Warn, but do not error out, if a collection has conflicting block sizes.
[arvados.git] / services / datamanager / collection / collection.go
index ca03627405e0742ad419b1d1dd6daf64fba7a341..33970d8bc437222c5d8d1cede7919eb8f53e000e 100644 (file)
@@ -10,7 +10,6 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/logger"
        "git.curoverse.com/arvados.git/sdk/go/manifest"
        "git.curoverse.com/arvados.git/sdk/go/util"
-       "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
        "log"
        "os"
        "runtime/pprof"
@@ -18,7 +17,7 @@ import (
 )
 
 var (
-       heapProfileFilename string
+       HeapProfileFilename string
        // globals for debugging
        totalManifestSize uint64
        maxManifestSize   uint64
@@ -42,6 +41,7 @@ type ReadCollections struct {
        CollectionUUIDToIndex     map[string]int
        CollectionIndexToUUID     []string
        BlockToCollectionIndices  map[blockdigest.DigestWithSize][]int
+       Err                       error
 }
 
 // GetCollectionsParams params
@@ -67,7 +67,7 @@ type SdkCollectionList struct {
 }
 
 func init() {
-       flag.StringVar(&heapProfileFilename,
+       flag.StringVar(&HeapProfileFilename,
                "heap-profile",
                "",
                "File to write the heap profiles to. Leave blank to skip profiling.")
@@ -79,26 +79,30 @@ func init() {
 // to call multiple times in a single run.
 // Otherwise we would see cumulative numbers as explained here:
 // https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
-func WriteHeapProfile() {
-       if heapProfileFilename != "" {
-
-               heapProfile, err := os.Create(heapProfileFilename)
+func WriteHeapProfile() error {
+       if HeapProfileFilename != "" {
+               heapProfile, err := os.Create(HeapProfileFilename)
                if err != nil {
-                       log.Fatal(err)
+                       return err
                }
 
                defer heapProfile.Close()
 
                err = pprof.WriteHeapProfile(heapProfile)
-               if err != nil {
-                       log.Fatal(err)
-               }
+               return err
        }
+
+       return nil
 }
 
 // GetCollectionsAndSummarize gets collections from api and summarizes
 func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
-       results = GetCollections(params)
+       results, err := GetCollections(params)
+       if err != nil {
+               results.Err = err
+               return
+       }
+
        results.Summarize(params.Logger)
 
        log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
@@ -116,10 +120,11 @@ func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollec
 }
 
 // GetCollections gets collections from api
-func GetCollections(params GetCollectionsParams) (results ReadCollections) {
+func GetCollections(params GetCollectionsParams) (results ReadCollections, err error) {
        if &params.Client == nil {
-               log.Fatalf("params.Client passed to GetCollections() should " +
+               err = fmt.Errorf("params.Client passed to GetCollections() should " +
                        "contain a valid ArvadosClient, but instead it is nil.")
+               return
        }
 
        fieldsWanted := []string{"manifest_text",
@@ -139,26 +144,25 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
 
        var defaultReplicationLevel int
        {
-               value, err := params.Client.Discovery("defaultCollectionReplication")
+               var value interface{}
+               value, err = params.Client.Discovery("defaultCollectionReplication")
                if err != nil {
-                       loggerutil.FatalWithMessage(params.Logger,
-                               fmt.Sprintf("Error querying default collection replication: %v", err))
+                       return
                }
 
                defaultReplicationLevel = int(value.(float64))
                if defaultReplicationLevel <= 0 {
-                       loggerutil.FatalWithMessage(params.Logger,
-                               fmt.Sprintf("Default collection replication returned by arvados SDK "+
-                                       "should be a positive integer but instead it was %d.",
-                                       defaultReplicationLevel))
+                       err = fmt.Errorf("Default collection replication returned by arvados SDK "+
+                               "should be a positive integer but instead it was %d.",
+                               defaultReplicationLevel)
+                       return
                }
        }
 
        initialNumberOfCollectionsAvailable, err :=
                util.NumberItemsAvailable(params.Client, "collections")
        if err != nil {
-               loggerutil.FatalWithMessage(params.Logger,
-                       fmt.Sprintf("Error querying collection count: %v", err))
+               return
        }
        // Include a 1% margin for collections added while we're reading so
        // that we don't have to grow the map in most cases.
@@ -183,22 +187,28 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
                // We're still finding new collections
 
                // Write the heap profile for examining memory usage
-               WriteHeapProfile()
+               err = WriteHeapProfile()
+               if err != nil {
+                       return
+               }
 
                // Get next batch of collections.
                var collections SdkCollectionList
-               err := params.Client.List("collections", sdkParams, &collections)
+               err = params.Client.List("collections", sdkParams, &collections)
                if err != nil {
-                       loggerutil.FatalWithMessage(params.Logger,
-                               fmt.Sprintf("Error querying collections: %v", err))
+                       return
                }
 
                // Process collection and update our date filter.
-               sdkParams["filters"].([][]string)[0][2] =
-                       ProcessCollections(params.Logger,
-                               collections.Items,
-                               defaultReplicationLevel,
-                               results.UUIDToCollection).Format(time.RFC3339)
+               var latestModificationDate time.Time
+               latestModificationDate, err = ProcessCollections(params.Logger,
+                       collections.Items,
+                       defaultReplicationLevel,
+                       results.UUIDToCollection)
+               if err != nil {
+                       return
+               }
+               sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339)
 
                // update counts
                previousTotalCollections = totalCollections
@@ -224,7 +234,7 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
        }
 
        // Write the heap profile for examining memory usage
-       WriteHeapProfile()
+       err = WriteHeapProfile()
 
        return
 }
@@ -240,7 +250,7 @@ func StrCopy(s string) string {
 func ProcessCollections(arvLogger *logger.Logger,
        receivedCollections []SdkCollectionInfo,
        defaultReplicationLevel int,
-       UUIDToCollection map[string]Collection) (latestModificationDate time.Time) {
+       UUIDToCollection map[string]Collection) (latestModificationDate time.Time, err error) {
        for _, sdkCollection := range receivedCollections {
                collection := Collection{UUID: StrCopy(sdkCollection.UUID),
                        OwnerUUID:         StrCopy(sdkCollection.OwnerUUID),
@@ -248,13 +258,12 @@ func ProcessCollections(arvLogger *logger.Logger,
                        BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
 
                if sdkCollection.ModifiedAt.IsZero() {
-                       loggerutil.FatalWithMessage(arvLogger,
-                               fmt.Sprintf(
-                                       "Arvados SDK collection returned with unexpected zero "+
-                                               "modification date. This probably means that either we failed to "+
-                                               "parse the modification date or the API server has changed how "+
-                                               "it returns modification dates: %+v",
-                                       collection))
+                       return latestModificationDate, fmt.Errorf(
+                               "Arvados SDK collection returned with unexpected zero "+
+                                       "modification date. This probably means that either we failed to "+
+                                       "parse the modification date or the API server has changed how "+
+                                       "it returns modification dates: %+v",
+                               collection)
                }
 
                if sdkCollection.ModifiedAt.After(latestModificationDate) {
@@ -265,7 +274,7 @@ func ProcessCollections(arvLogger *logger.Logger,
                        collection.ReplicationLevel = defaultReplicationLevel
                }
 
-               manifest := manifest.Manifest{sdkCollection.ManifestText}
+               manifest := manifest.Manifest{Text: sdkCollection.ManifestText}
                manifestSize := uint64(len(sdkCollection.ManifestText))
 
                if _, alreadySeen := UUIDToCollection[collection.UUID]; !alreadySeen {
@@ -278,16 +287,20 @@ func ProcessCollections(arvLogger *logger.Logger,
                blockChannel := manifest.BlockIterWithDuplicates()
                for block := range blockChannel {
                        if storedSize, stored := collection.BlockDigestToSize[block.Digest]; stored && storedSize != block.Size {
-                               message := fmt.Sprintf(
+                               log.Printf(
                                        "Collection %s contains multiple sizes (%d and %d) for block %s",
                                        collection.UUID,
                                        storedSize,
                                        block.Size,
                                        block.Digest)
-                               loggerutil.FatalWithMessage(arvLogger, message)
                        }
                        collection.BlockDigestToSize[block.Digest] = block.Size
                }
+               if manifest.Err != nil {
+                       err = manifest.Err
+                       return
+               }
+
                collection.TotalSize = 0
                for _, size := range collection.BlockDigestToSize {
                        collection.TotalSize += size