Merge branch 'crunch-job_finds_newer_docker_hashes' of https://github.com/tmooney...
[arvados.git] / services / keepstore / trash_worker.go
index bc99e9657a21cb490bd1db452d5338d56907b618..27d6216d01633feca360de94f0a8febaabfb475a 100644 (file)
@@ -1,55 +1,61 @@
 package main
 
 import (
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "errors"
        "log"
        "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
 )
 
-/*
-       Keepstore initiates trash worker channel goroutine.
-       The channel will process trash list.
-               For each (next) trash request:
-      Delete the block indicated by the trash request Locator
-               Repeat
-*/
+// RunTrashWorker is used by Keepstore to initiate trash worker channel goroutine.
+//     The channel will process trash list.
+//             For each (next) trash request:
+//      Delete the block indicated by the trash request Locator
+//             Repeat
+//
+func RunTrashWorker(trashq *WorkQueue) {
+       for item := range trashq.NextItem {
+               trashRequest := item.(TrashRequest)
+               TrashItem(trashRequest)
+               trashq.DoneItem <- struct{}{}
+       }
+}
 
-var defaultTrashLifetime int64 = 0
+// TrashItem deletes the indicated block from every writable volume.
+func TrashItem(trashRequest TrashRequest) {
+       reqMtime := time.Unix(0, trashRequest.BlockMtime)
+       if time.Since(reqMtime) < theConfig.BlobSignatureTTL.Duration() {
+               log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
+                       arvados.Duration(time.Since(reqMtime)),
+                       trashRequest.Locator,
+                       trashRequest.BlockMtime,
+                       reqMtime,
+                       theConfig.BlobSignatureTTL)
+               return
+       }
 
-func RunTrashWorker(arv *arvadosclient.ArvadosClient, trashq *WorkQueue) {
-       if arv != nil {
-               defaultTrashLifetimeMap, err := arv.Discovery("defaultTrashLifetime")
+       for _, volume := range KeepVM.AllWritable() {
+               mtime, err := volume.Mtime(trashRequest.Locator)
                if err != nil {
-                       log.Fatalf("Error setting up arvados client %s", err.Error())
+                       log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
+                       continue
+               }
+               if trashRequest.BlockMtime != mtime.UnixNano() {
+                       log.Printf("%v Delete(%v): stored mtime %v does not match trash list value %v", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime)
+                       continue
                }
-               defaultTrashLifetime = int64(defaultTrashLifetimeMap["defaultTrashLifetime"].(float64))
-       }
 
-       nextItem := trashq.NextItem
-       for item := range nextItem {
-               trashRequest := item.(TrashRequest)
-               err := TrashItem(trashRequest)
-               if err != nil {
-                       log.Printf("Trash request error for %s: %s", trashRequest, err)
+               if !theConfig.EnableDelete {
+                       err = errors.New("did not delete block because EnableDelete is false")
+               } else {
+                       err = volume.Trash(trashRequest.Locator)
                }
-       }
-}
 
-/*
-       Delete the block indicated by the Locator in TrashRequest.
-*/
-func TrashItem(trashRequest TrashRequest) (err error) {
-       // Verify if the block is to be deleted based on its Mtime
-       for _, volume := range KeepVM.Volumes() {
-               mtime, err := volume.Mtime(trashRequest.Locator)
-               if err == nil {
-                       if trashRequest.BlockMtime == mtime.Unix() {
-                               currentTime := time.Now().Unix()
-                               if (currentTime - trashRequest.BlockMtime) > defaultTrashLifetime {
-                                       err = volume.Delete(trashRequest.Locator)
-                               }
-                       }
+               if err != nil {
+                       log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
+               } else {
+                       log.Printf("%v Delete(%v) OK", volume, trashRequest.Locator)
                }
        }
-       return
 }