X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/8a6c7aa5eac21e93d721bf958ca67eb7f216888e..3091c9c336ddca217b14745142f9473a489f42de:/services/datamanager/datamanager.go diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go index bba36fc397..604a6db2f4 100644 --- a/services/datamanager/datamanager.go +++ b/services/datamanager/datamanager.go @@ -15,7 +15,6 @@ import ( "git.curoverse.com/arvados.git/services/datamanager/loggerutil" "git.curoverse.com/arvados.git/services/datamanager/summary" "log" - "os" "time" ) @@ -86,7 +85,7 @@ func singlerun(arv arvadosclient.ArvadosClient) error { } if logEventTypePrefix != "" { - arvLogger = logger.NewLogger(logger.LoggerParams{ + arvLogger, err = logger.NewLogger(logger.LoggerParams{ Client: arv, EventTypePrefix: logEventTypePrefix, WriteInterval: time.Second * time.Duration(logFrequencySeconds)}) @@ -109,10 +108,9 @@ func singlerun(arv arvadosclient.ArvadosClient) error { dataFetcher = BuildDataFetcher(arv) } - dataFetcher(arvLogger, &readCollections, &keepServerInfo) - - if readCollections.Err != nil { - return readCollections.Err + err = dataFetcher(arvLogger, &readCollections, &keepServerInfo) + if err != nil { + return err } err = summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo) @@ -139,6 +137,11 @@ func singlerun(arv arvadosclient.ArvadosClient) error { rlbss.Count) } + kc, err := keepclient.MakeKeepClient(&arv) + if err != nil { + return fmt.Errorf("Error setting up keep client %v", err.Error()) + } + // Log that we're finished. We force the recording, since go will // not wait for the write timer before exiting. if arvLogger != nil { @@ -149,22 +152,9 @@ func singlerun(arv arvadosclient.ArvadosClient) error { p["summary_info"] = summaryInfo p["run_info"].(map[string]interface{})["finished_at"] = time.Now() - p["run_info"].(map[string]interface{})["args"] = os.Args }) } - // If dry-run, do not issue any changes to keepstore - if dryRun { - log.Printf("Datamanager dry-run. Returning without issuing any keepstore updates.") - return nil - } - - // Not dry-run; issue changes to keepstore - kc, err := keepclient.MakeKeepClient(&arv) - if err != nil { - return fmt.Errorf("Error setting up keep client %v", err.Error()) - } - pullServers := summary.ComputePullServers(kc, &keepServerInfo, readCollections.BlockToDesiredReplication, @@ -176,7 +166,7 @@ func singlerun(arv arvadosclient.ArvadosClient) error { &keepServerInfo, replicationSummary.KeepBlocksNotInCollections) - err = summary.WritePullLists(arvLogger, pullLists) + err = summary.WritePullLists(arvLogger, pullLists, dryRun) if err != nil { return err } @@ -184,37 +174,42 @@ func singlerun(arv arvadosclient.ArvadosClient) error { if trashErr != nil { return err } - keep.SendTrashLists(kc, trashLists) + keep.SendTrashLists(arvLogger, kc, trashLists, dryRun) return nil } // BuildDataFetcher returns a data fetcher that fetches data from remote servers. func BuildDataFetcher(arv arvadosclient.ArvadosClient) summary.DataFetcher { - return func(arvLogger *logger.Logger, + return func( + arvLogger *logger.Logger, readCollections *collection.ReadCollections, - keepServerInfo *keep.ReadServers) { - collectionChannel := make(chan collection.ReadCollections) - + keepServerInfo *keep.ReadServers, + ) error { + collDone := make(chan struct{}) + var collErr error go func() { - collectionChannel <- collection.GetCollectionsAndSummarize( + *readCollections, collErr = collection.GetCollectionsAndSummarize( collection.GetCollectionsParams{ Client: arv, Logger: arvLogger, BatchSize: 50}) + collDone <- struct{}{} }() - var err error - *keepServerInfo, err = keep.GetKeepServersAndSummarize( + var keepErr error + *keepServerInfo, keepErr = keep.GetKeepServersAndSummarize( keep.GetKeepServersParams{ Client: arv, Logger: arvLogger, Limit: 1000}) - if err != nil { - return - } + <- collDone - *readCollections = <-collectionChannel + // Return a nil error only if both parts succeeded. + if collErr != nil { + return collErr + } + return keepErr } }