Merge branch 'crunch-job_finds_newer_docker_hashes' of https://github.com/tmooney...
[arvados.git] / services / keepstore / trash_worker.go
index ca269126ee61f2a225b3a9a55a830ebfc51c9a97..27d6216d01633feca360de94f0a8febaabfb475a 100644 (file)
@@ -1,43 +1,61 @@
 package main
 
 import (
+       "errors"
        "log"
        "time"
-)
 
-/*
-       Keepstore initiates trash worker channel goroutine.
-       The channel will process trash list.
-               For each (next) trash request:
-      Delete the block indicated by the trash request Locator
-               Repeat
-*/
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
 
+// RunTrashWorker is used by Keepstore to initiate trash worker channel goroutine.
+//     The channel will process trash list.
+//             For each (next) trash request:
+//      Delete the block indicated by the trash request Locator
+//             Repeat
+//
 func RunTrashWorker(trashq *WorkQueue) {
-       nextItem := trashq.NextItem
-       for item := range nextItem {
+       for item := range trashq.NextItem {
                trashRequest := item.(TrashRequest)
-               err := TrashItem(trashRequest)
-               if err != nil {
-                       log.Printf("Trash request error for %s: %s", trashRequest, err)
-               }
+               TrashItem(trashRequest)
+               trashq.DoneItem <- struct{}{}
        }
 }
 
-/*
-       Delete the block indicated by the Locator in TrashRequest.
-*/
-func TrashItem(trashRequest TrashRequest) (err error) {
-       // Verify if the block is to be deleted based on its Mtime
+// TrashItem deletes the indicated block from every writable volume.
+func TrashItem(trashRequest TrashRequest) {
+       reqMtime := time.Unix(0, trashRequest.BlockMtime)
+       if time.Since(reqMtime) < theConfig.BlobSignatureTTL.Duration() {
+               log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
+                       arvados.Duration(time.Since(reqMtime)),
+                       trashRequest.Locator,
+                       trashRequest.BlockMtime,
+                       reqMtime,
+                       theConfig.BlobSignatureTTL)
+               return
+       }
+
        for _, volume := range KeepVM.AllWritable() {
                mtime, err := volume.Mtime(trashRequest.Locator)
-               if err != nil || trashRequest.BlockMtime != mtime.Unix() {
+               if err != nil {
+                       log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
                        continue
                }
-               currentTime := time.Now().Unix()
-               if time.Duration(currentTime-trashRequest.BlockMtime)*time.Second >= permission_ttl {
-                       err = volume.Delete(trashRequest.Locator)
+               if trashRequest.BlockMtime != mtime.UnixNano() {
+                       log.Printf("%v Delete(%v): stored mtime %v does not match trash list value %v", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime)
+                       continue
+               }
+
+               if !theConfig.EnableDelete {
+                       err = errors.New("did not delete block because EnableDelete is false")
+               } else {
+                       err = volume.Trash(trashRequest.Locator)
+               }
+
+               if err != nil {
+                       log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
+               } else {
+                       log.Printf("%v Delete(%v) OK", volume, trashRequest.Locator)
                }
        }
-       return
 }