Merge branch '8484-sanity-check-collection-count' closes #8484
[arvados.git] / services / datamanager / collection / collection.go
index 5519ad8670ec93611740268d34f845e5c104fffe..9b7eb7543a4ebed086aba2d409f44fcc789ef222 100644 (file)
@@ -10,7 +10,6 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/logger"
        "git.curoverse.com/arvados.git/sdk/go/manifest"
        "git.curoverse.com/arvados.git/sdk/go/util"
-       "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
        "log"
        "os"
        "runtime/pprof"
@@ -18,86 +17,92 @@ import (
 )
 
 var (
-       heapProfileFilename string
-       // globals for debugging
-       totalManifestSize uint64
-       maxManifestSize   uint64
+       HeapProfileFilename string
 )
 
+// Collection representation
 type Collection struct {
-       Uuid              string
-       OwnerUuid         string
+       UUID              string
+       OwnerUUID         string
        ReplicationLevel  int
        BlockDigestToSize map[blockdigest.BlockDigest]int
        TotalSize         int
 }
 
+// ReadCollections holds information about collections from API server
 type ReadCollections struct {
        ReadAllCollections        bool
-       UuidToCollection          map[string]Collection
+       UUIDToCollection          map[string]Collection
        OwnerToCollectionSize     map[string]int
        BlockToDesiredReplication map[blockdigest.DigestWithSize]int
-       CollectionUuidToIndex     map[string]int
-       CollectionIndexToUuid     []string
+       CollectionUUIDToIndex     map[string]int
+       CollectionIndexToUUID     []string
        BlockToCollectionIndices  map[blockdigest.DigestWithSize][]int
 }
 
+// GetCollectionsParams params
 type GetCollectionsParams struct {
        Client    arvadosclient.ArvadosClient
        Logger    *logger.Logger
        BatchSize int
 }
 
+// SdkCollectionInfo holds collection info from api
 type SdkCollectionInfo struct {
-       Uuid         string    `json:"uuid"`
-       OwnerUuid    string    `json:"owner_uuid"`
+       UUID         string    `json:"uuid"`
+       OwnerUUID    string    `json:"owner_uuid"`
        Redundancy   int       `json:"redundancy"`
        ModifiedAt   time.Time `json:"modified_at"`
        ManifestText string    `json:"manifest_text"`
 }
 
