Merge branch '8484-sanity-check-collection-count' closes #8484
[arvados.git] / services / datamanager / collection / collection.go
index 1d41a7087a3fc1e73d0668c6c01533624c045d9a..9b7eb7543a4ebed086aba2d409f44fcc789ef222 100644 (file)
@@ -130,7 +130,7 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections, err e
 
        sdkParams := arvadosclient.Dict{
                "select":  fieldsWanted,
-               "order":   []string{"modified_at ASC"},
+               "order":   []string{"modified_at ASC", "uuid ASC"},
                "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}},
                "offset": 0}
 
@@ -195,13 +195,17 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections, err e
                if err != nil {
                        return
                }
+               batchCollections := len(collections.Items)
 
-               // Update count of remaining collections
-               remainingCollections = collections.ItemsAvailable - params.BatchSize - sdkParams["offset"].(int)
-               if remainingCollections < 0 {
-                       remainingCollections = 0
+               // We must always have at least one collection in the batch
+               if batchCollections < 1 {
+                       err = fmt.Errorf("API query returned no collections for %+v", sdkParams)
+                       return
                }
 
+               // Update count of remaining collections
+               remainingCollections = collections.ItemsAvailable - sdkParams["offset"].(int) - batchCollections
+
                // Process collection and update our date filter.
                latestModificationDate, maxManifestSize, totalManifestSize, err := ProcessCollections(params.Logger,
                        collections.Items,
@@ -214,17 +218,18 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections, err e
                        sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339)
                        sdkParams["offset"] = 0
                } else {
-                       sdkParams["offset"] = sdkParams["offset"].(int) + params.BatchSize
+                       sdkParams["offset"] = sdkParams["offset"].(int) + batchCollections
                }
 
                // update counts
                previousTotalCollections = totalCollections
                totalCollections = len(results.UUIDToCollection)
 
-               log.Printf("%d collections read, %d new in last batch, "+
+               log.Printf("%d collections read, %d (%d new) in last batch, "+
                        "%d remaining, "+
                        "%s latest modified date, %.0f %d %d avg,max,total manifest size",
                        totalCollections,
+                       batchCollections,
                        totalCollections-previousTotalCollections,
                        remainingCollections,
                        sdkParams["filters"].([][]string)[0][2],
@@ -242,6 +247,30 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections, err e
                }
        }
 
+       // Make one final API request to verify that we have processed all collections available up to the latest modification date
+       var collections SdkCollectionList
+       sdkParams["filters"].([][]string)[0][1] = "<="
+       sdkParams["limit"] = 0
+       err = params.Client.List("collections", sdkParams, &collections)
+       if err != nil {
+               return
+       }
+       finalNumberOfCollectionsAvailable, err :=
+               util.NumberItemsAvailable(params.Client, "collections")
+       if err != nil {
+               return
+       }
+       if totalCollections < finalNumberOfCollectionsAvailable {
+               err = fmt.Errorf("API server indicates a total of %d collections "+
+                               "available up to %v, but we only retrieved %d. "+
+                               "Refusing to continue as this could indicate an "+
+                               "otherwise undetected failure.",
+                               finalNumberOfCollectionsAvailable, 
+                               sdkParams["filters"].([][]string)[0][2],
+                               totalCollections)
+               return
+       }
+
        // Write the heap profile for examining memory usage
        err = WriteHeapProfile()