Made HostPort() an explicit method to print the HostPort, so that String() can change...
[arvados.git] / services / datamanager / datamanager.go
1 /* Keep Datamanager. Responsible for checking on and reporting on Keep Storage */
2
3 package main
4
5 import (
6         "flag"
7         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8         "git.curoverse.com/arvados.git/sdk/go/logger"
9         "git.curoverse.com/arvados.git/sdk/go/util"
10         "git.curoverse.com/arvados.git/services/datamanager/collection"
11         "git.curoverse.com/arvados.git/services/datamanager/keep"
12         "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
13         "git.curoverse.com/arvados.git/services/datamanager/summary"
14         "log"
15         "time"
16 )
17
18 var (
19         logEventTypePrefix  string
20         logFrequencySeconds int
21         minutesBetweenRuns  int
22 )
23
24 func init() {
25         flag.StringVar(&logEventTypePrefix,
26                 "log-event-type-prefix",
27                 "experimental-data-manager",
28                 "Prefix to use in the event_type of our arvados log entries. Set to empty to turn off logging")
29         flag.IntVar(&logFrequencySeconds,
30                 "log-frequency-seconds",
31                 20,
32                 "How frequently we'll write log entries in seconds.")
33         flag.IntVar(&minutesBetweenRuns,
34                 "minutes-between-runs",
35                 0,
36                 "How many minutes we wait betwen data manager runs. 0 means run once and exit.")
37 }
38
39 func main() {
40         flag.Parse()
41         if minutesBetweenRuns == 0 {
42                 singlerun()
43         } else {
44                 waitTime := time.Minute * time.Duration(minutesBetweenRuns)
45                 for {
46                         log.Println("Beginning Run")
47                         singlerun()
48                         log.Printf("Sleeping for %d minutes", minutesBetweenRuns)
49                         time.Sleep(waitTime)
50                 }
51         }
52 }
53
54 func singlerun() {
55         arv, err := arvadosclient.MakeArvadosClient()
56         if err != nil {
57                 log.Fatalf("Error setting up arvados client %s", err.Error())
58         }
59
60         if is_admin, err := util.UserIsAdmin(arv); err != nil {
61                 log.Fatalf("Error querying current arvados user %s", err.Error())
62         } else if !is_admin {
63                 log.Fatalf("Current user is not an admin. Datamanager can only be run by admins.")
64         }
65
66         var arvLogger *logger.Logger
67         if logEventTypePrefix != "" {
68                 arvLogger = logger.NewLogger(logger.LoggerParams{
69                         Client:          arv,
70                         EventTypePrefix: logEventTypePrefix,
71                         WriteInterval:   time.Second * time.Duration(logFrequencySeconds)})
72         }
73
74         loggerutil.LogRunInfo(arvLogger)
75         if arvLogger != nil {
76                 arvLogger.AddWriteHook(loggerutil.LogMemoryAlloc)
77         }
78
79         var (
80                 readCollections collection.ReadCollections
81                 keepServerInfo  keep.ReadServers
82         )
83
84         if !summary.MaybeReadData(arvLogger, &readCollections, &keepServerInfo) {
85                 collectionChannel := make(chan collection.ReadCollections)
86
87                 go func() {
88                         collectionChannel <- collection.GetCollectionsAndSummarize(
89                                 collection.GetCollectionsParams{
90                                         Client:    arv,
91                                         Logger:    arvLogger,
92                                         BatchSize: 50})
93                 }()
94
95                 keepServerInfo = keep.GetKeepServersAndSummarize(
96                         keep.GetKeepServersParams{
97                                 Client: arv,
98                                 Logger: arvLogger,
99                                 Limit:  1000})
100
101                 readCollections = <-collectionChannel
102         }
103
104         summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo)
105
106         buckets := summary.BucketReplication(readCollections, keepServerInfo)
107         bucketCounts := buckets.Counts()
108
109         replicationSummary := buckets.SummarizeBuckets(readCollections)
110         replicationCounts := replicationSummary.ComputeCounts()
111
112         log.Printf("Blocks In Collections: %d, "+
113                 "\nBlocks In Keep: %d.",
114                 len(readCollections.BlockToReplication),
115                 len(keepServerInfo.BlockToServers))
116         log.Println(replicationCounts.PrettyPrint())
117
118         log.Printf("Blocks Histogram:")
119         for _, rlbss := range bucketCounts {
120                 log.Printf("%+v: %10d",
121                         rlbss.Levels,
122                         rlbss.Count)
123         }
124
125         // Log that we're finished. We force the recording, since go will
126         // not wait for the write timer before exiting.
127         if arvLogger != nil {
128                 arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
129                         summaryInfo := logger.GetOrCreateMap(p, "summary_info")
130                         summaryInfo["block_replication_counts"] = bucketCounts
131                         summaryInfo["replication_summary"] = replicationCounts
132                         p["summary_info"] = summaryInfo
133
134                         p["run_info"].(map[string]interface{})["finished_at"] = time.Now()
135                 })
136         }
137 }