Changes GetCollection loop to more reliably fetch all collections
authorJoshua C. Randall <jcrandall@alum.mit.edu>
Thu, 18 Feb 2016 14:36:53 +0000 (14:36 +0000)
committerJoshua C. Randall <jcrandall@alum.mit.edu>
Thu, 18 Feb 2016 14:36:53 +0000 (14:36 +0000)
Modifies the loop termination condition in GetCollection so that
it continues until there are no more items available according to
the API server (returned in collections.ItemsAvailable).

Modifies the query code so that it uses an offset to page through
results in case an entire batch has equal modified_at timestamps.

services/datamanager/collection/collection.go

index 1229f2917e21f9b17d897db8fe85e7adb4e43429..1d41a7087a3fc1e73d0668c6c01533624c045d9a 100644 (file)
@@ -131,7 +131,8 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections, err e
        sdkParams := arvadosclient.Dict{
                "select":  fieldsWanted,
                "order":   []string{"modified_at ASC"},
-               "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
+               "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}},
+               "offset": 0}
 
        if params.BatchSize > 0 {
                sdkParams["limit"] = params.BatchSize
@@ -176,9 +177,10 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections, err e
 
        // These values are just for getting the loop to run the first time,
        // afterwards they'll be set to real values.
-       previousTotalCollections := -1
-       totalCollections := 0
-       for totalCollections > previousTotalCollections {
+       remainingCollections := 1
+       var totalCollections int
+       var previousTotalCollections int
+       for remainingCollections > 0 {
                // We're still finding new collections
 
                // Write the heap profile for examining memory usage
@@ -194,6 +196,12 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections, err e
                        return
                }
 
+               // Update count of remaining collections
+               remainingCollections = collections.ItemsAvailable - params.BatchSize - sdkParams["offset"].(int)
+               if remainingCollections < 0 {
+                       remainingCollections = 0
+               }
+
                // Process collection and update our date filter.
                latestModificationDate, maxManifestSize, totalManifestSize, err := ProcessCollections(params.Logger,
                        collections.Items,
@@ -202,16 +210,23 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections, err e
                if err != nil {
                        return results, err
                }
-               sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339)
+               if sdkParams["filters"].([][]string)[0][2] != latestModificationDate.Format(time.RFC3339) {
+                       sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339)
+                       sdkParams["offset"] = 0
+               } else {
+                       sdkParams["offset"] = sdkParams["offset"].(int) + params.BatchSize
+               }
 
                // update counts
                previousTotalCollections = totalCollections
                totalCollections = len(results.UUIDToCollection)
 
                log.Printf("%d collections read, %d new in last batch, "+
+                       "%d remaining, "+
                        "%s latest modified date, %.0f %d %d avg,max,total manifest size",
                        totalCollections,
                        totalCollections-previousTotalCollections,
+                       remainingCollections,
                        sdkParams["filters"].([][]string)[0][2],
                        float32(totalManifestSize)/float32(totalCollections),
                        maxManifestSize, totalManifestSize)