1 /* Keep Datamanager. Responsible for checking on and reporting on Keep Storage */
8 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9 "git.curoverse.com/arvados.git/sdk/go/keepclient"
10 "git.curoverse.com/arvados.git/sdk/go/logger"
11 "git.curoverse.com/arvados.git/sdk/go/util"
12 "git.curoverse.com/arvados.git/services/datamanager/collection"
13 "git.curoverse.com/arvados.git/services/datamanager/keep"
14 "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
15 "git.curoverse.com/arvados.git/services/datamanager/summary"
21 logEventTypePrefix string
22 logFrequencySeconds int
23 minutesBetweenRuns int
27 flag.StringVar(&logEventTypePrefix,
28 "log-event-type-prefix",
29 "experimental-data-manager",
30 "Prefix to use in the event_type of our arvados log entries. Set to empty to turn off logging")
31 flag.IntVar(&logFrequencySeconds,
32 "log-frequency-seconds",
34 "How frequently we'll write log entries in seconds.")
35 flag.IntVar(&minutesBetweenRuns,
36 "minutes-between-runs",
38 "How many minutes we wait betwen data manager runs. 0 means run once and exit.")
43 if minutesBetweenRuns == 0 {
46 log.Fatalf("Got an error: %v", err)
49 waitTime := time.Minute * time.Duration(minutesBetweenRuns)
51 log.Println("Beginning Run")
54 log.Printf("Got an error: %v", err)
56 log.Printf("Sleeping for %d minutes", minutesBetweenRuns)
62 func singlerun() error {
63 arv, err := arvadosclient.MakeArvadosClient()
65 log.Fatalf("Error setting up arvados client %s", err.Error())
68 if is_admin, err := util.UserIsAdmin(arv); err != nil {
69 log.Fatalf("Error querying current arvados user %s", err.Error())
71 log.Fatalf("Current user is not an admin. Datamanager can only be run by admins.")
74 var arvLogger *logger.Logger
75 if logEventTypePrefix != "" {
76 arvLogger = logger.NewLogger(logger.LoggerParams{
78 EventTypePrefix: logEventTypePrefix,
79 WriteInterval: time.Second * time.Duration(logFrequencySeconds)})
82 loggerutil.LogRunInfo(arvLogger)
84 arvLogger.AddWriteHook(loggerutil.LogMemoryAlloc)
87 // Verify that datamanager token belongs to an admin user
88 dataManagerToken := keep.GetDataManagerToken(arvLogger)
89 origArvToken := arv.ApiToken
90 arv.ApiToken = dataManagerToken
91 if is_admin, err := util.UserIsAdmin(arv); err != nil {
92 log.Fatalf("Error querying arvados user for data manager token %s", err.Error())
94 log.Fatalf("Datamanager token does not belong to an admin user.")
96 arv.ApiToken = origArvToken
99 dataFetcher summary.DataFetcher
100 readCollections collection.ReadCollections
101 keepServerInfo keep.ReadServers
104 if summary.ShouldReadData() {
105 dataFetcher = summary.ReadData
107 dataFetcher = BuildDataFetcher(arv)
110 dataFetcher(arvLogger, &readCollections, &keepServerInfo)
112 summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo)
114 buckets := summary.BucketReplication(readCollections, keepServerInfo)
115 bucketCounts := buckets.Counts()
117 replicationSummary := buckets.SummarizeBuckets(readCollections)
118 replicationCounts := replicationSummary.ComputeCounts()
120 log.Printf("Blocks In Collections: %d, "+
121 "\nBlocks In Keep: %d.",
122 len(readCollections.BlockToDesiredReplication),
123 len(keepServerInfo.BlockToServers))
124 log.Println(replicationCounts.PrettyPrint())
126 log.Printf("Blocks Histogram:")
127 for _, rlbss := range bucketCounts {
128 log.Printf("%+v: %10d",
133 kc, err := keepclient.MakeKeepClient(&arv)
135 loggerutil.FatalWithMessage(arvLogger,
136 fmt.Sprintf("Error setting up keep client %s", err.Error()))
139 // Log that we're finished. We force the recording, since go will
140 // not wait for the write timer before exiting.
141 if arvLogger != nil {
142 defer arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
143 summaryInfo := logger.GetOrCreateMap(p, "summary_info")
144 summaryInfo["block_replication_counts"] = bucketCounts
145 summaryInfo["replication_summary"] = replicationCounts
146 p["summary_info"] = summaryInfo
148 p["run_info"].(map[string]interface{})["finished_at"] = time.Now()
152 pullServers := summary.ComputePullServers(kc,
154 readCollections.BlockToDesiredReplication,
155 replicationSummary.UnderReplicatedBlocks)
157 pullLists := summary.BuildPullLists(pullServers)
159 trashLists, trashErr := summary.BuildTrashLists(kc,
161 replicationSummary.KeepBlocksNotInCollections)
163 summary.WritePullLists(arvLogger, pullLists)
168 keep.SendTrashLists(dataManagerToken, kc, trashLists)
174 // Returns a data fetcher that fetches data from remote servers.
175 func BuildDataFetcher(arv arvadosclient.ArvadosClient) summary.DataFetcher {
176 return func(arvLogger *logger.Logger,
177 readCollections *collection.ReadCollections,
178 keepServerInfo *keep.ReadServers) {
179 collectionChannel := make(chan collection.ReadCollections)
182 collectionChannel <- collection.GetCollectionsAndSummarize(
183 collection.GetCollectionsParams{
189 *keepServerInfo = keep.GetKeepServersAndSummarize(
190 keep.GetKeepServersParams{
195 *readCollections = <-collectionChannel