X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/1a0303d4cb6e1870e958f0121c26a79e0116af64..9daf42fbdb868939653c6e3ca8a4fffd1cf94e31:/services/datamanager/datamanager.go?ds=sidebyside diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go index 168dbcf126..a8e506eacb 100644 --- a/services/datamanager/datamanager.go +++ b/services/datamanager/datamanager.go @@ -3,15 +3,54 @@ package main import ( - //"git.curoverse.com/arvados.git/sdk/go/keepclient" + "flag" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/logger" "git.curoverse.com/arvados.git/sdk/go/util" "git.curoverse.com/arvados.git/services/datamanager/collection" "git.curoverse.com/arvados.git/services/datamanager/keep" + "git.curoverse.com/arvados.git/services/datamanager/loggerutil" "log" + "time" ) +var ( + logEventTypePrefix string + logFrequencySeconds int + minutesBetweenRuns int +) + +func init() { + flag.StringVar(&logEventTypePrefix, + "log-event-type-prefix", + "experimental-data-manager", + "Prefix to use in the event_type of our arvados log entries. Set to empty to turn off logging") + flag.IntVar(&logFrequencySeconds, + "log-frequency-seconds", + 20, + "How frequently we'll write log entries in seconds.") + flag.IntVar(&minutesBetweenRuns, + "minutes-between-runs", + 0, + "How many minutes we wait betwen data manager runs. 0 means run once and exit.") +} + func main() { + flag.Parse() + if minutesBetweenRuns == 0 { + singlerun() + } else { + waitTime := time.Minute * time.Duration(minutesBetweenRuns) + for { + log.Println("Beginning Run") + singlerun() + log.Printf("Sleeping for %d minutes", minutesBetweenRuns) + time.Sleep(waitTime) + } + } +} + +func singlerun() { arv, err := arvadosclient.MakeArvadosClient() if err != nil { log.Fatalf("Error setting up arvados client %s", err.Error()) @@ -23,24 +62,40 @@ func main() { log.Fatalf("Current user is not an admin. Datamanager can only be run by admins.") } - readCollections := collection.GetCollections( - collection.GetCollectionsParams{ - Client: arv, Limit: 50, LogEveryNthCollectionProcessed: 10}) + var arvLogger *logger.Logger + if logEventTypePrefix != "" { + arvLogger = logger.NewLogger(logger.LoggerParams{Client: arv, + EventTypePrefix: logEventTypePrefix, + WriteInterval: time.Second * time.Duration(logFrequencySeconds)}) + } + + loggerutil.LogRunInfo(arvLogger) + if arvLogger != nil { + arvLogger.AddWriteHook(loggerutil.LogMemoryAlloc) + } + + collectionChannel := make(chan collection.ReadCollections) - log.Printf("Read Collections: %v", readCollections) + go func() { + collectionChannel <- collection.GetCollectionsAndSummarize( + collection.GetCollectionsParams{ + Client: arv, Logger: arvLogger, BatchSize: 50}) + }() - // TODO(misha): Add a "readonly" flag. If we're in readonly mode, - // lots of behaviors can become warnings (and obviously we can't - // write anything). - // if !readCollections.ReadAllCollections { - // log.Fatalf("Did not read all collections") - // } + keepServerInfo := keep.GetKeepServersAndSummarize( + keep.GetKeepServersParams{Client: arv, Logger: arvLogger, Limit: 1000}) - log.Printf("Read and processed %d collections", - len(readCollections.UuidToCollection)) + readCollections := <-collectionChannel - readServers := keep.GetKeepServers( - keep.GetKeepServersParams{Client: arv, Limit: 1000}) + // TODO(misha): Use these together to verify replication. + _ = readCollections + _ = keepServerInfo - log.Printf("Returned %d keep disks", len(readServers.AddressToContents)) + // Log that we're finished. We force the recording, since go will + // not wait for the timer before exiting. + if arvLogger != nil { + arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) { + p["run_info"].(map[string]interface{})["finished_at"] = time.Now() + }) + } }