X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d7f6013f1e728a3a7b42d6736b78cc81ac7de127..8a504ad561c1ffbafee8a7bc8da551f9d4b9a29e:/services/datamanager/collection/collection.go diff --git a/services/datamanager/collection/collection.go b/services/datamanager/collection/collection.go index 927b88819d..9a7a838f1b 100644 --- a/services/datamanager/collection/collection.go +++ b/services/datamanager/collection/collection.go @@ -4,82 +4,123 @@ package collection import ( "flag" + "fmt" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/blockdigest" + "git.curoverse.com/arvados.git/sdk/go/logger" "git.curoverse.com/arvados.git/sdk/go/manifest" - //"git.curoverse.com/arvados.git/sdk/go/util" + "git.curoverse.com/arvados.git/sdk/go/util" + "git.curoverse.com/arvados.git/services/datamanager/loggerutil" "log" "os" + "runtime" "runtime/pprof" + "time" ) var ( heap_profile_filename string - heap_profile *os.File + // globals for debugging + totalManifestSize uint64 + maxManifestSize uint64 ) type Collection struct { - Uuid string - OwnerUuid string - ReplicationLevel int + Uuid string + OwnerUuid string + ReplicationLevel int BlockDigestToSize map[blockdigest.BlockDigest]int - TotalSize int + TotalSize int } type ReadCollections struct { - ReadAllCollections bool - UuidToCollection map[string]Collection + ReadAllCollections bool + UuidToCollection map[string]Collection + OwnerToCollectionSize map[string]int } type GetCollectionsParams struct { - Client arvadosclient.ArvadosClient + Client arvadosclient.ArvadosClient + Logger *logger.Logger BatchSize int } type SdkCollectionInfo struct { - Uuid string `json:"uuid"` - OwnerUuid string `json:"owner_uuid"` - Redundancy int `json:"redundancy"` - ModifiedAt string `json:"modified_at"` - ManifestText string `json:"manifest_text"` + Uuid string `json:"uuid"` + OwnerUuid string `json:"owner_uuid"` + Redundancy int `json:"redundancy"` + ModifiedAt time.Time `json:"modified_at"` + ManifestText string `json:"manifest_text"` } type SdkCollectionList struct { - ItemsAvailable int `json:"items_available"` - Items []SdkCollectionInfo `json:"items"` + ItemsAvailable int `json:"items_available"` + Items []SdkCollectionInfo `json:"items"` } func init() { - flag.StringVar(&heap_profile_filename, + flag.StringVar(&heap_profile_filename, "heap-profile", "", - "File to write the heap profiles to.") + "File to write the heap profiles to. Leave blank to skip profiling.") } -// // Methods to implement util.SdkListResponse Interface -// func (s SdkCollectionList) NumItemsAvailable() (numAvailable int, err error) { -// return s.ItemsAvailable, nil -// } +// Write the heap profile to a file for later review. +// Since a file is expected to only contain a single heap profile this +// function overwrites the previously written profile, so it is safe +// to call multiple times in a single run. +// Otherwise we would see cumulative numbers as explained here: +// https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ +func WriteHeapProfile() { + if heap_profile_filename != "" { -// func (s SdkCollectionList) NumItemsContained() (numContained int, err error) { -// return len(s.Items), nil -// } + heap_profile, err := os.Create(heap_profile_filename) + if err != nil { + log.Fatal(err) + } -func GetCollections(params GetCollectionsParams) (results ReadCollections) { - if ¶ms.Client == nil { - log.Fatalf("params.Client passed to GetCollections() should " + - "contain a valid ArvadosClient, but instead it is nil.") - } + defer heap_profile.Close() - // TODO(misha): move this code somewhere better and make sure it's - // only run once - if heap_profile_filename != "" { - var err error - heap_profile, err = os.Create(heap_profile_filename) + err = pprof.WriteHeapProfile(heap_profile) if err != nil { log.Fatal(err) } } +} + +func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) { + results = GetCollections(params) + ComputeSizeOfOwnedCollections(&results) + + if params.Logger != nil { + params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) { + collectionInfo := p["collection_info"].(map[string]interface{}) + // Since maps are shallow copied, we run a risk of concurrent + // updates here. By copying results.OwnerToCollectionSize into + // the log, we're assuming that it won't be updated. + collectionInfo["owner_to_collection_size"] = results.OwnerToCollectionSize + }) + } + + log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize) + log.Printf("Read and processed %d collections", + len(results.UuidToCollection)) + + // TODO(misha): Add a "readonly" flag. If we're in readonly mode, + // lots of behaviors can become warnings (and obviously we can't + // write anything). + // if !readCollections.ReadAllCollections { + // log.Fatalf("Did not read all collections") + // } + + return +} + +func GetCollections(params GetCollectionsParams) (results ReadCollections) { + if ¶ms.Client == nil { + log.Fatalf("params.Client passed to GetCollections() should " + + "contain a valid ArvadosClient, but instead it is nil.") + } fieldsWanted := []string{"manifest_text", "owner_uuid", @@ -89,105 +130,140 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { "modified_at"} sdkParams := arvadosclient.Dict{ - "select": fieldsWanted, - "order": []string{"modified_at ASC"}, + "select": fieldsWanted, + "order": []string{"modified_at ASC"}, "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}} - // MISHA UNDO THIS TEMPORARY HACK TO FIND BUG! - //"filters": [][]string{[]string{"modified_at", ">=", "2014-11-05T20:44:50Z"}}} if params.BatchSize > 0 { sdkParams["limit"] = params.BatchSize } - // MISHA UNDO THIS TEMPORARY HACK TO FIND BUG! - sdkParams["limit"] = 50 - - // { - // var numReceived, numAvailable int - // results.ReadAllCollections, numReceived, numAvailable = - // util.ContainsAllAvailableItems(collections) - - // if (!results.ReadAllCollections) { - // log.Printf("ERROR: Did not receive all collections.") - // } - // log.Printf("Received %d of %d available collections.", - // numReceived, - // numAvailable) - // } - - initialNumberOfCollectionsAvailable := NumberCollectionsAvailable(params.Client) + initialNumberOfCollectionsAvailable, err := + util.NumberItemsAvailable(params.Client, "collections") + if err != nil { + loggerutil.FatalWithMessage(params.Logger, + fmt.Sprintf("Error querying collection count: %v", err)) + } // Include a 1% margin for collections added while we're reading so // that we don't have to grow the map in most cases. maxExpectedCollections := int( float64(initialNumberOfCollectionsAvailable) * 1.01) results.UuidToCollection = make(map[string]Collection, maxExpectedCollections) + if params.Logger != nil { + params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) { + collectionInfo := make(map[string]interface{}) + collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable + collectionInfo["batch_size"] = params.BatchSize + p["collection_info"] = collectionInfo + }) + } + + // These values are just for getting the loop to run the first time, + // afterwards they'll be set to real values. previousTotalCollections := -1 - for len(results.UuidToCollection) > previousTotalCollections { + totalCollections := 0 + for totalCollections > previousTotalCollections { // We're still finding new collections - log.Printf("previous, current: %d %d", previousTotalCollections, len(results.UuidToCollection)) - - // update count - previousTotalCollections = len(results.UuidToCollection) // Write the heap profile for examining memory usage - if heap_profile != nil { - err := pprof.WriteHeapProfile(heap_profile) - if err != nil { - log.Fatal(err) - } - } + WriteHeapProfile() // Get next batch of collections. var collections SdkCollectionList - log.Printf("Running with SDK Params: %v", sdkParams) err := params.Client.List("collections", sdkParams, &collections) if err != nil { - log.Fatalf("error querying collections: %+v", err) + loggerutil.FatalWithMessage(params.Logger, + fmt.Sprintf("Error querying collections: %v", err)) } // Process collection and update our date filter. - sdkParams["filters"].([][]string)[0][2] = ProcessCollections(collections.Items, results.UuidToCollection) - log.Printf("Latest date seen %s", sdkParams["filters"].([][]string)[0][2]) + sdkParams["filters"].([][]string)[0][2] = + ProcessCollections(params.Logger, + collections.Items, + results.UuidToCollection).Format(time.RFC3339) + + // update counts + previousTotalCollections = totalCollections + totalCollections = len(results.UuidToCollection) + + log.Printf("%d collections read, %d new in last batch, "+ + "%s latest modified date, %.0f %d %d avg,max,total manifest size", + totalCollections, + totalCollections-previousTotalCollections, + sdkParams["filters"].([][]string)[0][2], + float32(totalManifestSize)/float32(totalCollections), + maxManifestSize, totalManifestSize) + + if params.Logger != nil { + params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) { + collectionInfo := p["collection_info"].(map[string]interface{}) + collectionInfo["collections_read"] = totalCollections + collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2] + collectionInfo["total_manifest_size"] = totalManifestSize + collectionInfo["max_manifest_size"] = maxManifestSize + }) + } } - log.Printf("previous, current: %d %d", previousTotalCollections, len(results.UuidToCollection)) + + // Just in case this lowers the numbers reported in the heap profile. + runtime.GC() // Write the heap profile for examining memory usage - if heap_profile != nil { - err := pprof.WriteHeapProfile(heap_profile) - if err != nil { - log.Fatal(err) - } - } + WriteHeapProfile() return } +// StrCopy returns a newly allocated string. +// It is useful to copy slices so that the garbage collector can reuse +// the memory of the longer strings they came from. +func StrCopy(s string) string { + return string([]byte(s)) +} -func ProcessCollections(receivedCollections []SdkCollectionInfo, - uuidToCollection map[string]Collection) (latestModificationDate string) { +func ProcessCollections(arvLogger *logger.Logger, + receivedCollections []SdkCollectionInfo, + uuidToCollection map[string]Collection) (latestModificationDate time.Time) { for _, sdkCollection := range receivedCollections { - collection := Collection{Uuid: sdkCollection.Uuid, - OwnerUuid: sdkCollection.OwnerUuid, - ReplicationLevel: sdkCollection.Redundancy, + collection := Collection{Uuid: StrCopy(sdkCollection.Uuid), + OwnerUuid: StrCopy(sdkCollection.OwnerUuid), + ReplicationLevel: sdkCollection.Redundancy, BlockDigestToSize: make(map[blockdigest.BlockDigest]int)} - // log.Printf("Seeing modification date, owner_uuid: %s %s", - // sdkCollection.ModifiedAt, - // sdkCollection.OwnerUuid) - if sdkCollection.ModifiedAt > latestModificationDate { + + if sdkCollection.ModifiedAt.IsZero() { + loggerutil.FatalWithMessage(arvLogger, + fmt.Sprintf( + "Arvados SDK collection returned with unexpected zero "+ + "modifcation date. This probably means that either we failed to "+ + "parse the modification date or the API server has changed how "+ + "it returns modification dates: %v", + collection)) + } + + if sdkCollection.ModifiedAt.After(latestModificationDate) { latestModificationDate = sdkCollection.ModifiedAt } manifest := manifest.Manifest{sdkCollection.ManifestText} + manifestSize := uint64(len(sdkCollection.ManifestText)) + + if _, alreadySeen := uuidToCollection[collection.Uuid]; !alreadySeen { + totalManifestSize += manifestSize + } + if manifestSize > maxManifestSize { + maxManifestSize = manifestSize + } + blockChannel := manifest.BlockIterWithDuplicates() for block := range blockChannel { - if stored_size, stored := collection.BlockDigestToSize[block.Digest]; - stored && stored_size != block.Size { - log.Fatalf( + if stored_size, stored := collection.BlockDigestToSize[block.Digest]; stored && stored_size != block.Size { + message := fmt.Sprintf( "Collection %s contains multiple sizes (%d and %d) for block %s", collection.Uuid, stored_size, block.Size, block.Digest) + loggerutil.FatalWithMessage(arvLogger, message) } collection.BlockDigestToSize[block.Digest] = block.Size } @@ -206,14 +282,12 @@ func ProcessCollections(receivedCollections []SdkCollectionInfo, return } - -func NumberCollectionsAvailable(client arvadosclient.ArvadosClient) (int) { - var collections SdkCollectionList - sdkParams := arvadosclient.Dict{"limit": 0} - err := client.List("collections", sdkParams, &collections) - if err != nil { - log.Fatalf("error querying collections for items available: %v", err) +func ComputeSizeOfOwnedCollections(readCollections *ReadCollections) { + readCollections.OwnerToCollectionSize = make(map[string]int) + for _, coll := range readCollections.UuidToCollection { + readCollections.OwnerToCollectionSize[coll.OwnerUuid] = + readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize } - return collections.ItemsAvailable + return }