Merge branch 'master' into 7748-datamanager-dry-run
[arvados.git] / services / datamanager / datamanager.go
1 /* Keep Datamanager. Responsible for checking on and reporting on Keep Storage */
2
3 package main
4
5 import (
6         "errors"
7         "flag"
8         "fmt"
9         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
10         "git.curoverse.com/arvados.git/sdk/go/keepclient"
11         "git.curoverse.com/arvados.git/sdk/go/logger"
12         "git.curoverse.com/arvados.git/sdk/go/util"
13         "git.curoverse.com/arvados.git/services/datamanager/collection"
14         "git.curoverse.com/arvados.git/services/datamanager/keep"
15         "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
16         "git.curoverse.com/arvados.git/services/datamanager/summary"
17         "log"
18         "time"
19 )
20
21 var (
22         logEventTypePrefix  string
23         logFrequencySeconds int
24         minutesBetweenRuns  int
25         dryRun              bool
26 )
27
28 func init() {
29         flag.StringVar(&logEventTypePrefix,
30                 "log-event-type-prefix",
31                 "experimental-data-manager",
32                 "Prefix to use in the event_type of our arvados log entries. Set to empty to turn off logging")
33         flag.IntVar(&logFrequencySeconds,
34                 "log-frequency-seconds",
35                 20,
36                 "How frequently we'll write log entries in seconds.")
37         flag.IntVar(&minutesBetweenRuns,
38                 "minutes-between-runs",
39                 0,
40                 "How many minutes we wait between data manager runs. 0 means run once and exit.")
41         flag.BoolVar(&dryRun,
42                 "dry-run",
43                 false,
44                 "Perform a dry run. Log how many blocks would be deleted/moved, but do not issue any changes to keepstore.")
45 }
46
47 func main() {
48         flag.Parse()
49
50         if minutesBetweenRuns == 0 {
51                 arv, err := arvadosclient.MakeArvadosClient()
52                 if err != nil {
53                         loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("Error making arvados client: %v", err))
54                 }
55                 err = singlerun(arv)
56                 if err != nil {
57                         loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("singlerun: %v", err))
58                 }
59         } else {
60                 waitTime := time.Minute * time.Duration(minutesBetweenRuns)
61                 for {
62                         log.Println("Beginning Run")
63                         arv, err := arvadosclient.MakeArvadosClient()
64                         if err != nil {
65                                 loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("Error making arvados client: %v", err))
66                         }
67                         err = singlerun(arv)
68                         if err != nil {
69                                 log.Printf("singlerun: %v", err)
70                         }
71                         log.Printf("Sleeping for %d minutes", minutesBetweenRuns)
72                         time.Sleep(waitTime)
73                 }
74         }
75 }
76
77 var arvLogger *logger.Logger
78
79 func singlerun(arv arvadosclient.ArvadosClient) error {
80         var err error
81         if isAdmin, err := util.UserIsAdmin(arv); err != nil {
82                 return errors.New("Error verifying admin token: " + err.Error())
83         } else if !isAdmin {
84                 return errors.New("Current user is not an admin. Datamanager requires a privileged token.")
85         }
86
87         if logEventTypePrefix != "" {
88                 arvLogger, err = logger.NewLogger(logger.LoggerParams{
89                         Client:          arv,
90                         EventTypePrefix: logEventTypePrefix,
91                         WriteInterval:   time.Second * time.Duration(logFrequencySeconds)})
92         }
93
94         loggerutil.LogRunInfo(arvLogger)
95         if arvLogger != nil {
96                 arvLogger.AddWriteHook(loggerutil.LogMemoryAlloc)
97         }
98
99         var (
100                 dataFetcher     summary.DataFetcher
101                 readCollections collection.ReadCollections
102                 keepServerInfo  keep.ReadServers
103         )
104
105         if summary.ShouldReadData() {
106                 dataFetcher = summary.ReadData
107         } else {
108                 dataFetcher = BuildDataFetcher(arv)
109         }
110
111         dataFetcher(arvLogger, &readCollections, &keepServerInfo)
112
113         if readCollections.Err != nil {
114                 return readCollections.Err
115         }
116
117         err = summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo)
118         if err != nil {
119                 return err
120         }
121
122         buckets := summary.BucketReplication(readCollections, keepServerInfo)
123         bucketCounts := buckets.Counts()
124
125         replicationSummary := buckets.SummarizeBuckets(readCollections)
126         replicationCounts := replicationSummary.ComputeCounts()
127
128         log.Printf("Blocks In Collections: %d, "+
129                 "\nBlocks In Keep: %d.",
130                 len(readCollections.BlockToDesiredReplication),
131                 len(keepServerInfo.BlockToServers))
132         log.Println(replicationCounts.PrettyPrint())
133
134         log.Printf("Blocks Histogram:")
135         for _, rlbss := range bucketCounts {
136                 log.Printf("%+v: %10d",
137                         rlbss.Levels,
138                         rlbss.Count)
139         }
140
141         kc, err := keepclient.MakeKeepClient(&arv)
142         if err != nil {
143                 return fmt.Errorf("Error setting up keep client %v", err.Error())
144         }
145
146         // Log that we're finished. We force the recording, since go will
147         // not wait for the write timer before exiting.
148         if arvLogger != nil {
149                 defer arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
150                         summaryInfo := logger.GetOrCreateMap(p, "summary_info")
151                         summaryInfo["block_replication_counts"] = bucketCounts
152                         summaryInfo["replication_summary"] = replicationCounts
153                         p["summary_info"] = summaryInfo
154
155                         p["run_info"].(map[string]interface{})["finished_at"] = time.Now()
156                 })
157         }
158
159         pullServers := summary.ComputePullServers(kc,
160                 &keepServerInfo,
161                 readCollections.BlockToDesiredReplication,
162                 replicationSummary.UnderReplicatedBlocks)
163
164         pullLists := summary.BuildPullLists(pullServers)
165
166         trashLists, trashErr := summary.BuildTrashLists(kc,
167                 &keepServerInfo,
168                 replicationSummary.KeepBlocksNotInCollections)
169
170         err = summary.WritePullLists(arvLogger, pullLists, dryRun)
171         if err != nil {
172                 return err
173         }
174
175         if trashErr != nil {
176                 return err
177         }
178         keep.SendTrashLists(arvLogger, kc, trashLists, dryRun)
179
180         return nil
181 }
182
183 // BuildDataFetcher returns a data fetcher that fetches data from remote servers.
184 func BuildDataFetcher(arv arvadosclient.ArvadosClient) summary.DataFetcher {
185         return func(arvLogger *logger.Logger,
186                 readCollections *collection.ReadCollections,
187                 keepServerInfo *keep.ReadServers) {
188                 collectionChannel := make(chan collection.ReadCollections)
189
190                 go func() {
191                         collectionChannel <- collection.GetCollectionsAndSummarize(
192                                 collection.GetCollectionsParams{
193                                         Client:    arv,
194                                         Logger:    arvLogger,
195                                         BatchSize: 50})
196                 }()
197
198                 var err error
199                 *keepServerInfo, err = keep.GetKeepServersAndSummarize(
200                         keep.GetKeepServersParams{
201                                 Client: arv,
202                                 Logger: arvLogger,
203                                 Limit:  1000})
204
205                 if err != nil {
206                         return
207                 }
208
209                 *readCollections = <-collectionChannel
210         }
211 }