1 /* Keep Datamanager. Responsible for checking on and reporting on Keep Storage */
9 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
10 "git.curoverse.com/arvados.git/sdk/go/keepclient"
11 "git.curoverse.com/arvados.git/sdk/go/logger"
12 "git.curoverse.com/arvados.git/sdk/go/util"
13 "git.curoverse.com/arvados.git/services/datamanager/collection"
14 "git.curoverse.com/arvados.git/services/datamanager/keep"
15 "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
16 "git.curoverse.com/arvados.git/services/datamanager/summary"
22 logEventTypePrefix string
23 logFrequencySeconds int
24 minutesBetweenRuns int
29 flag.StringVar(&logEventTypePrefix,
30 "log-event-type-prefix",
31 "experimental-data-manager",
32 "Prefix to use in the event_type of our arvados log entries. Set to empty to turn off logging")
33 flag.IntVar(&logFrequencySeconds,
34 "log-frequency-seconds",
36 "How frequently we'll write log entries in seconds.")
37 flag.IntVar(&minutesBetweenRuns,
38 "minutes-between-runs",
40 "How many minutes we wait between data manager runs. 0 means run once and exit.")
44 "Perform a dry run. Log how many blocks would be deleted/moved, but do not issue any changes to keepstore.")
50 if minutesBetweenRuns == 0 {
51 arv, err := arvadosclient.MakeArvadosClient()
53 loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("Error making arvados client: %v", err))
57 loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("singlerun: %v", err))
60 waitTime := time.Minute * time.Duration(minutesBetweenRuns)
62 log.Println("Beginning Run")
63 arv, err := arvadosclient.MakeArvadosClient()
65 loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("Error making arvados client: %v", err))
69 log.Printf("singlerun: %v", err)
71 log.Printf("Sleeping for %d minutes", minutesBetweenRuns)
77 var arvLogger *logger.Logger
79 func singlerun(arv arvadosclient.ArvadosClient) error {
81 if isAdmin, err := util.UserIsAdmin(arv); err != nil {
82 return errors.New("Error verifying admin token: " + err.Error())
84 return errors.New("Current user is not an admin. Datamanager requires a privileged token.")
87 if logEventTypePrefix != "" {
88 arvLogger, err = logger.NewLogger(logger.LoggerParams{
90 EventTypePrefix: logEventTypePrefix,
91 WriteInterval: time.Second * time.Duration(logFrequencySeconds)})
94 loggerutil.LogRunInfo(arvLogger)
96 arvLogger.AddWriteHook(loggerutil.LogMemoryAlloc)
100 dataFetcher summary.DataFetcher
101 readCollections collection.ReadCollections
102 keepServerInfo keep.ReadServers
105 if summary.ShouldReadData() {
106 dataFetcher = summary.ReadData
108 dataFetcher = BuildDataFetcher(arv)
111 err = dataFetcher(arvLogger, &readCollections, &keepServerInfo)
116 err = summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo)
121 buckets := summary.BucketReplication(readCollections, keepServerInfo)
122 bucketCounts := buckets.Counts()
124 replicationSummary := buckets.SummarizeBuckets(readCollections)
125 replicationCounts := replicationSummary.ComputeCounts()
127 log.Printf("Blocks In Collections: %d, "+
128 "\nBlocks In Keep: %d.",
129 len(readCollections.BlockToDesiredReplication),
130 len(keepServerInfo.BlockToServers))
131 log.Println(replicationCounts.PrettyPrint())
133 log.Printf("Blocks Histogram:")
134 for _, rlbss := range bucketCounts {
135 log.Printf("%+v: %10d",
140 kc, err := keepclient.MakeKeepClient(&arv)
142 return fmt.Errorf("Error setting up keep client %v", err.Error())
145 // Log that we're finished. We force the recording, since go will
146 // not wait for the write timer before exiting.
147 if arvLogger != nil {
148 defer arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
149 summaryInfo := logger.GetOrCreateMap(p, "summary_info")
150 summaryInfo["block_replication_counts"] = bucketCounts
151 summaryInfo["replication_summary"] = replicationCounts
152 p["summary_info"] = summaryInfo
154 p["run_info"].(map[string]interface{})["finished_at"] = time.Now()
158 pullServers := summary.ComputePullServers(kc,
160 readCollections.BlockToDesiredReplication,
161 replicationSummary.UnderReplicatedBlocks)
163 pullLists := summary.BuildPullLists(pullServers)
165 trashLists, trashErr := summary.BuildTrashLists(kc,
167 replicationSummary.KeepBlocksNotInCollections)
169 err = summary.WritePullLists(arvLogger, pullLists, dryRun)
177 keep.SendTrashLists(arvLogger, kc, trashLists, dryRun)
182 // BuildDataFetcher returns a data fetcher that fetches data from remote servers.
183 func BuildDataFetcher(arv arvadosclient.ArvadosClient) summary.DataFetcher {
185 arvLogger *logger.Logger,
186 readCollections *collection.ReadCollections,
187 keepServerInfo *keep.ReadServers,
189 collDone := make(chan struct{})
192 *readCollections, collErr = collection.GetCollectionsAndSummarize(
193 collection.GetCollectionsParams{
197 collDone <- struct{}{}
201 *keepServerInfo, keepErr = keep.GetKeepServersAndSummarize(
202 keep.GetKeepServersParams{
209 // Return a nil error only if both parts succeeded.