Merge branch '7937-ignored-error' refs #7937
[arvados.git] / services / datamanager / datamanager.go
1 /* Keep Datamanager. Responsible for checking on and reporting on Keep Storage */
2
3 package main
4
5 import (
6         "errors"
7         "flag"
8         "fmt"
9         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
10         "git.curoverse.com/arvados.git/sdk/go/keepclient"
11         "git.curoverse.com/arvados.git/sdk/go/logger"
12         "git.curoverse.com/arvados.git/sdk/go/util"
13         "git.curoverse.com/arvados.git/services/datamanager/collection"
14         "git.curoverse.com/arvados.git/services/datamanager/keep"
15         "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
16         "git.curoverse.com/arvados.git/services/datamanager/summary"
17         "log"
18         "time"
19 )
20
21 var (
22         logEventTypePrefix  string
23         logFrequencySeconds int
24         minutesBetweenRuns  int
25         dryRun              bool
26 )
27
28 func init() {
29         flag.StringVar(&logEventTypePrefix,
30                 "log-event-type-prefix",
31                 "experimental-data-manager",
32                 "Prefix to use in the event_type of our arvados log entries. Set to empty to turn off logging")
33         flag.IntVar(&logFrequencySeconds,
34                 "log-frequency-seconds",
35                 20,
36                 "How frequently we'll write log entries in seconds.")
37         flag.IntVar(&minutesBetweenRuns,
38                 "minutes-between-runs",
39                 0,
40                 "How many minutes we wait between data manager runs. 0 means run once and exit.")
41         flag.BoolVar(&dryRun,
42                 "dry-run",
43                 false,
44                 "Perform a dry run. Log how many blocks would be deleted/moved, but do not issue any changes to keepstore.")
45 }
46
47 func main() {
48         flag.Parse()
49
50         if minutesBetweenRuns == 0 {
51                 arv, err := arvadosclient.MakeArvadosClient()
52                 if err != nil {
53                         loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("Error making arvados client: %v", err))
54                 }
55                 err = singlerun(arv)
56                 if err != nil {
57                         loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("singlerun: %v", err))
58                 }
59         } else {
60                 waitTime := time.Minute * time.Duration(minutesBetweenRuns)
61                 for {
62                         log.Println("Beginning Run")
63                         arv, err := arvadosclient.MakeArvadosClient()
64                         if err != nil {
65                                 loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("Error making arvados client: %v", err))
66                         }
67                         err = singlerun(arv)
68                         if err != nil {
69                                 log.Printf("singlerun: %v", err)
70                         }
71                         log.Printf("Sleeping for %d minutes", minutesBetweenRuns)
72                         time.Sleep(waitTime)
73                 }
74         }
75 }
76
77 var arvLogger *logger.Logger
78
79 func singlerun(arv arvadosclient.ArvadosClient) error {
80         var err error
81         if isAdmin, err := util.UserIsAdmin(arv); err != nil {
82                 return errors.New("Error verifying admin token: " + err.Error())
83         } else if !isAdmin {
84                 return errors.New("Current user is not an admin. Datamanager requires a privileged token.")
85         }
86
87         if logEventTypePrefix != "" {
88                 arvLogger, err = logger.NewLogger(logger.LoggerParams{
89                         Client:          arv,
90                         EventTypePrefix: logEventTypePrefix,
91                         WriteInterval:   time.Second * time.Duration(logFrequencySeconds)})
92         }
93
94         loggerutil.LogRunInfo(arvLogger)
95         if arvLogger != nil {
96                 arvLogger.AddWriteHook(loggerutil.LogMemoryAlloc)
97         }
98
99         var (
100                 dataFetcher     summary.DataFetcher
101                 readCollections collection.ReadCollections
102                 keepServerInfo  keep.ReadServers
103         )
104
105         if summary.ShouldReadData() {
106                 dataFetcher = summary.ReadData
107         } else {
108                 dataFetcher = BuildDataFetcher(arv)
109         }
110
111         err = dataFetcher(arvLogger, &readCollections, &keepServerInfo)
112         if err != nil {
113                 return err
114         }
115
116         err = summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo)
117         if err != nil {
118                 return err
119         }
120
121         buckets := summary.BucketReplication(readCollections, keepServerInfo)
122         bucketCounts := buckets.Counts()
123
124         replicationSummary := buckets.SummarizeBuckets(readCollections)
125         replicationCounts := replicationSummary.ComputeCounts()
126
127         log.Printf("Blocks In Collections: %d, "+
128                 "\nBlocks In Keep: %d.",
129                 len(readCollections.BlockToDesiredReplication),
130                 len(keepServerInfo.BlockToServers))
131         log.Println(replicationCounts.PrettyPrint())
132
133         log.Printf("Blocks Histogram:")
134         for _, rlbss := range bucketCounts {
135                 log.Printf("%+v: %10d",
136                         rlbss.Levels,
137                         rlbss.Count)
138         }
139
140         kc, err := keepclient.MakeKeepClient(&arv)
141         if err != nil {
142                 return fmt.Errorf("Error setting up keep client %v", err.Error())
143         }
144
145         // Log that we're finished. We force the recording, since go will
146         // not wait for the write timer before exiting.
147         if arvLogger != nil {
148                 defer arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
149                         summaryInfo := logger.GetOrCreateMap(p, "summary_info")
150                         summaryInfo["block_replication_counts"] = bucketCounts
151                         summaryInfo["replication_summary"] = replicationCounts
152                         p["summary_info"] = summaryInfo
153
154                         p["run_info"].(map[string]interface{})["finished_at"] = time.Now()
155                 })
156         }
157
158         pullServers := summary.ComputePullServers(kc,
159                 &keepServerInfo,
160                 readCollections.BlockToDesiredReplication,
161                 replicationSummary.UnderReplicatedBlocks)
162
163         pullLists := summary.BuildPullLists(pullServers)
164
165         trashLists, trashErr := summary.BuildTrashLists(kc,
166                 &keepServerInfo,
167                 replicationSummary.KeepBlocksNotInCollections)
168
169         err = summary.WritePullLists(arvLogger, pullLists, dryRun)
170         if err != nil {
171                 return err
172         }
173
174         if trashErr != nil {
175                 return err
176         }
177         keep.SendTrashLists(arvLogger, kc, trashLists, dryRun)
178
179         return nil
180 }
181
182 // BuildDataFetcher returns a data fetcher that fetches data from remote servers.
183 func BuildDataFetcher(arv arvadosclient.ArvadosClient) summary.DataFetcher {
184         return func(
185                 arvLogger *logger.Logger,
186                 readCollections *collection.ReadCollections,
187                 keepServerInfo *keep.ReadServers,
188         ) error {
189                 collDone := make(chan struct{})
190                 var collErr error
191                 go func() {
192                         *readCollections, collErr = collection.GetCollectionsAndSummarize(
193                                 collection.GetCollectionsParams{
194                                         Client:    arv,
195                                         Logger:    arvLogger,
196                                         BatchSize: 50})
197                         collDone <- struct{}{}
198                 }()
199
200                 var keepErr error
201                 *keepServerInfo, keepErr = keep.GetKeepServersAndSummarize(
202                         keep.GetKeepServersParams{
203                                 Client: arv,
204                                 Logger: arvLogger,
205                                 Limit:  1000})
206
207                 <- collDone
208
209                 // Return a nil error only if both parts succeeded.
210                 if collErr != nil {
211                         return collErr
212                 }
213                 return keepErr
214         }
215 }