Merge branch 'master' of git.curoverse.com:arvados into 3408-production-datamanager
[arvados.git] / services / datamanager / datamanager.go
1 /* Keep Datamanager. Responsible for checking on and reporting on Keep Storage */
2
3 package main
4
5 import (
6         "flag"
7         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8         "git.curoverse.com/arvados.git/sdk/go/logger"
9         "git.curoverse.com/arvados.git/sdk/go/util"
10         "git.curoverse.com/arvados.git/services/datamanager/collection"
11         "git.curoverse.com/arvados.git/services/datamanager/keep"
12         "log"
13         "os"
14         "runtime"
15         "time"
16 )
17
18 var (
19         logEventType string
20         logFrequencySeconds int
21 )
22
23 func init() {
24         flag.StringVar(&logEventType, 
25                 "log-event-type",
26                 "experimental-data-manager-report",
27                 "event_type to use in our arvados log entries. Set to empty to turn off logging")
28         flag.IntVar(&logFrequencySeconds, 
29                 "log-frequency-seconds",
30                 20,
31                 "How frequently we'll write log entries in seconds.")
32 }
33
34 func main() {
35         flag.Parse()
36
37         arv, err := arvadosclient.MakeArvadosClient()
38         if err != nil {
39                 log.Fatalf("Error setting up arvados client %s", err.Error())
40         }
41
42         if is_admin, err := util.UserIsAdmin(arv); err != nil {
43                 log.Fatalf("Error querying current arvados user %s", err.Error())
44         } else if !is_admin {
45                 log.Fatalf("Current user is not an admin. Datamanager can only be run by admins.")
46         }
47
48         var arvLogger *logger.Logger
49         if logEventType != "" {
50                 arvLogger = logger.NewLogger(logger.LoggerParams{Client: arv,
51                         EventType: logEventType,
52                         MinimumWriteInterval: time.Second * time.Duration(logFrequencySeconds)})
53         }
54
55         if arvLogger != nil {
56                 properties, _ := arvLogger.Edit()
57                 runInfo := make(map[string]interface{})
58                 runInfo["start_time"] = time.Now()
59                 runInfo["args"] = os.Args
60                 hostname, err := os.Hostname()
61                 if err != nil {
62                         runInfo["hostname_error"] = err.Error()
63                 } else {
64                         runInfo["hostname"] = hostname
65                 }
66                 runInfo["pid"] = os.Getpid()
67                 properties["run_info"] = runInfo
68
69                 arvLogger.AddWriteHook(LogMemoryAlloc)
70
71                 arvLogger.Record()
72         }
73
74         // TODO(misha): Read Collections and Keep Contents concurrently as goroutines.
75         // This requires waiting on them to finish before you let main() exit.
76
77         RunCollections(collection.GetCollectionsParams{
78                 Client: arv, Logger: arvLogger, BatchSize: 50})
79
80         RunKeep(keep.GetKeepServersParams{Client: arv, Limit: 1000})
81 }
82
83 func RunCollections(params collection.GetCollectionsParams) {
84         readCollections := collection.GetCollections(params)
85
86         UserUsage := ComputeSizeOfOwnedCollections(readCollections)
87         log.Printf("Uuid to Size used: %v", UserUsage)
88
89         // TODO(misha): Add a "readonly" flag. If we're in readonly mode,
90         // lots of behaviors can become warnings (and obviously we can't
91         // write anything).
92         // if !readCollections.ReadAllCollections {
93         //      log.Fatalf("Did not read all collections")
94         // }
95
96         log.Printf("Read and processed %d collections",
97                 len(readCollections.UuidToCollection))
98 }
99
100 func RunKeep(params keep.GetKeepServersParams) {
101         readServers := keep.GetKeepServers(params)
102
103         log.Printf("Returned %d keep disks", len(readServers.ServerToContents))
104
105         blockReplicationCounts := make(map[int]int)
106         for _, infos := range readServers.BlockToServers {
107                 replication := len(infos)
108                 blockReplicationCounts[replication] += 1
109         }
110
111         log.Printf("Replication level distribution: %v", blockReplicationCounts)
112 }
113
114 func ComputeSizeOfOwnedCollections(readCollections collection.ReadCollections) (
115         results map[string]int) {
116         results = make(map[string]int)
117         for _, coll := range readCollections.UuidToCollection {
118                 results[coll.OwnerUuid] = results[coll.OwnerUuid] + coll.TotalSize
119         }
120         return
121 }
122
123 func LogMemoryAlloc(properties map[string]interface{}, entry map[string]interface{}) {
124         _ = entry  // keep the compiler from complaining
125         runInfo := properties["run_info"].(map[string]interface{})
126         var memStats runtime.MemStats
127         runtime.ReadMemStats(&memStats)
128         runInfo["alloc_bytes_in_use"] = memStats.Alloc
129 }