1 /* Keep Datamanager. Responsible for checking on and reporting on Keep Storage */
7 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8 "git.curoverse.com/arvados.git/sdk/go/logger"
9 "git.curoverse.com/arvados.git/sdk/go/util"
10 "git.curoverse.com/arvados.git/services/datamanager/collection"
11 "git.curoverse.com/arvados.git/services/datamanager/keep"
12 "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
13 "git.curoverse.com/arvados.git/services/datamanager/summary"
19 logEventTypePrefix string
20 logFrequencySeconds int
21 minutesBetweenRuns int
25 flag.StringVar(&logEventTypePrefix,
26 "log-event-type-prefix",
27 "experimental-data-manager",
28 "Prefix to use in the event_type of our arvados log entries. Set to empty to turn off logging")
29 flag.IntVar(&logFrequencySeconds,
30 "log-frequency-seconds",
32 "How frequently we'll write log entries in seconds.")
33 flag.IntVar(&minutesBetweenRuns,
34 "minutes-between-runs",
36 "How many minutes we wait betwen data manager runs. 0 means run once and exit.")
41 if minutesBetweenRuns == 0 {
44 waitTime := time.Minute * time.Duration(minutesBetweenRuns)
46 log.Println("Beginning Run")
48 log.Printf("Sleeping for %d minutes", minutesBetweenRuns)
55 arv, err := arvadosclient.MakeArvadosClient()
57 log.Fatalf("Error setting up arvados client %s", err.Error())
60 if is_admin, err := util.UserIsAdmin(arv); err != nil {
61 log.Fatalf("Error querying current arvados user %s", err.Error())
63 log.Fatalf("Current user is not an admin. Datamanager can only be run by admins.")
66 var arvLogger *logger.Logger
67 if logEventTypePrefix != "" {
68 arvLogger = logger.NewLogger(logger.LoggerParams{
70 EventTypePrefix: logEventTypePrefix,
71 WriteInterval: time.Second * time.Duration(logFrequencySeconds)})
74 loggerutil.LogRunInfo(arvLogger)
76 arvLogger.AddWriteHook(loggerutil.LogMemoryAlloc)
80 readCollections collection.ReadCollections
81 keepServerInfo keep.ReadServers
84 if !summary.MaybeReadData(arvLogger, &readCollections, &keepServerInfo) {
85 collectionChannel := make(chan collection.ReadCollections)
88 collectionChannel <- collection.GetCollectionsAndSummarize(
89 collection.GetCollectionsParams{
95 keepServerInfo = keep.GetKeepServersAndSummarize(
96 keep.GetKeepServersParams{
101 readCollections = <-collectionChannel
104 summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo)
106 buckets := summary.BucketReplication(readCollections, keepServerInfo)
107 bucketCounts := buckets.Counts()
109 replicationSummary := buckets.SummarizeBuckets(readCollections)
110 replicationCounts := replicationSummary.ComputeCounts()
112 log.Printf("Blocks In Collections: %d, "+
113 "\nBlocks In Keep: %d.",
114 len(readCollections.BlockToReplication),
115 len(keepServerInfo.BlockToServers))
116 log.Println(replicationCounts.PrettyPrint())
118 log.Printf("Blocks Histogram:")
119 for _, rlbss := range bucketCounts {
120 log.Printf("%+v: %10d",
125 // Log that we're finished. We force the recording, since go will
126 // not wait for the write timer before exiting.
127 if arvLogger != nil {
128 arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
129 summaryInfo := logger.GetOrCreateMap(p, "summary_info")
130 summaryInfo["block_replication_counts"] = bucketCounts
131 summaryInfo["replication_summary"] = replicationCounts
132 p["summary_info"] = summaryInfo
134 p["run_info"].(map[string]interface{})["finished_at"] = time.Now()