1 /* Deals with parsing Collection responses from API Server. */
6 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7 "git.curoverse.com/arvados.git/sdk/go/manifest"
11 type Collection struct {
12 BlockDigestToSize map[string]int
18 type ReadCollections struct {
19 ReadAllCollections bool
20 UuidToCollection map[string]Collection
23 type GetCollectionsParams struct {
24 Client arvadosclient.ArvadosClient
26 LogEveryNthCollectionProcessed int // 0 means don't report any
29 // TODO(misha): Move this method somewhere more central
30 func SdkListResponseContainsAllAvailableItems(response map[string]interface{}) (containsAll bool, numContained int, numAvailable int) {
31 if value, ok := response["items"]; ok {
32 items := value.([]interface{})
34 var itemsAvailable interface{}
35 if itemsAvailable, ok = response["items_available"]; !ok {
36 // TODO(misha): Consider returning an error here (and above if
37 // we can't find items) so that callers can recover.
38 log.Fatalf("API server did not return the number of items available")
40 numContained = len(items)
41 numAvailable = int(itemsAvailable.(float64))
42 // If we never entered this block, allAvailable would be false by
43 // default, which is what we want
44 containsAll = numContained == numAvailable
50 func GetCollections(params GetCollectionsParams) (results ReadCollections) {
51 if ¶ms.Client == nil {
52 log.Fatalf("Received params.Client passed to GetCollections() should " +
53 "contain a valid ArvadosClient, but instead it is nil.")
56 fieldsWanted := []string{"manifest_text",
59 // TODO(misha): Start using the redundancy field.
62 sdkParams := arvadosclient.Dict{"select": fieldsWanted}
64 sdkParams["limit"] = params.Limit
67 var collections map[string]interface{}
68 err := params.Client.List("collections", sdkParams, &collections)
70 log.Fatalf("error querying collections: %v", err)
74 var numReceived, numAvailable int
75 results.ReadAllCollections, numReceived, numAvailable =
76 SdkListResponseContainsAllAvailableItems(collections)
78 if (!results.ReadAllCollections) {
79 log.Printf("ERROR: Did not receive all collections.")
81 log.Printf("Received %d of %d available collections.",
86 if value, ok := collections["items"]; ok {
87 items := value.([]interface{})
89 results.UuidToCollection = make(map[string]Collection)
90 for index, item := range items {
91 if m := params.LogEveryNthCollectionProcessed; m >0 && (index % m) == 0 {
92 log.Printf("Processing collection #%d", index)
94 item_map := item.(map[string]interface{})
95 collection := Collection{Uuid: item_map["uuid"].(string),
96 OwnerUuid: item_map["owner_uuid"].(string),
97 BlockDigestToSize: make(map[string]int)}
98 manifest := manifest.Manifest{item_map["manifest_text"].(string)}
99 blockChannel := manifest.BlockIterWithDuplicates()
100 for block := range blockChannel {
101 if stored_size, stored := collection.BlockDigestToSize[block.Digest];
102 stored && stored_size != block.Size {
104 "Collection %s contains multiple sizes (%d and %d) for block %s",
110 collection.BlockDigestToSize[block.Digest] = block.Size
112 results.UuidToCollection[collection.Uuid] = collection