Merge branch 'master' into 6260-test-datamanager
[arvados.git] / services / datamanager / datamanager.go
1 /* Keep Datamanager. Responsible for checking on and reporting on Keep Storage */
2
3 package main
4
5 import (
6         "flag"
7         "fmt"
8         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9         "git.curoverse.com/arvados.git/sdk/go/keepclient"
10         "git.curoverse.com/arvados.git/sdk/go/logger"
11         "git.curoverse.com/arvados.git/sdk/go/util"
12         "git.curoverse.com/arvados.git/services/datamanager/collection"
13         "git.curoverse.com/arvados.git/services/datamanager/keep"
14         "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
15         "git.curoverse.com/arvados.git/services/datamanager/summary"
16         "log"
17         "time"
18 )
19
20 var (
21         logEventTypePrefix  string
22         logFrequencySeconds int
23         minutesBetweenRuns  int
24 )
25
26 func init() {
27         flag.StringVar(&logEventTypePrefix,
28                 "log-event-type-prefix",
29                 "experimental-data-manager",
30                 "Prefix to use in the event_type of our arvados log entries. Set to empty to turn off logging")
31         flag.IntVar(&logFrequencySeconds,
32                 "log-frequency-seconds",
33                 20,
34                 "How frequently we'll write log entries in seconds.")
35         flag.IntVar(&minutesBetweenRuns,
36                 "minutes-between-runs",
37                 0,
38                 "How many minutes we wait betwen data manager runs. 0 means run once and exit.")
39 }
40
41 func main() {
42         flag.Parse()
43         if minutesBetweenRuns == 0 {
44                 err := singlerun()
45                 if err != nil {
46                         log.Fatalf("Got an error: %v", err)
47                 }
48         } else {
49                 waitTime := time.Minute * time.Duration(minutesBetweenRuns)
50                 for {
51                         log.Println("Beginning Run")
52                         err := singlerun()
53                         if err != nil {
54                                 log.Printf("Got an error: %v", err)
55                         }
56                         log.Printf("Sleeping for %d minutes", minutesBetweenRuns)
57                         time.Sleep(waitTime)
58                 }
59         }
60 }
61
62 func singlerun() error {
63         arv, err := arvadosclient.MakeArvadosClient()
64         if err != nil {
65                 log.Fatalf("Error setting up arvados client %s", err.Error())
66         }
67
68         if is_admin, err := util.UserIsAdmin(arv); err != nil {
69                 log.Fatalf("Error querying current arvados user %s", err.Error())
70         } else if !is_admin {
71                 log.Fatalf("Current user is not an admin. Datamanager can only be run by admins.")
72         }
73
74         var arvLogger *logger.Logger
75         if logEventTypePrefix != "" {
76                 arvLogger = logger.NewLogger(logger.LoggerParams{
77                         Client:          arv,
78                         EventTypePrefix: logEventTypePrefix,
79                         WriteInterval:   time.Second * time.Duration(logFrequencySeconds)})
80         }
81
82         loggerutil.LogRunInfo(arvLogger)
83         if arvLogger != nil {
84                 arvLogger.AddWriteHook(loggerutil.LogMemoryAlloc)
85         }
86
87   // Verify that datamanager token belongs to an admin user
88   dataManagerToken := keep.GetDataManagerToken(arvLogger)
89   origArvToken := arv.ApiToken
90   arv.ApiToken = dataManagerToken
91         if is_admin, err := util.UserIsAdmin(arv); err != nil {
92                 log.Fatalf("Error querying arvados user for data manager token %s", err.Error())
93         } else if !is_admin {
94                 log.Fatalf("Datamanager token does not belong to an admin user.")
95         }
96   arv.ApiToken = origArvToken
97
98         var (
99                 dataFetcher     summary.DataFetcher
100                 readCollections collection.ReadCollections
101                 keepServerInfo  keep.ReadServers
102         )
103
104         if summary.ShouldReadData() {
105                 dataFetcher = summary.ReadData
106         } else {
107                 dataFetcher = BuildDataFetcher(arv)
108         }
109
110         dataFetcher(arvLogger, &readCollections, &keepServerInfo)
111
112         summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo)
113
114         buckets := summary.BucketReplication(readCollections, keepServerInfo)
115         bucketCounts := buckets.Counts()
116
117         replicationSummary := buckets.SummarizeBuckets(readCollections)
118         replicationCounts := replicationSummary.ComputeCounts()
119
120         log.Printf("Blocks In Collections: %d, "+
121                 "\nBlocks In Keep: %d.",
122                 len(readCollections.BlockToDesiredReplication),
123                 len(keepServerInfo.BlockToServers))
124         log.Println(replicationCounts.PrettyPrint())
125
126         log.Printf("Blocks Histogram:")
127         for _, rlbss := range bucketCounts {
128                 log.Printf("%+v: %10d",
129                         rlbss.Levels,
130                         rlbss.Count)
131         }
132
133         kc, err := keepclient.MakeKeepClient(&arv)
134         if err != nil {
135                 loggerutil.FatalWithMessage(arvLogger,
136                         fmt.Sprintf("Error setting up keep client %s", err.Error()))
137         }
138
139         // Log that we're finished. We force the recording, since go will
140         // not wait for the write timer before exiting.
141         if arvLogger != nil {
142                 defer arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
143                         summaryInfo := logger.GetOrCreateMap(p, "summary_info")
144                         summaryInfo["block_replication_counts"] = bucketCounts
145                         summaryInfo["replication_summary"] = replicationCounts
146                         p["summary_info"] = summaryInfo
147
148                         p["run_info"].(map[string]interface{})["finished_at"] = time.Now()
149                 })
150         }
151
152         pullServers := summary.ComputePullServers(kc,
153                 &keepServerInfo,
154                 readCollections.BlockToDesiredReplication,
155                 replicationSummary.UnderReplicatedBlocks)
156
157         pullLists := summary.BuildPullLists(pullServers)
158
159         trashLists, trashErr := summary.BuildTrashLists(kc,
160                 &keepServerInfo,
161                 replicationSummary.KeepBlocksNotInCollections)
162
163         summary.WritePullLists(arvLogger, pullLists)
164
165         if trashErr != nil {
166                 return err
167         } else {
168                 keep.SendTrashLists(dataManagerToken, kc, trashLists)
169         }
170
171         return nil
172 }
173
174 // Returns a data fetcher that fetches data from remote servers.
175 func BuildDataFetcher(arv arvadosclient.ArvadosClient) summary.DataFetcher {
176         return func(arvLogger *logger.Logger,
177                 readCollections *collection.ReadCollections,
178                 keepServerInfo *keep.ReadServers) {
179                 collectionChannel := make(chan collection.ReadCollections)
180
181                 go func() {
182                         collectionChannel <- collection.GetCollectionsAndSummarize(
183                                 collection.GetCollectionsParams{
184                                         Client:    arv,
185                                         Logger:    arvLogger,
186                                         BatchSize: 50})
187                 }()
188
189                 *keepServerInfo = keep.GetKeepServersAndSummarize(
190                         keep.GetKeepServersParams{
191                                 Client: arv,
192                                 Logger: arvLogger,
193                                 Limit:  1000})
194
195                 *readCollections = <-collectionChannel
196         }
197 }