Merge branch 'master' into 7492-keepproxy-upstream-errors
[arvados.git] / services / datamanager / datamanager.go
1 /* Keep Datamanager. Responsible for checking on and reporting on Keep Storage */
2
3 package main
4
5 import (
6         "errors"
7         "flag"
8         "fmt"
9         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
10         "git.curoverse.com/arvados.git/sdk/go/keepclient"
11         "git.curoverse.com/arvados.git/sdk/go/logger"
12         "git.curoverse.com/arvados.git/sdk/go/util"
13         "git.curoverse.com/arvados.git/services/datamanager/collection"
14         "git.curoverse.com/arvados.git/services/datamanager/keep"
15         "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
16         "git.curoverse.com/arvados.git/services/datamanager/summary"
17         "log"
18         "time"
19 )
20
21 var (
22         logEventTypePrefix  string
23         logFrequencySeconds int
24         minutesBetweenRuns  int
25 )
26
27 func init() {
28         flag.StringVar(&logEventTypePrefix,
29                 "log-event-type-prefix",
30                 "experimental-data-manager",
31                 "Prefix to use in the event_type of our arvados log entries. Set to empty to turn off logging")
32         flag.IntVar(&logFrequencySeconds,
33                 "log-frequency-seconds",
34                 20,
35                 "How frequently we'll write log entries in seconds.")
36         flag.IntVar(&minutesBetweenRuns,
37                 "minutes-between-runs",
38                 0,
39                 "How many minutes we wait betwen data manager runs. 0 means run once and exit.")
40 }
41
42 func main() {
43         flag.Parse()
44         if minutesBetweenRuns == 0 {
45                 err := singlerun(makeArvadosClient())
46                 if err != nil {
47                         log.Fatalf("singlerun: %v", err)
48                 }
49         } else {
50                 waitTime := time.Minute * time.Duration(minutesBetweenRuns)
51                 for {
52                         log.Println("Beginning Run")
53                         err := singlerun(makeArvadosClient())
54                         if err != nil {
55                                 log.Printf("singlerun: %v", err)
56                         }
57                         log.Printf("Sleeping for %d minutes", minutesBetweenRuns)
58                         time.Sleep(waitTime)
59                 }
60         }
61 }
62
63 func makeArvadosClient() arvadosclient.ArvadosClient {
64         arv, err := arvadosclient.MakeArvadosClient()
65         if err != nil {
66                 log.Fatalf("Error setting up arvados client: %s", err)
67         }
68         return arv
69 }
70
71 func singlerun(arv arvadosclient.ArvadosClient) error {
72         var err error
73         if isAdmin, err := util.UserIsAdmin(arv); err != nil {
74                 return errors.New("Error verifying admin token: " + err.Error())
75         } else if !isAdmin {
76                 return errors.New("Current user is not an admin. Datamanager requires a privileged token.")
77         }
78
79         var arvLogger *logger.Logger
80         if logEventTypePrefix != "" {
81                 arvLogger = logger.NewLogger(logger.LoggerParams{
82                         Client:          arv,
83                         EventTypePrefix: logEventTypePrefix,
84                         WriteInterval:   time.Second * time.Duration(logFrequencySeconds)})
85         }
86
87         loggerutil.LogRunInfo(arvLogger)
88         if arvLogger != nil {
89                 arvLogger.AddWriteHook(loggerutil.LogMemoryAlloc)
90         }
91
92         var (
93                 dataFetcher     summary.DataFetcher
94                 readCollections collection.ReadCollections
95                 keepServerInfo  keep.ReadServers
96         )
97
98         if summary.ShouldReadData() {
99                 dataFetcher = summary.ReadData
100         } else {
101                 dataFetcher = BuildDataFetcher(arv)
102         }
103
104         dataFetcher(arvLogger, &readCollections, &keepServerInfo)
105
106         summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo)
107
108         buckets := summary.BucketReplication(readCollections, keepServerInfo)
109         bucketCounts := buckets.Counts()
110
111         replicationSummary := buckets.SummarizeBuckets(readCollections)
112         replicationCounts := replicationSummary.ComputeCounts()
113
114         log.Printf("Blocks In Collections: %d, "+
115                 "\nBlocks In Keep: %d.",
116                 len(readCollections.BlockToDesiredReplication),
117                 len(keepServerInfo.BlockToServers))
118         log.Println(replicationCounts.PrettyPrint())
119
120         log.Printf("Blocks Histogram:")
121         for _, rlbss := range bucketCounts {
122                 log.Printf("%+v: %10d",
123                         rlbss.Levels,
124                         rlbss.Count)
125         }
126
127         kc, err := keepclient.MakeKeepClient(&arv)
128         if err != nil {
129                 loggerutil.FatalWithMessage(arvLogger,
130                         fmt.Sprintf("Error setting up keep client %s", err.Error()))
131         }
132
133         // Log that we're finished. We force the recording, since go will
134         // not wait for the write timer before exiting.
135         if arvLogger != nil {
136                 defer arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
137                         summaryInfo := logger.GetOrCreateMap(p, "summary_info")
138                         summaryInfo["block_replication_counts"] = bucketCounts
139                         summaryInfo["replication_summary"] = replicationCounts
140                         p["summary_info"] = summaryInfo
141
142                         p["run_info"].(map[string]interface{})["finished_at"] = time.Now()
143                 })
144         }
145
146         pullServers := summary.ComputePullServers(kc,
147                 &keepServerInfo,
148                 readCollections.BlockToDesiredReplication,
149                 replicationSummary.UnderReplicatedBlocks)
150
151         pullLists := summary.BuildPullLists(pullServers)
152
153         trashLists, trashErr := summary.BuildTrashLists(kc,
154                 &keepServerInfo,
155                 replicationSummary.KeepBlocksNotInCollections)
156
157         summary.WritePullLists(arvLogger, pullLists)
158
159         if trashErr != nil {
160                 return err
161         }
162         keep.SendTrashLists(kc, trashLists)
163
164         return nil
165 }
166
167 // BuildDataFetcher returns a data fetcher that fetches data from remote servers.
168 func BuildDataFetcher(arv arvadosclient.ArvadosClient) summary.DataFetcher {
169         return func(arvLogger *logger.Logger,
170                 readCollections *collection.ReadCollections,
171                 keepServerInfo *keep.ReadServers) {
172                 collectionChannel := make(chan collection.ReadCollections)
173
174                 go func() {
175                         collectionChannel <- collection.GetCollectionsAndSummarize(
176                                 collection.GetCollectionsParams{
177                                         Client:    arv,
178                                         Logger:    arvLogger,
179                                         BatchSize: 50})
180                 }()
181
182                 *keepServerInfo = keep.GetKeepServersAndSummarize(
183                         keep.GetKeepServersParams{
184                                 Client: arv,
185                                 Logger: arvLogger,
186                                 Limit:  1000})
187
188                 *readCollections = <-collectionChannel
189         }
190 }