X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c3f49b76d173818386f5c65db46b353e1d334d1e..c6df16d2af30e989bcfb04f6ef730cde658a9dc9:/services/datamanager/datamanager.go diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go index a19d01f6cb..8e12835842 100644 --- a/services/datamanager/datamanager.go +++ b/services/datamanager/datamanager.go @@ -22,6 +22,8 @@ var ( logEventTypePrefix string logFrequencySeconds int minutesBetweenRuns int + collectionBatchSize int + dryRun bool ) func init() { @@ -36,27 +38,36 @@ func init() { flag.IntVar(&minutesBetweenRuns, "minutes-between-runs", 0, - "How many minutes we wait betwen data manager runs. 0 means run once and exit.") + "How many minutes we wait between data manager runs. 0 means run once and exit.") + flag.IntVar(&collectionBatchSize, + "collection-batch-size", + 1000, + "How many collections to request in each batch.") + flag.BoolVar(&dryRun, + "dry-run", + false, + "Perform a dry run. Log how many blocks would be deleted/moved, but do not issue any changes to keepstore.") } func main() { flag.Parse() + if minutesBetweenRuns == 0 { - arv, err := makeArvadosClient() + arv, err := arvadosclient.MakeArvadosClient() if err != nil { - log.Fatalf("makeArvadosClient: %v", err) + loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("Error making arvados client: %v", err)) } err = singlerun(arv) if err != nil { - log.Fatalf("singlerun: %v", err) + loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("singlerun: %v", err)) } } else { waitTime := time.Minute * time.Duration(minutesBetweenRuns) for { log.Println("Beginning Run") - arv, err := makeArvadosClient() + arv, err := arvadosclient.MakeArvadosClient() if err != nil { - log.Fatalf("makeArvadosClient: %v", err) + loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("Error making arvados client: %v", err)) } err = singlerun(arv) if err != nil { @@ -68,9 +79,7 @@ func main() { } } -func makeArvadosClient() (arvadosclient.ArvadosClient, error) { - return arvadosclient.MakeArvadosClient() -} +var arvLogger *logger.Logger func singlerun(arv arvadosclient.ArvadosClient) error { var err error @@ -80,9 +89,8 @@ func singlerun(arv arvadosclient.ArvadosClient) error { return errors.New("Current user is not an admin. Datamanager requires a privileged token.") } - var arvLogger *logger.Logger if logEventTypePrefix != "" { - arvLogger = logger.NewLogger(logger.LoggerParams{ + arvLogger, err = logger.NewLogger(logger.LoggerParams{ Client: arv, EventTypePrefix: logEventTypePrefix, WriteInterval: time.Second * time.Duration(logFrequencySeconds)}) @@ -105,9 +113,15 @@ func singlerun(arv arvadosclient.ArvadosClient) error { dataFetcher = BuildDataFetcher(arv) } - dataFetcher(arvLogger, &readCollections, &keepServerInfo) + err = dataFetcher(arvLogger, &readCollections, &keepServerInfo) + if err != nil { + return err + } - summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo) + err = summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo) + if err != nil { + return err + } buckets := summary.BucketReplication(readCollections, keepServerInfo) bucketCounts := buckets.Counts() @@ -157,7 +171,7 @@ func singlerun(arv arvadosclient.ArvadosClient) error { &keepServerInfo, replicationSummary.KeepBlocksNotInCollections) - err = summary.WritePullLists(arvLogger, pullLists) + err = summary.WritePullLists(arvLogger, pullLists, dryRun) if err != nil { return err } @@ -165,38 +179,42 @@ func singlerun(arv arvadosclient.ArvadosClient) error { if trashErr != nil { return err } - keep.SendTrashLists(kc, trashLists) + keep.SendTrashLists(arvLogger, kc, trashLists, dryRun) return nil } // BuildDataFetcher returns a data fetcher that fetches data from remote servers. func BuildDataFetcher(arv arvadosclient.ArvadosClient) summary.DataFetcher { - return func(arvLogger *logger.Logger, + return func( + arvLogger *logger.Logger, readCollections *collection.ReadCollections, - keepServerInfo *keep.ReadServers) { - collectionChannel := make(chan collection.ReadCollections) - + keepServerInfo *keep.ReadServers, + ) error { + collDone := make(chan struct{}) + var collErr error go func() { - collectionChannel <- collection.GetCollectionsAndSummarize( - arvLogger, + *readCollections, collErr = collection.GetCollectionsAndSummarize( collection.GetCollectionsParams{ Client: arv, Logger: arvLogger, - BatchSize: 50}) + BatchSize: collectionBatchSize}) + collDone <- struct{}{} }() - var err error - *keepServerInfo, err = keep.GetKeepServersAndSummarize( + var keepErr error + *keepServerInfo, keepErr = keep.GetKeepServersAndSummarize( keep.GetKeepServersParams{ Client: arv, Logger: arvLogger, Limit: 1000}) - if err != nil { - return - } + <-collDone - *readCollections = <-collectionChannel + // Return a nil error only if both parts succeeded. + if collErr != nil { + return collErr + } + return keepErr } }