Merge branch 'master' into 6663-git-server
[arvados.git] / services / datamanager / datamanager.go
1 /* Keep Datamanager. Responsible for checking on and reporting on Keep Storage */
2
3 package main
4
5 import (
6         "flag"
7         "fmt"
8         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9         "git.curoverse.com/arvados.git/sdk/go/keepclient"
10         "git.curoverse.com/arvados.git/sdk/go/logger"
11         "git.curoverse.com/arvados.git/sdk/go/util"
12         "git.curoverse.com/arvados.git/services/datamanager/collection"
13         "git.curoverse.com/arvados.git/services/datamanager/keep"
14         "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
15         "git.curoverse.com/arvados.git/services/datamanager/summary"
16         "log"
17         "time"
18 )
19
20 var (
21         logEventTypePrefix  string
22         logFrequencySeconds int
23         minutesBetweenRuns  int
24 )
25
26 func init() {
27         flag.StringVar(&logEventTypePrefix,
28                 "log-event-type-prefix",
29                 "experimental-data-manager",
30                 "Prefix to use in the event_type of our arvados log entries. Set to empty to turn off logging")
31         flag.IntVar(&logFrequencySeconds,
32                 "log-frequency-seconds",
33                 20,
34                 "How frequently we'll write log entries in seconds.")
35         flag.IntVar(&minutesBetweenRuns,
36                 "minutes-between-runs",
37                 0,
38                 "How many minutes we wait betwen data manager runs. 0 means run once and exit.")
39 }
40
41 func main() {
42         flag.Parse()
43         if minutesBetweenRuns == 0 {
44                 err := singlerun()
45                 if err != nil {
46                         log.Fatalf("Got an error: %v", err)
47                 }
48         } else {
49                 waitTime := time.Minute * time.Duration(minutesBetweenRuns)
50                 for {
51                         log.Println("Beginning Run")
52                         err := singlerun()
53                         if err != nil {
54                                 log.Printf("Got an error: %v", err)
55                         }
56                         log.Printf("Sleeping for %d minutes", minutesBetweenRuns)
57                         time.Sleep(waitTime)
58                 }
59         }
60 }
61
62 func singlerun() error {
63         arv, err := arvadosclient.MakeArvadosClient()
64         if err != nil {
65                 log.Fatalf("Error setting up arvados client %s", err.Error())
66         }
67
68         if is_admin, err := util.UserIsAdmin(arv); err != nil {
69                 log.Fatalf("Error querying current arvados user %s", err.Error())
70         } else if !is_admin {
71                 log.Fatalf("Current user is not an admin. Datamanager can only be run by admins.")
72         }
73
74         var arvLogger *logger.Logger
75         if logEventTypePrefix != "" {
76                 arvLogger = logger.NewLogger(logger.LoggerParams{
77                         Client:          arv,
78                         EventTypePrefix: logEventTypePrefix,
79                         WriteInterval:   time.Second * time.Duration(logFrequencySeconds)})
80         }
81
82         loggerutil.LogRunInfo(arvLogger)
83         if arvLogger != nil {
84                 arvLogger.AddWriteHook(loggerutil.LogMemoryAlloc)
85         }
86
87         var (
88                 dataFetcher     summary.DataFetcher
89                 readCollections collection.ReadCollections
90                 keepServerInfo  keep.ReadServers
91         )
92
93         if summary.ShouldReadData() {
94                 dataFetcher = summary.ReadData
95         } else {
96                 dataFetcher = BuildDataFetcher(arv)
97         }
98
99         dataFetcher(arvLogger, &readCollections, &keepServerInfo)
100
101         summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo)
102
103         buckets := summary.BucketReplication(readCollections, keepServerInfo)
104         bucketCounts := buckets.Counts()
105
106         replicationSummary := buckets.SummarizeBuckets(readCollections)
107         replicationCounts := replicationSummary.ComputeCounts()
108
109         log.Printf("Blocks In Collections: %d, "+
110                 "\nBlocks In Keep: %d.",
111                 len(readCollections.BlockToDesiredReplication),
112                 len(keepServerInfo.BlockToServers))
113         log.Println(replicationCounts.PrettyPrint())
114
115         log.Printf("Blocks Histogram:")
116         for _, rlbss := range bucketCounts {
117                 log.Printf("%+v: %10d",
118                         rlbss.Levels,
119                         rlbss.Count)
120         }
121
122         kc, err := keepclient.MakeKeepClient(&arv)
123         if err != nil {
124                 loggerutil.FatalWithMessage(arvLogger,
125                         fmt.Sprintf("Error setting up keep client %s", err.Error()))
126         }
127
128         // Log that we're finished. We force the recording, since go will
129         // not wait for the write timer before exiting.
130         if arvLogger != nil {
131                 defer arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
132                         summaryInfo := logger.GetOrCreateMap(p, "summary_info")
133                         summaryInfo["block_replication_counts"] = bucketCounts
134                         summaryInfo["replication_summary"] = replicationCounts
135                         p["summary_info"] = summaryInfo
136
137                         p["run_info"].(map[string]interface{})["finished_at"] = time.Now()
138                 })
139         }
140
141         pullServers := summary.ComputePullServers(kc,
142                 &keepServerInfo,
143                 readCollections.BlockToDesiredReplication,
144                 replicationSummary.UnderReplicatedBlocks)
145
146         pullLists := summary.BuildPullLists(pullServers)
147
148         trashLists, trashErr := summary.BuildTrashLists(kc,
149                 &keepServerInfo,
150                 replicationSummary.KeepBlocksNotInCollections)
151
152         summary.WritePullLists(arvLogger, pullLists)
153
154         if trashErr != nil {
155                 return err
156         } else {
157                 keep.SendTrashLists(keep.GetDataManagerToken(arvLogger), kc, trashLists)
158         }
159
160         return nil
161 }
162
163 // Returns a data fetcher that fetches data from remote servers.
164 func BuildDataFetcher(arv arvadosclient.ArvadosClient) summary.DataFetcher {
165         return func(arvLogger *logger.Logger,
166                 readCollections *collection.ReadCollections,
167                 keepServerInfo *keep.ReadServers) {
168                 collectionChannel := make(chan collection.ReadCollections)
169
170                 go func() {
171                         collectionChannel <- collection.GetCollectionsAndSummarize(
172                                 collection.GetCollectionsParams{
173                                         Client:    arv,
174                                         Logger:    arvLogger,
175                                         BatchSize: 50})
176                 }()
177
178                 *keepServerInfo = keep.GetKeepServersAndSummarize(
179                         keep.GetKeepServersParams{
180                                 Client: arv,
181                                 Logger: arvLogger,
182                                 Limit:  1000})
183
184                 *readCollections = <-collectionChannel
185         }
186 }