X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/1a0303d4cb6e1870e958f0121c26a79e0116af64..88c382d13b3d6e6f3b03ba0d5139ad9552c3c359:/services/datamanager/datamanager.go diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go index 168dbcf126..4a3b5627dd 100644 --- a/services/datamanager/datamanager.go +++ b/services/datamanager/datamanager.go @@ -3,44 +3,218 @@ package main import ( - //"git.curoverse.com/arvados.git/sdk/go/keepclient" + "errors" + "flag" + "fmt" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/keepclient" + "git.curoverse.com/arvados.git/sdk/go/logger" "git.curoverse.com/arvados.git/sdk/go/util" "git.curoverse.com/arvados.git/services/datamanager/collection" "git.curoverse.com/arvados.git/services/datamanager/keep" + "git.curoverse.com/arvados.git/services/datamanager/loggerutil" + "git.curoverse.com/arvados.git/services/datamanager/summary" "log" + "time" ) +var ( + logEventTypePrefix string + logFrequencySeconds int + minutesBetweenRuns int + collectionBatchSize int + dryRun bool +) + +func init() { + flag.StringVar(&logEventTypePrefix, + "log-event-type-prefix", + "experimental-data-manager", + "Prefix to use in the event_type of our arvados log entries. Set to empty to turn off logging") + flag.IntVar(&logFrequencySeconds, + "log-frequency-seconds", + 20, + "How frequently we'll write log entries in seconds.") + flag.IntVar(&minutesBetweenRuns, + "minutes-between-runs", + 0, + "How many minutes we wait between data manager runs. 0 means run once and exit.") + flag.IntVar(&collectionBatchSize, + "collection-batch-size", + 1000, + "How many collections to request in each batch.") + flag.BoolVar(&dryRun, + "dry-run", + false, + "Perform a dry run. Log how many blocks would be deleted/moved, but do not issue any changes to keepstore.") +} + func main() { - arv, err := arvadosclient.MakeArvadosClient() + flag.Parse() + + if minutesBetweenRuns == 0 { + arv, err := arvadosclient.MakeArvadosClient() + if err != nil { + loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("Error making arvados client: %v", err)) + } + err = singlerun(arv) + if err != nil { + loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("singlerun: %v", err)) + } + } else { + waitTime := time.Minute * time.Duration(minutesBetweenRuns) + for { + log.Println("Beginning Run") + arv, err := arvadosclient.MakeArvadosClient() + if err != nil { + loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("Error making arvados client: %v", err)) + } + err = singlerun(arv) + if err != nil { + log.Printf("singlerun: %v", err) + } + log.Printf("Sleeping for %d minutes", minutesBetweenRuns) + time.Sleep(waitTime) + } + } +} + +var arvLogger *logger.Logger + +func singlerun(arv arvadosclient.ArvadosClient) error { + var err error + if isAdmin, err := util.UserIsAdmin(arv); err != nil { + return errors.New("Error verifying admin token: " + err.Error()) + } else if !isAdmin { + return errors.New("Current user is not an admin. Datamanager requires a privileged token.") + } + + if logEventTypePrefix != "" { + arvLogger, err = logger.NewLogger(logger.LoggerParams{ + Client: arv, + EventTypePrefix: logEventTypePrefix, + WriteInterval: time.Second * time.Duration(logFrequencySeconds)}) + } + + loggerutil.LogRunInfo(arvLogger) + if arvLogger != nil { + arvLogger.AddWriteHook(loggerutil.LogMemoryAlloc) + } + + var ( + dataFetcher summary.DataFetcher + readCollections collection.ReadCollections + keepServerInfo keep.ReadServers + ) + + if summary.ShouldReadData() { + dataFetcher = summary.ReadData + } else { + dataFetcher = BuildDataFetcher(arv) + } + + err = dataFetcher(arvLogger, &readCollections, &keepServerInfo) if err != nil { - log.Fatalf("Error setting up arvados client %s", err.Error()) + return err } - if is_admin, err := util.UserIsAdmin(arv); err != nil { - log.Fatalf("Error querying current arvados user %s", err.Error()) - } else if !is_admin { - log.Fatalf("Current user is not an admin. Datamanager can only be run by admins.") + err = summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo) + if err != nil { + return err } - readCollections := collection.GetCollections( - collection.GetCollectionsParams{ - Client: arv, Limit: 50, LogEveryNthCollectionProcessed: 10}) + buckets := summary.BucketReplication(readCollections, keepServerInfo) + bucketCounts := buckets.Counts() - log.Printf("Read Collections: %v", readCollections) + replicationSummary := buckets.SummarizeBuckets(readCollections) + replicationCounts := replicationSummary.ComputeCounts() - // TODO(misha): Add a "readonly" flag. If we're in readonly mode, - // lots of behaviors can become warnings (and obviously we can't - // write anything). - // if !readCollections.ReadAllCollections { - // log.Fatalf("Did not read all collections") - // } + log.Printf("Blocks In Collections: %d, "+ + "\nBlocks In Keep: %d.", + len(readCollections.BlockToDesiredReplication), + len(keepServerInfo.BlockToServers)) + log.Println(replicationCounts.PrettyPrint()) - log.Printf("Read and processed %d collections", - len(readCollections.UuidToCollection)) + log.Printf("Blocks Histogram:") + for _, rlbss := range bucketCounts { + log.Printf("%+v: %10d", + rlbss.Levels, + rlbss.Count) + } - readServers := keep.GetKeepServers( - keep.GetKeepServersParams{Client: arv, Limit: 1000}) + kc, err := keepclient.MakeKeepClient(&arv) + if err != nil { + return fmt.Errorf("Error setting up keep client %v", err.Error()) + } + + // Log that we're finished. We force the recording, since go will + // not wait for the write timer before exiting. + if arvLogger != nil { + defer arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) { + summaryInfo := logger.GetOrCreateMap(p, "summary_info") + summaryInfo["block_replication_counts"] = bucketCounts + summaryInfo["replication_summary"] = replicationCounts + p["summary_info"] = summaryInfo + + p["run_info"].(map[string]interface{})["finished_at"] = time.Now() + }) + } - log.Printf("Returned %d keep disks", len(readServers.AddressToContents)) + pullServers := summary.ComputePullServers(kc, + &keepServerInfo, + readCollections.BlockToDesiredReplication, + replicationSummary.UnderReplicatedBlocks) + + pullLists := summary.BuildPullLists(pullServers) + + trashLists, trashErr := summary.BuildTrashLists(kc, + &keepServerInfo, + replicationSummary.KeepBlocksNotInCollections) + + err = summary.WritePullLists(arvLogger, pullLists, dryRun) + if err != nil { + return err + } + + if trashErr != nil { + return err + } + keep.SendTrashLists(arvLogger, kc, trashLists, dryRun) + + return nil +} + +// BuildDataFetcher returns a data fetcher that fetches data from remote servers. +func BuildDataFetcher(arv arvadosclient.ArvadosClient) summary.DataFetcher { + return func( + arvLogger *logger.Logger, + readCollections *collection.ReadCollections, + keepServerInfo *keep.ReadServers, + ) error { + collDone := make(chan struct{}) + var collErr error + go func() { + *readCollections, collErr = collection.GetCollectionsAndSummarize( + collection.GetCollectionsParams{ + Client: arv, + Logger: arvLogger, + BatchSize: collectionBatchSize}) + collDone <- struct{}{} + }() + + var keepErr error + *keepServerInfo, keepErr = keep.GetKeepServersAndSummarize( + keep.GetKeepServersParams{ + Client: arv, + Logger: arvLogger, + Limit: 1000}) + + <- collDone + + // Return a nil error only if both parts succeeded. + if collErr != nil { + return collErr + } + return keepErr + } }