1 /* Keep Datamanager. Responsible for checking on and reporting on Keep Storage */
9 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
10 "git.curoverse.com/arvados.git/sdk/go/keepclient"
11 "git.curoverse.com/arvados.git/sdk/go/logger"
12 "git.curoverse.com/arvados.git/sdk/go/util"
13 "git.curoverse.com/arvados.git/services/datamanager/collection"
14 "git.curoverse.com/arvados.git/services/datamanager/keep"
15 "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
16 "git.curoverse.com/arvados.git/services/datamanager/summary"
22 logEventTypePrefix string
23 logFrequencySeconds int
24 minutesBetweenRuns int
28 flag.StringVar(&logEventTypePrefix,
29 "log-event-type-prefix",
30 "experimental-data-manager",
31 "Prefix to use in the event_type of our arvados log entries. Set to empty to turn off logging")
32 flag.IntVar(&logFrequencySeconds,
33 "log-frequency-seconds",
35 "How frequently we'll write log entries in seconds.")
36 flag.IntVar(&minutesBetweenRuns,
37 "minutes-between-runs",
39 "How many minutes we wait betwen data manager runs. 0 means run once and exit.")
44 if minutesBetweenRuns == 0 {
45 arv, err := makeArvadosClient()
47 log.Fatalf("makeArvadosClient: %v", err)
51 log.Fatalf("singlerun: %v", err)
54 waitTime := time.Minute * time.Duration(minutesBetweenRuns)
56 log.Println("Beginning Run")
57 arv, err := makeArvadosClient()
59 log.Fatalf("makeArvadosClient: %v", err)
63 log.Printf("singlerun: %v", err)
65 log.Printf("Sleeping for %d minutes", minutesBetweenRuns)
71 func makeArvadosClient() (arvadosclient.ArvadosClient, error) {
72 return arvadosclient.MakeArvadosClient()
75 func singlerun(arv arvadosclient.ArvadosClient) error {
77 if isAdmin, err := util.UserIsAdmin(arv); err != nil {
78 return errors.New("Error verifying admin token: " + err.Error())
80 return errors.New("Current user is not an admin. Datamanager requires a privileged token.")
83 var arvLogger *logger.Logger
84 if logEventTypePrefix != "" {
85 arvLogger = logger.NewLogger(logger.LoggerParams{
87 EventTypePrefix: logEventTypePrefix,
88 WriteInterval: time.Second * time.Duration(logFrequencySeconds)})
91 loggerutil.LogRunInfo(arvLogger)
93 arvLogger.AddWriteHook(loggerutil.LogMemoryAlloc)
97 dataFetcher summary.DataFetcher
98 readCollections collection.ReadCollections
99 keepServerInfo keep.ReadServers
102 if summary.ShouldReadData() {
103 dataFetcher = summary.ReadData
105 dataFetcher = BuildDataFetcher(arv)
108 dataFetcher(arvLogger, &readCollections, &keepServerInfo)
110 if len(readCollections.UUIDToCollection) == 0 {
111 return nil // no collections read so no more work to do?
114 _, err = summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo)
119 buckets := summary.BucketReplication(readCollections, keepServerInfo)
120 bucketCounts := buckets.Counts()
122 replicationSummary := buckets.SummarizeBuckets(readCollections)
123 replicationCounts := replicationSummary.ComputeCounts()
125 log.Printf("Blocks In Collections: %d, "+
126 "\nBlocks In Keep: %d.",
127 len(readCollections.BlockToDesiredReplication),
128 len(keepServerInfo.BlockToServers))
129 log.Println(replicationCounts.PrettyPrint())
131 log.Printf("Blocks Histogram:")
132 for _, rlbss := range bucketCounts {
133 log.Printf("%+v: %10d",
138 kc, err := keepclient.MakeKeepClient(&arv)
140 return fmt.Errorf("Error setting up keep client %v", err.Error())
143 // Log that we're finished. We force the recording, since go will
144 // not wait for the write timer before exiting.
145 if arvLogger != nil {
146 defer arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
147 summaryInfo := logger.GetOrCreateMap(p, "summary_info")
148 summaryInfo["block_replication_counts"] = bucketCounts
149 summaryInfo["replication_summary"] = replicationCounts
150 p["summary_info"] = summaryInfo
152 p["run_info"].(map[string]interface{})["finished_at"] = time.Now()
156 pullServers := summary.ComputePullServers(kc,
158 readCollections.BlockToDesiredReplication,
159 replicationSummary.UnderReplicatedBlocks)
161 pullLists := summary.BuildPullLists(pullServers)
163 trashLists, trashErr := summary.BuildTrashLists(kc,
165 replicationSummary.KeepBlocksNotInCollections)
167 err = summary.WritePullLists(arvLogger, pullLists)
175 keep.SendTrashLists(kc, trashLists)
180 // BuildDataFetcher returns a data fetcher that fetches data from remote servers.
181 func BuildDataFetcher(arv arvadosclient.ArvadosClient) summary.DataFetcher {
182 return func(arvLogger *logger.Logger,
183 readCollections *collection.ReadCollections,
184 keepServerInfo *keep.ReadServers) {
185 collectionChannel := make(chan collection.ReadCollections)
188 collectionChannel <- collection.GetCollectionsAndSummarize(
190 collection.GetCollectionsParams{
197 *keepServerInfo, err = keep.GetKeepServersAndSummarize(
198 keep.GetKeepServersParams{
207 *readCollections = <-collectionChannel