closes #8508
[arvados.git] / services / datamanager / datamanager.go
1 /* Keep Datamanager. Responsible for checking on and reporting on Keep Storage */
2
3 package main
4
5 import (
6         "errors"
7         "flag"
8         "fmt"
9         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
10         "git.curoverse.com/arvados.git/sdk/go/keepclient"
11         "git.curoverse.com/arvados.git/sdk/go/logger"
12         "git.curoverse.com/arvados.git/sdk/go/util"
13         "git.curoverse.com/arvados.git/services/datamanager/collection"
14         "git.curoverse.com/arvados.git/services/datamanager/keep"
15         "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
16         "git.curoverse.com/arvados.git/services/datamanager/summary"
17         "log"
18         "time"
19 )
20
21 var (
22         logEventTypePrefix  string
23         logFrequencySeconds int
24         minutesBetweenRuns  int
25         collectionBatchSize int
26         dryRun              bool
27 )
28
29 func init() {
30         flag.StringVar(&logEventTypePrefix,
31                 "log-event-type-prefix",
32                 "experimental-data-manager",
33                 "Prefix to use in the event_type of our arvados log entries. Set to empty to turn off logging")
34         flag.IntVar(&logFrequencySeconds,
35                 "log-frequency-seconds",
36                 20,
37                 "How frequently we'll write log entries in seconds.")
38         flag.IntVar(&minutesBetweenRuns,
39                 "minutes-between-runs",
40                 0,
41                 "How many minutes we wait between data manager runs. 0 means run once and exit.")
42         flag.IntVar(&collectionBatchSize,
43                 "collection-batch-size",
44                 1000,
45                 "How many collections to request in each batch.")
46         flag.BoolVar(&dryRun,
47                 "dry-run",
48                 false,
49                 "Perform a dry run. Log how many blocks would be deleted/moved, but do not issue any changes to keepstore.")
50 }
51
52 func main() {
53         flag.Parse()
54
55         if minutesBetweenRuns == 0 {
56                 arv, err := arvadosclient.MakeArvadosClient()
57                 if err != nil {
58                         loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("Error making arvados client: %v", err))
59                 }
60                 err = singlerun(arv)
61                 if err != nil {
62                         loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("singlerun: %v", err))
63                 }
64         } else {
65                 waitTime := time.Minute * time.Duration(minutesBetweenRuns)
66                 for {
67                         log.Println("Beginning Run")
68                         arv, err := arvadosclient.MakeArvadosClient()
69                         if err != nil {
70                                 loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("Error making arvados client: %v", err))
71                         }
72                         err = singlerun(arv)
73                         if err != nil {
74                                 log.Printf("singlerun: %v", err)
75                         }
76                         log.Printf("Sleeping for %d minutes", minutesBetweenRuns)
77                         time.Sleep(waitTime)
78                 }
79         }
80 }
81
82 var arvLogger *logger.Logger
83
84 func singlerun(arv arvadosclient.ArvadosClient) error {
85         var err error
86         if isAdmin, err := util.UserIsAdmin(arv); err != nil {
87                 return errors.New("Error verifying admin token: " + err.Error())
88         } else if !isAdmin {
89                 return errors.New("Current user is not an admin. Datamanager requires a privileged token.")
90         }
91
92         if logEventTypePrefix != "" {
93                 arvLogger, err = logger.NewLogger(logger.LoggerParams{
94                         Client:          arv,
95                         EventTypePrefix: logEventTypePrefix,
96                         WriteInterval:   time.Second * time.Duration(logFrequencySeconds)})
97         }
98
99         loggerutil.LogRunInfo(arvLogger)
100         if arvLogger != nil {
101                 arvLogger.AddWriteHook(loggerutil.LogMemoryAlloc)
102         }
103
104         var (
105                 dataFetcher     summary.DataFetcher
106                 readCollections collection.ReadCollections
107                 keepServerInfo  keep.ReadServers
108         )
109
110         if summary.ShouldReadData() {
111                 dataFetcher = summary.ReadData
112         } else {
113                 dataFetcher = BuildDataFetcher(arv)
114         }
115
116         err = dataFetcher(arvLogger, &readCollections, &keepServerInfo)
117         if err != nil {
118                 return err
119         }
120
121         err = summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo)
122         if err != nil {
123                 return err
124         }
125
126         buckets := summary.BucketReplication(readCollections, keepServerInfo)
127         bucketCounts := buckets.Counts()
128
129         replicationSummary := buckets.SummarizeBuckets(readCollections)
130         replicationCounts := replicationSummary.ComputeCounts()
131
132         log.Printf("Blocks In Collections: %d, "+
133                 "\nBlocks In Keep: %d.",
134                 len(readCollections.BlockToDesiredReplication),
135                 len(keepServerInfo.BlockToServers))
136         log.Println(replicationCounts.PrettyPrint())
137
138         log.Printf("Blocks Histogram:")
139         for _, rlbss := range bucketCounts {
140                 log.Printf("%+v: %10d",
141                         rlbss.Levels,
142                         rlbss.Count)
143         }
144
145         kc, err := keepclient.MakeKeepClient(&arv)
146         if err != nil {
147                 return fmt.Errorf("Error setting up keep client %v", err.Error())
148         }
149
150         // Log that we're finished. We force the recording, since go will
151         // not wait for the write timer before exiting.
152         if arvLogger != nil {
153                 defer arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
154                         summaryInfo := logger.GetOrCreateMap(p, "summary_info")
155                         summaryInfo["block_replication_counts"] = bucketCounts
156                         summaryInfo["replication_summary"] = replicationCounts
157                         p["summary_info"] = summaryInfo
158
159                         p["run_info"].(map[string]interface{})["finished_at"] = time.Now()
160                 })
161         }
162
163         pullServers := summary.ComputePullServers(kc,
164                 &keepServerInfo,
165                 readCollections.BlockToDesiredReplication,
166                 replicationSummary.UnderReplicatedBlocks)
167
168         pullLists := summary.BuildPullLists(pullServers)
169
170         trashLists, trashErr := summary.BuildTrashLists(kc,
171                 &keepServerInfo,
172                 replicationSummary.KeepBlocksNotInCollections)
173
174         err = summary.WritePullLists(arvLogger, pullLists, dryRun)
175         if err != nil {
176                 return err
177         }
178
179         if trashErr != nil {
180                 return err
181         }
182         keep.SendTrashLists(arvLogger, kc, trashLists, dryRun)
183
184         return nil
185 }
186
187 // BuildDataFetcher returns a data fetcher that fetches data from remote servers.
188 func BuildDataFetcher(arv arvadosclient.ArvadosClient) summary.DataFetcher {
189         return func(
190                 arvLogger *logger.Logger,
191                 readCollections *collection.ReadCollections,
192                 keepServerInfo *keep.ReadServers,
193         ) error {
194                 collDone := make(chan struct{})
195                 var collErr error
196                 go func() {
197                         *readCollections, collErr = collection.GetCollectionsAndSummarize(
198                                 collection.GetCollectionsParams{
199                                         Client:    arv,
200                                         Logger:    arvLogger,
201                                         BatchSize: collectionBatchSize})
202                         collDone <- struct{}{}
203                 }()
204
205                 var keepErr error
206                 *keepServerInfo, keepErr = keep.GetKeepServersAndSummarize(
207                         keep.GetKeepServersParams{
208                                 Client: arv,
209                                 Logger: arvLogger,
210                                 Limit:  1000})
211
212                 <-collDone
213
214                 // Return a nil error only if both parts succeeded.
215                 if collErr != nil {
216                         return collErr
217                 }
218                 return keepErr
219         }
220 }