X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/828f692c3ada2691d5c86dff79d91e7e4e2cdda0..970095751e2e836ed296152ae3e9ccb6caa62f62:/services/datamanager/datamanager.go diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go index a2edc7be2f..bf989026c4 100644 --- a/services/datamanager/datamanager.go +++ b/services/datamanager/datamanager.go @@ -3,18 +3,37 @@ package main import ( - //"git.curoverse.com/arvados.git/sdk/go/keepclient" + "flag" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/logger" "git.curoverse.com/arvados.git/sdk/go/util" "git.curoverse.com/arvados.git/services/datamanager/collection" "git.curoverse.com/arvados.git/services/datamanager/keep" "log" + "os" + "runtime" + "time" ) -// Helper type so we don't have to write out 'map[string]interface{}' every time. -type Dict map[string]interface{} +var ( + logEventType string + logFrequencySeconds int +) + +func init() { + flag.StringVar(&logEventType, + "log-event-type", + "experimental-data-manager-report", + "event_type to use in our arvados log entries. Set to empty to turn off logging") + flag.IntVar(&logFrequencySeconds, + "log-frequency-seconds", + 20, + "How frequently we'll write log entries in seconds.") +} func main() { + flag.Parse() + arv, err := arvadosclient.MakeArvadosClient() if err != nil { log.Fatalf("Error setting up arvados client %s", err.Error()) @@ -26,11 +45,46 @@ func main() { log.Fatalf("Current user is not an admin. Datamanager can only be run by admins.") } - readCollections := collection.GetCollections( - collection.GetCollectionsParams{ - Client: arv, Limit: 50, LogEveryNthCollectionProcessed: 10}) + var arvLogger *logger.Logger + if logEventType != "" { + arvLogger = logger.NewLogger(logger.LoggerParams{Client: arv, + EventType: logEventType, + MinimumWriteInterval: time.Second * time.Duration(logFrequencySeconds)}) + } + + if arvLogger != nil { + properties, _ := arvLogger.Edit() + runInfo := make(map[string]interface{}) + runInfo["start_time"] = time.Now() + runInfo["args"] = os.Args + hostname, err := os.Hostname() + if err != nil { + runInfo["hostname_error"] = err.Error() + } else { + runInfo["hostname"] = hostname + } + runInfo["pid"] = os.Getpid() + properties["run_info"] = runInfo - //log.Printf("Read Collections: %v", readCollections) + arvLogger.AddEditHook(LogMemoryAlloc) + + arvLogger.Record() + } + + // TODO(misha): Read Collections and Keep Contents concurrently as goroutines. + // This requires waiting on them to finish before you let main() exit. + + RunCollections(collection.GetCollectionsParams{ + Client: arv, Logger: arvLogger, BatchSize: 100}) + + RunKeep(keep.GetKeepServersParams{Client: arv, Limit: 1000}) +} + +func RunCollections(params collection.GetCollectionsParams) { + readCollections := collection.GetCollections(params) + + UserUsage := ComputeSizeOfOwnedCollections(readCollections) + log.Printf("Uuid to Size used: %v", UserUsage) // TODO(misha): Add a "readonly" flag. If we're in readonly mode, // lots of behaviors can become warnings (and obviously we can't @@ -41,9 +95,35 @@ func main() { log.Printf("Read and processed %d collections", len(readCollections.UuidToCollection)) +} - readServers := keep.GetKeepServers( - keep.GetKeepServersParams{Client: arv, Limit: 1000}) +func RunKeep(params keep.GetKeepServersParams) { + readServers := keep.GetKeepServers(params) + + log.Printf("Returned %d keep disks", len(readServers.ServerToContents)) + + blockReplicationCounts := make(map[int]int) + for _, infos := range readServers.BlockToServers { + replication := len(infos) + blockReplicationCounts[replication] += 1 + } + + log.Printf("Replication level distribution: %v", blockReplicationCounts) +} + +func ComputeSizeOfOwnedCollections(readCollections collection.ReadCollections) ( + results map[string]int) { + results = make(map[string]int) + for _, coll := range readCollections.UuidToCollection { + results[coll.OwnerUuid] = results[coll.OwnerUuid] + coll.TotalSize + } + return +} - log.Printf("Returned %d keep disks", len(readServers.AddressToContents)) +func LogMemoryAlloc(properties map[string]interface{}, entry map[string]interface{}) { + _ = entry // keep the compiler from complaining + runInfo := properties["run_info"].(map[string]interface{}) + var memStats runtime.MemStats + runtime.ReadMemStats(&memStats) + runInfo["alloc_bytes_in_use"] = memStats.Alloc }