X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/5e27876fa4d3faf3b973282bfb4f152c02345bdc..0c0f18dfbcdcf552889258b76563315fbe2eb060:/services/datamanager/datamanager.go diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go index 91612668b5..3c53b4ad2a 100644 --- a/services/datamanager/datamanager.go +++ b/services/datamanager/datamanager.go @@ -3,6 +3,7 @@ package main import ( + "errors" "flag" "fmt" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" @@ -41,31 +42,42 @@ func init() { func main() { flag.Parse() if minutesBetweenRuns == 0 { - singlerun() + arv, err := arvadosclient.MakeArvadosClient() + if err != nil { + loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("Error making arvados client: %v", err)) + } + err = singlerun(arv) + if err != nil { + loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("singlerun: %v", err)) + } } else { waitTime := time.Minute * time.Duration(minutesBetweenRuns) for { log.Println("Beginning Run") - singlerun() + arv, err := arvadosclient.MakeArvadosClient() + if err != nil { + loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("Error making arvados client: %v", err)) + } + err = singlerun(arv) + if err != nil { + log.Printf("singlerun: %v", err) + } log.Printf("Sleeping for %d minutes", minutesBetweenRuns) time.Sleep(waitTime) } } } -func singlerun() { - arv, err := arvadosclient.MakeArvadosClient() - if err != nil { - log.Fatalf("Error setting up arvados client %s", err.Error()) - } +var arvLogger *logger.Logger - if is_admin, err := util.UserIsAdmin(arv); err != nil { - log.Fatalf("Error querying current arvados user %s", err.Error()) - } else if !is_admin { - log.Fatalf("Current user is not an admin. Datamanager can only be run by admins.") +func singlerun(arv arvadosclient.ArvadosClient) error { + var err error + if isAdmin, err := util.UserIsAdmin(arv); err != nil { + return errors.New("Error verifying admin token: " + err.Error()) + } else if !isAdmin { + return errors.New("Current user is not an admin. Datamanager requires a privileged token.") } - var arvLogger *logger.Logger if logEventTypePrefix != "" { arvLogger = logger.NewLogger(logger.LoggerParams{ Client: arv, @@ -92,7 +104,14 @@ func singlerun() { dataFetcher(arvLogger, &readCollections, &keepServerInfo) - summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo) + if readCollections.Err != nil { + return readCollections.Err + } + + err = summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo) + if err != nil { + return err + } buckets := summary.BucketReplication(readCollections, keepServerInfo) bucketCounts := buckets.Counts() @@ -115,23 +134,13 @@ func singlerun() { kc, err := keepclient.MakeKeepClient(&arv) if err != nil { - loggerutil.FatalWithMessage(arvLogger, - fmt.Sprintf("Error setting up keep client %s", err.Error())) + return fmt.Errorf("Error setting up keep client %v", err.Error()) } - pullServers := summary.ComputePullServers(kc, - &keepServerInfo, - readCollections.BlockToDesiredReplication, - replicationSummary.UnderReplicatedBlocks) - - pullLists := summary.BuildPullLists(pullServers) - - summary.WritePullLists(arvLogger, pullLists) - // Log that we're finished. We force the recording, since go will // not wait for the write timer before exiting. if arvLogger != nil { - arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) { + defer arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) { summaryInfo := logger.GetOrCreateMap(p, "summary_info") summaryInfo["block_replication_counts"] = bucketCounts summaryInfo["replication_summary"] = replicationCounts @@ -140,9 +149,32 @@ func singlerun() { p["run_info"].(map[string]interface{})["finished_at"] = time.Now() }) } + + pullServers := summary.ComputePullServers(kc, + &keepServerInfo, + readCollections.BlockToDesiredReplication, + replicationSummary.UnderReplicatedBlocks) + + pullLists := summary.BuildPullLists(pullServers) + + trashLists, trashErr := summary.BuildTrashLists(kc, + &keepServerInfo, + replicationSummary.KeepBlocksNotInCollections) + + err = summary.WritePullLists(arvLogger, pullLists) + if err != nil { + return err + } + + if trashErr != nil { + return err + } + keep.SendTrashLists(kc, trashLists) + + return nil } -// Returns a data fetcher that fetches data from remote servers. +// BuildDataFetcher returns a data fetcher that fetches data from remote servers. func BuildDataFetcher(arv arvadosclient.ArvadosClient) summary.DataFetcher { return func(arvLogger *logger.Logger, readCollections *collection.ReadCollections, @@ -157,12 +189,17 @@ func BuildDataFetcher(arv arvadosclient.ArvadosClient) summary.DataFetcher { BatchSize: 50}) }() - *keepServerInfo = keep.GetKeepServersAndSummarize( + var err error + *keepServerInfo, err = keep.GetKeepServersAndSummarize( keep.GetKeepServersParams{ Client: arv, Logger: arvLogger, Limit: 1000}) + if err != nil { + return + } + *readCollections = <-collectionChannel } }