gofmt. refs #8754
[arvados.git] / services / datamanager / collection / collection.go
index 42e0cc429054fbb8da0831f117881682f4be225f..c392ded35dc70968a8a844de0181cfc41457b120 100644 (file)
@@ -130,8 +130,9 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections, err e
 
        sdkParams := arvadosclient.Dict{
                "select":  fieldsWanted,
-               "order":   []string{"modified_at ASC"},
-               "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
+               "order":   []string{"modified_at ASC", "uuid ASC"},
+               "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}},
+               "offset":  0}
 
        if params.BatchSize > 0 {
                sdkParams["limit"] = params.BatchSize
@@ -176,9 +177,10 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections, err e
 
        // These values are just for getting the loop to run the first time,
        // afterwards they'll be set to real values.
-       previousTotalCollections := -1
-       totalCollections := 0
-       for totalCollections > previousTotalCollections {
+       remainingCollections := 1
+       var totalCollections int
+       var previousTotalCollections int
+       for remainingCollections > 0 {
                // We're still finding new collections
 
                // Write the heap profile for examining memory usage
@@ -193,6 +195,16 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections, err e
                if err != nil {
                        return
                }
+               batchCollections := len(collections.Items)
+
+               // We must always have at least one collection in the batch
+               if batchCollections < 1 {
+                       err = fmt.Errorf("API query returned no collections for %+v", sdkParams)
+                       return
+               }
+
+               // Update count of remaining collections
+               remainingCollections = collections.ItemsAvailable - sdkParams["offset"].(int) - batchCollections
 
                // Process collection and update our date filter.
                latestModificationDate, maxManifestSize, totalManifestSize, err := ProcessCollections(params.Logger,
@@ -202,16 +214,24 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections, err e
                if err != nil {
                        return results, err
                }
-               sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339)
+               if sdkParams["filters"].([][]string)[0][2] != latestModificationDate.Format(time.RFC3339) {
+                       sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339)
+                       sdkParams["offset"] = 0
+               } else {
+                       sdkParams["offset"] = sdkParams["offset"].(int) + batchCollections
+               }
 
                // update counts
                previousTotalCollections = totalCollections
                totalCollections = len(results.UUIDToCollection)
 
-               log.Printf("%d collections read, %d new in last batch, "+
+               log.Printf("%d collections read, %d (%d new) in last batch, "+
+                       "%d remaining, "+
                        "%s latest modified date, %.0f %d %d avg,max,total manifest size",
                        totalCollections,
+                       batchCollections,
                        totalCollections-previousTotalCollections,
+                       remainingCollections,
                        sdkParams["filters"].([][]string)[0][2],
                        float32(totalManifestSize)/float32(totalCollections),
                        maxManifestSize, totalManifestSize)
@@ -228,6 +248,7 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections, err e
        }
 
        // Make one final API request to verify that we have processed all collections available up to the latest modification date
+       var collections SdkCollectionList
        sdkParams["filters"].([][]string)[0][1] = "<="
        sdkParams["limit"] = 0
        err = params.Client.List("collections", sdkParams, &collections)
@@ -241,12 +262,12 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections, err e
        }
        if totalCollections < finalNumberOfCollectionsAvailable {
                err = fmt.Errorf("API server indicates a total of %d collections "+
-                               "available up to %v, but we only retrieved %d. "+
-                               "Refusing to continue as this could indicate an "+
-                               "otherwise undetected failure.",
-                               finalNumberOfCollectionsAvailable, 
-                               sdkParams["filters"].([][]string)[0][2],
-                               totalCollections)
+                       "available up to %v, but we only retrieved %d. "+
+                       "Refusing to continue as this could indicate an "+
+                       "otherwise undetected failure.",
+                       finalNumberOfCollectionsAvailable,
+                       sdkParams["filters"].([][]string)[0][2],
+                       totalCollections)
                return
        }