X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/970095751e2e836ed296152ae3e9ccb6caa62f62..5a662d84f00c0c2693c18d333bab9d0fdda7e28e:/services/datamanager/datamanager.go diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go index bf989026c4..d3efe62173 100644 --- a/services/datamanager/datamanager.go +++ b/services/datamanager/datamanager.go @@ -9,31 +9,49 @@ import ( "git.curoverse.com/arvados.git/sdk/go/util" "git.curoverse.com/arvados.git/services/datamanager/collection" "git.curoverse.com/arvados.git/services/datamanager/keep" + "git.curoverse.com/arvados.git/services/datamanager/loggerutil" + "git.curoverse.com/arvados.git/services/datamanager/summary" "log" - "os" - "runtime" "time" ) var ( - logEventType string + logEventTypePrefix string logFrequencySeconds int + minutesBetweenRuns int ) func init() { - flag.StringVar(&logEventType, - "log-event-type", - "experimental-data-manager-report", - "event_type to use in our arvados log entries. Set to empty to turn off logging") - flag.IntVar(&logFrequencySeconds, + flag.StringVar(&logEventTypePrefix, + "log-event-type-prefix", + "experimental-data-manager", + "Prefix to use in the event_type of our arvados log entries. Set to empty to turn off logging") + flag.IntVar(&logFrequencySeconds, "log-frequency-seconds", 20, "How frequently we'll write log entries in seconds.") + flag.IntVar(&minutesBetweenRuns, + "minutes-between-runs", + 0, + "How many minutes we wait betwen data manager runs. 0 means run once and exit.") } func main() { flag.Parse() + if minutesBetweenRuns == 0 { + singlerun() + } else { + waitTime := time.Minute * time.Duration(minutesBetweenRuns) + for { + log.Println("Beginning Run") + singlerun() + log.Printf("Sleeping for %d minutes", minutesBetweenRuns) + time.Sleep(waitTime) + } + } +} +func singlerun() { arv, err := arvadosclient.MakeArvadosClient() if err != nil { log.Fatalf("Error setting up arvados client %s", err.Error()) @@ -46,84 +64,74 @@ func main() { } var arvLogger *logger.Logger - if logEventType != "" { - arvLogger = logger.NewLogger(logger.LoggerParams{Client: arv, - EventType: logEventType, - MinimumWriteInterval: time.Second * time.Duration(logFrequencySeconds)}) + if logEventTypePrefix != "" { + arvLogger = logger.NewLogger(logger.LoggerParams{ + Client: arv, + EventTypePrefix: logEventTypePrefix, + WriteInterval: time.Second * time.Duration(logFrequencySeconds)}) } + loggerutil.LogRunInfo(arvLogger) if arvLogger != nil { - properties, _ := arvLogger.Edit() - runInfo := make(map[string]interface{}) - runInfo["start_time"] = time.Now() - runInfo["args"] = os.Args - hostname, err := os.Hostname() - if err != nil { - runInfo["hostname_error"] = err.Error() - } else { - runInfo["hostname"] = hostname - } - runInfo["pid"] = os.Getpid() - properties["run_info"] = runInfo - - arvLogger.AddEditHook(LogMemoryAlloc) - - arvLogger.Record() + arvLogger.AddWriteHook(loggerutil.LogMemoryAlloc) } - // TODO(misha): Read Collections and Keep Contents concurrently as goroutines. - // This requires waiting on them to finish before you let main() exit. - - RunCollections(collection.GetCollectionsParams{ - Client: arv, Logger: arvLogger, BatchSize: 100}) - - RunKeep(keep.GetKeepServersParams{Client: arv, Limit: 1000}) -} - -func RunCollections(params collection.GetCollectionsParams) { - readCollections := collection.GetCollections(params) + var ( + readCollections collection.ReadCollections + keepServerInfo keep.ReadServers + ) + + if !summary.MaybeReadData(arvLogger, &readCollections, &keepServerInfo) { + collectionChannel := make(chan collection.ReadCollections) + + go func() { + collectionChannel <- collection.GetCollectionsAndSummarize( + collection.GetCollectionsParams{ + Client: arv, + Logger: arvLogger, + BatchSize: 50}) + }() + + keepServerInfo = keep.GetKeepServersAndSummarize( + keep.GetKeepServersParams{ + Client: arv, + Logger: arvLogger, + Limit: 1000}) + + readCollections = <-collectionChannel + } - UserUsage := ComputeSizeOfOwnedCollections(readCollections) - log.Printf("Uuid to Size used: %v", UserUsage) + summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo) - // TODO(misha): Add a "readonly" flag. If we're in readonly mode, - // lots of behaviors can become warnings (and obviously we can't - // write anything). - // if !readCollections.ReadAllCollections { - // log.Fatalf("Did not read all collections") - // } + buckets := summary.BucketReplication(readCollections, keepServerInfo) + bucketCounts := buckets.Counts() - log.Printf("Read and processed %d collections", - len(readCollections.UuidToCollection)) -} + replicationSummary := buckets.SummarizeBuckets(readCollections) + replicationCounts := replicationSummary.ComputeCounts() -func RunKeep(params keep.GetKeepServersParams) { - readServers := keep.GetKeepServers(params) + log.Printf("Blocks In Collections: %d, "+ + "\nBlocks In Keep: %d.", + len(readCollections.BlockToReplication), + len(keepServerInfo.BlockToServers)) + log.Println(replicationCounts.PrettyPrint()) - log.Printf("Returned %d keep disks", len(readServers.ServerToContents)) - - blockReplicationCounts := make(map[int]int) - for _, infos := range readServers.BlockToServers { - replication := len(infos) - blockReplicationCounts[replication] += 1 + log.Printf("Blocks Histogram:") + for _, rlbss := range bucketCounts { + log.Printf("%+v: %10d", + rlbss.Levels, + rlbss.Count) } - log.Printf("Replication level distribution: %v", blockReplicationCounts) -} - -func ComputeSizeOfOwnedCollections(readCollections collection.ReadCollections) ( - results map[string]int) { - results = make(map[string]int) - for _, coll := range readCollections.UuidToCollection { - results[coll.OwnerUuid] = results[coll.OwnerUuid] + coll.TotalSize + // Log that we're finished. We force the recording, since go will + // not wait for the write timer before exiting. + if arvLogger != nil { + arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) { + summaryInfo := logger.GetOrCreateMap(p, "summary_info") + summaryInfo["block_replication_counts"] = bucketCounts + summaryInfo["replication_summary"] = replicationCounts + p["summary_info"] = summaryInfo + + p["run_info"].(map[string]interface{})["finished_at"] = time.Now() + }) } - return -} - -func LogMemoryAlloc(properties map[string]interface{}, entry map[string]interface{}) { - _ = entry // keep the compiler from complaining - runInfo := properties["run_info"].(map[string]interface{}) - var memStats runtime.MemStats - runtime.ReadMemStats(&memStats) - runInfo["alloc_bytes_in_use"] = memStats.Alloc }