X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a56406d730f2a07dd442b9e99ef9dab7b7d81895..af6d31cba8346ac86bc0027eb0f675144fb43056:/services/datamanager/datamanager.go diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go index cd141eeb16..b81cf7edfe 100644 --- a/services/datamanager/datamanager.go +++ b/services/datamanager/datamanager.go @@ -3,8 +3,11 @@ package main import ( + "errors" "flag" + "fmt" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/keepclient" "git.curoverse.com/arvados.git/sdk/go/logger" "git.curoverse.com/arvados.git/sdk/go/util" "git.curoverse.com/arvados.git/services/datamanager/collection" @@ -39,34 +42,48 @@ func init() { func main() { flag.Parse() if minutesBetweenRuns == 0 { - singlerun() + arv, err := makeArvadosClient() + if err != nil { + log.Fatalf("makeArvadosClient: %v", err) + } + err = singlerun(arv) + if err != nil { + log.Fatalf("singlerun: %v", err) + } } else { waitTime := time.Minute * time.Duration(minutesBetweenRuns) for { log.Println("Beginning Run") - singlerun() + arv, err := makeArvadosClient() + if err != nil { + log.Fatalf("makeArvadosClient: %v", err) + } + err = singlerun(arv) + if err != nil { + log.Printf("singlerun: %v", err) + } log.Printf("Sleeping for %d minutes", minutesBetweenRuns) time.Sleep(waitTime) } } } -func singlerun() { - arv, err := arvadosclient.MakeArvadosClient() - if err != nil { - log.Fatalf("Error setting up arvados client %s", err.Error()) - } +func makeArvadosClient() (arvadosclient.ArvadosClient, error) { + return arvadosclient.MakeArvadosClient() +} - if is_admin, err := util.UserIsAdmin(arv); err != nil { - log.Fatalf("Error querying current arvados user %s", err.Error()) - } else if !is_admin { - log.Fatalf("Current user is not an admin. Datamanager can only be run by admins.") +func singlerun(arv arvadosclient.ArvadosClient) error { + var err error + if isAdmin, err := util.UserIsAdmin(arv); err != nil { + return errors.New("Error verifying admin token: " + err.Error()) + } else if !isAdmin { + return errors.New("Current user is not an admin. Datamanager requires a privileged token.") } var arvLogger *logger.Logger if logEventTypePrefix != "" { arvLogger = logger.NewLogger(logger.LoggerParams{ - Client: arv, + Client: arv, EventTypePrefix: logEventTypePrefix, WriteInterval: time.Second * time.Duration(logFrequencySeconds)}) } @@ -77,46 +94,101 @@ func singlerun() { } var ( + dataFetcher summary.DataFetcher readCollections collection.ReadCollections - keepServerInfo keep.ReadServers + keepServerInfo keep.ReadServers ) - if !summary.MaybeReadData(arvLogger, &readCollections, &keepServerInfo) { - collectionChannel := make(chan collection.ReadCollections) - - go func() { - collectionChannel <- collection.GetCollectionsAndSummarize( - collection.GetCollectionsParams{ - Client: arv, - Logger: arvLogger, - BatchSize: 50}) - }() - - keepServerInfo = keep.GetKeepServersAndSummarize( - keep.GetKeepServersParams{ - Client: arv, - Logger: arvLogger, - Limit: 1000}) - - readCollections = <-collectionChannel + if summary.ShouldReadData() { + dataFetcher = summary.ReadData + } else { + dataFetcher = BuildDataFetcher(arv) } + dataFetcher(arvLogger, &readCollections, &keepServerInfo) + summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo) - replicationSummary := - summary.SummarizeReplication(readCollections, keepServerInfo) + buckets := summary.BucketReplication(readCollections, keepServerInfo) + bucketCounts := buckets.Counts() + + replicationSummary := buckets.SummarizeBuckets(readCollections) + replicationCounts := replicationSummary.ComputeCounts() - log.Printf("Blocks In Collections: %d, " + + log.Printf("Blocks In Collections: %d, "+ "\nBlocks In Keep: %d.", - len(readCollections.BlockToReplication), + len(readCollections.BlockToDesiredReplication), len(keepServerInfo.BlockToServers)) - log.Println(replicationSummary.ComputeCounts().PrettyPrint()) + log.Println(replicationCounts.PrettyPrint()) + + log.Printf("Blocks Histogram:") + for _, rlbss := range bucketCounts { + log.Printf("%+v: %10d", + rlbss.Levels, + rlbss.Count) + } + + kc, err := keepclient.MakeKeepClient(&arv) + if err != nil { + loggerutil.FatalWithMessage(arvLogger, + fmt.Sprintf("Error setting up keep client %s", err.Error())) + } // Log that we're finished. We force the recording, since go will - // not wait for the timer before exiting. + // not wait for the write timer before exiting. if arvLogger != nil { - arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) { + defer arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) { + summaryInfo := logger.GetOrCreateMap(p, "summary_info") + summaryInfo["block_replication_counts"] = bucketCounts + summaryInfo["replication_summary"] = replicationCounts + p["summary_info"] = summaryInfo + p["run_info"].(map[string]interface{})["finished_at"] = time.Now() }) } + + pullServers := summary.ComputePullServers(kc, + &keepServerInfo, + readCollections.BlockToDesiredReplication, + replicationSummary.UnderReplicatedBlocks) + + pullLists := summary.BuildPullLists(pullServers) + + trashLists, trashErr := summary.BuildTrashLists(kc, + &keepServerInfo, + replicationSummary.KeepBlocksNotInCollections) + + summary.WritePullLists(arvLogger, pullLists) + + if trashErr != nil { + return err + } + keep.SendTrashLists(kc, trashLists) + + return nil +} + +// BuildDataFetcher returns a data fetcher that fetches data from remote servers. +func BuildDataFetcher(arv arvadosclient.ArvadosClient) summary.DataFetcher { + return func(arvLogger *logger.Logger, + readCollections *collection.ReadCollections, + keepServerInfo *keep.ReadServers) { + collectionChannel := make(chan collection.ReadCollections) + + go func() { + collectionChannel <- collection.GetCollectionsAndSummarize( + collection.GetCollectionsParams{ + Client: arv, + Logger: arvLogger, + BatchSize: 50}) + }() + + *keepServerInfo = keep.GetKeepServersAndSummarize( + keep.GetKeepServersParams{ + Client: arv, + Logger: arvLogger, + Limit: 1000}) + + *readCollections = <-collectionChannel + } }