import (
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/services/datamanager/manifest"
+ "git.curoverse.com/arvados.git/sdk/go/manifest"
"log"
)
OwnerUuid string
}
-type readCollections struct {
+type ReadCollections struct {
ReadAllCollections bool
UuidToCollection map[string]Collection
}
-func GetCollections(arv arvadosclient.ArvadosClient) (results readCollections) {
+type GetCollectionsParams struct {
+ Client arvadosclient.ArvadosClient
+ Limit int
+ LogEveryNthCollectionProcessed int // 0 means don't report any
+}
+
+// TODO(misha): Move this method somewhere more central
+func SdkListResponseContainsAllAvailableItems(response map[string]interface{}) (containsAll bool, numContained int, numAvailable int) {
+ if value, ok := response["items"]; ok {
+ items := value.([]interface{})
+ {
+ var itemsAvailable interface{}
+ if itemsAvailable, ok = response["items_available"]; !ok {
+ // TODO(misha): Consider returning an error here (and above if
+ // we can't find items) so that callers can recover.
+ log.Fatalf("API server did not return the number of items available")
+ }
+ numContained = len(items)
+ numAvailable = int(itemsAvailable.(float64))
+ // If we never entered this block, allAvailable would be false by
+ // default, which is what we want
+ containsAll = numContained == numAvailable
+ }
+ }
+ return
+}
+
+func GetCollections(params GetCollectionsParams) (results ReadCollections) {
+ if ¶ms.Client == nil {
+ log.Fatalf("Received params.Client passed to GetCollections() should " +
+ "contain a valid ArvadosClient, but instead it is nil.")
+ }
+
fieldsWanted := []string{"manifest_text",
"owner_uuid",
"uuid",
// TODO(misha): Start using the redundancy field.
"redundancy"}
- // TODO(misha): Set the limit param with a flag.
- params := arvadosclient.Dict{"limit": 1, "select": fieldsWanted}
+ sdkParams := arvadosclient.Dict{"select": fieldsWanted}
+ if params.Limit > 0 {
+ sdkParams["limit"] = params.Limit
+ }
var collections map[string]interface{}
- err := arv.List("collections", params, &collections)
+ err := params.Client.List("collections", sdkParams, &collections)
if err != nil {
log.Fatalf("error querying collections: %v", err)
}
- results.ReadAllCollections = false
+ {
+ var numReceived, numAvailable int
+ results.ReadAllCollections, numReceived, numAvailable =
+ SdkListResponseContainsAllAvailableItems(collections)
+
+ if (!results.ReadAllCollections) {
+ log.Printf("ERROR: Did not receive all collections.")
+ }
+ log.Printf("Received %d of %d available collections.",
+ numReceived,
+ numAvailable)
+ }
if value, ok := collections["items"]; ok {
items := value.([]interface{})
- {
- var itemsAvailable interface{}
- if itemsAvailable, ok = collections["items_available"]; !ok {
- log.Fatalf("API server did not return the number of items available")
- }
- numReceived := len(items)
- numAvailable := int(itemsAvailable.(float64))
- results.ReadAllCollections = numReceived == numAvailable
-
- if (!results.ReadAllCollections) {
- log.Printf(
- "ERROR: Did not receive all collections. " +
- "Received %d of %d available collections.",
- numReceived,
- numAvailable)
- }
- }
-
results.UuidToCollection = make(map[string]Collection)
- for _, item := range items {
+ for index, item := range items {
+ if m := params.LogEveryNthCollectionProcessed; m >0 && (index % m) == 0 {
+ log.Printf("Processing collection #%d", index)
+ }
item_map := item.(map[string]interface{})
collection := Collection{Uuid: item_map["uuid"].(string),
OwnerUuid: item_map["owner_uuid"].(string),