Merge branch '8484-sanity-check-collection-count' closes #8484
authorTom Clegg <tom@curoverse.com>
Fri, 19 Feb 2016 22:44:52 +0000 (17:44 -0500)
committerTom Clegg <tom@curoverse.com>
Fri, 19 Feb 2016 22:44:52 +0000 (17:44 -0500)
1  2 
services/datamanager/collection/collection.go

index f0a0f3f1aab62f1fa6367cff900eac261f22157b,7cc3ff720438254351c7cc9efe3e6f9c3b407b63..9b7eb7543a4ebed086aba2d409f44fcc789ef222
@@@ -130,9 -130,8 +130,9 @@@ func GetCollections(params GetCollectio
  
        sdkParams := arvadosclient.Dict{
                "select":  fieldsWanted,
 -              "order":   []string{"modified_at ASC"},
 -              "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
 +              "order":   []string{"modified_at ASC", "uuid ASC"},
 +              "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}},
 +              "offset": 0}
  
        if params.BatchSize > 0 {
                sdkParams["limit"] = params.BatchSize
  
        // These values are just for getting the loop to run the first time,
        // afterwards they'll be set to real values.
 -      previousTotalCollections := -1
 -      totalCollections := 0
 -      for totalCollections > previousTotalCollections {
 +      remainingCollections := 1
 +      var totalCollections int
 +      var previousTotalCollections int
 +      for remainingCollections > 0 {
                // We're still finding new collections
  
                // Write the heap profile for examining memory usage
                if err != nil {
                        return
                }
 +              batchCollections := len(collections.Items)
 +
 +              // We must always have at least one collection in the batch
 +              if batchCollections < 1 {
 +                      err = fmt.Errorf("API query returned no collections for %+v", sdkParams)
 +                      return
 +              }
 +
 +              // Update count of remaining collections
 +              remainingCollections = collections.ItemsAvailable - sdkParams["offset"].(int) - batchCollections
  
                // Process collection and update our date filter.
                latestModificationDate, maxManifestSize, totalManifestSize, err := ProcessCollections(params.Logger,
                if err != nil {
                        return results, err
                }
 -              sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339)
 +              if sdkParams["filters"].([][]string)[0][2] != latestModificationDate.Format(time.RFC3339) {
 +                      sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339)
 +                      sdkParams["offset"] = 0
 +              } else {
 +                      sdkParams["offset"] = sdkParams["offset"].(int) + batchCollections
 +              }
  
                // update counts
                previousTotalCollections = totalCollections
                totalCollections = len(results.UUIDToCollection)
  
 -              log.Printf("%d collections read, %d new in last batch, "+
 +              log.Printf("%d collections read, %d (%d new) in last batch, "+
 +                      "%d remaining, "+
                        "%s latest modified date, %.0f %d %d avg,max,total manifest size",
                        totalCollections,
 +                      batchCollections,
                        totalCollections-previousTotalCollections,
 +                      remainingCollections,
                        sdkParams["filters"].([][]string)[0][2],
                        float32(totalManifestSize)/float32(totalCollections),
                        maxManifestSize, totalManifestSize)
                }
        }
  
+       // Make one final API request to verify that we have processed all collections available up to the latest modification date
+       var collections SdkCollectionList
+       sdkParams["filters"].([][]string)[0][1] = "<="
+       sdkParams["limit"] = 0
+       err = params.Client.List("collections", sdkParams, &collections)
+       if err != nil {
+               return
+       }
+       finalNumberOfCollectionsAvailable, err :=
+               util.NumberItemsAvailable(params.Client, "collections")
+       if err != nil {
+               return
+       }
+       if totalCollections < finalNumberOfCollectionsAvailable {
+               err = fmt.Errorf("API server indicates a total of %d collections "+
+                               "available up to %v, but we only retrieved %d. "+
+                               "Refusing to continue as this could indicate an "+
+                               "otherwise undetected failure.",
+                               finalNumberOfCollectionsAvailable, 
+                               sdkParams["filters"].([][]string)[0][2],
+                               totalCollections)
+               return
+       }
        // Write the heap profile for examining memory usage
        err = WriteHeapProfile()