X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/036140f305fc34fdefe0ae393b1011f4c3f840de..0025e3e72aea2388491a801e7bd512ab72fff16a:/services/datamanager/datamanager.go diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go index d7ac0d2e1f..8f3109362b 100644 --- a/services/datamanager/datamanager.go +++ b/services/datamanager/datamanager.go @@ -41,19 +41,25 @@ func init() { func main() { flag.Parse() if minutesBetweenRuns == 0 { - singlerun() + err := singlerun() + if err != nil { + log.Fatalf("Got an error: %v", err) + } } else { waitTime := time.Minute * time.Duration(minutesBetweenRuns) for { log.Println("Beginning Run") - singlerun() + err := singlerun() + if err != nil { + log.Printf("Got an error: %v", err) + } log.Printf("Sleeping for %d minutes", minutesBetweenRuns) time.Sleep(waitTime) } } } -func singlerun() { +func singlerun() error { arv, err := arvadosclient.MakeArvadosClient() if err != nil { log.Fatalf("Error setting up arvados client %s", err.Error()) @@ -78,6 +84,17 @@ func singlerun() { arvLogger.AddWriteHook(loggerutil.LogMemoryAlloc) } + // Verify that datamanager token belongs to an admin user + dataManagerToken := keep.GetDataManagerToken(arvLogger) + origArvToken := arv.ApiToken + arv.ApiToken = dataManagerToken + if is_admin, err := util.UserIsAdmin(arv); err != nil { + log.Fatalf("Error querying arvados user for data manager token %s", err.Error()) + } else if !is_admin { + log.Fatalf("Datamanager token does not belong to an admin user.") + } + arv.ApiToken = origArvToken + var ( dataFetcher summary.DataFetcher readCollections collection.ReadCollections @@ -119,31 +136,39 @@ func singlerun() { fmt.Sprintf("Error setting up keep client %s", err.Error())) } + // Log that we're finished. We force the recording, since go will + // not wait for the write timer before exiting. + if arvLogger != nil { + defer arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) { + summaryInfo := logger.GetOrCreateMap(p, "summary_info") + summaryInfo["block_replication_counts"] = bucketCounts + summaryInfo["replication_summary"] = replicationCounts + p["summary_info"] = summaryInfo + + p["run_info"].(map[string]interface{})["finished_at"] = time.Now() + }) + } + pullServers := summary.ComputePullServers(kc, &keepServerInfo, readCollections.BlockToDesiredReplication, replicationSummary.UnderReplicatedBlocks) pullLists := summary.BuildPullLists(pullServers) - trashLists := summary.BuildTrashLists(kc, + + trashLists, trashErr := summary.BuildTrashLists(kc, &keepServerInfo, replicationSummary.KeepBlocksNotInCollections) summary.WritePullLists(arvLogger, pullLists) - keep.SendTrashLists(arvLogger, kc, trashLists) - // Log that we're finished. We force the recording, since go will - // not wait for the write timer before exiting. - if arvLogger != nil { - arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) { - summaryInfo := logger.GetOrCreateMap(p, "summary_info") - summaryInfo["block_replication_counts"] = bucketCounts - summaryInfo["replication_summary"] = replicationCounts - p["summary_info"] = summaryInfo - - p["run_info"].(map[string]interface{})["finished_at"] = time.Now() - }) + if trashErr != nil { + return err + } else { + keep.SendTrashLists(dataManagerToken, kc, trashLists) } + + return nil } // Returns a data fetcher that fetches data from remote servers.