From 4879e0e2f75fd387720b4f4b58ca6ae48a798c98 Mon Sep 17 00:00:00 2001 From: mishaz Date: Mon, 12 Jan 2015 23:20:50 +0000 Subject: [PATCH] Started reading collections and keep data in parallel. Moved some logic from datamanager.go to collections.go. Added logging to end of run. --- services/datamanager/collection/collection.go | 37 +++++++++++++++++ services/datamanager/datamanager.go | 41 ++++++------------- 2 files changed, 50 insertions(+), 28 deletions(-) diff --git a/services/datamanager/collection/collection.go b/services/datamanager/collection/collection.go index d8352dbc6e..f9e2a3dc82 100644 --- a/services/datamanager/collection/collection.go +++ b/services/datamanager/collection/collection.go @@ -34,6 +34,7 @@ type Collection struct { type ReadCollections struct { ReadAllCollections bool UuidToCollection map[string]Collection + OwnerToCollectionSize map[string]int } type GetCollectionsParams struct { @@ -95,6 +96,31 @@ func WriteHeapProfile() { } +func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) { + results = GetCollections(params) + ComputeSizeOfOwnedCollections(&results) + + if params.Logger != nil { + properties,_ := params.Logger.Edit() + collectionInfo := properties["collection_info"].(map[string]interface{}) + collectionInfo["owner_to_collection_size"] = results.OwnerToCollectionSize + params.Logger.Record() + } + + log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize) + log.Printf("Read and processed %d collections", + len(results.UuidToCollection)) + + // TODO(misha): Add a "readonly" flag. If we're in readonly mode, + // lots of behaviors can become warnings (and obviously we can't + // write anything). + // if !readCollections.ReadAllCollections { + // log.Fatalf("Did not read all collections") + // } + + return +} + func GetCollections(params GetCollectionsParams) (results ReadCollections) { if ¶ms.Client == nil { log.Fatalf("params.Client passed to GetCollections() should " + @@ -272,6 +298,17 @@ func NumberCollectionsAvailable(client arvadosclient.ArvadosClient) (int) { } +func ComputeSizeOfOwnedCollections(readCollections *ReadCollections) { + readCollections.OwnerToCollectionSize = make(map[string]int) + for _, coll := range readCollections.UuidToCollection { + readCollections.OwnerToCollectionSize[coll.OwnerUuid] = + readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize + } + + return +} + + // Assumes you haven't already called arvLogger.Edit()! // If you have called arvLogger.Edit() this method will hang waiting // for the lock you're already holding. diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go index 87a71a9a4b..6bd9ee8afa 100644 --- a/services/datamanager/datamanager.go +++ b/services/datamanager/datamanager.go @@ -71,30 +71,24 @@ func main() { arvLogger.Record() } - // TODO(misha): Read Collections and Keep Contents concurrently as goroutines. - // This requires waiting on them to finish before you let main() exit. + collectionChannel := make(chan collection.ReadCollections) - RunCollections(collection.GetCollectionsParams{ - Client: arv, Logger: arvLogger, BatchSize: 50}) + go func() { collectionChannel <- collection.GetCollectionsAndSummarize( + collection.GetCollectionsParams{ + Client: arv, Logger: arvLogger, BatchSize: 50}) }() RunKeep(keep.GetKeepServersParams{Client: arv, Limit: 1000}) -} - -func RunCollections(params collection.GetCollectionsParams) { - readCollections := collection.GetCollections(params) - UserUsage := ComputeSizeOfOwnedCollections(readCollections) - log.Printf("Uuid to Size used: %v", UserUsage) + readCollections := <-collectionChannel + _ = readCollections // Make compiler happy. - // TODO(misha): Add a "readonly" flag. If we're in readonly mode, - // lots of behaviors can become warnings (and obviously we can't - // write anything). - // if !readCollections.ReadAllCollections { - // log.Fatalf("Did not read all collections") - // } - - log.Printf("Read and processed %d collections", - len(readCollections.UuidToCollection)) + // Log that we're finished + if arvLogger != nil { + properties,_ := arvLogger.Edit() + properties["run_info"].(map[string]interface{})["end_time"] = time.Now() + // Force the recording, since go will not wait for the timer before exiting. + arvLogger.ForceRecord() + } } func RunKeep(params keep.GetKeepServersParams) { @@ -111,15 +105,6 @@ func RunKeep(params keep.GetKeepServersParams) { log.Printf("Replication level distribution: %v", blockReplicationCounts) } -func ComputeSizeOfOwnedCollections(readCollections collection.ReadCollections) ( - results map[string]int) { - results = make(map[string]int) - for _, coll := range readCollections.UuidToCollection { - results[coll.OwnerUuid] = results[coll.OwnerUuid] + coll.TotalSize - } - return -} - func LogMemoryAlloc(properties map[string]interface{}, entry map[string]interface{}) { _ = entry // keep the compiler from complaining runInfo := properties["run_info"].(map[string]interface{}) -- 2.30.2