Merge branch '6264-cwl-runner' closes #6264
[arvados.git] / services / datamanager / datamanager.go
1 /* Keep Datamanager. Responsible for checking on and reporting on Keep Storage */
2
3 package main
4
5 import (
6         "flag"
7         "fmt"
8         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9         "git.curoverse.com/arvados.git/sdk/go/keepclient"
10         "git.curoverse.com/arvados.git/sdk/go/logger"
11         "git.curoverse.com/arvados.git/sdk/go/util"
12         "git.curoverse.com/arvados.git/services/datamanager/collection"
13         "git.curoverse.com/arvados.git/services/datamanager/keep"
14         "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
15         "git.curoverse.com/arvados.git/services/datamanager/summary"
16         "log"
17         "time"
18 )
19
20 var (
21         logEventTypePrefix  string
22         logFrequencySeconds int
23         minutesBetweenRuns  int
24 )
25
26 func init() {
27         flag.StringVar(&logEventTypePrefix,
28                 "log-event-type-prefix",
29                 "experimental-data-manager",
30                 "Prefix to use in the event_type of our arvados log entries. Set to empty to turn off logging")
31         flag.IntVar(&logFrequencySeconds,
32                 "log-frequency-seconds",
33                 20,
34                 "How frequently we'll write log entries in seconds.")
35         flag.IntVar(&minutesBetweenRuns,
36                 "minutes-between-runs",
37                 0,
38                 "How many minutes we wait betwen data manager runs. 0 means run once and exit.")
39 }
40
41 func main() {
42         flag.Parse()
43         if minutesBetweenRuns == 0 {
44                 singlerun()
45         } else {
46                 waitTime := time.Minute * time.Duration(minutesBetweenRuns)
47                 for {
48                         log.Println("Beginning Run")
49                         singlerun()
50                         log.Printf("Sleeping for %d minutes", minutesBetweenRuns)
51                         time.Sleep(waitTime)
52                 }
53         }
54 }
55
56 func singlerun() {
57         arv, err := arvadosclient.MakeArvadosClient()
58         if err != nil {
59                 log.Fatalf("Error setting up arvados client %s", err.Error())
60         }
61
62         if is_admin, err := util.UserIsAdmin(arv); err != nil {
63                 log.Fatalf("Error querying current arvados user %s", err.Error())
64         } else if !is_admin {
65                 log.Fatalf("Current user is not an admin. Datamanager can only be run by admins.")
66         }
67
68         var arvLogger *logger.Logger
69         if logEventTypePrefix != "" {
70                 arvLogger = logger.NewLogger(logger.LoggerParams{
71                         Client:          arv,
72                         EventTypePrefix: logEventTypePrefix,
73                         WriteInterval:   time.Second * time.Duration(logFrequencySeconds)})
74         }
75
76         loggerutil.LogRunInfo(arvLogger)
77         if arvLogger != nil {
78                 arvLogger.AddWriteHook(loggerutil.LogMemoryAlloc)
79         }
80
81         var (
82                 dataFetcher     summary.DataFetcher
83                 readCollections collection.ReadCollections
84                 keepServerInfo  keep.ReadServers
85         )
86
87         if summary.ShouldReadData() {
88                 dataFetcher = summary.ReadData
89         } else {
90                 dataFetcher = BuildDataFetcher(arv)
91         }
92
93         dataFetcher(arvLogger, &readCollections, &keepServerInfo)
94
95         summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo)
96
97         buckets := summary.BucketReplication(readCollections, keepServerInfo)
98         bucketCounts := buckets.Counts()
99
100         replicationSummary := buckets.SummarizeBuckets(readCollections)
101         replicationCounts := replicationSummary.ComputeCounts()
102
103         log.Printf("Blocks In Collections: %d, "+
104                 "\nBlocks In Keep: %d.",
105                 len(readCollections.BlockToDesiredReplication),
106                 len(keepServerInfo.BlockToServers))
107         log.Println(replicationCounts.PrettyPrint())
108
109         log.Printf("Blocks Histogram:")
110         for _, rlbss := range bucketCounts {
111                 log.Printf("%+v: %10d",
112                         rlbss.Levels,
113                         rlbss.Count)
114         }
115
116         kc, err := keepclient.MakeKeepClient(&arv)
117         if err != nil {
118                 loggerutil.FatalWithMessage(arvLogger,
119                         fmt.Sprintf("Error setting up keep client %s", err.Error()))
120         }
121
122         pullServers := summary.ComputePullServers(kc,
123                 &keepServerInfo,
124                 readCollections.BlockToDesiredReplication,
125                 replicationSummary.UnderReplicatedBlocks)
126
127         pullLists := summary.BuildPullLists(pullServers)
128
129         summary.WritePullLists(arvLogger, pullLists)
130
131         // Log that we're finished. We force the recording, since go will
132         // not wait for the write timer before exiting.
133         if arvLogger != nil {
134                 arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
135                         summaryInfo := logger.GetOrCreateMap(p, "summary_info")
136                         summaryInfo["block_replication_counts"] = bucketCounts
137                         summaryInfo["replication_summary"] = replicationCounts
138                         p["summary_info"] = summaryInfo
139
140                         p["run_info"].(map[string]interface{})["finished_at"] = time.Now()
141                 })
142         }
143 }
144
145 // Returns a data fetcher that fetches data from remote servers.
146 func BuildDataFetcher(arv arvadosclient.ArvadosClient) summary.DataFetcher {
147         return func(arvLogger *logger.Logger,
148                 readCollections *collection.ReadCollections,
149                 keepServerInfo *keep.ReadServers) {
150                 collectionChannel := make(chan collection.ReadCollections)
151
152                 go func() {
153                         collectionChannel <- collection.GetCollectionsAndSummarize(
154                                 collection.GetCollectionsParams{
155                                         Client:    arv,
156                                         Logger:    arvLogger,
157                                         BatchSize: 50})
158                 }()
159
160                 *keepServerInfo = keep.GetKeepServersAndSummarize(
161                         keep.GetKeepServersParams{
162                                 Client: arv,
163                                 Logger: arvLogger,
164                                 Limit:  1000})
165
166                 *readCollections = <-collectionChannel
167         }
168 }