Added memory alloc in use to stats exported to log. Also added EditHooks to Logger...
[arvados.git] / services / datamanager / datamanager.go
index cd3e010c4639cde5bc7e2f981d7b2dd555e4d8f7..bf989026c45189db29a7a1ca10c8c785c7a383ac 100644 (file)
@@ -5,12 +5,32 @@ package main
 import (
        "flag"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/logger"
        "git.curoverse.com/arvados.git/sdk/go/util"
        "git.curoverse.com/arvados.git/services/datamanager/collection"
        "git.curoverse.com/arvados.git/services/datamanager/keep"
        "log"
+       "os"
+       "runtime"
+       "time"
 )
 
+var (
+       logEventType string
+       logFrequencySeconds int
+)
+
+func init() {
+       flag.StringVar(&logEventType, 
+               "log-event-type",
+               "experimental-data-manager-report",
+               "event_type to use in our arvados log entries. Set to empty to turn off logging")
+       flag.IntVar(&logFrequencySeconds, 
+               "log-frequency-seconds",
+               20,
+               "How frequently we'll write log entries in seconds.")
+}
+
 func main() {
        flag.Parse()
 
@@ -25,13 +45,43 @@ func main() {
                log.Fatalf("Current user is not an admin. Datamanager can only be run by admins.")
        }
 
+       var arvLogger *logger.Logger
+       if logEventType != "" {
+               arvLogger = logger.NewLogger(logger.LoggerParams{Client: arv,
+                       EventType: logEventType,
+                       MinimumWriteInterval: time.Second * time.Duration(logFrequencySeconds)})
+       }
+
+       if arvLogger != nil {
+               properties, _ := arvLogger.Edit()
+               runInfo := make(map[string]interface{})
+               runInfo["start_time"] = time.Now()
+               runInfo["args"] = os.Args
+               hostname, err := os.Hostname()
+               if err != nil {
+                       runInfo["hostname_error"] = err.Error()
+               } else {
+                       runInfo["hostname"] = hostname
+               }
+               runInfo["pid"] = os.Getpid()
+               properties["run_info"] = runInfo
+
+               arvLogger.AddEditHook(LogMemoryAlloc)
+
+               arvLogger.Record()
+       }
+
        // TODO(misha): Read Collections and Keep Contents concurrently as goroutines.
+       // This requires waiting on them to finish before you let main() exit.
+
+       RunCollections(collection.GetCollectionsParams{
+               Client: arv, Logger: arvLogger, BatchSize: 100})
 
-       readCollections := collection.GetCollections(
-               collection.GetCollectionsParams{
-                       Client: arv, Limit: 50, LogEveryNthCollectionProcessed: 10})
+       RunKeep(keep.GetKeepServersParams{Client: arv, Limit: 1000})
+}
 
-       log.Printf("Read Collections: %v", readCollections)
+func RunCollections(params collection.GetCollectionsParams) {
+       readCollections := collection.GetCollections(params)
 
        UserUsage := ComputeSizeOfOwnedCollections(readCollections)
        log.Printf("Uuid to Size used: %v", UserUsage)
@@ -45,11 +95,20 @@ func main() {
 
        log.Printf("Read and processed %d collections",
                len(readCollections.UuidToCollection))
+}
+
+func RunKeep(params keep.GetKeepServersParams) {
+       readServers := keep.GetKeepServers(params)
 
-       readServers := keep.GetKeepServers(
-               keep.GetKeepServersParams{Client: arv, Limit: 1000})
+       log.Printf("Returned %d keep disks", len(readServers.ServerToContents))
 
-       log.Printf("Returned %d keep disks", len(readServers.AddressToContents))
+       blockReplicationCounts := make(map[int]int)
+       for _, infos := range readServers.BlockToServers {
+               replication := len(infos)
+               blockReplicationCounts[replication] += 1
+       }
+
+       log.Printf("Replication level distribution: %v", blockReplicationCounts)
 }
 
 func ComputeSizeOfOwnedCollections(readCollections collection.ReadCollections) (
@@ -60,3 +119,11 @@ func ComputeSizeOfOwnedCollections(readCollections collection.ReadCollections) (
        }
        return
 }
+
+func LogMemoryAlloc(properties map[string]interface{}, entry map[string]interface{}) {
+       _ = entry  // keep the compiler from complaining
+       runInfo := properties["run_info"].(map[string]interface{})
+       var memStats runtime.MemStats
+       runtime.ReadMemStats(&memStats)
+       runInfo["alloc_bytes_in_use"] = memStats.Alloc
+}