Merge branch 'master' into 6663-git-server
[arvados.git] / services / datamanager / datamanager.go
index 587d42654f7a4fa694e8aea014582e6006f9f449..70a9ae785956396bab936e73b1a7f6ed04c63731 100644 (file)
@@ -41,19 +41,25 @@ func init() {
 func main() {
        flag.Parse()
        if minutesBetweenRuns == 0 {
-               singlerun()
+               err := singlerun()
+               if err != nil {
+                       log.Fatalf("Got an error: %v", err)
+               }
        } else {
                waitTime := time.Minute * time.Duration(minutesBetweenRuns)
                for {
                        log.Println("Beginning Run")
-                       singlerun()
+                       err := singlerun()
+                       if err != nil {
+                               log.Printf("Got an error: %v", err)
+                       }
                        log.Printf("Sleeping for %d minutes", minutesBetweenRuns)
                        time.Sleep(waitTime)
                }
        }
 }
 
-func singlerun() {
+func singlerun() error {
        arv, err := arvadosclient.MakeArvadosClient()
        if err != nil {
                log.Fatalf("Error setting up arvados client %s", err.Error())
@@ -79,30 +85,19 @@ func singlerun() {
        }
 
        var (
+               dataFetcher     summary.DataFetcher
                readCollections collection.ReadCollections
                keepServerInfo  keep.ReadServers
        )
 
-       if !summary.MaybeReadData(arvLogger, &readCollections, &keepServerInfo) {
-               collectionChannel := make(chan collection.ReadCollections)
-
-               go func() {
-                       collectionChannel <- collection.GetCollectionsAndSummarize(
-                               collection.GetCollectionsParams{
-                                       Client:    arv,
-                                       Logger:    arvLogger,
-                                       BatchSize: 50})
-               }()
-
-               keepServerInfo = keep.GetKeepServersAndSummarize(
-                       keep.GetKeepServersParams{
-                               Client: arv,
-                               Logger: arvLogger,
-                               Limit:  1000})
-
-               readCollections = <-collectionChannel
+       if summary.ShouldReadData() {
+               dataFetcher = summary.ReadData
+       } else {
+               dataFetcher = BuildDataFetcher(arv)
        }
 
+       dataFetcher(arvLogger, &readCollections, &keepServerInfo)
+
        summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo)
 
        buckets := summary.BucketReplication(readCollections, keepServerInfo)
@@ -113,7 +108,7 @@ func singlerun() {
 
        log.Printf("Blocks In Collections: %d, "+
                "\nBlocks In Keep: %d.",
-               len(readCollections.BlockToReplication),
+               len(readCollections.BlockToDesiredReplication),
                len(keepServerInfo.BlockToServers))
        log.Println(replicationCounts.PrettyPrint())
 
@@ -130,19 +125,10 @@ func singlerun() {
                        fmt.Sprintf("Error setting up keep client %s", err.Error()))
        }
 
-       pullServers := summary.ComputePullServers(kc,
-               &keepServerInfo,
-               readCollections.BlockToReplication,
-               replicationSummary.UnderReplicatedBlocks)
-
-       pullLists := summary.BuildPullLists(pullServers)
-
-       summary.WritePullLists(arvLogger, pullLists)
-
        // Log that we're finished. We force the recording, since go will
        // not wait for the write timer before exiting.
        if arvLogger != nil {
-               arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
+               defer arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
                        summaryInfo := logger.GetOrCreateMap(p, "summary_info")
                        summaryInfo["block_replication_counts"] = bucketCounts
                        summaryInfo["replication_summary"] = replicationCounts
@@ -151,4 +137,50 @@ func singlerun() {
                        p["run_info"].(map[string]interface{})["finished_at"] = time.Now()
                })
        }
+
+       pullServers := summary.ComputePullServers(kc,
+               &keepServerInfo,
+               readCollections.BlockToDesiredReplication,
+               replicationSummary.UnderReplicatedBlocks)
+
+       pullLists := summary.BuildPullLists(pullServers)
+
+       trashLists, trashErr := summary.BuildTrashLists(kc,
+               &keepServerInfo,
+               replicationSummary.KeepBlocksNotInCollections)
+
+       summary.WritePullLists(arvLogger, pullLists)
+
+       if trashErr != nil {
+               return err
+       } else {
+               keep.SendTrashLists(keep.GetDataManagerToken(arvLogger), kc, trashLists)
+       }
+
+       return nil
+}
+
+// Returns a data fetcher that fetches data from remote servers.
+func BuildDataFetcher(arv arvadosclient.ArvadosClient) summary.DataFetcher {
+       return func(arvLogger *logger.Logger,
+               readCollections *collection.ReadCollections,
+               keepServerInfo *keep.ReadServers) {
+               collectionChannel := make(chan collection.ReadCollections)
+
+               go func() {
+                       collectionChannel <- collection.GetCollectionsAndSummarize(
+                               collection.GetCollectionsParams{
+                                       Client:    arv,
+                                       Logger:    arvLogger,
+                                       BatchSize: 50})
+               }()
+
+               *keepServerInfo = keep.GetKeepServersAndSummarize(
+                       keep.GetKeepServersParams{
+                               Client: arv,
+                               Logger: arvLogger,
+                               Limit:  1000})
+
+               *readCollections = <-collectionChannel
+       }
 }