Merge branch '7286-nodeman-destroy-broken-nodes' closes #7286
[arvados.git] / services / keepstore / trash_worker.go
index bc1775f97eb3e0c700c8197fa5b21b77ecc019e0..65e3fbd2849593e44be94921cb7073a5aba3adaa 100644 (file)
@@ -1,47 +1,59 @@
 package main
 
 import (
+       "errors"
        "log"
        "time"
 )
 
-/*
-       Keepstore initiates trash worker channel goroutine.
-       The channel will process trash list.
-               For each (next) trash request:
-      Delete the block indicated by the trash request Locator
-               Repeat
-*/
-
+// RunTrashWorker is used by Keepstore to initiate trash worker channel goroutine.
+//     The channel will process trash list.
+//             For each (next) trash request:
+//      Delete the block indicated by the trash request Locator
+//             Repeat
+//
 func RunTrashWorker(trashq *WorkQueue) {
        for item := range trashq.NextItem {
                trashRequest := item.(TrashRequest)
                TrashItem(trashRequest)
+               trashq.DoneItem <- struct{}{}
        }
 }
 
 // TrashItem deletes the indicated block from every writable volume.
 func TrashItem(trashRequest TrashRequest) {
        reqMtime := time.Unix(trashRequest.BlockMtime, 0)
-       if time.Since(reqMtime) < blob_signature_ttl {
-               log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blob_signature_ttl is %v! Skipping.",
+       if time.Since(reqMtime) < blobSignatureTTL {
+               log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
                        time.Since(reqMtime),
                        trashRequest.Locator,
                        trashRequest.BlockMtime,
                        reqMtime,
-                       blob_signature_ttl)
+                       blobSignatureTTL)
                return
        }
+
        for _, volume := range KeepVM.AllWritable() {
                mtime, err := volume.Mtime(trashRequest.Locator)
-               if err != nil || trashRequest.BlockMtime != mtime.Unix() {
+               if err != nil {
+                       log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
+                       continue
+               }
+               if trashRequest.BlockMtime != mtime.Unix() {
+                       log.Printf("%v Delete(%v): mtime on volume is %v does not match trash list value %v", volume, trashRequest.Locator, mtime.Unix(), trashRequest.BlockMtime)
                        continue
                }
-               err = volume.Delete(trashRequest.Locator)
+
+               if neverDelete {
+                       err = errors.New("did not delete block because neverDelete is true")
+               } else {
+                       err = volume.Delete(trashRequest.Locator)
+               }
+
                if err != nil {
                        log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
-                       continue
+               } else {
+                       log.Printf("%v Delete(%v) OK", volume, trashRequest.Locator)
                }
-               log.Printf("%v Delete(%v) OK", volume, trashRequest.Locator)
        }
 }