X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/af550e54c034136e5fcb187e7f81e3d82170f9c8..970095751e2e836ed296152ae3e9ccb6caa62f62:/services/datamanager/datamanager.go diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go index cd3e010c46..bf989026c4 100644 --- a/services/datamanager/datamanager.go +++ b/services/datamanager/datamanager.go @@ -5,12 +5,32 @@ package main import ( "flag" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/logger" "git.curoverse.com/arvados.git/sdk/go/util" "git.curoverse.com/arvados.git/services/datamanager/collection" "git.curoverse.com/arvados.git/services/datamanager/keep" "log" + "os" + "runtime" + "time" ) +var ( + logEventType string + logFrequencySeconds int +) + +func init() { + flag.StringVar(&logEventType, + "log-event-type", + "experimental-data-manager-report", + "event_type to use in our arvados log entries. Set to empty to turn off logging") + flag.IntVar(&logFrequencySeconds, + "log-frequency-seconds", + 20, + "How frequently we'll write log entries in seconds.") +} + func main() { flag.Parse() @@ -25,13 +45,43 @@ func main() { log.Fatalf("Current user is not an admin. Datamanager can only be run by admins.") } + var arvLogger *logger.Logger + if logEventType != "" { + arvLogger = logger.NewLogger(logger.LoggerParams{Client: arv, + EventType: logEventType, + MinimumWriteInterval: time.Second * time.Duration(logFrequencySeconds)}) + } + + if arvLogger != nil { + properties, _ := arvLogger.Edit() + runInfo := make(map[string]interface{}) + runInfo["start_time"] = time.Now() + runInfo["args"] = os.Args + hostname, err := os.Hostname() + if err != nil { + runInfo["hostname_error"] = err.Error() + } else { + runInfo["hostname"] = hostname + } + runInfo["pid"] = os.Getpid() + properties["run_info"] = runInfo + + arvLogger.AddEditHook(LogMemoryAlloc) + + arvLogger.Record() + } + // TODO(misha): Read Collections and Keep Contents concurrently as goroutines. + // This requires waiting on them to finish before you let main() exit. + + RunCollections(collection.GetCollectionsParams{ + Client: arv, Logger: arvLogger, BatchSize: 100}) - readCollections := collection.GetCollections( - collection.GetCollectionsParams{ - Client: arv, Limit: 50, LogEveryNthCollectionProcessed: 10}) + RunKeep(keep.GetKeepServersParams{Client: arv, Limit: 1000}) +} - log.Printf("Read Collections: %v", readCollections) +func RunCollections(params collection.GetCollectionsParams) { + readCollections := collection.GetCollections(params) UserUsage := ComputeSizeOfOwnedCollections(readCollections) log.Printf("Uuid to Size used: %v", UserUsage) @@ -45,11 +95,20 @@ func main() { log.Printf("Read and processed %d collections", len(readCollections.UuidToCollection)) +} + +func RunKeep(params keep.GetKeepServersParams) { + readServers := keep.GetKeepServers(params) - readServers := keep.GetKeepServers( - keep.GetKeepServersParams{Client: arv, Limit: 1000}) + log.Printf("Returned %d keep disks", len(readServers.ServerToContents)) - log.Printf("Returned %d keep disks", len(readServers.AddressToContents)) + blockReplicationCounts := make(map[int]int) + for _, infos := range readServers.BlockToServers { + replication := len(infos) + blockReplicationCounts[replication] += 1 + } + + log.Printf("Replication level distribution: %v", blockReplicationCounts) } func ComputeSizeOfOwnedCollections(readCollections collection.ReadCollections) ( @@ -60,3 +119,11 @@ func ComputeSizeOfOwnedCollections(readCollections collection.ReadCollections) ( } return } + +func LogMemoryAlloc(properties map[string]interface{}, entry map[string]interface{}) { + _ = entry // keep the compiler from complaining + runInfo := properties["run_info"].(map[string]interface{}) + var memStats runtime.MemStats + runtime.ReadMemStats(&memStats) + runInfo["alloc_bytes_in_use"] = memStats.Alloc +}