Added params struct for GetCollections
[arvados.git] / services / datamanager / collection / collection.go
1 /* Deals with parsing Collection responses from API Server. */
2
3 package collection
4
5 import (
6         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7         "git.curoverse.com/arvados.git/sdk/go/manifest"
8         "log"
9 )
10
11 type Collection struct {
12         BlockDigestToSize map[string]int
13         ReplicationLevel int
14         Uuid string
15         OwnerUuid string
16 }
17
18 type ReadCollections struct {
19         ReadAllCollections bool
20         UuidToCollection map[string]Collection
21 }
22
23 type GetCollectionsParams struct {
24         Client arvadosclient.ArvadosClient
25         Limit int
26         LogEveryNthCollectionProcessed int  // 0 means don't report any
27 }
28
29 // TODO(misha): Move this method somewhere more central
30 func SdkListResponseContainsAllAvailableItems(response map[string]interface{}) (containsAll bool, numContained int, numAvailable int) {
31         if value, ok := response["items"]; ok {
32                 items := value.([]interface{})
33                 {
34                         var itemsAvailable interface{}
35                         if itemsAvailable, ok = response["items_available"]; !ok {
36                                 // TODO(misha): Consider returning an error here (and above if
37                                 // we can't find items) so that callers can recover.
38                                 log.Fatalf("API server did not return the number of items available")
39                         }
40                         numContained = len(items)
41                         numAvailable = int(itemsAvailable.(float64))
42                         // If we never entered this block, allAvailable would be false by
43                         // default, which is what we want
44                         containsAll = numContained == numAvailable
45                 }
46         }
47         return
48 }
49
50 func GetCollections(params GetCollectionsParams) (results ReadCollections) {
51         if &params.Client == nil {
52                 log.Fatalf("Received params.Client passed to GetCollections() should " +
53                         "contain a valid ArvadosClient, but instead it is nil.")
54         }
55
56         fieldsWanted := []string{"manifest_text",
57                 "owner_uuid",
58                 "uuid",
59                 // TODO(misha): Start using the redundancy field.
60                 "redundancy"}
61
62         sdkParams := arvadosclient.Dict{"select": fieldsWanted}
63         if params.Limit > 0 {
64                 sdkParams["limit"] = params.Limit
65         }
66
67         var collections map[string]interface{}
68         err := params.Client.List("collections", sdkParams, &collections)
69         if err != nil {
70                 log.Fatalf("error querying collections: %v", err)
71         }
72
73         {
74                 var numReceived, numAvailable int
75                 results.ReadAllCollections, numReceived, numAvailable =
76                         SdkListResponseContainsAllAvailableItems(collections)
77
78                 if (!results.ReadAllCollections) {
79                         log.Printf("ERROR: Did not receive all collections.")
80                 }
81                 log.Printf("Received %d of %d available collections.",
82                         numReceived,
83                         numAvailable)
84         }
85
86         if value, ok := collections["items"]; ok {
87                 items := value.([]interface{})
88
89                 results.UuidToCollection = make(map[string]Collection)
90                 for index, item := range items {
91                         if m := params.LogEveryNthCollectionProcessed; m >0 && (index % m) == 0 {
92                                 log.Printf("Processing collection #%d", index)
93                         }
94                         item_map := item.(map[string]interface{})
95                         collection := Collection{Uuid: item_map["uuid"].(string),
96                                 OwnerUuid: item_map["owner_uuid"].(string),
97                                 BlockDigestToSize: make(map[string]int)}
98                         manifest := manifest.Manifest{item_map["manifest_text"].(string)}
99                         blockChannel := manifest.BlockIterWithDuplicates()
100                         for block := range blockChannel {
101                                 if stored_size, stored := collection.BlockDigestToSize[block.Digest];
102                                 stored && stored_size != block.Size {
103                                         log.Fatalf(
104                                                 "Collection %s contains multiple sizes (%d and %d) for block %s",
105                                                 collection.Uuid,
106                                                 stored_size,
107                                                 block.Size,
108                                                 block.Digest)
109                                 }
110                                 collection.BlockDigestToSize[block.Digest] = block.Size
111                         }
112                         results.UuidToCollection[collection.Uuid] = collection
113                 }
114         }
115         return
116 }