X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/83ea6e36b19db2c9a45be87c900efbbd9ea8bdb9..5a662d84f00c0c2693c18d333bab9d0fdda7e28e:/services/datamanager/datamanager.go diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go index a8e506eacb..d3efe62173 100644 --- a/services/datamanager/datamanager.go +++ b/services/datamanager/datamanager.go @@ -10,6 +10,7 @@ import ( "git.curoverse.com/arvados.git/services/datamanager/collection" "git.curoverse.com/arvados.git/services/datamanager/keep" "git.curoverse.com/arvados.git/services/datamanager/loggerutil" + "git.curoverse.com/arvados.git/services/datamanager/summary" "log" "time" ) @@ -64,7 +65,8 @@ func singlerun() { var arvLogger *logger.Logger if logEventTypePrefix != "" { - arvLogger = logger.NewLogger(logger.LoggerParams{Client: arv, + arvLogger = logger.NewLogger(logger.LoggerParams{ + Client: arv, EventTypePrefix: logEventTypePrefix, WriteInterval: time.Second * time.Duration(logFrequencySeconds)}) } @@ -74,27 +76,61 @@ func singlerun() { arvLogger.AddWriteHook(loggerutil.LogMemoryAlloc) } - collectionChannel := make(chan collection.ReadCollections) + var ( + readCollections collection.ReadCollections + keepServerInfo keep.ReadServers + ) - go func() { - collectionChannel <- collection.GetCollectionsAndSummarize( - collection.GetCollectionsParams{ - Client: arv, Logger: arvLogger, BatchSize: 50}) - }() + if !summary.MaybeReadData(arvLogger, &readCollections, &keepServerInfo) { + collectionChannel := make(chan collection.ReadCollections) - keepServerInfo := keep.GetKeepServersAndSummarize( - keep.GetKeepServersParams{Client: arv, Logger: arvLogger, Limit: 1000}) + go func() { + collectionChannel <- collection.GetCollectionsAndSummarize( + collection.GetCollectionsParams{ + Client: arv, + Logger: arvLogger, + BatchSize: 50}) + }() - readCollections := <-collectionChannel + keepServerInfo = keep.GetKeepServersAndSummarize( + keep.GetKeepServersParams{ + Client: arv, + Logger: arvLogger, + Limit: 1000}) - // TODO(misha): Use these together to verify replication. - _ = readCollections - _ = keepServerInfo + readCollections = <-collectionChannel + } + + summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo) + + buckets := summary.BucketReplication(readCollections, keepServerInfo) + bucketCounts := buckets.Counts() + + replicationSummary := buckets.SummarizeBuckets(readCollections) + replicationCounts := replicationSummary.ComputeCounts() + + log.Printf("Blocks In Collections: %d, "+ + "\nBlocks In Keep: %d.", + len(readCollections.BlockToReplication), + len(keepServerInfo.BlockToServers)) + log.Println(replicationCounts.PrettyPrint()) + + log.Printf("Blocks Histogram:") + for _, rlbss := range bucketCounts { + log.Printf("%+v: %10d", + rlbss.Levels, + rlbss.Count) + } // Log that we're finished. We force the recording, since go will - // not wait for the timer before exiting. + // not wait for the write timer before exiting. if arvLogger != nil { arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) { + summaryInfo := logger.GetOrCreateMap(p, "summary_info") + summaryInfo["block_replication_counts"] = bucketCounts + summaryInfo["replication_summary"] = replicationCounts + p["summary_info"] = summaryInfo + p["run_info"].(map[string]interface{})["finished_at"] = time.Now() }) }