7490: The makeArvadosClient func, which is invoked by singlerun, should return error...
[arvados.git] / services / datamanager / datamanager.go
1 /* Keep Datamanager. Responsible for checking on and reporting on Keep Storage */
2
3 package main
4
5 import (
6         "errors"
7         "flag"
8         "fmt"
9         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
10         "git.curoverse.com/arvados.git/sdk/go/keepclient"
11         "git.curoverse.com/arvados.git/sdk/go/logger"
12         "git.curoverse.com/arvados.git/sdk/go/util"
13         "git.curoverse.com/arvados.git/services/datamanager/collection"
14         "git.curoverse.com/arvados.git/services/datamanager/keep"
15         "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
16         "git.curoverse.com/arvados.git/services/datamanager/summary"
17         "log"
18         "time"
19 )
20
21 var (
22         logEventTypePrefix  string
23         logFrequencySeconds int
24         minutesBetweenRuns  int
25 )
26
27 func init() {
28         flag.StringVar(&logEventTypePrefix,
29                 "log-event-type-prefix",
30                 "experimental-data-manager",
31                 "Prefix to use in the event_type of our arvados log entries. Set to empty to turn off logging")
32         flag.IntVar(&logFrequencySeconds,
33                 "log-frequency-seconds",
34                 20,
35                 "How frequently we'll write log entries in seconds.")
36         flag.IntVar(&minutesBetweenRuns,
37                 "minutes-between-runs",
38                 0,
39                 "How many minutes we wait betwen data manager runs. 0 means run once and exit.")
40 }
41
42 func main() {
43         flag.Parse()
44         if minutesBetweenRuns == 0 {
45                 arv, err := makeArvadosClient()
46                 if err != nil {
47                         log.Fatalf("makeArvadosClient: %v", err)
48                 }
49                 err = singlerun(arv)
50                 if err != nil {
51                         log.Fatalf("singlerun: %v", err)
52                 }
53         } else {
54                 waitTime := time.Minute * time.Duration(minutesBetweenRuns)
55                 for {
56                         log.Println("Beginning Run")
57                         arv, err := makeArvadosClient()
58                         if err != nil {
59                                 log.Fatalf("makeArvadosClient: %v", err)
60                         }
61                         err = singlerun(arv)
62                         if err != nil {
63                                 log.Printf("singlerun: %v", err)
64                         }
65                         log.Printf("Sleeping for %d minutes", minutesBetweenRuns)
66                         time.Sleep(waitTime)
67                 }
68         }
69 }
70
71 func makeArvadosClient() (arvadosclient.ArvadosClient, error) {
72         return arvadosclient.MakeArvadosClient()
73 }
74
75 func singlerun(arv arvadosclient.ArvadosClient) error {
76         var err error
77         if isAdmin, err := util.UserIsAdmin(arv); err != nil {
78                 return errors.New("Error verifying admin token: " + err.Error())
79         } else if !isAdmin {
80                 return errors.New("Current user is not an admin. Datamanager requires a privileged token.")
81         }
82
83         var arvLogger *logger.Logger
84         if logEventTypePrefix != "" {
85                 arvLogger = logger.NewLogger(logger.LoggerParams{
86                         Client:          arv,
87                         EventTypePrefix: logEventTypePrefix,
88                         WriteInterval:   time.Second * time.Duration(logFrequencySeconds)})
89         }
90
91         loggerutil.LogRunInfo(arvLogger)
92         if arvLogger != nil {
93                 arvLogger.AddWriteHook(loggerutil.LogMemoryAlloc)
94         }
95
96         var (
97                 dataFetcher     summary.DataFetcher
98                 readCollections collection.ReadCollections
99                 keepServerInfo  keep.ReadServers
100         )
101
102         if summary.ShouldReadData() {
103                 dataFetcher = summary.ReadData
104         } else {
105                 dataFetcher = BuildDataFetcher(arv)
106         }
107
108         dataFetcher(arvLogger, &readCollections, &keepServerInfo)
109
110         summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo)
111
112         buckets := summary.BucketReplication(readCollections, keepServerInfo)
113         bucketCounts := buckets.Counts()
114
115         replicationSummary := buckets.SummarizeBuckets(readCollections)
116         replicationCounts := replicationSummary.ComputeCounts()
117
118         log.Printf("Blocks In Collections: %d, "+
119                 "\nBlocks In Keep: %d.",
120                 len(readCollections.BlockToDesiredReplication),
121                 len(keepServerInfo.BlockToServers))
122         log.Println(replicationCounts.PrettyPrint())
123
124         log.Printf("Blocks Histogram:")
125         for _, rlbss := range bucketCounts {
126                 log.Printf("%+v: %10d",
127                         rlbss.Levels,
128                         rlbss.Count)
129         }
130
131         kc, err := keepclient.MakeKeepClient(&arv)
132         if err != nil {
133                 loggerutil.FatalWithMessage(arvLogger,
134                         fmt.Sprintf("Error setting up keep client %s", err.Error()))
135         }
136
137         // Log that we're finished. We force the recording, since go will
138         // not wait for the write timer before exiting.
139         if arvLogger != nil {
140                 defer arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
141                         summaryInfo := logger.GetOrCreateMap(p, "summary_info")
142                         summaryInfo["block_replication_counts"] = bucketCounts
143                         summaryInfo["replication_summary"] = replicationCounts
144                         p["summary_info"] = summaryInfo
145
146                         p["run_info"].(map[string]interface{})["finished_at"] = time.Now()
147                 })
148         }
149
150         pullServers := summary.ComputePullServers(kc,
151                 &keepServerInfo,
152                 readCollections.BlockToDesiredReplication,
153                 replicationSummary.UnderReplicatedBlocks)
154
155         pullLists := summary.BuildPullLists(pullServers)
156
157         trashLists, trashErr := summary.BuildTrashLists(kc,
158                 &keepServerInfo,
159                 replicationSummary.KeepBlocksNotInCollections)
160
161         summary.WritePullLists(arvLogger, pullLists)
162
163         if trashErr != nil {
164                 return err
165         }
166         keep.SendTrashLists(kc, trashLists)
167
168         return nil
169 }
170
171 // BuildDataFetcher returns a data fetcher that fetches data from remote servers.
172 func BuildDataFetcher(arv arvadosclient.ArvadosClient) summary.DataFetcher {
173         return func(arvLogger *logger.Logger,
174                 readCollections *collection.ReadCollections,
175                 keepServerInfo *keep.ReadServers) {
176                 collectionChannel := make(chan collection.ReadCollections)
177
178                 go func() {
179                         collectionChannel <- collection.GetCollectionsAndSummarize(
180                                 collection.GetCollectionsParams{
181                                         Client:    arv,
182                                         Logger:    arvLogger,
183                                         BatchSize: 50})
184                 }()
185
186                 *keepServerInfo = keep.GetKeepServersAndSummarize(
187                         keep.GetKeepServersParams{
188                                 Client: arv,
189                                 Logger: arvLogger,
190                                 Limit:  1000})
191
192                 *readCollections = <-collectionChannel
193         }
194 }