X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/4879e0e2f75fd387720b4f4b58ca6ae48a798c98..bbd85b230ccd208ef942792109fed1c9161f9429:/services/datamanager/collection/collection.go diff --git a/services/datamanager/collection/collection.go b/services/datamanager/collection/collection.go index f9e2a3dc82..f0a0f3f1aa 100644 --- a/services/datamanager/collection/collection.go +++ b/services/datamanager/collection/collection.go @@ -1,4 +1,4 @@ -/* Deals with parsing Collection responses from API Server. */ +// Deals with parsing Collection responses from API Server. package collection @@ -9,107 +9,100 @@ import ( "git.curoverse.com/arvados.git/sdk/go/blockdigest" "git.curoverse.com/arvados.git/sdk/go/logger" "git.curoverse.com/arvados.git/sdk/go/manifest" + "git.curoverse.com/arvados.git/sdk/go/util" "log" "os" - "runtime" "runtime/pprof" "time" ) var ( - heap_profile_filename string - // globals for debugging - totalManifestSize uint64 - maxManifestSize uint64 + HeapProfileFilename string ) +// Collection representation type Collection struct { - Uuid string - OwnerUuid string - ReplicationLevel int + UUID string + OwnerUUID string + ReplicationLevel int BlockDigestToSize map[blockdigest.BlockDigest]int - TotalSize int + TotalSize int } +// ReadCollections holds information about collections from API server type ReadCollections struct { - ReadAllCollections bool - UuidToCollection map[string]Collection - OwnerToCollectionSize map[string]int + ReadAllCollections bool + UUIDToCollection map[string]Collection + OwnerToCollectionSize map[string]int + BlockToDesiredReplication map[blockdigest.DigestWithSize]int + CollectionUUIDToIndex map[string]int + CollectionIndexToUUID []string + BlockToCollectionIndices map[blockdigest.DigestWithSize][]int } +// GetCollectionsParams params type GetCollectionsParams struct { - Client arvadosclient.ArvadosClient - Logger *logger.Logger + Client arvadosclient.ArvadosClient + Logger *logger.Logger BatchSize int } +// SdkCollectionInfo holds collection info from api type SdkCollectionInfo struct { - Uuid string `json:"uuid"` - OwnerUuid string `json:"owner_uuid"` - Redundancy int `json:"redundancy"` - ModifiedAt time.Time `json:"modified_at"` - ManifestText string `json:"manifest_text"` + UUID string `json:"uuid"` + OwnerUUID string `json:"owner_uuid"` + Redundancy int `json:"redundancy"` + ModifiedAt time.Time `json:"modified_at"` + ManifestText string `json:"manifest_text"` } +// SdkCollectionList lists collections from api type SdkCollectionList struct { - ItemsAvailable int `json:"items_available"` - Items []SdkCollectionInfo `json:"items"` + ItemsAvailable int `json:"items_available"` + Items []SdkCollectionInfo `json:"items"` } func init() { - flag.StringVar(&heap_profile_filename, + flag.StringVar(&HeapProfileFilename, "heap-profile", "", "File to write the heap profiles to. Leave blank to skip profiling.") } -// // Methods to implement util.SdkListResponse Interface -// func (s SdkCollectionList) NumItemsAvailable() (numAvailable int, err error) { -// return s.ItemsAvailable, nil -// } - -// func (s SdkCollectionList) NumItemsContained() (numContained int, err error) { -// return len(s.Items), nil -// } - -// Write the heap profile to a file for later review. +// WriteHeapProfile writes the heap profile to a file for later review. // Since a file is expected to only contain a single heap profile this // function overwrites the previously written profile, so it is safe // to call multiple times in a single run. // Otherwise we would see cumulative numbers as explained here: // https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ -func WriteHeapProfile() { - if heap_profile_filename != "" { - - heap_profile, err := os.Create(heap_profile_filename) +func WriteHeapProfile() error { + if HeapProfileFilename != "" { + heapProfile, err := os.Create(HeapProfileFilename) if err != nil { - log.Fatal(err) + return err } - defer heap_profile.Close() + defer heapProfile.Close() - err = pprof.WriteHeapProfile(heap_profile) - if err != nil { - log.Fatal(err) - } + err = pprof.WriteHeapProfile(heapProfile) + return err } -} - -func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) { - results = GetCollections(params) - ComputeSizeOfOwnedCollections(&results) + return nil +} - if params.Logger != nil { - properties,_ := params.Logger.Edit() - collectionInfo := properties["collection_info"].(map[string]interface{}) - collectionInfo["owner_to_collection_size"] = results.OwnerToCollectionSize - params.Logger.Record() +// GetCollectionsAndSummarize gets collections from api and summarizes +func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections, err error) { + results, err = GetCollections(params) + if err != nil { + return } + results.Summarize(params.Logger) + log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize) log.Printf("Read and processed %d collections", - len(results.UuidToCollection)) + len(results.UUIDToCollection)) // TODO(misha): Add a "readonly" flag. If we're in readonly mode, // lots of behaviors can become warnings (and obviously we can't @@ -121,101 +114,145 @@ func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollec return } -func GetCollections(params GetCollectionsParams) (results ReadCollections) { +// GetCollections gets collections from api +func GetCollections(params GetCollectionsParams) (results ReadCollections, err error) { if ¶ms.Client == nil { - log.Fatalf("params.Client passed to GetCollections() should " + + err = fmt.Errorf("params.Client passed to GetCollections() should " + "contain a valid ArvadosClient, but instead it is nil.") + return } fieldsWanted := []string{"manifest_text", "owner_uuid", "uuid", - // TODO(misha): Start using the redundancy field. "redundancy", "modified_at"} sdkParams := arvadosclient.Dict{ - "select": fieldsWanted, - "order": []string{"modified_at ASC"}, - "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}} + "select": fieldsWanted, + "order": []string{"modified_at ASC", "uuid ASC"}, + "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}, + "offset": 0} if params.BatchSize > 0 { sdkParams["limit"] = params.BatchSize } - initialNumberOfCollectionsAvailable := NumberCollectionsAvailable(params.Client) + var defaultReplicationLevel int + { + var value interface{} + value, err = params.Client.Discovery("defaultCollectionReplication") + if err != nil { + return + } + + defaultReplicationLevel = int(value.(float64)) + if defaultReplicationLevel <= 0 { + err = fmt.Errorf("Default collection replication returned by arvados SDK "+ + "should be a positive integer but instead it was %d.", + defaultReplicationLevel) + return + } + } + + initialNumberOfCollectionsAvailable, err := + util.NumberItemsAvailable(params.Client, "collections") + if err != nil { + return + } // Include a 1% margin for collections added while we're reading so // that we don't have to grow the map in most cases. maxExpectedCollections := int( float64(initialNumberOfCollectionsAvailable) * 1.01) - results.UuidToCollection = make(map[string]Collection, maxExpectedCollections) + results.UUIDToCollection = make(map[string]Collection, maxExpectedCollections) if params.Logger != nil { - properties,_ := params.Logger.Edit() - collectionInfo := make(map[string]interface{}) - collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable - collectionInfo["batch_size"] = params.BatchSize - properties["collection_info"] = collectionInfo - params.Logger.Record() + params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) { + collectionInfo := logger.GetOrCreateMap(p, "collection_info") + collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable + collectionInfo["batch_size"] = params.BatchSize + collectionInfo["default_replication_level"] = defaultReplicationLevel + }) } // These values are just for getting the loop to run the first time, // afterwards they'll be set to real values. - previousTotalCollections := -1 - totalCollections := 0 - for totalCollections > previousTotalCollections { + remainingCollections := 1 + var totalCollections int + var previousTotalCollections int + for remainingCollections > 0 { // We're still finding new collections // Write the heap profile for examining memory usage - WriteHeapProfile() + err = WriteHeapProfile() + if err != nil { + return + } // Get next batch of collections. var collections SdkCollectionList - err := params.Client.List("collections", sdkParams, &collections) + err = params.Client.List("collections", sdkParams, &collections) if err != nil { - fatalWithMessage(params.Logger, - fmt.Sprintf("Error querying collections: %v", err)) + return + } + batchCollections := len(collections.Items) + + // We must always have at least one collection in the batch + if batchCollections < 1 { + err = fmt.Errorf("API query returned no collections for %+v", sdkParams) + return } + // Update count of remaining collections + remainingCollections = collections.ItemsAvailable - sdkParams["offset"].(int) - batchCollections + // Process collection and update our date filter. - sdkParams["filters"].([][]string)[0][2] = - ProcessCollections(params.Logger, + latestModificationDate, maxManifestSize, totalManifestSize, err := ProcessCollections(params.Logger, collections.Items, - results.UuidToCollection).Format(time.RFC3339) + defaultReplicationLevel, + results.UUIDToCollection) + if err != nil { + return results, err + } + if sdkParams["filters"].([][]string)[0][2] != latestModificationDate.Format(time.RFC3339) { + sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339) + sdkParams["offset"] = 0 + } else { + sdkParams["offset"] = sdkParams["offset"].(int) + batchCollections + } // update counts previousTotalCollections = totalCollections - totalCollections = len(results.UuidToCollection) + totalCollections = len(results.UUIDToCollection) - log.Printf("%d collections read, %d new in last batch, " + + log.Printf("%d collections read, %d (%d new) in last batch, "+ + "%d remaining, "+ "%s latest modified date, %.0f %d %d avg,max,total manifest size", totalCollections, - totalCollections - previousTotalCollections, + batchCollections, + totalCollections-previousTotalCollections, + remainingCollections, sdkParams["filters"].([][]string)[0][2], float32(totalManifestSize)/float32(totalCollections), maxManifestSize, totalManifestSize) if params.Logger != nil { - properties,_ := params.Logger.Edit() - collectionInfo := properties["collection_info"].(map[string]interface{}) - collectionInfo["collections_read"] = totalCollections - collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2] - collectionInfo["total_manifest_size"] = totalManifestSize - collectionInfo["max_manifest_size"] = maxManifestSize - params.Logger.Record() + params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) { + collectionInfo := logger.GetOrCreateMap(p, "collection_info") + collectionInfo["collections_read"] = totalCollections + collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2] + collectionInfo["total_manifest_size"] = totalManifestSize + collectionInfo["max_manifest_size"] = maxManifestSize + }) } } - // Just in case this lowers the numbers reported in the heap profile. - runtime.GC() - // Write the heap profile for examining memory usage - WriteHeapProfile() + err = WriteHeapProfile() return } - // StrCopy returns a newly allocated string. // It is useful to copy slices so that the garbage collector can reuse // the memory of the longer strings they came from. @@ -223,58 +260,72 @@ func StrCopy(s string) string { return string([]byte(s)) } - +// ProcessCollections read from api server func ProcessCollections(arvLogger *logger.Logger, receivedCollections []SdkCollectionInfo, - uuidToCollection map[string]Collection) (latestModificationDate time.Time) { + defaultReplicationLevel int, + UUIDToCollection map[string]Collection, +) ( + latestModificationDate time.Time, + maxManifestSize, totalManifestSize uint64, + err error, +) { for _, sdkCollection := range receivedCollections { - collection := Collection{Uuid: StrCopy(sdkCollection.Uuid), - OwnerUuid: StrCopy(sdkCollection.OwnerUuid), - ReplicationLevel: sdkCollection.Redundancy, + collection := Collection{UUID: StrCopy(sdkCollection.UUID), + OwnerUUID: StrCopy(sdkCollection.OwnerUUID), + ReplicationLevel: sdkCollection.Redundancy, BlockDigestToSize: make(map[blockdigest.BlockDigest]int)} if sdkCollection.ModifiedAt.IsZero() { - fatalWithMessage(arvLogger, - fmt.Sprintf( - "Arvados SDK collection returned with unexpected zero " + - "modifcation date. This probably means that either we failed to " + - "parse the modification date or the API server has changed how " + - "it returns modification dates: %v", - collection)) + err = fmt.Errorf( + "Arvados SDK collection returned with unexpected zero "+ + "modification date. This probably means that either we failed to "+ + "parse the modification date or the API server has changed how "+ + "it returns modification dates: %+v", + collection) + return } if sdkCollection.ModifiedAt.After(latestModificationDate) { latestModificationDate = sdkCollection.ModifiedAt } - manifest := manifest.Manifest{sdkCollection.ManifestText} + + if collection.ReplicationLevel == 0 { + collection.ReplicationLevel = defaultReplicationLevel + } + + manifest := manifest.Manifest{Text: sdkCollection.ManifestText} manifestSize := uint64(len(sdkCollection.ManifestText)) - if _, alreadySeen := uuidToCollection[collection.Uuid]; !alreadySeen { + if _, alreadySeen := UUIDToCollection[collection.UUID]; !alreadySeen { totalManifestSize += manifestSize } if manifestSize > maxManifestSize { maxManifestSize = manifestSize } - + blockChannel := manifest.BlockIterWithDuplicates() for block := range blockChannel { - if stored_size, stored := collection.BlockDigestToSize[block.Digest]; - stored && stored_size != block.Size { - message := fmt.Sprintf( + if storedSize, stored := collection.BlockDigestToSize[block.Digest]; stored && storedSize != block.Size { + log.Printf( "Collection %s contains multiple sizes (%d and %d) for block %s", - collection.Uuid, - stored_size, + collection.UUID, + storedSize, block.Size, block.Digest) - fatalWithMessage(arvLogger, message) } collection.BlockDigestToSize[block.Digest] = block.Size } + if manifest.Err != nil { + err = manifest.Err + return + } + collection.TotalSize = 0 for _, size := range collection.BlockDigestToSize { collection.TotalSize += size } - uuidToCollection[collection.Uuid] = collection + UUIDToCollection[collection.UUID] = collection // Clear out all the manifest strings that we don't need anymore. // These hopefully form the bulk of our memory usage. @@ -285,40 +336,49 @@ func ProcessCollections(arvLogger *logger.Logger, return } - -func NumberCollectionsAvailable(client arvadosclient.ArvadosClient) (int) { - var collections SdkCollectionList - sdkParams := arvadosclient.Dict{"limit": 0} - err := client.List("collections", sdkParams, &collections) - if err != nil { - log.Fatalf("error querying collections for items available: %v", err) - } - - return collections.ItemsAvailable -} - - -func ComputeSizeOfOwnedCollections(readCollections *ReadCollections) { +// Summarize the collections read +func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) { readCollections.OwnerToCollectionSize = make(map[string]int) - for _, coll := range readCollections.UuidToCollection { - readCollections.OwnerToCollectionSize[coll.OwnerUuid] = - readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize + readCollections.BlockToDesiredReplication = make(map[blockdigest.DigestWithSize]int) + numCollections := len(readCollections.UUIDToCollection) + readCollections.CollectionUUIDToIndex = make(map[string]int, numCollections) + readCollections.CollectionIndexToUUID = make([]string, 0, numCollections) + readCollections.BlockToCollectionIndices = make(map[blockdigest.DigestWithSize][]int) + + for _, coll := range readCollections.UUIDToCollection { + collectionIndex := len(readCollections.CollectionIndexToUUID) + readCollections.CollectionIndexToUUID = + append(readCollections.CollectionIndexToUUID, coll.UUID) + readCollections.CollectionUUIDToIndex[coll.UUID] = collectionIndex + + readCollections.OwnerToCollectionSize[coll.OwnerUUID] = + readCollections.OwnerToCollectionSize[coll.OwnerUUID] + coll.TotalSize + + for block, size := range coll.BlockDigestToSize { + locator := blockdigest.DigestWithSize{Digest: block, Size: uint32(size)} + readCollections.BlockToCollectionIndices[locator] = + append(readCollections.BlockToCollectionIndices[locator], + collectionIndex) + storedReplication := readCollections.BlockToDesiredReplication[locator] + if coll.ReplicationLevel > storedReplication { + readCollections.BlockToDesiredReplication[locator] = + coll.ReplicationLevel + } + } } - return -} - - -// Assumes you haven't already called arvLogger.Edit()! -// If you have called arvLogger.Edit() this method will hang waiting -// for the lock you're already holding. -func fatalWithMessage(arvLogger *logger.Logger, message string) { if arvLogger != nil { - properties,_ := arvLogger.Edit() - properties["FATAL"] = message - properties["run_info"].(map[string]interface{})["end_time"] = time.Now() - arvLogger.ForceRecord() + arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) { + collectionInfo := logger.GetOrCreateMap(p, "collection_info") + // Since maps are shallow copied, we run a risk of concurrent + // updates here. By copying results.OwnerToCollectionSize into + // the log, we're assuming that it won't be updated. + collectionInfo["owner_to_collection_size"] = + readCollections.OwnerToCollectionSize + collectionInfo["distinct_blocks_named"] = + len(readCollections.BlockToDesiredReplication) + }) } - log.Fatalf(message) + return }