X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b14bc80d764b85a6ddfed32198b2144b5adf2637..4879e0e2f75fd387720b4f4b58ca6ae48a798c98:/services/datamanager/collection/collection.go?ds=sidebyside diff --git a/services/datamanager/collection/collection.go b/services/datamanager/collection/collection.go index dd37377c06..f9e2a3dc82 100644 --- a/services/datamanager/collection/collection.go +++ b/services/datamanager/collection/collection.go @@ -4,6 +4,7 @@ package collection import ( "flag" + "fmt" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/blockdigest" "git.curoverse.com/arvados.git/sdk/go/logger" @@ -33,6 +34,7 @@ type Collection struct { type ReadCollections struct { ReadAllCollections bool UuidToCollection map[string]Collection + OwnerToCollectionSize map[string]int } type GetCollectionsParams struct { @@ -94,6 +96,31 @@ func WriteHeapProfile() { } +func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) { + results = GetCollections(params) + ComputeSizeOfOwnedCollections(&results) + + if params.Logger != nil { + properties,_ := params.Logger.Edit() + collectionInfo := properties["collection_info"].(map[string]interface{}) + collectionInfo["owner_to_collection_size"] = results.OwnerToCollectionSize + params.Logger.Record() + } + + log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize) + log.Printf("Read and processed %d collections", + len(results.UuidToCollection)) + + // TODO(misha): Add a "readonly" flag. If we're in readonly mode, + // lots of behaviors can become warnings (and obviously we can't + // write anything). + // if !readCollections.ReadAllCollections { + // log.Fatalf("Did not read all collections") + // } + + return +} + func GetCollections(params GetCollectionsParams) (results ReadCollections) { if ¶ms.Client == nil { log.Fatalf("params.Client passed to GetCollections() should " + @@ -146,17 +173,15 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { var collections SdkCollectionList err := params.Client.List("collections", sdkParams, &collections) if err != nil { - if params.Logger != nil { - properties,_ := params.Logger.Edit() - properties["FATAL"] = err.Error() - params.Logger.Record() - } - log.Fatalf("error querying collections: %+v", err) + fatalWithMessage(params.Logger, + fmt.Sprintf("Error querying collections: %v", err)) } // Process collection and update our date filter. sdkParams["filters"].([][]string)[0][2] = - ProcessCollections(collections.Items, results.UuidToCollection).Format(time.RFC3339) + ProcessCollections(params.Logger, + collections.Items, + results.UuidToCollection).Format(time.RFC3339) // update counts previousTotalCollections = totalCollections @@ -199,7 +224,8 @@ func StrCopy(s string) string { } -func ProcessCollections(receivedCollections []SdkCollectionInfo, +func ProcessCollections(arvLogger *logger.Logger, + receivedCollections []SdkCollectionInfo, uuidToCollection map[string]Collection) (latestModificationDate time.Time) { for _, sdkCollection := range receivedCollections { collection := Collection{Uuid: StrCopy(sdkCollection.Uuid), @@ -208,13 +234,15 @@ func ProcessCollections(receivedCollections []SdkCollectionInfo, BlockDigestToSize: make(map[blockdigest.BlockDigest]int)} if sdkCollection.ModifiedAt.IsZero() { - log.Fatalf( - "Arvados SDK collection returned with unexpected zero modifcation " + - "date. This probably means that either we failed to parse the " + - "modification date or the API server has changed how it returns " + - "modification dates: %v", - collection) + fatalWithMessage(arvLogger, + fmt.Sprintf( + "Arvados SDK collection returned with unexpected zero " + + "modifcation date. This probably means that either we failed to " + + "parse the modification date or the API server has changed how " + + "it returns modification dates: %v", + collection)) } + if sdkCollection.ModifiedAt.After(latestModificationDate) { latestModificationDate = sdkCollection.ModifiedAt } @@ -232,12 +260,13 @@ func ProcessCollections(receivedCollections []SdkCollectionInfo, for block := range blockChannel { if stored_size, stored := collection.BlockDigestToSize[block.Digest]; stored && stored_size != block.Size { - log.Fatalf( + message := fmt.Sprintf( "Collection %s contains multiple sizes (%d and %d) for block %s", collection.Uuid, stored_size, block.Size, block.Digest) + fatalWithMessage(arvLogger, message) } collection.BlockDigestToSize[block.Digest] = block.Size } @@ -267,3 +296,29 @@ func NumberCollectionsAvailable(client arvadosclient.ArvadosClient) (int) { return collections.ItemsAvailable } + + +func ComputeSizeOfOwnedCollections(readCollections *ReadCollections) { + readCollections.OwnerToCollectionSize = make(map[string]int) + for _, coll := range readCollections.UuidToCollection { + readCollections.OwnerToCollectionSize[coll.OwnerUuid] = + readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize + } + + return +} + + +// Assumes you haven't already called arvLogger.Edit()! +// If you have called arvLogger.Edit() this method will hang waiting +// for the lock you're already holding. +func fatalWithMessage(arvLogger *logger.Logger, message string) { + if arvLogger != nil { + properties,_ := arvLogger.Edit() + properties["FATAL"] = message + properties["run_info"].(map[string]interface{})["end_time"] = time.Now() + arvLogger.ForceRecord() + } + + log.Fatalf(message) +}