X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/4879e0e2f75fd387720b4f4b58ca6ae48a798c98..0d63ac0c2486a43198eb1015ba8d1028239139ee:/services/datamanager/collection/collection.go diff --git a/services/datamanager/collection/collection.go b/services/datamanager/collection/collection.go index f9e2a3dc82..9a7a838f1b 100644 --- a/services/datamanager/collection/collection.go +++ b/services/datamanager/collection/collection.go @@ -9,6 +9,8 @@ import ( "git.curoverse.com/arvados.git/sdk/go/blockdigest" "git.curoverse.com/arvados.git/sdk/go/logger" "git.curoverse.com/arvados.git/sdk/go/manifest" + "git.curoverse.com/arvados.git/sdk/go/util" + "git.curoverse.com/arvados.git/services/datamanager/loggerutil" "log" "os" "runtime" @@ -20,58 +22,49 @@ var ( heap_profile_filename string // globals for debugging totalManifestSize uint64 - maxManifestSize uint64 + maxManifestSize uint64 ) type Collection struct { - Uuid string - OwnerUuid string - ReplicationLevel int + Uuid string + OwnerUuid string + ReplicationLevel int BlockDigestToSize map[blockdigest.BlockDigest]int - TotalSize int + TotalSize int } type ReadCollections struct { - ReadAllCollections bool - UuidToCollection map[string]Collection + ReadAllCollections bool + UuidToCollection map[string]Collection OwnerToCollectionSize map[string]int } type GetCollectionsParams struct { - Client arvadosclient.ArvadosClient - Logger *logger.Logger + Client arvadosclient.ArvadosClient + Logger *logger.Logger BatchSize int } type SdkCollectionInfo struct { - Uuid string `json:"uuid"` - OwnerUuid string `json:"owner_uuid"` - Redundancy int `json:"redundancy"` - ModifiedAt time.Time `json:"modified_at"` - ManifestText string `json:"manifest_text"` + Uuid string `json:"uuid"` + OwnerUuid string `json:"owner_uuid"` + Redundancy int `json:"redundancy"` + ModifiedAt time.Time `json:"modified_at"` + ManifestText string `json:"manifest_text"` } type SdkCollectionList struct { - ItemsAvailable int `json:"items_available"` - Items []SdkCollectionInfo `json:"items"` + ItemsAvailable int `json:"items_available"` + Items []SdkCollectionInfo `json:"items"` } func init() { - flag.StringVar(&heap_profile_filename, + flag.StringVar(&heap_profile_filename, "heap-profile", "", "File to write the heap profiles to. Leave blank to skip profiling.") } -// // Methods to implement util.SdkListResponse Interface -// func (s SdkCollectionList) NumItemsAvailable() (numAvailable int, err error) { -// return s.ItemsAvailable, nil -// } - -// func (s SdkCollectionList) NumItemsContained() (numContained int, err error) { -// return len(s.Items), nil -// } - // Write the heap profile to a file for later review. // Since a file is expected to only contain a single heap profile this // function overwrites the previously written profile, so it is safe @@ -95,16 +88,18 @@ func WriteHeapProfile() { } } - func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) { results = GetCollections(params) ComputeSizeOfOwnedCollections(&results) if params.Logger != nil { - properties,_ := params.Logger.Edit() - collectionInfo := properties["collection_info"].(map[string]interface{}) - collectionInfo["owner_to_collection_size"] = results.OwnerToCollectionSize - params.Logger.Record() + params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) { + collectionInfo := p["collection_info"].(map[string]interface{}) + // Since maps are shallow copied, we run a risk of concurrent + // updates here. By copying results.OwnerToCollectionSize into + // the log, we're assuming that it won't be updated. + collectionInfo["owner_to_collection_size"] = results.OwnerToCollectionSize + }) } log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize) @@ -135,15 +130,20 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { "modified_at"} sdkParams := arvadosclient.Dict{ - "select": fieldsWanted, - "order": []string{"modified_at ASC"}, + "select": fieldsWanted, + "order": []string{"modified_at ASC"}, "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}} if params.BatchSize > 0 { sdkParams["limit"] = params.BatchSize } - initialNumberOfCollectionsAvailable := NumberCollectionsAvailable(params.Client) + initialNumberOfCollectionsAvailable, err := + util.NumberItemsAvailable(params.Client, "collections") + if err != nil { + loggerutil.FatalWithMessage(params.Logger, + fmt.Sprintf("Error querying collection count: %v", err)) + } // Include a 1% margin for collections added while we're reading so // that we don't have to grow the map in most cases. maxExpectedCollections := int( @@ -151,12 +151,12 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { results.UuidToCollection = make(map[string]Collection, maxExpectedCollections) if params.Logger != nil { - properties,_ := params.Logger.Edit() - collectionInfo := make(map[string]interface{}) - collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable - collectionInfo["batch_size"] = params.BatchSize - properties["collection_info"] = collectionInfo - params.Logger.Record() + params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) { + collectionInfo := make(map[string]interface{}) + collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable + collectionInfo["batch_size"] = params.BatchSize + p["collection_info"] = collectionInfo + }) } // These values are just for getting the loop to run the first time, @@ -173,36 +173,36 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { var collections SdkCollectionList err := params.Client.List("collections", sdkParams, &collections) if err != nil { - fatalWithMessage(params.Logger, + loggerutil.FatalWithMessage(params.Logger, fmt.Sprintf("Error querying collections: %v", err)) } // Process collection and update our date filter. sdkParams["filters"].([][]string)[0][2] = ProcessCollections(params.Logger, - collections.Items, - results.UuidToCollection).Format(time.RFC3339) + collections.Items, + results.UuidToCollection).Format(time.RFC3339) // update counts previousTotalCollections = totalCollections totalCollections = len(results.UuidToCollection) - log.Printf("%d collections read, %d new in last batch, " + + log.Printf("%d collections read, %d new in last batch, "+ "%s latest modified date, %.0f %d %d avg,max,total manifest size", totalCollections, - totalCollections - previousTotalCollections, + totalCollections-previousTotalCollections, sdkParams["filters"].([][]string)[0][2], float32(totalManifestSize)/float32(totalCollections), maxManifestSize, totalManifestSize) if params.Logger != nil { - properties,_ := params.Logger.Edit() - collectionInfo := properties["collection_info"].(map[string]interface{}) - collectionInfo["collections_read"] = totalCollections - collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2] - collectionInfo["total_manifest_size"] = totalManifestSize - collectionInfo["max_manifest_size"] = maxManifestSize - params.Logger.Record() + params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) { + collectionInfo := p["collection_info"].(map[string]interface{}) + collectionInfo["collections_read"] = totalCollections + collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2] + collectionInfo["total_manifest_size"] = totalManifestSize + collectionInfo["max_manifest_size"] = maxManifestSize + }) } } @@ -215,7 +215,6 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { return } - // StrCopy returns a newly allocated string. // It is useful to copy slices so that the garbage collector can reuse // the memory of the longer strings they came from. @@ -223,22 +222,21 @@ func StrCopy(s string) string { return string([]byte(s)) } - func ProcessCollections(arvLogger *logger.Logger, receivedCollections []SdkCollectionInfo, uuidToCollection map[string]Collection) (latestModificationDate time.Time) { for _, sdkCollection := range receivedCollections { collection := Collection{Uuid: StrCopy(sdkCollection.Uuid), - OwnerUuid: StrCopy(sdkCollection.OwnerUuid), - ReplicationLevel: sdkCollection.Redundancy, + OwnerUuid: StrCopy(sdkCollection.OwnerUuid), + ReplicationLevel: sdkCollection.Redundancy, BlockDigestToSize: make(map[blockdigest.BlockDigest]int)} if sdkCollection.ModifiedAt.IsZero() { - fatalWithMessage(arvLogger, + loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf( - "Arvados SDK collection returned with unexpected zero " + - "modifcation date. This probably means that either we failed to " + - "parse the modification date or the API server has changed how " + + "Arvados SDK collection returned with unexpected zero "+ + "modifcation date. This probably means that either we failed to "+ + "parse the modification date or the API server has changed how "+ "it returns modification dates: %v", collection)) } @@ -255,18 +253,17 @@ func ProcessCollections(arvLogger *logger.Logger, if manifestSize > maxManifestSize { maxManifestSize = manifestSize } - + blockChannel := manifest.BlockIterWithDuplicates() for block := range blockChannel { - if stored_size, stored := collection.BlockDigestToSize[block.Digest]; - stored && stored_size != block.Size { + if stored_size, stored := collection.BlockDigestToSize[block.Digest]; stored && stored_size != block.Size { message := fmt.Sprintf( "Collection %s contains multiple sizes (%d and %d) for block %s", collection.Uuid, stored_size, block.Size, block.Digest) - fatalWithMessage(arvLogger, message) + loggerutil.FatalWithMessage(arvLogger, message) } collection.BlockDigestToSize[block.Digest] = block.Size } @@ -285,19 +282,6 @@ func ProcessCollections(arvLogger *logger.Logger, return } - -func NumberCollectionsAvailable(client arvadosclient.ArvadosClient) (int) { - var collections SdkCollectionList - sdkParams := arvadosclient.Dict{"limit": 0} - err := client.List("collections", sdkParams, &collections) - if err != nil { - log.Fatalf("error querying collections for items available: %v", err) - } - - return collections.ItemsAvailable -} - - func ComputeSizeOfOwnedCollections(readCollections *ReadCollections) { readCollections.OwnerToCollectionSize = make(map[string]int) for _, coll := range readCollections.UuidToCollection { @@ -307,18 +291,3 @@ func ComputeSizeOfOwnedCollections(readCollections *ReadCollections) { return } - - -// Assumes you haven't already called arvLogger.Edit()! -// If you have called arvLogger.Edit() this method will hang waiting -// for the lock you're already holding. -func fatalWithMessage(arvLogger *logger.Logger, message string) { - if arvLogger != nil { - properties,_ := arvLogger.Edit() - properties["FATAL"] = message - properties["run_info"].(map[string]interface{})["end_time"] = time.Now() - arvLogger.ForceRecord() - } - - log.Fatalf(message) -}