Merge branch '6260-work-queue-status' refs #6260
[arvados.git] / services / keepstore / trash_worker.go
index ca269126ee61f2a225b3a9a55a830ebfc51c9a97..8f78658c3a7496473c2d81a7f0d7b13213ef9d5f 100644 (file)
@@ -1,6 +1,7 @@
 package main
 
 import (
+       "errors"
        "log"
        "time"
 )
@@ -14,30 +15,47 @@ import (
 */
 
 func RunTrashWorker(trashq *WorkQueue) {
-       nextItem := trashq.NextItem
-       for item := range nextItem {
+       for item := range trashq.NextItem {
                trashRequest := item.(TrashRequest)
-               err := TrashItem(trashRequest)
-               if err != nil {
-                       log.Printf("Trash request error for %s: %s", trashRequest, err)
-               }
+               TrashItem(trashRequest)
+               trashq.DoneItem <- struct{}{}
        }
 }
 
-/*
-       Delete the block indicated by the Locator in TrashRequest.
-*/
-func TrashItem(trashRequest TrashRequest) (err error) {
-       // Verify if the block is to be deleted based on its Mtime
+// TrashItem deletes the indicated block from every writable volume.
+func TrashItem(trashRequest TrashRequest) {
+       reqMtime := time.Unix(trashRequest.BlockMtime, 0)
+       if time.Since(reqMtime) < blob_signature_ttl {
+               log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blob_signature_ttl is %v! Skipping.",
+                       time.Since(reqMtime),
+                       trashRequest.Locator,
+                       trashRequest.BlockMtime,
+                       reqMtime,
+                       blob_signature_ttl)
+               return
+       }
+
        for _, volume := range KeepVM.AllWritable() {
                mtime, err := volume.Mtime(trashRequest.Locator)
-               if err != nil || trashRequest.BlockMtime != mtime.Unix() {
+               if err != nil {
+                       log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
                        continue
                }
-               currentTime := time.Now().Unix()
-               if time.Duration(currentTime-trashRequest.BlockMtime)*time.Second >= permission_ttl {
+               if trashRequest.BlockMtime != mtime.Unix() {
+                       log.Printf("%v Delete(%v): mtime on volume is %v does not match trash list value %v", volume, trashRequest.Locator, mtime.Unix(), trashRequest.BlockMtime)
+                       continue
+               }
+
+               if never_delete {
+                       err = errors.New("did not delete block because never_delete is true")
+               } else {
                        err = volume.Delete(trashRequest.Locator)
                }
+
+               if err != nil {
+                       log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
+               } else {
+                       log.Printf("%v Delete(%v) OK", volume, trashRequest.Locator)
+               }
        }
-       return
 }