X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a50278e3d0e26bb5d513d0af5da2fb559b112388..2cf5dde4a6d56d0fb87a1dc79743753632d640a0:/services/datamanager/collection/collection.go diff --git a/services/datamanager/collection/collection.go b/services/datamanager/collection/collection.go index ca03627405..c392ded35d 100644 --- a/services/datamanager/collection/collection.go +++ b/services/datamanager/collection/collection.go @@ -10,7 +10,6 @@ import ( "git.curoverse.com/arvados.git/sdk/go/logger" "git.curoverse.com/arvados.git/sdk/go/manifest" "git.curoverse.com/arvados.git/sdk/go/util" - "git.curoverse.com/arvados.git/services/datamanager/loggerutil" "log" "os" "runtime/pprof" @@ -18,10 +17,7 @@ import ( ) var ( - heapProfileFilename string - // globals for debugging - totalManifestSize uint64 - maxManifestSize uint64 + HeapProfileFilename string ) // Collection representation @@ -67,7 +63,7 @@ type SdkCollectionList struct { } func init() { - flag.StringVar(&heapProfileFilename, + flag.StringVar(&HeapProfileFilename, "heap-profile", "", "File to write the heap profiles to. Leave blank to skip profiling.") @@ -79,26 +75,29 @@ func init() { // to call multiple times in a single run. // Otherwise we would see cumulative numbers as explained here: // https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ -func WriteHeapProfile() { - if heapProfileFilename != "" { - - heapProfile, err := os.Create(heapProfileFilename) +func WriteHeapProfile() error { + if HeapProfileFilename != "" { + heapProfile, err := os.Create(HeapProfileFilename) if err != nil { - log.Fatal(err) + return err } defer heapProfile.Close() err = pprof.WriteHeapProfile(heapProfile) - if err != nil { - log.Fatal(err) - } + return err } + + return nil } // GetCollectionsAndSummarize gets collections from api and summarizes -func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) { - results = GetCollections(params) +func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections, err error) { + results, err = GetCollections(params) + if err != nil { + return + } + results.Summarize(params.Logger) log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize) @@ -116,10 +115,11 @@ func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollec } // GetCollections gets collections from api -func GetCollections(params GetCollectionsParams) (results ReadCollections) { +func GetCollections(params GetCollectionsParams) (results ReadCollections, err error) { if ¶ms.Client == nil { - log.Fatalf("params.Client passed to GetCollections() should " + + err = fmt.Errorf("params.Client passed to GetCollections() should " + "contain a valid ArvadosClient, but instead it is nil.") + return } fieldsWanted := []string{"manifest_text", @@ -130,8 +130,9 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { sdkParams := arvadosclient.Dict{ "select": fieldsWanted, - "order": []string{"modified_at ASC"}, - "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}} + "order": []string{"modified_at ASC", "uuid ASC"}, + "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}, + "offset": 0} if params.BatchSize > 0 { sdkParams["limit"] = params.BatchSize @@ -139,26 +140,25 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { var defaultReplicationLevel int { - value, err := params.Client.Discovery("defaultCollectionReplication") + var value interface{} + value, err = params.Client.Discovery("defaultCollectionReplication") if err != nil { - loggerutil.FatalWithMessage(params.Logger, - fmt.Sprintf("Error querying default collection replication: %v", err)) + return } defaultReplicationLevel = int(value.(float64)) if defaultReplicationLevel <= 0 { - loggerutil.FatalWithMessage(params.Logger, - fmt.Sprintf("Default collection replication returned by arvados SDK "+ - "should be a positive integer but instead it was %d.", - defaultReplicationLevel)) + err = fmt.Errorf("Default collection replication returned by arvados SDK "+ + "should be a positive integer but instead it was %d.", + defaultReplicationLevel) + return } } initialNumberOfCollectionsAvailable, err := util.NumberItemsAvailable(params.Client, "collections") if err != nil { - loggerutil.FatalWithMessage(params.Logger, - fmt.Sprintf("Error querying collection count: %v", err)) + return } // Include a 1% margin for collections added while we're reading so // that we don't have to grow the map in most cases. @@ -177,37 +177,61 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { // These values are just for getting the loop to run the first time, // afterwards they'll be set to real values. - previousTotalCollections := -1 - totalCollections := 0 - for totalCollections > previousTotalCollections { + remainingCollections := 1 + var totalCollections int + var previousTotalCollections int + for remainingCollections > 0 { // We're still finding new collections // Write the heap profile for examining memory usage - WriteHeapProfile() + err = WriteHeapProfile() + if err != nil { + return + } // Get next batch of collections. var collections SdkCollectionList - err := params.Client.List("collections", sdkParams, &collections) + err = params.Client.List("collections", sdkParams, &collections) if err != nil { - loggerutil.FatalWithMessage(params.Logger, - fmt.Sprintf("Error querying collections: %v", err)) + return + } + batchCollections := len(collections.Items) + + // We must always have at least one collection in the batch + if batchCollections < 1 { + err = fmt.Errorf("API query returned no collections for %+v", sdkParams) + return } + // Update count of remaining collections + remainingCollections = collections.ItemsAvailable - sdkParams["offset"].(int) - batchCollections + // Process collection and update our date filter. - sdkParams["filters"].([][]string)[0][2] = - ProcessCollections(params.Logger, - collections.Items, - defaultReplicationLevel, - results.UUIDToCollection).Format(time.RFC3339) + latestModificationDate, maxManifestSize, totalManifestSize, err := ProcessCollections(params.Logger, + collections.Items, + defaultReplicationLevel, + results.UUIDToCollection) + if err != nil { + return results, err + } + if sdkParams["filters"].([][]string)[0][2] != latestModificationDate.Format(time.RFC3339) { + sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339) + sdkParams["offset"] = 0 + } else { + sdkParams["offset"] = sdkParams["offset"].(int) + batchCollections + } // update counts previousTotalCollections = totalCollections totalCollections = len(results.UUIDToCollection) - log.Printf("%d collections read, %d new in last batch, "+ + log.Printf("%d collections read, %d (%d new) in last batch, "+ + "%d remaining, "+ "%s latest modified date, %.0f %d %d avg,max,total manifest size", totalCollections, + batchCollections, totalCollections-previousTotalCollections, + remainingCollections, sdkParams["filters"].([][]string)[0][2], float32(totalManifestSize)/float32(totalCollections), maxManifestSize, totalManifestSize) @@ -223,8 +247,32 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { } } + // Make one final API request to verify that we have processed all collections available up to the latest modification date + var collections SdkCollectionList + sdkParams["filters"].([][]string)[0][1] = "<=" + sdkParams["limit"] = 0 + err = params.Client.List("collections", sdkParams, &collections) + if err != nil { + return + } + finalNumberOfCollectionsAvailable, err := + util.NumberItemsAvailable(params.Client, "collections") + if err != nil { + return + } + if totalCollections < finalNumberOfCollectionsAvailable { + err = fmt.Errorf("API server indicates a total of %d collections "+ + "available up to %v, but we only retrieved %d. "+ + "Refusing to continue as this could indicate an "+ + "otherwise undetected failure.", + finalNumberOfCollectionsAvailable, + sdkParams["filters"].([][]string)[0][2], + totalCollections) + return + } + // Write the heap profile for examining memory usage - WriteHeapProfile() + err = WriteHeapProfile() return } @@ -240,7 +288,12 @@ func StrCopy(s string) string { func ProcessCollections(arvLogger *logger.Logger, receivedCollections []SdkCollectionInfo, defaultReplicationLevel int, - UUIDToCollection map[string]Collection) (latestModificationDate time.Time) { + UUIDToCollection map[string]Collection, +) ( + latestModificationDate time.Time, + maxManifestSize, totalManifestSize uint64, + err error, +) { for _, sdkCollection := range receivedCollections { collection := Collection{UUID: StrCopy(sdkCollection.UUID), OwnerUUID: StrCopy(sdkCollection.OwnerUUID), @@ -248,13 +301,13 @@ func ProcessCollections(arvLogger *logger.Logger, BlockDigestToSize: make(map[blockdigest.BlockDigest]int)} if sdkCollection.ModifiedAt.IsZero() { - loggerutil.FatalWithMessage(arvLogger, - fmt.Sprintf( - "Arvados SDK collection returned with unexpected zero "+ - "modification date. This probably means that either we failed to "+ - "parse the modification date or the API server has changed how "+ - "it returns modification dates: %+v", - collection)) + err = fmt.Errorf( + "Arvados SDK collection returned with unexpected zero "+ + "modification date. This probably means that either we failed to "+ + "parse the modification date or the API server has changed how "+ + "it returns modification dates: %+v", + collection) + return } if sdkCollection.ModifiedAt.After(latestModificationDate) { @@ -265,7 +318,7 @@ func ProcessCollections(arvLogger *logger.Logger, collection.ReplicationLevel = defaultReplicationLevel } - manifest := manifest.Manifest{sdkCollection.ManifestText} + manifest := manifest.Manifest{Text: sdkCollection.ManifestText} manifestSize := uint64(len(sdkCollection.ManifestText)) if _, alreadySeen := UUIDToCollection[collection.UUID]; !alreadySeen { @@ -278,16 +331,20 @@ func ProcessCollections(arvLogger *logger.Logger, blockChannel := manifest.BlockIterWithDuplicates() for block := range blockChannel { if storedSize, stored := collection.BlockDigestToSize[block.Digest]; stored && storedSize != block.Size { - message := fmt.Sprintf( + log.Printf( "Collection %s contains multiple sizes (%d and %d) for block %s", collection.UUID, storedSize, block.Size, block.Digest) - loggerutil.FatalWithMessage(arvLogger, message) } collection.BlockDigestToSize[block.Digest] = block.Size } + if manifest.Err != nil { + err = manifest.Err + return + } + collection.TotalSize = 0 for _, size := range collection.BlockDigestToSize { collection.TotalSize += size