X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/2255281ed7406e7c10027ed778f53ee28e6869fa..b3b9aeee4dba20bcddd8cb4ee2cdcd3c8a34eaec:/services/datamanager/collection/collection.go diff --git a/services/datamanager/collection/collection.go b/services/datamanager/collection/collection.go index 55b0b3711a..1229f2917e 100644 --- a/services/datamanager/collection/collection.go +++ b/services/datamanager/collection/collection.go @@ -10,7 +10,6 @@ import ( "git.curoverse.com/arvados.git/sdk/go/logger" "git.curoverse.com/arvados.git/sdk/go/manifest" "git.curoverse.com/arvados.git/sdk/go/util" - "git.curoverse.com/arvados.git/services/datamanager/loggerutil" "log" "os" "runtime/pprof" @@ -18,10 +17,7 @@ import ( ) var ( - heapProfileFilename string - // globals for debugging - totalManifestSize uint64 - maxManifestSize uint64 + HeapProfileFilename string ) // Collection representation @@ -67,7 +63,7 @@ type SdkCollectionList struct { } func init() { - flag.StringVar(&heapProfileFilename, + flag.StringVar(&HeapProfileFilename, "heap-profile", "", "File to write the heap profiles to. Leave blank to skip profiling.") @@ -80,8 +76,8 @@ func init() { // Otherwise we would see cumulative numbers as explained here: // https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ func WriteHeapProfile() error { - if heapProfileFilename != "" { - heapProfile, err := os.Create(heapProfileFilename) + if HeapProfileFilename != "" { + heapProfile, err := os.Create(HeapProfileFilename) if err != nil { return err } @@ -96,10 +92,10 @@ func WriteHeapProfile() error { } // GetCollectionsAndSummarize gets collections from api and summarizes -func GetCollectionsAndSummarize(arvLogger *logger.Logger, params GetCollectionsParams) (results ReadCollections) { - results, err := GetCollections(params) +func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections, err error) { + results, err = GetCollections(params) if err != nil { - loggerutil.LogErrorMessage(arvLogger, fmt.Sprintf("Error during GetCollections with params %v: %v", params, err)) + return } results.Summarize(params.Logger) @@ -123,6 +119,7 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections, err e if ¶ms.Client == nil { err = fmt.Errorf("params.Client passed to GetCollections() should " + "contain a valid ArvadosClient, but instead it is nil.") + return } fieldsWanted := []string{"manifest_text", @@ -198,13 +195,12 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections, err e } // Process collection and update our date filter. - var latestModificationDate time.Time - latestModificationDate, err = ProcessCollections(params.Logger, + latestModificationDate, maxManifestSize, totalManifestSize, err := ProcessCollections(params.Logger, collections.Items, defaultReplicationLevel, results.UUIDToCollection) if err != nil { - return + return results, err } sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339) @@ -232,7 +228,7 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections, err e } // Write the heap profile for examining memory usage - WriteHeapProfile() + err = WriteHeapProfile() return } @@ -248,7 +244,12 @@ func StrCopy(s string) string { func ProcessCollections(arvLogger *logger.Logger, receivedCollections []SdkCollectionInfo, defaultReplicationLevel int, - UUIDToCollection map[string]Collection) (latestModificationDate time.Time, err error) { + UUIDToCollection map[string]Collection, +) ( + latestModificationDate time.Time, + maxManifestSize, totalManifestSize uint64, + err error, +) { for _, sdkCollection := range receivedCollections { collection := Collection{UUID: StrCopy(sdkCollection.UUID), OwnerUUID: StrCopy(sdkCollection.OwnerUUID), @@ -256,12 +257,13 @@ func ProcessCollections(arvLogger *logger.Logger, BlockDigestToSize: make(map[blockdigest.BlockDigest]int)} if sdkCollection.ModifiedAt.IsZero() { - return latestModificationDate, fmt.Errorf( + err = fmt.Errorf( "Arvados SDK collection returned with unexpected zero "+ "modification date. This probably means that either we failed to "+ "parse the modification date or the API server has changed how "+ "it returns modification dates: %+v", collection) + return } if sdkCollection.ModifiedAt.After(latestModificationDate) { @@ -272,7 +274,7 @@ func ProcessCollections(arvLogger *logger.Logger, collection.ReplicationLevel = defaultReplicationLevel } - manifest := manifest.Manifest{sdkCollection.ManifestText} + manifest := manifest.Manifest{Text: sdkCollection.ManifestText} manifestSize := uint64(len(sdkCollection.ManifestText)) if _, alreadySeen := UUIDToCollection[collection.UUID]; !alreadySeen { @@ -285,16 +287,20 @@ func ProcessCollections(arvLogger *logger.Logger, blockChannel := manifest.BlockIterWithDuplicates() for block := range blockChannel { if storedSize, stored := collection.BlockDigestToSize[block.Digest]; stored && storedSize != block.Size { - err = fmt.Errorf( + log.Printf( "Collection %s contains multiple sizes (%d and %d) for block %s", collection.UUID, storedSize, block.Size, block.Digest) - return } collection.BlockDigestToSize[block.Digest] = block.Size } + if manifest.Err != nil { + err = manifest.Err + return + } + collection.TotalSize = 0 for _, size := range collection.BlockDigestToSize { collection.TotalSize += size