7490: added couple more datamanager tests with errors injected during GetCollections
[arvados.git] / services / datamanager / datamanager.go
1 /* Keep Datamanager. Responsible for checking on and reporting on Keep Storage */
2
3 package main
4
5 import (
6         "errors"
7         "flag"
8         "fmt"
9         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
10         "git.curoverse.com/arvados.git/sdk/go/keepclient"
11         "git.curoverse.com/arvados.git/sdk/go/logger"
12         "git.curoverse.com/arvados.git/sdk/go/util"
13         "git.curoverse.com/arvados.git/services/datamanager/collection"
14         "git.curoverse.com/arvados.git/services/datamanager/keep"
15         "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
16         "git.curoverse.com/arvados.git/services/datamanager/summary"
17         "log"
18         "time"
19 )
20
21 var (
22         logEventTypePrefix  string
23         logFrequencySeconds int
24         minutesBetweenRuns  int
25 )
26
27 func init() {
28         flag.StringVar(&logEventTypePrefix,
29                 "log-event-type-prefix",
30                 "experimental-data-manager",
31                 "Prefix to use in the event_type of our arvados log entries. Set to empty to turn off logging")
32         flag.IntVar(&logFrequencySeconds,
33                 "log-frequency-seconds",
34                 20,
35                 "How frequently we'll write log entries in seconds.")
36         flag.IntVar(&minutesBetweenRuns,
37                 "minutes-between-runs",
38                 0,
39                 "How many minutes we wait betwen data manager runs. 0 means run once and exit.")
40 }
41
42 func main() {
43         flag.Parse()
44         if minutesBetweenRuns == 0 {
45                 arv, err := makeArvadosClient()
46                 if err != nil {
47                         log.Fatalf("makeArvadosClient: %v", err)
48                 }
49                 err = singlerun(arv)
50                 if err != nil {
51                         log.Fatalf("singlerun: %v", err)
52                 }
53         } else {
54                 waitTime := time.Minute * time.Duration(minutesBetweenRuns)
55                 for {
56                         log.Println("Beginning Run")
57                         arv, err := makeArvadosClient()
58                         if err != nil {
59                                 log.Fatalf("makeArvadosClient: %v", err)
60                         }
61                         err = singlerun(arv)
62                         if err != nil {
63                                 log.Printf("singlerun: %v", err)
64                         }
65                         log.Printf("Sleeping for %d minutes", minutesBetweenRuns)
66                         time.Sleep(waitTime)
67                 }
68         }
69 }
70
71 func makeArvadosClient() (arvadosclient.ArvadosClient, error) {
72         return arvadosclient.MakeArvadosClient()
73 }
74
75 func singlerun(arv arvadosclient.ArvadosClient) error {
76         var err error
77         if isAdmin, err := util.UserIsAdmin(arv); err != nil {
78                 return errors.New("Error verifying admin token: " + err.Error())
79         } else if !isAdmin {
80                 return errors.New("Current user is not an admin. Datamanager requires a privileged token.")
81         }
82
83         var arvLogger *logger.Logger
84         if logEventTypePrefix != "" {
85                 arvLogger = logger.NewLogger(logger.LoggerParams{
86                         Client:          arv,
87                         EventTypePrefix: logEventTypePrefix,
88                         WriteInterval:   time.Second * time.Duration(logFrequencySeconds)})
89         }
90
91         loggerutil.LogRunInfo(arvLogger)
92         if arvLogger != nil {
93                 arvLogger.AddWriteHook(loggerutil.LogMemoryAlloc)
94         }
95
96         var (
97                 dataFetcher     summary.DataFetcher
98                 readCollections collection.ReadCollections
99                 keepServerInfo  keep.ReadServers
100         )
101
102         if summary.ShouldReadData() {
103                 dataFetcher = summary.ReadData
104         } else {
105                 dataFetcher = BuildDataFetcher(arv)
106         }
107
108         dataFetcher(arvLogger, &readCollections, &keepServerInfo)
109
110         if len(readCollections.UUIDToCollection) == 0 {
111                 return nil // no collections read so no more work to do?
112         }
113
114         _, err = summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo)
115         if err != nil {
116                 return err
117         }
118
119         buckets := summary.BucketReplication(readCollections, keepServerInfo)
120         bucketCounts := buckets.Counts()
121
122         replicationSummary := buckets.SummarizeBuckets(readCollections)
123         replicationCounts := replicationSummary.ComputeCounts()
124
125         log.Printf("Blocks In Collections: %d, "+
126                 "\nBlocks In Keep: %d.",
127                 len(readCollections.BlockToDesiredReplication),
128                 len(keepServerInfo.BlockToServers))
129         log.Println(replicationCounts.PrettyPrint())
130
131         log.Printf("Blocks Histogram:")
132         for _, rlbss := range bucketCounts {
133                 log.Printf("%+v: %10d",
134                         rlbss.Levels,
135                         rlbss.Count)
136         }
137
138         kc, err := keepclient.MakeKeepClient(&arv)
139         if err != nil {
140                 return fmt.Errorf("Error setting up keep client %v", err.Error())
141         }
142
143         // Log that we're finished. We force the recording, since go will
144         // not wait for the write timer before exiting.
145         if arvLogger != nil {
146                 defer arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
147                         summaryInfo := logger.GetOrCreateMap(p, "summary_info")
148                         summaryInfo["block_replication_counts"] = bucketCounts
149                         summaryInfo["replication_summary"] = replicationCounts
150                         p["summary_info"] = summaryInfo
151
152                         p["run_info"].(map[string]interface{})["finished_at"] = time.Now()
153                 })
154         }
155
156         pullServers := summary.ComputePullServers(kc,
157                 &keepServerInfo,
158                 readCollections.BlockToDesiredReplication,
159                 replicationSummary.UnderReplicatedBlocks)
160
161         pullLists := summary.BuildPullLists(pullServers)
162
163         trashLists, trashErr := summary.BuildTrashLists(kc,
164                 &keepServerInfo,
165                 replicationSummary.KeepBlocksNotInCollections)
166
167         err = summary.WritePullLists(arvLogger, pullLists)
168         if err != nil {
169                 return err
170         }
171
172         if trashErr != nil {
173                 return err
174         }
175         keep.SendTrashLists(kc, trashLists)
176
177         return nil
178 }
179
180 // BuildDataFetcher returns a data fetcher that fetches data from remote servers.
181 func BuildDataFetcher(arv arvadosclient.ArvadosClient) summary.DataFetcher {
182         return func(arvLogger *logger.Logger,
183                 readCollections *collection.ReadCollections,
184                 keepServerInfo *keep.ReadServers) {
185                 collectionChannel := make(chan collection.ReadCollections)
186
187                 go func() {
188                         collectionChannel <- collection.GetCollectionsAndSummarize(
189                                 arvLogger,
190                                 collection.GetCollectionsParams{
191                                         Client:    arv,
192                                         Logger:    arvLogger,
193                                         BatchSize: 50})
194                 }()
195
196                 var err error
197                 *keepServerInfo, err = keep.GetKeepServersAndSummarize(
198                         keep.GetKeepServersParams{
199                                 Client: arv,
200                                 Logger: arvLogger,
201                                 Limit:  1000})
202
203                 if err != nil {
204                         return
205                 }
206
207                 *readCollections = <-collectionChannel
208         }
209 }