+// SdkCollectionList lists collections from api
 type SdkCollectionList struct {
        ItemsAvailable int                 `json:"items_available"`
        Items          []SdkCollectionInfo `json:"items"`
 }
 
 func init() {
-       flag.StringVar(&heapProfileFilename,
+       flag.StringVar(&HeapProfileFilename,
                "heap-profile",
                "",
                "File to write the heap profiles to. Leave blank to skip profiling.")
 }
 
-// Write the heap profile to a file for later review.
+// WriteHeapProfile writes the heap profile to a file for later review.
 // Since a file is expected to only contain a single heap profile this
 // function overwrites the previously written profile, so it is safe
 // to call multiple times in a single run.
 // Otherwise we would see cumulative numbers as explained here:
 // https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
-func WriteHeapProfile() {
-       if heapProfileFilename != "" {
-
-               heap_profile, err := os.Create(heapProfileFilename)
+func WriteHeapProfile() error {
+       if HeapProfileFilename != "" {
+               heapProfile, err := os.Create(HeapProfileFilename)
                if err != nil {
-                       log.Fatal(err)
+                       return err
                }
 
-               defer heap_profile.Close()
+               defer heapProfile.Close()
 
-               err = pprof.WriteHeapProfile(heap_profile)
-               if err != nil {
-                       log.Fatal(err)
-               }
+               err = pprof.WriteHeapProfile(heapProfile)
+               return err
        }
+
+       return nil
 }
 
-func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
-       results = GetCollections(params)
+// GetCollectionsAndSummarize gets collections from api and summarizes
+func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections, err error) {
+       results, err = GetCollections(params)
+       if err != nil {
+               return
+       }
+
        results.Summarize(params.Logger)
 
        log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
        log.Printf("Read and processed %d collections",
-               len(results.UuidToCollection))
+               len(results.UUIDToCollection))
 
        // TODO(misha): Add a "readonly" flag. If we're in readonly mode,
        // lots of behaviors can become warnings (and obviously we can't
@@ -109,10 +114,12 @@ func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollec
        return
 }
 
-func GetCollections(params GetCollectionsParams) (results ReadCollections) {
+// GetCollections gets collections from api
+func GetCollections(params GetCollectionsParams) (results ReadCollections, err error) {
        if &params.Client == nil {
-               log.Fatalf("params.Client passed to GetCollections() should " +
+               err = fmt.Errorf("params.Client passed to GetCollections() should " +
                        "contain a valid ArvadosClient, but instead it is nil.")
+               return
        }
 
        fieldsWanted := []string{"manifest_text",
@@ -123,8 +130,9 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
 
        sdkParams := arvadosclient.Dict{
                "select":  fieldsWanted,
-               "order":   []string{"modified_at ASC"},
-               "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
+               "order":   []string{"modified_at ASC", "uuid ASC"},
+               "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}},
+               "offset": 0}
 
        if params.BatchSize > 0 {
                sdkParams["limit"] = params.BatchSize
@@ -132,32 +140,31 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
 
        var defaultReplicationLevel int
        {
-               value, err := params.Client.Discovery("defaultCollectionReplication")
+               var value interface{}
+               value, err = params.Client.Discovery("defaultCollectionReplication")
                if err != nil {
-                       loggerutil.FatalWithMessage(params.Logger,
-                               fmt.Sprintf("Error querying default collection replication: %v", err))
+                       return
                }
 
                defaultReplicationLevel = int(value.(float64))
                if defaultReplicationLevel <= 0 {
-                       loggerutil.FatalWithMessage(params.Logger,
-                               fmt.Sprintf("Default collection replication returned by arvados SDK "+
-                                       "should be a positive integer but instead it was %d.",
-                                       defaultReplicationLevel))
+                       err = fmt.Errorf("Default collection replication returned by arvados SDK "+
+                               "should be a positive integer but instead it was %d.",
+                               defaultReplicationLevel)
+                       return
                }
        }
 
        initialNumberOfCollectionsAvailable, err :=
                util.NumberItemsAvailable(params.Client, "collections")
        if err != nil {
-               loggerutil.FatalWithMessage(params.Logger,
-                       fmt.Sprintf("Error querying collection count: %v", err))
+               return
        }
        // Include a 1% margin for collections added while we're reading so
        // that we don't have to grow the map in most cases.
        maxExpectedCollections := int(
                float64(initialNumberOfCollectionsAvailable) * 1.01)
-       results.UuidToCollection = make(map[string]Collection, maxExpectedCollections)
+       results.UUIDToCollection = make(map[string]Collection, maxExpectedCollections)
 
        if params.Logger != nil {
                params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
@@ -170,37 +177,61 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
 
        // These values are just for getting the loop to run the first time,
        // afterwards they'll be set to real values.
-       previousTotalCollections := -1
-       totalCollections := 0
-       for totalCollections > previousTotalCollections {
+       remainingCollections := 1
+       var totalCollections int
+       var previousTotalCollections int
+       for remainingCollections > 0 {
                // We're still finding new collections
 
                // Write the heap profile for examining memory usage
-               WriteHeapProfile()
+               err = WriteHeapProfile()
+               if err != nil {
+                       return
+               }
 
                // Get next batch of collections.
                var collections SdkCollectionList
-               err := params.Client.List("collections", sdkParams, &collections)
+               err = params.Client.List("collections", sdkParams, &collections)
                if err != nil {
-                       loggerutil.FatalWithMessage(params.Logger,
-                               fmt.Sprintf("Error querying collections: %v", err))
+                       return
                }
+               batchCollections := len(collections.Items)
+
+               // We must always have at least one collection in the batch
+               if batchCollections < 1 {
+                       err = fmt.Errorf("API query returned no collections for %+v", sdkParams)
+                       return
+               }
+
+               // Update count of remaining collections
+               remainingCollections = collections.ItemsAvailable - sdkParams["offset"].(int) - batchCollections
 
                // Process collection and update our date filter.
-               sdkParams["filters"].([][]string)[0][2] =
-                       ProcessCollections(params.Logger,
-                               collections.Items,
-                               defaultReplicationLevel,
-                               results.UuidToCollection).Format(time.RFC3339)
+               latestModificationDate, maxManifestSize, totalManifestSize, err := ProcessCollections(params.Logger,
+                       collections.Items,
+                       defaultReplicationLevel,
+                       results.UUIDToCollection)
+               if err != nil {
+                       return results, err
+               }
+               if sdkParams["filters"].([][]string)[0][2] != latestModificationDate.Format(time.RFC3339) {
+                       sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339)
+                       sdkParams["offset"] = 0
+               } else {
+                       sdkParams["offset"] = sdkParams["offset"].(int) + batchCollections
+               }
 
                // update counts
                previousTotalCollections = totalCollections
-               totalCollections = len(results.UuidToCollection)
+               totalCollections = len(results.UUIDToCollection)
 
-               log.Printf("%d collections read, %d new in last batch, "+
+               log.Printf("%d collections read, %d (%d new) in last batch, "+
+                       "%d remaining, "+
                        "%s latest modified date, %.0f %d %d avg,max,total manifest size",
                        totalCollections,
+                       batchCollections,
                        totalCollections-previousTotalCollections,
+                       remainingCollections,
                        sdkParams["filters"].([][]string)[0][2],
                        float32(totalManifestSize)/float32(totalCollections),
                        maxManifestSize, totalManifestSize)
@@ -216,8 +247,32 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
                }
        }
 
+       // Make one final API request to verify that we have processed all collections available up to the latest modification date
+       var collections SdkCollectionList
+       sdkParams["filters"].([][]string)[0][1] = "<="
+       sdkParams["limit"] = 0
+       err = params.Client.List("collections", sdkParams, &collections)
+       if err != nil {
+               return
+       }
+       finalNumberOfCollectionsAvailable, err :=
+               util.NumberItemsAvailable(params.Client, "collections")
+       if err != nil {
+               return
+       }
+       if totalCollections < finalNumberOfCollectionsAvailable {
+               err = fmt.Errorf("API server indicates a total of %d collections "+
+                               "available up to %v, but we only retrieved %d. "+
+                               "Refusing to continue as this could indicate an "+
+                               "otherwise undetected failure.",
+                               finalNumberOfCollectionsAvailable, 
+                               sdkParams["filters"].([][]string)[0][2],
+                               totalCollections)
+               return
+       }
+
        // Write the heap profile for examining memory usage
-       WriteHeapProfile()
+       err = WriteHeapProfile()
 
        return
 }
@@ -229,24 +284,30 @@ func StrCopy(s string) string {
        return string([]byte(s))
 }
 
+// ProcessCollections read from api server
 func ProcessCollections(arvLogger *logger.Logger,
        receivedCollections []SdkCollectionInfo,
        defaultReplicationLevel int,
-       uuidToCollection map[string]Collection) (latestModificationDate time.Time) {
+       UUIDToCollection map[string]Collection,
+) (
+       latestModificationDate time.Time,
+       maxManifestSize, totalManifestSize uint64,
+       err error,
+) {
        for _, sdkCollection := range receivedCollections {
-               collection := Collection{Uuid: StrCopy(sdkCollection.Uuid),
-                       OwnerUuid:         StrCopy(sdkCollection.OwnerUuid),
+               collection := Collection{UUID: StrCopy(sdkCollection.UUID),
+                       OwnerUUID:         StrCopy(sdkCollection.OwnerUUID),
                        ReplicationLevel:  sdkCollection.Redundancy,
                        BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
 
                if sdkCollection.ModifiedAt.IsZero() {
-                       loggerutil.FatalWithMessage(arvLogger,
-                               fmt.Sprintf(
-                                       "Arvados SDK collection returned with unexpected zero "+
-                                               "modification date. This probably means that either we failed to "+
-                                               "parse the modification date or the API server has changed how "+
-                                               "it returns modification dates: %+v",
-                                       collection))
+                       err = fmt.Errorf(
+                               "Arvados SDK collection returned with unexpected zero "+
+                                       "modification date. This probably means that either we failed to "+
+                                       "parse the modification date or the API server has changed how "+
+                                       "it returns modification dates: %+v",
+                               collection)
+                       return
                }
 
                if sdkCollection.ModifiedAt.After(latestModificationDate) {
@@ -257,10 +318,10 @@ func ProcessCollections(arvLogger *logger.Logger,
                        collection.ReplicationLevel = defaultReplicationLevel
                }
 
-               manifest := manifest.Manifest{sdkCollection.ManifestText}
+               manifest := manifest.Manifest{Text: sdkCollection.ManifestText}
                manifestSize := uint64(len(sdkCollection.ManifestText))
 
-               if _, alreadySeen := uuidToCollection[collection.Uuid]; !alreadySeen {
+               if _, alreadySeen := UUIDToCollection[collection.UUID]; !alreadySeen {
                        totalManifestSize += manifestSize
                }
                if manifestSize > maxManifestSize {
@@ -269,22 +330,26 @@ func ProcessCollections(arvLogger *logger.Logger,
 
                blockChannel := manifest.BlockIterWithDuplicates()
                for block := range blockChannel {
-                       if stored_size, stored := collection.BlockDigestToSize[block.Digest]; stored && stored_size != block.Size {
-                               message := fmt.Sprintf(
+                       if storedSize, stored := collection.BlockDigestToSize[block.Digest]; stored && storedSize != block.Size {
+                               log.Printf(
                                        "Collection %s contains multiple sizes (%d and %d) for block %s",
-                                       collection.Uuid,
-                                       stored_size,
+                                       collection.UUID,
+                                       storedSize,
                                        block.Size,
                                        block.Digest)
-                               loggerutil.FatalWithMessage(arvLogger, message)
                        }
                        collection.BlockDigestToSize[block.Digest] = block.Size
                }
+               if manifest.Err != nil {
+                       err = manifest.Err
+                       return
+               }
+
                collection.TotalSize = 0
                for _, size := range collection.BlockDigestToSize {
                        collection.TotalSize += size
                }
-               uuidToCollection[collection.Uuid] = collection
+               UUIDToCollection[collection.UUID] = collection
 
                // Clear out all the manifest strings that we don't need anymore.
                // These hopefully form the bulk of our memory usage.
@@ -295,22 +360,23 @@ func ProcessCollections(arvLogger *logger.Logger,
        return
 }
 
+// Summarize the collections read
 func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) {
        readCollections.OwnerToCollectionSize = make(map[string]int)
        readCollections.BlockToDesiredReplication = make(map[blockdigest.DigestWithSize]int)
-       numCollections := len(readCollections.UuidToCollection)
-       readCollections.CollectionUuidToIndex = make(map[string]int, numCollections)
-       readCollections.CollectionIndexToUuid = make([]string, 0, numCollections)
+       numCollections := len(readCollections.UUIDToCollection)
+       readCollections.CollectionUUIDToIndex = make(map[string]int, numCollections)
+       readCollections.CollectionIndexToUUID = make([]string, 0, numCollections)
        readCollections.BlockToCollectionIndices = make(map[blockdigest.DigestWithSize][]int)
 
-       for _, coll := range readCollections.UuidToCollection {
-               collectionIndex := len(readCollections.CollectionIndexToUuid)
-               readCollections.CollectionIndexToUuid =
-                       append(readCollections.CollectionIndexToUuid, coll.Uuid)
-               readCollections.CollectionUuidToIndex[coll.Uuid] = collectionIndex
+       for _, coll := range readCollections.UUIDToCollection {
+               collectionIndex := len(readCollections.CollectionIndexToUUID)
+               readCollections.CollectionIndexToUUID =
+                       append(readCollections.CollectionIndexToUUID, coll.UUID)
+               readCollections.CollectionUUIDToIndex[coll.UUID] = collectionIndex
 
-               readCollections.OwnerToCollectionSize[coll.OwnerUuid] =
-                       readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize
+               readCollections.OwnerToCollectionSize[coll.OwnerUUID] =
+                       readCollections.OwnerToCollectionSize[coll.OwnerUUID] + coll.TotalSize
 
                for block, size := range coll.BlockDigestToSize {
                        locator := blockdigest.DigestWithSize{Digest: block, Size: uint32(size)}