Merge branch 'master' into 7255-manifests-in-datamanager
[arvados.git] / services / datamanager / datamanager.go
1 /* Keep Datamanager. Responsible for checking on and reporting on Keep Storage */
2
3 package main
4
5 import (
6         "errors"
7         "flag"
8         "fmt"
9         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
10         "git.curoverse.com/arvados.git/sdk/go/keepclient"
11         "git.curoverse.com/arvados.git/sdk/go/logger"
12         "git.curoverse.com/arvados.git/sdk/go/util"
13         "git.curoverse.com/arvados.git/services/datamanager/collection"
14         "git.curoverse.com/arvados.git/services/datamanager/keep"
15         "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
16         "git.curoverse.com/arvados.git/services/datamanager/summary"
17         "log"
18         "time"
19 )
20
21 var (
22         logEventTypePrefix  string
23         logFrequencySeconds int
24         minutesBetweenRuns  int
25 )
26
27 func init() {
28         flag.StringVar(&logEventTypePrefix,
29                 "log-event-type-prefix",
30                 "experimental-data-manager",
31                 "Prefix to use in the event_type of our arvados log entries. Set to empty to turn off logging")
32         flag.IntVar(&logFrequencySeconds,
33                 "log-frequency-seconds",
34                 20,
35                 "How frequently we'll write log entries in seconds.")
36         flag.IntVar(&minutesBetweenRuns,
37                 "minutes-between-runs",
38                 0,
39                 "How many minutes we wait betwen data manager runs. 0 means run once and exit.")
40 }
41
42 func main() {
43         flag.Parse()
44         if minutesBetweenRuns == 0 {
45                 arv, err := arvadosclient.MakeArvadosClient()
46                 if err != nil {
47                         loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("Error making arvados client: %v", err))
48                 }
49                 err = singlerun(arv)
50                 if err != nil {
51                         loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("singlerun: %v", err))
52                 }
53         } else {
54                 waitTime := time.Minute * time.Duration(minutesBetweenRuns)
55                 for {
56                         log.Println("Beginning Run")
57                         arv, err := arvadosclient.MakeArvadosClient()
58                         if err != nil {
59                                 loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("Error making arvados client: %v", err))
60                         }
61                         err = singlerun(arv)
62                         if err != nil {
63                                 log.Printf("singlerun: %v", err)
64                         }
65                         log.Printf("Sleeping for %d minutes", minutesBetweenRuns)
66                         time.Sleep(waitTime)
67                 }
68         }
69 }
70
71 var arvLogger *logger.Logger
72
73 func singlerun(arv arvadosclient.ArvadosClient) error {
74         var err error
75         if isAdmin, err := util.UserIsAdmin(arv); err != nil {
76                 return errors.New("Error verifying admin token: " + err.Error())
77         } else if !isAdmin {
78                 return errors.New("Current user is not an admin. Datamanager requires a privileged token.")
79         }
80
81         if logEventTypePrefix != "" {
82                 arvLogger = logger.NewLogger(logger.LoggerParams{
83                         Client:          arv,
84                         EventTypePrefix: logEventTypePrefix,
85                         WriteInterval:   time.Second * time.Duration(logFrequencySeconds)})
86         }
87
88         loggerutil.LogRunInfo(arvLogger)
89         if arvLogger != nil {
90                 arvLogger.AddWriteHook(loggerutil.LogMemoryAlloc)
91         }
92
93         var (
94                 dataFetcher     summary.DataFetcher
95                 readCollections collection.ReadCollections
96                 keepServerInfo  keep.ReadServers
97         )
98
99         if summary.ShouldReadData() {
100                 dataFetcher = summary.ReadData
101         } else {
102                 dataFetcher = BuildDataFetcher(arv)
103         }
104
105         dataFetcher(arvLogger, &readCollections, &keepServerInfo)
106
107         if readCollections.Err != nil {
108                 return readCollections.Err
109         }
110
111         err = summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo)
112         if err != nil {
113                 return err
114         }
115
116         buckets := summary.BucketReplication(readCollections, keepServerInfo)
117         bucketCounts := buckets.Counts()
118
119         replicationSummary := buckets.SummarizeBuckets(readCollections)
120         replicationCounts := replicationSummary.ComputeCounts()
121
122         log.Printf("Blocks In Collections: %d, "+
123                 "\nBlocks In Keep: %d.",
124                 len(readCollections.BlockToDesiredReplication),
125                 len(keepServerInfo.BlockToServers))
126         log.Println(replicationCounts.PrettyPrint())
127
128         log.Printf("Blocks Histogram:")
129         for _, rlbss := range bucketCounts {
130                 log.Printf("%+v: %10d",
131                         rlbss.Levels,
132                         rlbss.Count)
133         }
134
135         kc, err := keepclient.MakeKeepClient(&arv)
136         if err != nil {
137                 return fmt.Errorf("Error setting up keep client %v", err.Error())
138         }
139
140         // Log that we're finished. We force the recording, since go will
141         // not wait for the write timer before exiting.
142         if arvLogger != nil {
143                 defer arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
144                         summaryInfo := logger.GetOrCreateMap(p, "summary_info")
145                         summaryInfo["block_replication_counts"] = bucketCounts
146                         summaryInfo["replication_summary"] = replicationCounts
147                         p["summary_info"] = summaryInfo
148
149                         p["run_info"].(map[string]interface{})["finished_at"] = time.Now()
150                 })
151         }
152
153         pullServers := summary.ComputePullServers(kc,
154                 &keepServerInfo,
155                 readCollections.BlockToDesiredReplication,
156                 replicationSummary.UnderReplicatedBlocks)
157
158         pullLists := summary.BuildPullLists(pullServers)
159
160         trashLists, trashErr := summary.BuildTrashLists(kc,
161                 &keepServerInfo,
162                 replicationSummary.KeepBlocksNotInCollections)
163
164         err = summary.WritePullLists(arvLogger, pullLists)
165         if err != nil {
166                 return err
167         }
168
169         if trashErr != nil {
170                 return err
171         }
172         keep.SendTrashLists(kc, trashLists)
173
174         return nil
175 }
176
177 // BuildDataFetcher returns a data fetcher that fetches data from remote servers.
178 func BuildDataFetcher(arv arvadosclient.ArvadosClient) summary.DataFetcher {
179         return func(arvLogger *logger.Logger,
180                 readCollections *collection.ReadCollections,
181                 keepServerInfo *keep.ReadServers) {
182                 collectionChannel := make(chan collection.ReadCollections)
183
184                 go func() {
185                         collectionChannel <- collection.GetCollectionsAndSummarize(
186                                 collection.GetCollectionsParams{
187                                         Client:    arv,
188                                         Logger:    arvLogger,
189                                         BatchSize: 50})
190                 }()
191
192                 var err error
193                 *keepServerInfo, err = keep.GetKeepServersAndSummarize(
194                         keep.GetKeepServersParams{
195                                 Client: arv,
196                                 Logger: arvLogger,
197                                 Limit:  1000})
198
199                 if err != nil {
200                         return
201                 }
202
203                 *readCollections = <-collectionChannel
204         }
205 }