Created helper method to deal with iterating through list returned by SDK.
[arvados.git] / services / datamanager / collection / collection.go
1 /* Deals with parsing Collection responses from API Server. */
2
3 package collection
4
5 import (
6         "errors"
7         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8         "git.curoverse.com/arvados.git/sdk/go/manifest"
9         "log"
10 )
11
12 type Collection struct {
13         BlockDigestToSize map[string]int
14         ReplicationLevel int
15         Uuid string
16         OwnerUuid string
17 }
18
19 type ReadCollections struct {
20         ReadAllCollections bool
21         UuidToCollection map[string]Collection
22 }
23
24 type GetCollectionsParams struct {
25         Client arvadosclient.ArvadosClient
26         Limit int
27         LogEveryNthCollectionProcessed int  // 0 means don't report any
28 }
29
30 // TODO(misha): Move this method somewhere more central
31 func SdkListResponseContainsAllAvailableItems(response map[string]interface{}) (containsAll bool, numContained int, numAvailable int) {
32         if value, ok := response["items"]; ok {
33                 items := value.([]interface{})
34                 {
35                         var itemsAvailable interface{}
36                         if itemsAvailable, ok = response["items_available"]; !ok {
37                                 // TODO(misha): Consider returning an error here (and above if
38                                 // we can't find items) so that callers can recover.
39                                 log.Fatalf("API server did not return the number of items available")
40                         }
41                         numContained = len(items)
42                         numAvailable = int(itemsAvailable.(float64))
43                         // If we never entered this block, allAvailable would be false by
44                         // default, which is what we want
45                         containsAll = numContained == numAvailable
46                 }
47         }
48         return
49 }
50
51 func IterateSdkListItems(response map[string]interface{}) (c <-chan map[string]interface{}, err error) {
52         if value, ok := response["items"]; ok {
53                 ch := make(chan map[string]interface{})
54                 c = ch
55                 items := value.([]interface{})
56                 go func() {
57                         for _, item := range items {
58                                 ch <- item.(map[string]interface{})
59                         }
60                         close(ch)
61                 }()
62         } else {
63                 err = errors.New("Could not find \"items\" field in response " +
64                         "passed to IterateSdkListItems()")
65         }
66         return
67 }
68
69
70
71 func GetCollections(params GetCollectionsParams) (results ReadCollections) {
72         if &params.Client == nil {
73                 log.Fatalf("Received params.Client passed to GetCollections() should " +
74                         "contain a valid ArvadosClient, but instead it is nil.")
75         }
76
77         fieldsWanted := []string{"manifest_text",
78                 "owner_uuid",
79                 "uuid",
80                 // TODO(misha): Start using the redundancy field.
81                 "redundancy"}
82
83         sdkParams := arvadosclient.Dict{"select": fieldsWanted}
84         if params.Limit > 0 {
85                 sdkParams["limit"] = params.Limit
86         }
87
88         var collections map[string]interface{}
89         err := params.Client.List("collections", sdkParams, &collections)
90         if err != nil {
91                 log.Fatalf("error querying collections: %v", err)
92         }
93
94         {
95                 var numReceived, numAvailable int
96                 results.ReadAllCollections, numReceived, numAvailable =
97                         SdkListResponseContainsAllAvailableItems(collections)
98
99                 if (!results.ReadAllCollections) {
100                         log.Printf("ERROR: Did not receive all collections.")
101                 }
102                 log.Printf("Received %d of %d available collections.",
103                         numReceived,
104                         numAvailable)
105         }
106
107         if collectionChannel, err := IterateSdkListItems(collections); err != nil {
108                 log.Fatalf("Error trying to iterate collections returned by SDK: %v", err)
109         } else {
110                 index := 0
111                 results.UuidToCollection = make(map[string]Collection)
112                 for item_map := range collectionChannel {
113                         index += 1
114                         if m := params.LogEveryNthCollectionProcessed; m >0 && (index % m) == 0 {
115                                 log.Printf("Processing collection #%d", index)
116                         }
117                         collection := Collection{Uuid: item_map["uuid"].(string),
118                                 OwnerUuid: item_map["owner_uuid"].(string),
119                                 BlockDigestToSize: make(map[string]int)}
120                         manifest := manifest.Manifest{item_map["manifest_text"].(string)}
121                         blockChannel := manifest.BlockIterWithDuplicates()
122                         for block := range blockChannel {
123                                 if stored_size, stored := collection.BlockDigestToSize[block.Digest];
124                                 stored && stored_size != block.Size {
125                                         log.Fatalf(
126                                                 "Collection %s contains multiple sizes (%d and %d) for block %s",
127                                                 collection.Uuid,
128                                                 stored_size,
129                                                 block.Size,
130                                                 block.Digest)
131                                 }
132                                 collection.BlockDigestToSize[block.Digest] = block.Size
133                         }
134                         results.UuidToCollection[collection.Uuid] = collection
135                 }
136         }
137         return
138 }