Started reading collections and keep data in parallel. Moved some logic from datamana...
authormishaz <misha@curoverse.com>
Mon, 12 Jan 2015 23:20:50 +0000 (23:20 +0000)
committerTom Clegg <tom@curoverse.com>
Fri, 13 Feb 2015 21:25:30 +0000 (16:25 -0500)
services/datamanager/collection/collection.go
services/datamanager/datamanager.go

index d8352dbc6e52a630ab5951f5d17f640d38bcd6a0..f9e2a3dc82110a35c254e6ea6475bbebdd941dfa 100644 (file)
@@ -34,6 +34,7 @@ type Collection struct {
 type ReadCollections struct {
        ReadAllCollections bool
        UuidToCollection map[string]Collection
+       OwnerToCollectionSize map[string]int
 }
 
 type GetCollectionsParams struct {
@@ -95,6 +96,31 @@ func WriteHeapProfile() {
 }
 
 
+func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
+       results = GetCollections(params)
+       ComputeSizeOfOwnedCollections(&results)
+
+       if params.Logger != nil {
+               properties,_ := params.Logger.Edit()
+               collectionInfo := properties["collection_info"].(map[string]interface{})
+               collectionInfo["owner_to_collection_size"] = results.OwnerToCollectionSize
+               params.Logger.Record()
+       }
+
+       log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
+       log.Printf("Read and processed %d collections",
+               len(results.UuidToCollection))
+
+       // TODO(misha): Add a "readonly" flag. If we're in readonly mode,
+       // lots of behaviors can become warnings (and obviously we can't
+       // write anything).
+       // if !readCollections.ReadAllCollections {
+       //      log.Fatalf("Did not read all collections")
+       // }
+
+       return
+}
+
 func GetCollections(params GetCollectionsParams) (results ReadCollections) {
        if &params.Client == nil {
                log.Fatalf("params.Client passed to GetCollections() should " +
@@ -272,6 +298,17 @@ func NumberCollectionsAvailable(client arvadosclient.ArvadosClient) (int) {
 }
 
 
+func ComputeSizeOfOwnedCollections(readCollections *ReadCollections) {
+       readCollections.OwnerToCollectionSize = make(map[string]int)
+       for _, coll := range readCollections.UuidToCollection {
+               readCollections.OwnerToCollectionSize[coll.OwnerUuid] =
+                       readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize
+       }
+
+       return
+}
+
+
 // Assumes you haven't already called arvLogger.Edit()!
 // If you have called arvLogger.Edit() this method will hang waiting
 // for the lock you're already holding.
index 87a71a9a4b8162e2c101d8e3961b0024ad11bb4f..6bd9ee8afa8fc0de7df8bc17e93be3c3caacb2dd 100644 (file)
@@ -71,30 +71,24 @@ func main() {
                arvLogger.Record()
        }
 
-       // TODO(misha): Read Collections and Keep Contents concurrently as goroutines.
-       // This requires waiting on them to finish before you let main() exit.
+       collectionChannel := make(chan collection.ReadCollections)
 
-       RunCollections(collection.GetCollectionsParams{
-               Client: arv, Logger: arvLogger, BatchSize: 50})
+       go func() { collectionChannel <- collection.GetCollectionsAndSummarize(
+               collection.GetCollectionsParams{
+                       Client: arv, Logger: arvLogger, BatchSize: 50}) }()
 
        RunKeep(keep.GetKeepServersParams{Client: arv, Limit: 1000})
-}
-
-func RunCollections(params collection.GetCollectionsParams) {
-       readCollections := collection.GetCollections(params)
 
-       UserUsage := ComputeSizeOfOwnedCollections(readCollections)
-       log.Printf("Uuid to Size used: %v", UserUsage)
+       readCollections := <-collectionChannel
+       _ = readCollections  // Make compiler happy.
 
-       // TODO(misha): Add a "readonly" flag. If we're in readonly mode,
-       // lots of behaviors can become warnings (and obviously we can't
-       // write anything).
-       // if !readCollections.ReadAllCollections {
-       //      log.Fatalf("Did not read all collections")
-       // }
-
-       log.Printf("Read and processed %d collections",
-               len(readCollections.UuidToCollection))
+       // Log that we're finished
+       if arvLogger != nil {
+               properties,_ := arvLogger.Edit()
+               properties["run_info"].(map[string]interface{})["end_time"] = time.Now()
+               // Force the recording, since go will not wait for the timer before exiting.
+               arvLogger.ForceRecord()
+       }
 }
 
 func RunKeep(params keep.GetKeepServersParams) {
@@ -111,15 +105,6 @@ func RunKeep(params keep.GetKeepServersParams) {
        log.Printf("Replication level distribution: %v", blockReplicationCounts)
 }
 
-func ComputeSizeOfOwnedCollections(readCollections collection.ReadCollections) (
-       results map[string]int) {
-       results = make(map[string]int)
-       for _, coll := range readCollections.UuidToCollection {
-               results[coll.OwnerUuid] = results[coll.OwnerUuid] + coll.TotalSize
-       }
-       return
-}
-
 func LogMemoryAlloc(properties map[string]interface{}, entry map[string]interface{}) {
        _ = entry  // keep the compiler from complaining
        runInfo := properties["run_info"].(map[string]interface{})