10383: Added 2 tests to check that no resuming take place when using either --no...
[arvados.git] / services / datamanager / collection / collection.go
index 7102ec3c054bb24df5e3e0979784b9c08b3949ed..05e7a5f2313d6b5a9138a90d7b1f9f7bfd2ad88c 100644 (file)
@@ -18,9 +18,6 @@ import (
 
 var (
        HeapProfileFilename string
-       // globals for debugging
-       totalManifestSize uint64
-       maxManifestSize   uint64
 )
 
 // Collection representation
@@ -41,23 +38,22 @@ type ReadCollections struct {
        CollectionUUIDToIndex     map[string]int
        CollectionIndexToUUID     []string
        BlockToCollectionIndices  map[blockdigest.DigestWithSize][]int
-       Err                       error
 }
 
 // GetCollectionsParams params
 type GetCollectionsParams struct {
-       Client    arvadosclient.ArvadosClient
+       Client    *arvadosclient.ArvadosClient
        Logger    *logger.Logger
        BatchSize int
 }
 
 // SdkCollectionInfo holds collection info from api
 type SdkCollectionInfo struct {
-       UUID         string    `json:"uuid"`
-       OwnerUUID    string    `json:"owner_uuid"`
-       Redundancy   int       `json:"redundancy"`
-       ModifiedAt   time.Time `json:"modified_at"`
-       ManifestText string    `json:"manifest_text"`
+       UUID               string    `json:"uuid"`
+       OwnerUUID          string    `json:"owner_uuid"`
+       ReplicationDesired int       `json:"replication_desired"`
+       ModifiedAt         time.Time `json:"modified_at"`
+       ManifestText       string    `json:"manifest_text"`
 }
 
 // SdkCollectionList lists collections from api
@@ -96,10 +92,9 @@ func WriteHeapProfile() error {
 }
 
 // GetCollectionsAndSummarize gets collections from api and summarizes
-func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
-       results, err := GetCollections(params)
+func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections, err error) {
+       results, err = GetCollections(params)
        if err != nil {
-               results.Err = err
                return
        }
 
@@ -130,13 +125,14 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections, err e
        fieldsWanted := []string{"manifest_text",
                "owner_uuid",
                "uuid",
-               "redundancy",
+               "replication_desired",
                "modified_at"}
 
        sdkParams := arvadosclient.Dict{
                "select":  fieldsWanted,
-               "order":   []string{"modified_at ASC"},
-               "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
+               "order":   []string{"modified_at ASC", "uuid ASC"},
+               "filters": [][]string{{"modified_at", ">=", "1900-01-01T00:00:00Z"}},
+               "offset":  0}
 
        if params.BatchSize > 0 {
                sdkParams["limit"] = params.BatchSize
@@ -181,9 +177,10 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections, err e
 
        // These values are just for getting the loop to run the first time,
        // afterwards they'll be set to real values.
-       previousTotalCollections := -1
-       totalCollections := 0
-       for totalCollections > previousTotalCollections {
+       remainingCollections := 1
+       var totalCollections int
+       var previousTotalCollections int
+       for remainingCollections > 0 {
                // We're still finding new collections
 
                // Write the heap profile for examining memory usage
@@ -198,26 +195,43 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections, err e
                if err != nil {
                        return
                }
+               batchCollections := len(collections.Items)
+
+               // We must always have at least one collection in the batch
+               if batchCollections < 1 {
+                       err = fmt.Errorf("API query returned no collections for %+v", sdkParams)
+                       return
+               }
+
+               // Update count of remaining collections
+               remainingCollections = collections.ItemsAvailable - sdkParams["offset"].(int) - batchCollections
 
                // Process collection and update our date filter.
-               var latestModificationDate time.Time
-               latestModificationDate, err = ProcessCollections(params.Logger,
+               latestModificationDate, maxManifestSize, totalManifestSize, err := ProcessCollections(params.Logger,
                        collections.Items,
                        defaultReplicationLevel,
                        results.UUIDToCollection)
                if err != nil {
-                       return
+                       return results, err
+               }
+               if sdkParams["filters"].([][]string)[0][2] != latestModificationDate.Format(time.RFC3339) {
+                       sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339)
+                       sdkParams["offset"] = 0
+               } else {
+                       sdkParams["offset"] = sdkParams["offset"].(int) + batchCollections
                }
-               sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339)
 
                // update counts
                previousTotalCollections = totalCollections
                totalCollections = len(results.UUIDToCollection)
 
-               log.Printf("%d collections read, %d new in last batch, "+
+               log.Printf("%d collections read, %d (%d new) in last batch, "+
+                       "%d remaining, "+
                        "%s latest modified date, %.0f %d %d avg,max,total manifest size",
                        totalCollections,
+                       batchCollections,
                        totalCollections-previousTotalCollections,
+                       remainingCollections,
                        sdkParams["filters"].([][]string)[0][2],
                        float32(totalManifestSize)/float32(totalCollections),
                        maxManifestSize, totalManifestSize)
@@ -233,6 +247,30 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections, err e
                }
        }
 
+       // Make one final API request to verify that we have processed all collections available up to the latest modification date
+       var collections SdkCollectionList
+       sdkParams["filters"].([][]string)[0][1] = "<="
+       sdkParams["limit"] = 0
+       err = params.Client.List("collections", sdkParams, &collections)
+       if err != nil {
+               return
+       }
+       finalNumberOfCollectionsAvailable, err :=
+               util.NumberItemsAvailable(params.Client, "collections")
+       if err != nil {
+               return
+       }
+       if totalCollections < finalNumberOfCollectionsAvailable {
+               err = fmt.Errorf("API server indicates a total of %d collections "+
+                       "available up to %v, but we only retrieved %d. "+
+                       "Refusing to continue as this could indicate an "+
+                       "otherwise undetected failure.",
+                       finalNumberOfCollectionsAvailable,
+                       sdkParams["filters"].([][]string)[0][2],
+                       totalCollections)
+               return
+       }
+
        // Write the heap profile for examining memory usage
        err = WriteHeapProfile()
 
@@ -250,20 +288,26 @@ func StrCopy(s string) string {
 func ProcessCollections(arvLogger *logger.Logger,
        receivedCollections []SdkCollectionInfo,
        defaultReplicationLevel int,
-       UUIDToCollection map[string]Collection) (latestModificationDate time.Time, err error) {
+       UUIDToCollection map[string]Collection,
+) (
+       latestModificationDate time.Time,
+       maxManifestSize, totalManifestSize uint64,
+       err error,
+) {
        for _, sdkCollection := range receivedCollections {
                collection := Collection{UUID: StrCopy(sdkCollection.UUID),
                        OwnerUUID:         StrCopy(sdkCollection.OwnerUUID),
-                       ReplicationLevel:  sdkCollection.Redundancy,
+                       ReplicationLevel:  sdkCollection.ReplicationDesired,
                        BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
 
                if sdkCollection.ModifiedAt.IsZero() {
-                       return latestModificationDate, fmt.Errorf(
+                       err = fmt.Errorf(
                                "Arvados SDK collection returned with unexpected zero "+
                                        "modification date. This probably means that either we failed to "+
                                        "parse the modification date or the API server has changed how "+
                                        "it returns modification dates: %+v",
                                collection)
+                       return
                }
 
                if sdkCollection.ModifiedAt.After(latestModificationDate) {
@@ -274,7 +318,7 @@ func ProcessCollections(arvLogger *logger.Logger,
                        collection.ReplicationLevel = defaultReplicationLevel
                }
 
-               manifest := manifest.Manifest{sdkCollection.ManifestText}
+               manifest := manifest.Manifest{Text: sdkCollection.ManifestText}
                manifestSize := uint64(len(sdkCollection.ManifestText))
 
                if _, alreadySeen := UUIDToCollection[collection.UUID]; !alreadySeen {
@@ -287,16 +331,20 @@ func ProcessCollections(arvLogger *logger.Logger,
                blockChannel := manifest.BlockIterWithDuplicates()
                for block := range blockChannel {
                        if storedSize, stored := collection.BlockDigestToSize[block.Digest]; stored && storedSize != block.Size {
-                               err = fmt.Errorf(
+                               log.Printf(
                                        "Collection %s contains multiple sizes (%d and %d) for block %s",
                                        collection.UUID,
                                        storedSize,
                                        block.Size,
                                        block.Digest)
-                               return
                        }
                        collection.BlockDigestToSize[block.Digest] = block.Size
                }
+               if manifest.Err != nil {
+                       err = manifest.Err
+                       return
+               }
+
                collection.TotalSize = 0
                for _, size := range collection.BlockDigestToSize {
                        collection.TotalSize += size