1 /* Keep Datamanager. Responsible for checking on and reporting on Keep Storage */
7 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8 "git.curoverse.com/arvados.git/sdk/go/logger"
9 "git.curoverse.com/arvados.git/sdk/go/util"
10 "git.curoverse.com/arvados.git/services/datamanager/collection"
11 "git.curoverse.com/arvados.git/services/datamanager/keep"
20 logFrequencySeconds int
24 flag.StringVar(&logEventType,
26 "experimental-data-manager-report",
27 "event_type to use in our arvados log entries. Set to empty to turn off logging")
28 flag.IntVar(&logFrequencySeconds,
29 "log-frequency-seconds",
31 "How frequently we'll write log entries in seconds.")
37 arv, err := arvadosclient.MakeArvadosClient()
39 log.Fatalf("Error setting up arvados client %s", err.Error())
42 if is_admin, err := util.UserIsAdmin(arv); err != nil {
43 log.Fatalf("Error querying current arvados user %s", err.Error())
45 log.Fatalf("Current user is not an admin. Datamanager can only be run by admins.")
48 var arvLogger *logger.Logger
49 if logEventType != "" {
50 arvLogger = logger.NewLogger(logger.LoggerParams{Client: arv,
51 EventType: logEventType,
52 MinimumWriteInterval: time.Second * time.Duration(logFrequencySeconds)})
56 properties, _ := arvLogger.Edit()
57 runInfo := make(map[string]interface{})
58 runInfo["start_time"] = time.Now()
59 runInfo["args"] = os.Args
60 hostname, err := os.Hostname()
62 runInfo["hostname_error"] = err.Error()
64 runInfo["hostname"] = hostname
66 runInfo["pid"] = os.Getpid()
67 properties["run_info"] = runInfo
69 arvLogger.AddWriteHook(LogMemoryAlloc)
74 // TODO(misha): Read Collections and Keep Contents concurrently as goroutines.
75 // This requires waiting on them to finish before you let main() exit.
77 RunCollections(collection.GetCollectionsParams{
78 Client: arv, Logger: arvLogger, BatchSize: 50})
80 RunKeep(keep.GetKeepServersParams{Client: arv, Limit: 1000})
83 func RunCollections(params collection.GetCollectionsParams) {
84 readCollections := collection.GetCollections(params)
86 UserUsage := ComputeSizeOfOwnedCollections(readCollections)
87 log.Printf("Uuid to Size used: %v", UserUsage)
89 // TODO(misha): Add a "readonly" flag. If we're in readonly mode,
90 // lots of behaviors can become warnings (and obviously we can't
92 // if !readCollections.ReadAllCollections {
93 // log.Fatalf("Did not read all collections")
96 log.Printf("Read and processed %d collections",
97 len(readCollections.UuidToCollection))
100 func RunKeep(params keep.GetKeepServersParams) {
101 readServers := keep.GetKeepServers(params)
103 log.Printf("Returned %d keep disks", len(readServers.ServerToContents))
105 blockReplicationCounts := make(map[int]int)
106 for _, infos := range readServers.BlockToServers {
107 replication := len(infos)
108 blockReplicationCounts[replication] += 1
111 log.Printf("Replication level distribution: %v", blockReplicationCounts)
114 func ComputeSizeOfOwnedCollections(readCollections collection.ReadCollections) (
115 results map[string]int) {
116 results = make(map[string]int)
117 for _, coll := range readCollections.UuidToCollection {
118 results[coll.OwnerUuid] = results[coll.OwnerUuid] + coll.TotalSize
123 func LogMemoryAlloc(properties map[string]interface{}, entry map[string]interface{}) {
124 _ = entry // keep the compiler from complaining
125 runInfo := properties["run_info"].(map[string]interface{})
126 var memStats runtime.MemStats
127 runtime.ReadMemStats(&memStats)
128 runInfo["alloc_bytes_in_use"] = memStats.Alloc