+ kc, err := keepclient.MakeKeepClient(&arv)
+ if err != nil {
+ return fmt.Errorf("Error setting up keep client %v", err.Error())
+ }
+
+ // Log that we're finished. We force the recording, since go will
+ // not wait for the write timer before exiting.
+ if arvLogger != nil {
+ defer arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
+ summaryInfo := logger.GetOrCreateMap(p, "summary_info")
+ summaryInfo["block_replication_counts"] = bucketCounts
+ summaryInfo["replication_summary"] = replicationCounts
+ p["summary_info"] = summaryInfo
+
+ p["run_info"].(map[string]interface{})["finished_at"] = time.Now()
+ })
+ }
+
+ pullServers := summary.ComputePullServers(kc,
+ &keepServerInfo,
+ readCollections.BlockToDesiredReplication,
+ replicationSummary.UnderReplicatedBlocks)
+
+ pullLists := summary.BuildPullLists(pullServers)
+
+ trashLists, trashErr := summary.BuildTrashLists(kc,
+ &keepServerInfo,
+ replicationSummary.KeepBlocksNotInCollections)
+
+ err = summary.WritePullLists(arvLogger, pullLists, dryRun)
+ if err != nil {
+ return err
+ }
+
+ if trashErr != nil {
+ return err
+ }
+ keep.SendTrashLists(arvLogger, kc, trashLists, dryRun)
+
+ return nil
+}
+
+// BuildDataFetcher returns a data fetcher that fetches data from remote servers.
+func BuildDataFetcher(arv arvadosclient.ArvadosClient) summary.DataFetcher {
+ return func(
+ arvLogger *logger.Logger,
+ readCollections *collection.ReadCollections,
+ keepServerInfo *keep.ReadServers,
+ ) error {
+ collDone := make(chan struct{})
+ var collErr error
+ go func() {
+ *readCollections, collErr = collection.GetCollectionsAndSummarize(
+ collection.GetCollectionsParams{
+ Client: arv,
+ Logger: arvLogger,
+ BatchSize: collectionBatchSize})
+ collDone <- struct{}{}
+ }()
+
+ var keepErr error
+ *keepServerInfo, keepErr = keep.GetKeepServersAndSummarize(
+ keep.GetKeepServersParams{
+ Client: arv,
+ Logger: arvLogger,
+ Limit: 1000})
+
+ <- collDone
+
+ // Return a nil error only if both parts succeeded.
+ if collErr != nil {
+ return collErr
+ }
+ return keepErr
+ }