1 /* Deals with parsing Collection responses from API Server. */
7 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8 "git.curoverse.com/arvados.git/sdk/go/manifest"
12 type Collection struct {
13 BlockDigestToSize map[string]int
19 type ReadCollections struct {
20 ReadAllCollections bool
21 UuidToCollection map[string]Collection
24 type GetCollectionsParams struct {
25 Client arvadosclient.ArvadosClient
27 LogEveryNthCollectionProcessed int // 0 means don't report any
30 // TODO(misha): Move this method somewhere more central
31 func SdkListResponseContainsAllAvailableItems(response map[string]interface{}) (containsAll bool, numContained int, numAvailable int) {
32 if value, ok := response["items"]; ok {
33 items := value.([]interface{})
35 var itemsAvailable interface{}
36 if itemsAvailable, ok = response["items_available"]; !ok {
37 // TODO(misha): Consider returning an error here (and above if
38 // we can't find items) so that callers can recover.
39 log.Fatalf("API server did not return the number of items available")
41 numContained = len(items)
42 numAvailable = int(itemsAvailable.(float64))
43 // If we never entered this block, allAvailable would be false by
44 // default, which is what we want
45 containsAll = numContained == numAvailable
51 func IterateSdkListItems(response map[string]interface{}) (c <-chan map[string]interface{}, err error) {
52 if value, ok := response["items"]; ok {
53 ch := make(chan map[string]interface{})
55 items := value.([]interface{})
57 for _, item := range items {
58 ch <- item.(map[string]interface{})
63 err = errors.New("Could not find \"items\" field in response " +
64 "passed to IterateSdkListItems()")
71 func GetCollections(params GetCollectionsParams) (results ReadCollections) {
72 if ¶ms.Client == nil {
73 log.Fatalf("Received params.Client passed to GetCollections() should " +
74 "contain a valid ArvadosClient, but instead it is nil.")
77 fieldsWanted := []string{"manifest_text",
80 // TODO(misha): Start using the redundancy field.
83 sdkParams := arvadosclient.Dict{"select": fieldsWanted}
85 sdkParams["limit"] = params.Limit
88 var collections map[string]interface{}
89 err := params.Client.List("collections", sdkParams, &collections)
91 log.Fatalf("error querying collections: %v", err)
95 var numReceived, numAvailable int
96 results.ReadAllCollections, numReceived, numAvailable =
97 SdkListResponseContainsAllAvailableItems(collections)
99 if (!results.ReadAllCollections) {
100 log.Printf("ERROR: Did not receive all collections.")
102 log.Printf("Received %d of %d available collections.",
107 if collectionChannel, err := IterateSdkListItems(collections); err != nil {
108 log.Fatalf("Error trying to iterate collections returned by SDK: %v", err)
111 results.UuidToCollection = make(map[string]Collection)
112 for item_map := range collectionChannel {
114 if m := params.LogEveryNthCollectionProcessed; m >0 && (index % m) == 0 {
115 log.Printf("Processing collection #%d", index)
117 collection := Collection{Uuid: item_map["uuid"].(string),
118 OwnerUuid: item_map["owner_uuid"].(string),
119 BlockDigestToSize: make(map[string]int)}
120 manifest := manifest.Manifest{item_map["manifest_text"].(string)}
121 blockChannel := manifest.BlockIterWithDuplicates()
122 for block := range blockChannel {
123 if stored_size, stored := collection.BlockDigestToSize[block.Digest];
124 stored && stored_size != block.Size {
126 "Collection %s contains multiple sizes (%d and %d) for block %s",
132 collection.BlockDigestToSize[block.Digest] = block.Size
134 results.UuidToCollection[collection.Uuid] = collection