X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a5a4f79e91aa8bba1794394646808f6d4c444661..ef42f0396f5b51fb8f87b2f7e605d50f32f256b4:/services/datamanager/collection/collection.go diff --git a/services/datamanager/collection/collection.go b/services/datamanager/collection/collection.go index a3f1e311f2..ca03627405 100644 --- a/services/datamanager/collection/collection.go +++ b/services/datamanager/collection/collection.go @@ -1,4 +1,4 @@ -/* Deals with parsing Collection responses from API Server. */ +// Deals with parsing Collection responses from API Server. package collection @@ -13,7 +13,6 @@ import ( "git.curoverse.com/arvados.git/services/datamanager/loggerutil" "log" "os" - "runtime" "runtime/pprof" "time" ) @@ -25,42 +24,43 @@ var ( maxManifestSize uint64 ) -const ( - DefaultReplicationLevel = 2 -) - +// Collection representation type Collection struct { - Uuid string - OwnerUuid string + UUID string + OwnerUUID string ReplicationLevel int BlockDigestToSize map[blockdigest.BlockDigest]int TotalSize int } +// ReadCollections holds information about collections from API server type ReadCollections struct { - ReadAllCollections bool - UuidToCollection map[string]Collection - OwnerToCollectionSize map[string]int - BlockToReplication map[blockdigest.BlockDigest]int - CollectionUuidToIndex map[string]int - CollectionIndexToUuid []string - BlockToCollectionIndex map[blockdigest.BlockDigest]int + ReadAllCollections bool + UUIDToCollection map[string]Collection + OwnerToCollectionSize map[string]int + BlockToDesiredReplication map[blockdigest.DigestWithSize]int + CollectionUUIDToIndex map[string]int + CollectionIndexToUUID []string + BlockToCollectionIndices map[blockdigest.DigestWithSize][]int } +// GetCollectionsParams params type GetCollectionsParams struct { Client arvadosclient.ArvadosClient Logger *logger.Logger BatchSize int } +// SdkCollectionInfo holds collection info from api type SdkCollectionInfo struct { - Uuid string `json:"uuid"` - OwnerUuid string `json:"owner_uuid"` + UUID string `json:"uuid"` + OwnerUUID string `json:"owner_uuid"` Redundancy int `json:"redundancy"` ModifiedAt time.Time `json:"modified_at"` ManifestText string `json:"manifest_text"` } +// SdkCollectionList lists collections from api type SdkCollectionList struct { ItemsAvailable int `json:"items_available"` Items []SdkCollectionInfo `json:"items"` @@ -73,7 +73,7 @@ func init() { "File to write the heap profiles to. Leave blank to skip profiling.") } -// Write the heap profile to a file for later review. +// WriteHeapProfile writes the heap profile to a file for later review. // Since a file is expected to only contain a single heap profile this // function overwrites the previously written profile, so it is safe // to call multiple times in a single run. @@ -82,37 +82,28 @@ func init() { func WriteHeapProfile() { if heapProfileFilename != "" { - heap_profile, err := os.Create(heapProfileFilename) + heapProfile, err := os.Create(heapProfileFilename) if err != nil { log.Fatal(err) } - defer heap_profile.Close() + defer heapProfile.Close() - err = pprof.WriteHeapProfile(heap_profile) + err = pprof.WriteHeapProfile(heapProfile) if err != nil { log.Fatal(err) } } } +// GetCollectionsAndSummarize gets collections from api and summarizes func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) { results = GetCollections(params) - Summarize(&results) - - if params.Logger != nil { - params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) { - collectionInfo := p["collection_info"].(map[string]interface{}) - // Since maps are shallow copied, we run a risk of concurrent - // updates here. By copying results.OwnerToCollectionSize into - // the log, we're assuming that it won't be updated. - collectionInfo["owner_to_collection_size"] = results.OwnerToCollectionSize - }) - } + results.Summarize(params.Logger) log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize) log.Printf("Read and processed %d collections", - len(results.UuidToCollection)) + len(results.UUIDToCollection)) // TODO(misha): Add a "readonly" flag. If we're in readonly mode, // lots of behaviors can become warnings (and obviously we can't @@ -124,6 +115,7 @@ func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollec return } +// GetCollections gets collections from api func GetCollections(params GetCollectionsParams) (results ReadCollections) { if ¶ms.Client == nil { log.Fatalf("params.Client passed to GetCollections() should " + @@ -133,7 +125,6 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { fieldsWanted := []string{"manifest_text", "owner_uuid", "uuid", - // TODO(misha): Start using the redundancy field. "redundancy", "modified_at"} @@ -146,6 +137,23 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { sdkParams["limit"] = params.BatchSize } + var defaultReplicationLevel int + { + value, err := params.Client.Discovery("defaultCollectionReplication") + if err != nil { + loggerutil.FatalWithMessage(params.Logger, + fmt.Sprintf("Error querying default collection replication: %v", err)) + } + + defaultReplicationLevel = int(value.(float64)) + if defaultReplicationLevel <= 0 { + loggerutil.FatalWithMessage(params.Logger, + fmt.Sprintf("Default collection replication returned by arvados SDK "+ + "should be a positive integer but instead it was %d.", + defaultReplicationLevel)) + } + } + initialNumberOfCollectionsAvailable, err := util.NumberItemsAvailable(params.Client, "collections") if err != nil { @@ -156,14 +164,14 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { // that we don't have to grow the map in most cases. maxExpectedCollections := int( float64(initialNumberOfCollectionsAvailable) * 1.01) - results.UuidToCollection = make(map[string]Collection, maxExpectedCollections) + results.UUIDToCollection = make(map[string]Collection, maxExpectedCollections) if params.Logger != nil { params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) { - collectionInfo := make(map[string]interface{}) + collectionInfo := logger.GetOrCreateMap(p, "collection_info") collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable collectionInfo["batch_size"] = params.BatchSize - p["collection_info"] = collectionInfo + collectionInfo["default_replication_level"] = defaultReplicationLevel }) } @@ -189,11 +197,12 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { sdkParams["filters"].([][]string)[0][2] = ProcessCollections(params.Logger, collections.Items, - results.UuidToCollection).Format(time.RFC3339) + defaultReplicationLevel, + results.UUIDToCollection).Format(time.RFC3339) // update counts previousTotalCollections = totalCollections - totalCollections = len(results.UuidToCollection) + totalCollections = len(results.UUIDToCollection) log.Printf("%d collections read, %d new in last batch, "+ "%s latest modified date, %.0f %d %d avg,max,total manifest size", @@ -205,7 +214,7 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { if params.Logger != nil { params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) { - collectionInfo := p["collection_info"].(map[string]interface{}) + collectionInfo := logger.GetOrCreateMap(p, "collection_info") collectionInfo["collections_read"] = totalCollections collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2] collectionInfo["total_manifest_size"] = totalManifestSize @@ -214,9 +223,6 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { } } - // Just in case this lowers the numbers reported in the heap profile. - runtime.GC() - // Write the heap profile for examining memory usage WriteHeapProfile() @@ -230,12 +236,14 @@ func StrCopy(s string) string { return string([]byte(s)) } +// ProcessCollections read from api server func ProcessCollections(arvLogger *logger.Logger, receivedCollections []SdkCollectionInfo, - uuidToCollection map[string]Collection) (latestModificationDate time.Time) { + defaultReplicationLevel int, + UUIDToCollection map[string]Collection) (latestModificationDate time.Time) { for _, sdkCollection := range receivedCollections { - collection := Collection{Uuid: StrCopy(sdkCollection.Uuid), - OwnerUuid: StrCopy(sdkCollection.OwnerUuid), + collection := Collection{UUID: StrCopy(sdkCollection.UUID), + OwnerUUID: StrCopy(sdkCollection.OwnerUUID), ReplicationLevel: sdkCollection.Redundancy, BlockDigestToSize: make(map[blockdigest.BlockDigest]int)} @@ -243,7 +251,7 @@ func ProcessCollections(arvLogger *logger.Logger, loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf( "Arvados SDK collection returned with unexpected zero "+ - "modifcation date. This probably means that either we failed to "+ + "modification date. This probably means that either we failed to "+ "parse the modification date or the API server has changed how "+ "it returns modification dates: %+v", collection)) @@ -254,13 +262,13 @@ func ProcessCollections(arvLogger *logger.Logger, } if collection.ReplicationLevel == 0 { - collection.ReplicationLevel = DefaultReplicationLevel + collection.ReplicationLevel = defaultReplicationLevel } manifest := manifest.Manifest{sdkCollection.ManifestText} manifestSize := uint64(len(sdkCollection.ManifestText)) - if _, alreadySeen := uuidToCollection[collection.Uuid]; !alreadySeen { + if _, alreadySeen := UUIDToCollection[collection.UUID]; !alreadySeen { totalManifestSize += manifestSize } if manifestSize > maxManifestSize { @@ -269,11 +277,11 @@ func ProcessCollections(arvLogger *logger.Logger, blockChannel := manifest.BlockIterWithDuplicates() for block := range blockChannel { - if stored_size, stored := collection.BlockDigestToSize[block.Digest]; stored && stored_size != block.Size { + if storedSize, stored := collection.BlockDigestToSize[block.Digest]; stored && storedSize != block.Size { message := fmt.Sprintf( "Collection %s contains multiple sizes (%d and %d) for block %s", - collection.Uuid, - stored_size, + collection.UUID, + storedSize, block.Size, block.Digest) loggerutil.FatalWithMessage(arvLogger, message) @@ -284,7 +292,7 @@ func ProcessCollections(arvLogger *logger.Logger, for _, size := range collection.BlockDigestToSize { collection.TotalSize += size } - uuidToCollection[collection.Uuid] = collection + UUIDToCollection[collection.UUID] = collection // Clear out all the manifest strings that we don't need anymore. // These hopefully form the bulk of our memory usage. @@ -295,31 +303,49 @@ func ProcessCollections(arvLogger *logger.Logger, return } -func Summarize(readCollections *ReadCollections) { +// Summarize the collections read +func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) { readCollections.OwnerToCollectionSize = make(map[string]int) - readCollections.BlockToReplication = make(map[blockdigest.BlockDigest]int) - numCollections := len(readCollections.UuidToCollection) - readCollections.CollectionUuidToIndex = make(map[string]int, numCollections) - readCollections.CollectionIndexToUuid = make([]string, 0, numCollections) - readCollections.BlockToCollectionIndex = make(map[blockdigest.BlockDigest]int) - - for _, coll := range readCollections.UuidToCollection { - collectionIndex := len(readCollections.CollectionIndexToUuid) - readCollections.CollectionIndexToUuid = - append(readCollections.CollectionIndexToUuid, coll.Uuid) - readCollections.CollectionUuidToIndex[coll.Uuid] = collectionIndex - - readCollections.OwnerToCollectionSize[coll.OwnerUuid] = - readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize - - for block, _ := range coll.BlockDigestToSize { - readCollections.BlockToCollectionIndex[block] = collectionIndex - storedReplication := readCollections.BlockToReplication[block] + readCollections.BlockToDesiredReplication = make(map[blockdigest.DigestWithSize]int) + numCollections := len(readCollections.UUIDToCollection) + readCollections.CollectionUUIDToIndex = make(map[string]int, numCollections) + readCollections.CollectionIndexToUUID = make([]string, 0, numCollections) + readCollections.BlockToCollectionIndices = make(map[blockdigest.DigestWithSize][]int) + + for _, coll := range readCollections.UUIDToCollection { + collectionIndex := len(readCollections.CollectionIndexToUUID) + readCollections.CollectionIndexToUUID = + append(readCollections.CollectionIndexToUUID, coll.UUID) + readCollections.CollectionUUIDToIndex[coll.UUID] = collectionIndex + + readCollections.OwnerToCollectionSize[coll.OwnerUUID] = + readCollections.OwnerToCollectionSize[coll.OwnerUUID] + coll.TotalSize + + for block, size := range coll.BlockDigestToSize { + locator := blockdigest.DigestWithSize{Digest: block, Size: uint32(size)} + readCollections.BlockToCollectionIndices[locator] = + append(readCollections.BlockToCollectionIndices[locator], + collectionIndex) + storedReplication := readCollections.BlockToDesiredReplication[locator] if coll.ReplicationLevel > storedReplication { - readCollections.BlockToReplication[block] = coll.ReplicationLevel + readCollections.BlockToDesiredReplication[locator] = + coll.ReplicationLevel } } } + if arvLogger != nil { + arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) { + collectionInfo := logger.GetOrCreateMap(p, "collection_info") + // Since maps are shallow copied, we run a risk of concurrent + // updates here. By copying results.OwnerToCollectionSize into + // the log, we're assuming that it won't be updated. + collectionInfo["owner_to_collection_size"] = + readCollections.OwnerToCollectionSize + collectionInfo["distinct_blocks_named"] = + len(readCollections.BlockToDesiredReplication) + }) + } + return }