6260: Continue to force never_delete to be true until #6221 is resolved; in the meant...
[arvados.git] / services / datamanager / datamanager.go
index 28d558bb8d28711563e72eafb37f2aadb00371d0..8f3109362b7ee3def6ebc32c9755f937348fb81e 100644 (file)
@@ -41,19 +41,25 @@ func init() {
 func main() {
        flag.Parse()
        if minutesBetweenRuns == 0 {
-               singlerun()
+               err := singlerun()
+               if err != nil {
+                       log.Fatalf("Got an error: %v", err)
+               }
        } else {
                waitTime := time.Minute * time.Duration(minutesBetweenRuns)
                for {
                        log.Println("Beginning Run")
-                       singlerun()
+                       err := singlerun()
+                       if err != nil {
+                               log.Printf("Got an error: %v", err)
+                       }
                        log.Printf("Sleeping for %d minutes", minutesBetweenRuns)
                        time.Sleep(waitTime)
                }
        }
 }
 
-func singlerun() {
+func singlerun() error {
        arv, err := arvadosclient.MakeArvadosClient()
        if err != nil {
                log.Fatalf("Error setting up arvados client %s", err.Error())
@@ -78,6 +84,17 @@ func singlerun() {
                arvLogger.AddWriteHook(loggerutil.LogMemoryAlloc)
        }
 
+       // Verify that datamanager token belongs to an admin user
+       dataManagerToken := keep.GetDataManagerToken(arvLogger)
+       origArvToken := arv.ApiToken
+       arv.ApiToken = dataManagerToken
+       if is_admin, err := util.UserIsAdmin(arv); err != nil {
+               log.Fatalf("Error querying arvados user for data manager token %s", err.Error())
+       } else if !is_admin {
+               log.Fatalf("Datamanager token does not belong to an admin user.")
+       }
+       arv.ApiToken = origArvToken
+
        var (
                dataFetcher     summary.DataFetcher
                readCollections collection.ReadCollections
@@ -119,32 +136,39 @@ func singlerun() {
                        fmt.Sprintf("Error setting up keep client %s", err.Error()))
        }
 
+       // Log that we're finished. We force the recording, since go will
+       // not wait for the write timer before exiting.
+       if arvLogger != nil {
+               defer arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
+                       summaryInfo := logger.GetOrCreateMap(p, "summary_info")
+                       summaryInfo["block_replication_counts"] = bucketCounts
+                       summaryInfo["replication_summary"] = replicationCounts
+                       p["summary_info"] = summaryInfo
+
+                       p["run_info"].(map[string]interface{})["finished_at"] = time.Now()
+               })
+       }
+
        pullServers := summary.ComputePullServers(kc,
                &keepServerInfo,
                readCollections.BlockToDesiredReplication,
                replicationSummary.UnderReplicatedBlocks)
 
        pullLists := summary.BuildPullLists(pullServers)
-       trashLists := summary.BuildTrashLists(kc,
+
+       trashLists, trashErr := summary.BuildTrashLists(kc,
                &keepServerInfo,
                replicationSummary.KeepBlocksNotInCollections)
 
        summary.WritePullLists(arvLogger, pullLists)
 
-       keep.SendTrashLists(arvLogger, kc, trashLists)
-
-       // Log that we're finished. We force the recording, since go will
-       // not wait for the write timer before exiting.
-       if arvLogger != nil {
-               arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
-                       summaryInfo := logger.GetOrCreateMap(p, "summary_info")
-                       summaryInfo["block_replication_counts"] = bucketCounts
-                       summaryInfo["replication_summary"] = replicationCounts
-                       p["summary_info"] = summaryInfo
-
-                       p["run_info"].(map[string]interface{})["finished_at"] = time.Now()
-               })
+       if trashErr != nil {
+               return err
+       } else {
+               keep.SendTrashLists(dataManagerToken, kc, trashLists)
        }
+
+       return nil
 }
 
 // Returns a data fetcher that fetches data from remote servers.