X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/6100a423c0a75736238747224d72afb112793fdb..5e27876fa4d3faf3b973282bfb4f152c02345bdc:/services/datamanager/datamanager.go diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go index 5b45153007..91612668b5 100644 --- a/services/datamanager/datamanager.go +++ b/services/datamanager/datamanager.go @@ -4,19 +4,23 @@ package main import ( "flag" + "fmt" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/keepclient" "git.curoverse.com/arvados.git/sdk/go/logger" "git.curoverse.com/arvados.git/sdk/go/util" "git.curoverse.com/arvados.git/services/datamanager/collection" "git.curoverse.com/arvados.git/services/datamanager/keep" "git.curoverse.com/arvados.git/services/datamanager/loggerutil" + "git.curoverse.com/arvados.git/services/datamanager/summary" "log" "time" ) var ( - logEventTypePrefix string + logEventTypePrefix string logFrequencySeconds int + minutesBetweenRuns int ) func init() { @@ -28,11 +32,28 @@ func init() { "log-frequency-seconds", 20, "How frequently we'll write log entries in seconds.") + flag.IntVar(&minutesBetweenRuns, + "minutes-between-runs", + 0, + "How many minutes we wait betwen data manager runs. 0 means run once and exit.") } func main() { flag.Parse() + if minutesBetweenRuns == 0 { + singlerun() + } else { + waitTime := time.Minute * time.Duration(minutesBetweenRuns) + for { + log.Println("Beginning Run") + singlerun() + log.Printf("Sleeping for %d minutes", minutesBetweenRuns) + time.Sleep(waitTime) + } + } +} +func singlerun() { arv, err := arvadosclient.MakeArvadosClient() if err != nil { log.Fatalf("Error setting up arvados client %s", err.Error()) @@ -46,9 +67,10 @@ func main() { var arvLogger *logger.Logger if logEventTypePrefix != "" { - arvLogger = logger.NewLogger(logger.LoggerParams{Client: arv, - EventTypePrefix: logEventTypePrefix, - WriteInterval: time.Second * time.Duration(logFrequencySeconds)}) + arvLogger = logger.NewLogger(logger.LoggerParams{ + Client: arv, + EventTypePrefix: logEventTypePrefix, + WriteInterval: time.Second * time.Duration(logFrequencySeconds)}) } loggerutil.LogRunInfo(arvLogger) @@ -56,28 +78,91 @@ func main() { arvLogger.AddWriteHook(loggerutil.LogMemoryAlloc) } - collectionChannel := make(chan collection.ReadCollections) + var ( + dataFetcher summary.DataFetcher + readCollections collection.ReadCollections + keepServerInfo keep.ReadServers + ) + + if summary.ShouldReadData() { + dataFetcher = summary.ReadData + } else { + dataFetcher = BuildDataFetcher(arv) + } + + dataFetcher(arvLogger, &readCollections, &keepServerInfo) + + summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo) + + buckets := summary.BucketReplication(readCollections, keepServerInfo) + bucketCounts := buckets.Counts() + + replicationSummary := buckets.SummarizeBuckets(readCollections) + replicationCounts := replicationSummary.ComputeCounts() + + log.Printf("Blocks In Collections: %d, "+ + "\nBlocks In Keep: %d.", + len(readCollections.BlockToDesiredReplication), + len(keepServerInfo.BlockToServers)) + log.Println(replicationCounts.PrettyPrint()) + + log.Printf("Blocks Histogram:") + for _, rlbss := range bucketCounts { + log.Printf("%+v: %10d", + rlbss.Levels, + rlbss.Count) + } - go func() { - collectionChannel <- collection.GetCollectionsAndSummarize( - collection.GetCollectionsParams{ - Client: arv, Logger: arvLogger, BatchSize: 50}) - }() + kc, err := keepclient.MakeKeepClient(&arv) + if err != nil { + loggerutil.FatalWithMessage(arvLogger, + fmt.Sprintf("Error setting up keep client %s", err.Error())) + } - keepServerInfo := keep.GetKeepServersAndSummarize( - keep.GetKeepServersParams{Client: arv, Logger: arvLogger, Limit: 1000}) + pullServers := summary.ComputePullServers(kc, + &keepServerInfo, + readCollections.BlockToDesiredReplication, + replicationSummary.UnderReplicatedBlocks) - readCollections := <-collectionChannel + pullLists := summary.BuildPullLists(pullServers) - // TODO(misha): Use these together to verify replication. - _ = readCollections - _ = keepServerInfo + summary.WritePullLists(arvLogger, pullLists) // Log that we're finished. We force the recording, since go will - // not wait for the timer before exiting. + // not wait for the write timer before exiting. if arvLogger != nil { arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) { + summaryInfo := logger.GetOrCreateMap(p, "summary_info") + summaryInfo["block_replication_counts"] = bucketCounts + summaryInfo["replication_summary"] = replicationCounts + p["summary_info"] = summaryInfo + p["run_info"].(map[string]interface{})["finished_at"] = time.Now() }) } } + +// Returns a data fetcher that fetches data from remote servers. +func BuildDataFetcher(arv arvadosclient.ArvadosClient) summary.DataFetcher { + return func(arvLogger *logger.Logger, + readCollections *collection.ReadCollections, + keepServerInfo *keep.ReadServers) { + collectionChannel := make(chan collection.ReadCollections) + + go func() { + collectionChannel <- collection.GetCollectionsAndSummarize( + collection.GetCollectionsParams{ + Client: arv, + Logger: arvLogger, + BatchSize: 50}) + }() + + *keepServerInfo = keep.GetKeepServersAndSummarize( + keep.GetKeepServersParams{ + Client: arv, + Logger: arvLogger, + Limit: 1000}) + + *readCollections = <-collectionChannel + } +}