X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/3dcb5f41ca5e20004ca3d042958246bd341101bb..43538243995267c417983360d226d6e8eb181139:/services/datamanager/datamanager.go diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go index f454f82a6f..bd68db112a 100644 --- a/services/datamanager/datamanager.go +++ b/services/datamanager/datamanager.go @@ -3,67 +3,81 @@ package main import ( - //"git.curoverse.com/arvados.git/sdk/go/keepclient" + "flag" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/logger" + "git.curoverse.com/arvados.git/sdk/go/util" "git.curoverse.com/arvados.git/services/datamanager/collection" + "git.curoverse.com/arvados.git/services/datamanager/keep" + "git.curoverse.com/arvados.git/services/datamanager/loggerutil" "log" + "time" ) -// Helper type so we don't have to write out 'map[string]interface{}' every time. -type Dict map[string]interface{} +var ( + logEventType string + logFrequencySeconds int +) -func UserIsAdmin(arv arvadosclient.ArvadosClient) (is_admin bool, err error) { - type user struct { - IsAdmin bool `json:"is_admin"` - } - var u user - err = arv.Call("GET", "users", "", "current", nil, &u) - return u.IsAdmin, err +func init() { + flag.StringVar(&logEventType, + "log-event-type", + "experimental-data-manager-report", + "event_type to use in our arvados log entries. Set to empty to turn off logging") + flag.IntVar(&logFrequencySeconds, + "log-frequency-seconds", + 20, + "How frequently we'll write log entries in seconds.") } func main() { + flag.Parse() + arv, err := arvadosclient.MakeArvadosClient() if err != nil { log.Fatalf("Error setting up arvados client %s", err.Error()) } - if is_admin, err := UserIsAdmin(arv); err != nil { + if is_admin, err := util.UserIsAdmin(arv); err != nil { log.Fatalf("Error querying current arvados user %s", err.Error()) } else if !is_admin { log.Fatalf("Current user is not an admin. Datamanager can only be run by admins.") } - readCollections := collection.GetCollections( - collection.GetCollectionsParams{ - Client: arv, Limit: 50, LogEveryNthCollectionProcessed: 10}) + var arvLogger *logger.Logger + if logEventType != "" { + arvLogger = logger.NewLogger(logger.LoggerParams{Client: arv, + EventType: logEventType, + WriteInterval: time.Second * time.Duration(logFrequencySeconds)}) + } + + loggerutil.LogRunInfo(arvLogger) + if arvLogger != nil { + arvLogger.AddWriteHook(loggerutil.LogMemoryAlloc) + } - //log.Printf("Read Collections: %v", readCollections) + collectionChannel := make(chan collection.ReadCollections) - // TODO(misha): Add a "readonly" flag. If we're in readonly mode, - // lots of behaviors can become warnings (and obviously we can't - // write anything). - // if !readCollections.ReadAllCollections { - // log.Fatalf("Did not read all collections") - // } + go func() { + collectionChannel <- collection.GetCollectionsAndSummarize( + collection.GetCollectionsParams{ + Client: arv, Logger: arvLogger, BatchSize: 50}) + }() - log.Printf("Read and processed %d collections", - len(readCollections.UuidToCollection)) + keepServerInfo := keep.GetKeepServersAndSummarize( + keep.GetKeepServersParams{Client: arv, Logger: arvLogger, Limit: 1000}) - // TODO(misha): Send SDK and Keep requests in parallel + readCollections := <-collectionChannel - keepParams := arvadosclient.Dict{"limit": 1000} - var keepDisks map[string]interface{} - err = arv.List("keep_disks", keepParams, &keepDisks) - if err != nil { - log.Fatalf("Error requesting keep disks from API server: %v", err) - } - var retrievedAll bool - var numDisksReturned, numDisksAvailable int - if retrievedAll, numDisksReturned, numDisksAvailable = - collection.SdkListResponseContainsAllAvailableItems(keepDisks); !retrievedAll { - log.Fatalf("Failed to retrieve all keep disks. Only received %d of %d", - numDisksReturned, numDisksAvailable) - } + // TODO(misha): Use these together to verify replication. + _ = readCollections + _ = keepServerInfo - log.Printf("Returned %d keep disks", numDisksReturned) + // Log that we're finished. We force the recording, since go will + // not wait for the timer before exiting. + if arvLogger != nil { + arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) { + p["run_info"].(map[string]interface{})["time_finished"] = time.Now() + }) + } }