Moved Keep code to its own package.
[arvados.git] / services / datamanager / collection / collection.go
1 /* Deals with parsing Collection responses from API Server. */
2
3 package collection
4
5 import (
6         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7         "git.curoverse.com/arvados.git/sdk/go/manifest"
8         "git.curoverse.com/arvados.git/sdk/go/util"
9         "log"
10 )
11
12 type Collection struct {
13         BlockDigestToSize map[string]int
14         ReplicationLevel int
15         Uuid string
16         OwnerUuid string
17 }
18
19 type ReadCollections struct {
20         ReadAllCollections bool
21         UuidToCollection map[string]Collection
22 }
23
24 type GetCollectionsParams struct {
25         Client arvadosclient.ArvadosClient
26         Limit int
27         LogEveryNthCollectionProcessed int  // 0 means don't report any
28 }
29
30
31
32
33 func GetCollections(params GetCollectionsParams) (results ReadCollections) {
34         if &params.Client == nil {
35                 log.Fatalf("params.Client passed to GetCollections() should " +
36                         "contain a valid ArvadosClient, but instead it is nil.")
37         }
38
39         fieldsWanted := []string{"manifest_text",
40                 "owner_uuid",
41                 "uuid",
42                 // TODO(misha): Start using the redundancy field.
43                 "redundancy"}
44
45         sdkParams := arvadosclient.Dict{"select": fieldsWanted}
46         if params.Limit > 0 {
47                 sdkParams["limit"] = params.Limit
48         }
49
50         var collections map[string]interface{}
51         err := params.Client.List("collections", sdkParams, &collections)
52         if err != nil {
53                 log.Fatalf("error querying collections: %v", err)
54         }
55
56         {
57                 var numReceived, numAvailable int
58                 results.ReadAllCollections, numReceived, numAvailable =
59                         util.SdkListResponseContainsAllAvailableItems(collections)
60
61                 if (!results.ReadAllCollections) {
62                         log.Printf("ERROR: Did not receive all collections.")
63                 }
64                 log.Printf("Received %d of %d available collections.",
65                         numReceived,
66                         numAvailable)
67         }
68
69         if collectionChannel, err := util.IterateSdkListItems(collections); err != nil {
70                 log.Fatalf("Error trying to iterate collections returned by SDK: %v", err)
71         } else {
72                 index := 0
73                 results.UuidToCollection = make(map[string]Collection)
74                 for item_map := range collectionChannel {
75                         index += 1
76                         if m := params.LogEveryNthCollectionProcessed; m >0 && (index % m) == 0 {
77                                 log.Printf("Processing collection #%d", index)
78                         }
79                         collection := Collection{Uuid: item_map["uuid"].(string),
80                                 OwnerUuid: item_map["owner_uuid"].(string),
81                                 BlockDigestToSize: make(map[string]int)}
82                         manifest := manifest.Manifest{item_map["manifest_text"].(string)}
83                         blockChannel := manifest.BlockIterWithDuplicates()
84                         for block := range blockChannel {
85                                 if stored_size, stored := collection.BlockDigestToSize[block.Digest];
86                                 stored && stored_size != block.Size {
87                                         log.Fatalf(
88                                                 "Collection %s contains multiple sizes (%d and %d) for block %s",
89                                                 collection.Uuid,
90                                                 stored_size,
91                                                 block.Size,
92                                                 block.Digest)
93                                 }
94                                 collection.BlockDigestToSize[block.Digest] = block.Size
95                         }
96                         results.UuidToCollection[collection.Uuid] = collection
97                 }
98         }
99         return
100 }