X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/0e57453d2b637a3d105d4e3d67031f3915f9d302..779e41aaaa1f4d3a696a63011c00648bfbfaae72:/services/datamanager/collection/collection.go diff --git a/services/datamanager/collection/collection.go b/services/datamanager/collection/collection.go index 5519ad8670..df6852687a 100644 --- a/services/datamanager/collection/collection.go +++ b/services/datamanager/collection/collection.go @@ -10,7 +10,6 @@ import ( "git.curoverse.com/arvados.git/sdk/go/logger" "git.curoverse.com/arvados.git/sdk/go/manifest" "git.curoverse.com/arvados.git/sdk/go/util" - "git.curoverse.com/arvados.git/services/datamanager/loggerutil" "log" "os" "runtime/pprof" @@ -18,86 +17,97 @@ import ( ) var ( - heapProfileFilename string + HeapProfileFilename string // globals for debugging totalManifestSize uint64 maxManifestSize uint64 ) +// Collection representation type Collection struct { - Uuid string - OwnerUuid string + UUID string + OwnerUUID string ReplicationLevel int BlockDigestToSize map[blockdigest.BlockDigest]int TotalSize int } +// ReadCollections holds information about collections from API server type ReadCollections struct { ReadAllCollections bool - UuidToCollection map[string]Collection + UUIDToCollection map[string]Collection OwnerToCollectionSize map[string]int BlockToDesiredReplication map[blockdigest.DigestWithSize]int - CollectionUuidToIndex map[string]int - CollectionIndexToUuid []string + CollectionUUIDToIndex map[string]int + CollectionIndexToUUID []string BlockToCollectionIndices map[blockdigest.DigestWithSize][]int + Err error } +// GetCollectionsParams params type GetCollectionsParams struct { Client arvadosclient.ArvadosClient Logger *logger.Logger BatchSize int } +// SdkCollectionInfo holds collection info from api type SdkCollectionInfo struct { - Uuid string `json:"uuid"` - OwnerUuid string `json:"owner_uuid"` + UUID string `json:"uuid"` + OwnerUUID string `json:"owner_uuid"` Redundancy int `json:"redundancy"` ModifiedAt time.Time `json:"modified_at"` ManifestText string `json:"manifest_text"` } +// SdkCollectionList lists collections from api type SdkCollectionList struct { ItemsAvailable int `json:"items_available"` Items []SdkCollectionInfo `json:"items"` } func init() { - flag.StringVar(&heapProfileFilename, + flag.StringVar(&HeapProfileFilename, "heap-profile", "", "File to write the heap profiles to. Leave blank to skip profiling.") } -// Write the heap profile to a file for later review. +// WriteHeapProfile writes the heap profile to a file for later review. // Since a file is expected to only contain a single heap profile this // function overwrites the previously written profile, so it is safe // to call multiple times in a single run. // Otherwise we would see cumulative numbers as explained here: // https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ -func WriteHeapProfile() { - if heapProfileFilename != "" { - - heap_profile, err := os.Create(heapProfileFilename) +func WriteHeapProfile() error { + if HeapProfileFilename != "" { + heapProfile, err := os.Create(HeapProfileFilename) if err != nil { - log.Fatal(err) + return err } - defer heap_profile.Close() + defer heapProfile.Close() - err = pprof.WriteHeapProfile(heap_profile) - if err != nil { - log.Fatal(err) - } + err = pprof.WriteHeapProfile(heapProfile) + return err } + + return nil } +// GetCollectionsAndSummarize gets collections from api and summarizes func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) { - results = GetCollections(params) + results, err := GetCollections(params) + if err != nil { + results.Err = err + return + } + results.Summarize(params.Logger) log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize) log.Printf("Read and processed %d collections", - len(results.UuidToCollection)) + len(results.UUIDToCollection)) // TODO(misha): Add a "readonly" flag. If we're in readonly mode, // lots of behaviors can become warnings (and obviously we can't @@ -109,10 +119,12 @@ func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollec return } -func GetCollections(params GetCollectionsParams) (results ReadCollections) { +// GetCollections gets collections from api +func GetCollections(params GetCollectionsParams) (results ReadCollections, err error) { if ¶ms.Client == nil { - log.Fatalf("params.Client passed to GetCollections() should " + + err = fmt.Errorf("params.Client passed to GetCollections() should " + "contain a valid ArvadosClient, but instead it is nil.") + return } fieldsWanted := []string{"manifest_text", @@ -132,32 +144,31 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { var defaultReplicationLevel int { - value, err := params.Client.Discovery("defaultCollectionReplication") + var value interface{} + value, err = params.Client.Discovery("defaultCollectionReplication") if err != nil { - loggerutil.FatalWithMessage(params.Logger, - fmt.Sprintf("Error querying default collection replication: %v", err)) + return } defaultReplicationLevel = int(value.(float64)) if defaultReplicationLevel <= 0 { - loggerutil.FatalWithMessage(params.Logger, - fmt.Sprintf("Default collection replication returned by arvados SDK "+ - "should be a positive integer but instead it was %d.", - defaultReplicationLevel)) + err = fmt.Errorf("Default collection replication returned by arvados SDK "+ + "should be a positive integer but instead it was %d.", + defaultReplicationLevel) + return } } initialNumberOfCollectionsAvailable, err := util.NumberItemsAvailable(params.Client, "collections") if err != nil { - loggerutil.FatalWithMessage(params.Logger, - fmt.Sprintf("Error querying collection count: %v", err)) + return } // Include a 1% margin for collections added while we're reading so // that we don't have to grow the map in most cases. maxExpectedCollections := int( float64(initialNumberOfCollectionsAvailable) * 1.01) - results.UuidToCollection = make(map[string]Collection, maxExpectedCollections) + results.UUIDToCollection = make(map[string]Collection, maxExpectedCollections) if params.Logger != nil { params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) { @@ -176,26 +187,32 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { // We're still finding new collections // Write the heap profile for examining memory usage - WriteHeapProfile() + err = WriteHeapProfile() + if err != nil { + return + } // Get next batch of collections. var collections SdkCollectionList - err := params.Client.List("collections", sdkParams, &collections) + err = params.Client.List("collections", sdkParams, &collections) if err != nil { - loggerutil.FatalWithMessage(params.Logger, - fmt.Sprintf("Error querying collections: %v", err)) + return } // Process collection and update our date filter. - sdkParams["filters"].([][]string)[0][2] = - ProcessCollections(params.Logger, - collections.Items, - defaultReplicationLevel, - results.UuidToCollection).Format(time.RFC3339) + var latestModificationDate time.Time + latestModificationDate, err = ProcessCollections(params.Logger, + collections.Items, + defaultReplicationLevel, + results.UUIDToCollection) + if err != nil { + return + } + sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339) // update counts previousTotalCollections = totalCollections - totalCollections = len(results.UuidToCollection) + totalCollections = len(results.UUIDToCollection) log.Printf("%d collections read, %d new in last batch, "+ "%s latest modified date, %.0f %d %d avg,max,total manifest size", @@ -217,7 +234,7 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { } // Write the heap profile for examining memory usage - WriteHeapProfile() + err = WriteHeapProfile() return } @@ -229,24 +246,24 @@ func StrCopy(s string) string { return string([]byte(s)) } +// ProcessCollections read from api server func ProcessCollections(arvLogger *logger.Logger, receivedCollections []SdkCollectionInfo, defaultReplicationLevel int, - uuidToCollection map[string]Collection) (latestModificationDate time.Time) { + UUIDToCollection map[string]Collection) (latestModificationDate time.Time, err error) { for _, sdkCollection := range receivedCollections { - collection := Collection{Uuid: StrCopy(sdkCollection.Uuid), - OwnerUuid: StrCopy(sdkCollection.OwnerUuid), + collection := Collection{UUID: StrCopy(sdkCollection.UUID), + OwnerUUID: StrCopy(sdkCollection.OwnerUUID), ReplicationLevel: sdkCollection.Redundancy, BlockDigestToSize: make(map[blockdigest.BlockDigest]int)} if sdkCollection.ModifiedAt.IsZero() { - loggerutil.FatalWithMessage(arvLogger, - fmt.Sprintf( - "Arvados SDK collection returned with unexpected zero "+ - "modification date. This probably means that either we failed to "+ - "parse the modification date or the API server has changed how "+ - "it returns modification dates: %+v", - collection)) + return latestModificationDate, fmt.Errorf( + "Arvados SDK collection returned with unexpected zero "+ + "modification date. This probably means that either we failed to "+ + "parse the modification date or the API server has changed how "+ + "it returns modification dates: %+v", + collection) } if sdkCollection.ModifiedAt.After(latestModificationDate) { @@ -257,10 +274,10 @@ func ProcessCollections(arvLogger *logger.Logger, collection.ReplicationLevel = defaultReplicationLevel } - manifest := manifest.Manifest{sdkCollection.ManifestText} + manifest := manifest.Manifest{Text: sdkCollection.ManifestText} manifestSize := uint64(len(sdkCollection.ManifestText)) - if _, alreadySeen := uuidToCollection[collection.Uuid]; !alreadySeen { + if _, alreadySeen := UUIDToCollection[collection.UUID]; !alreadySeen { totalManifestSize += manifestSize } if manifestSize > maxManifestSize { @@ -269,22 +286,27 @@ func ProcessCollections(arvLogger *logger.Logger, blockChannel := manifest.BlockIterWithDuplicates() for block := range blockChannel { - if stored_size, stored := collection.BlockDigestToSize[block.Digest]; stored && stored_size != block.Size { - message := fmt.Sprintf( + if storedSize, stored := collection.BlockDigestToSize[block.Digest]; stored && storedSize != block.Size { + err = fmt.Errorf( "Collection %s contains multiple sizes (%d and %d) for block %s", - collection.Uuid, - stored_size, + collection.UUID, + storedSize, block.Size, block.Digest) - loggerutil.FatalWithMessage(arvLogger, message) + return } collection.BlockDigestToSize[block.Digest] = block.Size } + if manifest.Err != nil { + err = manifest.Err + return + } + collection.TotalSize = 0 for _, size := range collection.BlockDigestToSize { collection.TotalSize += size } - uuidToCollection[collection.Uuid] = collection + UUIDToCollection[collection.UUID] = collection // Clear out all the manifest strings that we don't need anymore. // These hopefully form the bulk of our memory usage. @@ -295,22 +317,23 @@ func ProcessCollections(arvLogger *logger.Logger, return } +// Summarize the collections read func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) { readCollections.OwnerToCollectionSize = make(map[string]int) readCollections.BlockToDesiredReplication = make(map[blockdigest.DigestWithSize]int) - numCollections := len(readCollections.UuidToCollection) - readCollections.CollectionUuidToIndex = make(map[string]int, numCollections) - readCollections.CollectionIndexToUuid = make([]string, 0, numCollections) + numCollections := len(readCollections.UUIDToCollection) + readCollections.CollectionUUIDToIndex = make(map[string]int, numCollections) + readCollections.CollectionIndexToUUID = make([]string, 0, numCollections) readCollections.BlockToCollectionIndices = make(map[blockdigest.DigestWithSize][]int) - for _, coll := range readCollections.UuidToCollection { - collectionIndex := len(readCollections.CollectionIndexToUuid) - readCollections.CollectionIndexToUuid = - append(readCollections.CollectionIndexToUuid, coll.Uuid) - readCollections.CollectionUuidToIndex[coll.Uuid] = collectionIndex + for _, coll := range readCollections.UUIDToCollection { + collectionIndex := len(readCollections.CollectionIndexToUUID) + readCollections.CollectionIndexToUUID = + append(readCollections.CollectionIndexToUUID, coll.UUID) + readCollections.CollectionUUIDToIndex[coll.UUID] = collectionIndex - readCollections.OwnerToCollectionSize[coll.OwnerUuid] = - readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize + readCollections.OwnerToCollectionSize[coll.OwnerUUID] = + readCollections.OwnerToCollectionSize[coll.OwnerUUID] + coll.TotalSize for block, size := range coll.BlockDigestToSize { locator := blockdigest.DigestWithSize{Digest: block, Size: uint32(size)}