X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/45f10d80d1b584808a6e375214b5be6bc7d2a730..6613ec1e9c705fb5b950611fd160d4a2babed251:/services/datamanager/datamanager.go diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go index 70a9ae7859..5250d175ff 100644 --- a/services/datamanager/datamanager.go +++ b/services/datamanager/datamanager.go @@ -3,6 +3,7 @@ package main import ( + "errors" "flag" "fmt" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" @@ -21,6 +22,8 @@ var ( logEventTypePrefix string logFrequencySeconds int minutesBetweenRuns int + collectionBatchSize int + dryRun bool ) func init() { @@ -35,23 +38,40 @@ func init() { flag.IntVar(&minutesBetweenRuns, "minutes-between-runs", 0, - "How many minutes we wait betwen data manager runs. 0 means run once and exit.") + "How many minutes we wait between data manager runs. 0 means run once and exit.") + flag.IntVar(&collectionBatchSize, + "collection-batch-size", + 1000, + "How many collections to request in each batch.") + flag.BoolVar(&dryRun, + "dry-run", + false, + "Perform a dry run. Log how many blocks would be deleted/moved, but do not issue any changes to keepstore.") } func main() { flag.Parse() + if minutesBetweenRuns == 0 { - err := singlerun() + arv, err := arvadosclient.MakeArvadosClient() + if err != nil { + loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("Error making arvados client: %v", err)) + } + err = singlerun(arv) if err != nil { - log.Fatalf("Got an error: %v", err) + loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("singlerun: %v", err)) } } else { waitTime := time.Minute * time.Duration(minutesBetweenRuns) for { log.Println("Beginning Run") - err := singlerun() + arv, err := arvadosclient.MakeArvadosClient() + if err != nil { + loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("Error making arvados client: %v", err)) + } + err = singlerun(arv) if err != nil { - log.Printf("Got an error: %v", err) + log.Printf("singlerun: %v", err) } log.Printf("Sleeping for %d minutes", minutesBetweenRuns) time.Sleep(waitTime) @@ -59,21 +79,18 @@ func main() { } } -func singlerun() error { - arv, err := arvadosclient.MakeArvadosClient() - if err != nil { - log.Fatalf("Error setting up arvados client %s", err.Error()) - } +var arvLogger *logger.Logger - if is_admin, err := util.UserIsAdmin(arv); err != nil { - log.Fatalf("Error querying current arvados user %s", err.Error()) - } else if !is_admin { - log.Fatalf("Current user is not an admin. Datamanager can only be run by admins.") +func singlerun(arv *arvadosclient.ArvadosClient) error { + var err error + if isAdmin, err := util.UserIsAdmin(arv); err != nil { + return errors.New("Error verifying admin token: " + err.Error()) + } else if !isAdmin { + return errors.New("Current user is not an admin. Datamanager requires a privileged token.") } - var arvLogger *logger.Logger if logEventTypePrefix != "" { - arvLogger = logger.NewLogger(logger.LoggerParams{ + arvLogger, err = logger.NewLogger(logger.LoggerParams{ Client: arv, EventTypePrefix: logEventTypePrefix, WriteInterval: time.Second * time.Duration(logFrequencySeconds)}) @@ -96,9 +113,15 @@ func singlerun() error { dataFetcher = BuildDataFetcher(arv) } - dataFetcher(arvLogger, &readCollections, &keepServerInfo) + err = dataFetcher(arvLogger, &readCollections, &keepServerInfo) + if err != nil { + return err + } - summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo) + err = summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo) + if err != nil { + return err + } buckets := summary.BucketReplication(readCollections, keepServerInfo) bucketCounts := buckets.Counts() @@ -119,10 +142,9 @@ func singlerun() error { rlbss.Count) } - kc, err := keepclient.MakeKeepClient(&arv) + kc, err := keepclient.MakeKeepClient(arv) if err != nil { - loggerutil.FatalWithMessage(arvLogger, - fmt.Sprintf("Error setting up keep client %s", err.Error())) + return fmt.Errorf("Error setting up keep client %v", err.Error()) } // Log that we're finished. We force the recording, since go will @@ -149,38 +171,50 @@ func singlerun() error { &keepServerInfo, replicationSummary.KeepBlocksNotInCollections) - summary.WritePullLists(arvLogger, pullLists) + err = summary.WritePullLists(arvLogger, pullLists, dryRun) + if err != nil { + return err + } if trashErr != nil { return err - } else { - keep.SendTrashLists(keep.GetDataManagerToken(arvLogger), kc, trashLists) } + keep.SendTrashLists(arvLogger, kc, trashLists, dryRun) return nil } -// Returns a data fetcher that fetches data from remote servers. -func BuildDataFetcher(arv arvadosclient.ArvadosClient) summary.DataFetcher { - return func(arvLogger *logger.Logger, +// BuildDataFetcher returns a data fetcher that fetches data from remote servers. +func BuildDataFetcher(arv *arvadosclient.ArvadosClient) summary.DataFetcher { + return func( + arvLogger *logger.Logger, readCollections *collection.ReadCollections, - keepServerInfo *keep.ReadServers) { - collectionChannel := make(chan collection.ReadCollections) - + keepServerInfo *keep.ReadServers, + ) error { + collDone := make(chan struct{}) + var collErr error go func() { - collectionChannel <- collection.GetCollectionsAndSummarize( + *readCollections, collErr = collection.GetCollectionsAndSummarize( collection.GetCollectionsParams{ Client: arv, Logger: arvLogger, - BatchSize: 50}) + BatchSize: collectionBatchSize}) + collDone <- struct{}{} }() - *keepServerInfo = keep.GetKeepServersAndSummarize( + var keepErr error + *keepServerInfo, keepErr = keep.GetKeepServersAndSummarize( keep.GetKeepServersParams{ Client: arv, Logger: arvLogger, Limit: 1000}) - *readCollections = <-collectionChannel + <-collDone + + // Return a nil error only if both parts succeeded. + if collErr != nil { + return collErr + } + return keepErr } }