Added params struct for GetCollections
[arvados.git] / services / datamanager / collection / collection.go
index 87f89841a0389ce060388cb899202264e178cbf8..9dc5a702026bafcc124ef77e3f1c3ae0df7b8f68 100644 (file)
@@ -4,68 +4,110 @@ package collection
 
 import (
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/services/datamanager/manifest"
+       "git.curoverse.com/arvados.git/sdk/go/manifest"
        "log"
 )
 
 type Collection struct {
-       // TODO(misha): Consider whether we need BlockLocator.hints, and if
-       // not, perhaps we should use a custom struct here.
-       Blocks []manifest.BlockLocator
+       BlockDigestToSize map[string]int
        ReplicationLevel int
        Uuid string
-       ownerUuid string
+       OwnerUuid string
 }
 
-type readCollections struct {
+type ReadCollections struct {
        ReadAllCollections bool
        UuidToCollection map[string]Collection
 }
 
-func GetCollections(arv arvadosclient.ArvadosClient) (results readCollections) {
+type GetCollectionsParams struct {
+       Client arvadosclient.ArvadosClient
+       Limit int
+       LogEveryNthCollectionProcessed int  // 0 means don't report any
+}
+
+// TODO(misha): Move this method somewhere more central
+func SdkListResponseContainsAllAvailableItems(response map[string]interface{}) (containsAll bool, numContained int, numAvailable int) {
+       if value, ok := response["items"]; ok {
+               items := value.([]interface{})
+               {
+                       var itemsAvailable interface{}
+                       if itemsAvailable, ok = response["items_available"]; !ok {
+                               // TODO(misha): Consider returning an error here (and above if
+                               // we can't find items) so that callers can recover.
+                               log.Fatalf("API server did not return the number of items available")
+                       }
+                       numContained = len(items)
+                       numAvailable = int(itemsAvailable.(float64))
+                       // If we never entered this block, allAvailable would be false by
+                       // default, which is what we want
+                       containsAll = numContained == numAvailable
+               }
+       }
+       return
+}
+
+func GetCollections(params GetCollectionsParams) (results ReadCollections) {
+       if &params.Client == nil {
+               log.Fatalf("Received params.Client passed to GetCollections() should " +
+                       "contain a valid ArvadosClient, but instead it is nil.")
+       }
+
        fieldsWanted := []string{"manifest_text",
                "owner_uuid",
                "uuid",
+               // TODO(misha): Start using the redundancy field.
                "redundancy"}
 
-       // TODO(misha): Set the limit param with a flag.
-       params := arvadosclient.Dict{"limit": 1, "select": fieldsWanted}
+       sdkParams := arvadosclient.Dict{"select": fieldsWanted}
+       if params.Limit > 0 {
+               sdkParams["limit"] = params.Limit
+       }
 
        var collections map[string]interface{}
-       err := arv.List("collections", params, &collections)
+       err := params.Client.List("collections", sdkParams, &collections)
        if err != nil {
                log.Fatalf("error querying collections: %v", err)
        }
 
-       results.ReadAllCollections = false
+       {
+               var numReceived, numAvailable int
+               results.ReadAllCollections, numReceived, numAvailable =
+                       SdkListResponseContainsAllAvailableItems(collections)
+
+               if (!results.ReadAllCollections) {
+                       log.Printf("ERROR: Did not receive all collections.")
+               }
+               log.Printf("Received %d of %d available collections.",
+                       numReceived,
+                       numAvailable)
+       }
 
        if value, ok := collections["items"]; ok {
                items := value.([]interface{})
-               
-               {
-                       itemsAvailable, ok := collections["items_available"]
-                       if !ok {
-                               log.Fatalf("API server did not return the number of items available")
-                       }
-                       numReceived := len(items)
-                       numAvailable := int(itemsAvailable.(float64))
-                       results.ReadAllCollections = numReceived == numAvailable
-
-                       if (!results.ReadAllCollections) {
-                               log.Printf("ERROR: Did not receive all collections. Received %d of %d available collections.",
-                                       numReceived, numAvailable)
-                       }
-               }
 
                results.UuidToCollection = make(map[string]Collection)
-               for _, item := range items {
+               for index, item := range items {
+                       if m := params.LogEveryNthCollectionProcessed; m >0 && (index % m) == 0 {
+                               log.Printf("Processing collection #%d", index)
+                       }
                        item_map := item.(map[string]interface{})
                        collection := Collection{Uuid: item_map["uuid"].(string),
-                               ownerUuid: item_map["owner_uuid"].(string)}
+                               OwnerUuid: item_map["owner_uuid"].(string),
+                               BlockDigestToSize: make(map[string]int)}
                        manifest := manifest.Manifest{item_map["manifest_text"].(string)}
-                       blockChannel := manifest.BlockIter()
+                       blockChannel := manifest.BlockIterWithDuplicates()
                        for block := range blockChannel {
-                               collection.Blocks = append(collection.Blocks, block)
+                               if stored_size, stored := collection.BlockDigestToSize[block.Digest];
+                               stored && stored_size != block.Size {
+                                       log.Fatalf(
+                                               "Collection %s contains multiple sizes (%d and %d) for block %s",
+                                               collection.Uuid,
+                                               stored_size,
+                                               block.Size,
+                                               block.Digest)
+                               }
+                               collection.BlockDigestToSize[block.Digest] = block.Size
                        }
                        results.UuidToCollection[collection.Uuid] = collection
                }