+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
import (
- "log"
+ "errors"
"time"
-)
-/*
- Keepstore initiates trash worker channel goroutine.
- The channel will process trash list.
- For each (next) trash request:
- Delete the block indicated by the trash request Locator
- Repeat
-*/
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+// RunTrashWorker is used by Keepstore to initiate trash worker channel goroutine.
+// The channel will process trash list.
+// For each (next) trash request:
+// Delete the block indicated by the trash request Locator
+// Repeat
+//
func RunTrashWorker(trashq *WorkQueue) {
- nextItem := trashq.NextItem
- for item := range nextItem {
+ for item := range trashq.NextItem {
trashRequest := item.(TrashRequest)
- err := TrashItem(trashRequest)
- if err != nil {
- log.Printf("Trash request error for %s: %s", trashRequest, err)
- }
+ TrashItem(trashRequest)
+ trashq.DoneItem <- struct{}{}
}
}
-/*
- Delete the block indicated by the Locator in TrashRequest.
-*/
-func TrashItem(trashRequest TrashRequest) (err error) {
- // Verify if the block is to be deleted based on its Mtime
- for _, volume := range KeepVM.AllWritable() {
+// TrashItem deletes the indicated block from every writable volume.
+func TrashItem(trashRequest TrashRequest) {
+ reqMtime := time.Unix(0, trashRequest.BlockMtime)
+ if time.Since(reqMtime) < theConfig.BlobSignatureTTL.Duration() {
+ log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
+ arvados.Duration(time.Since(reqMtime)),
+ trashRequest.Locator,
+ trashRequest.BlockMtime,
+ reqMtime,
+ theConfig.BlobSignatureTTL)
+ return
+ }
+
+ var volumes []Volume
+ if uuid := trashRequest.MountUUID; uuid == "" {
+ volumes = KeepVM.AllWritable()
+ } else if v := KeepVM.Lookup(uuid, true); v == nil {
+ log.Printf("warning: trash request for nonexistent mount: %v", trashRequest)
+ return
+ } else {
+ volumes = []Volume{v}
+ }
+
+ for _, volume := range volumes {
mtime, err := volume.Mtime(trashRequest.Locator)
- if err != nil || trashRequest.BlockMtime != mtime.Unix() {
+ if err != nil {
+ log.Printf("%v Trash(%v): %v", volume, trashRequest.Locator, err)
+ continue
+ }
+ if trashRequest.BlockMtime != mtime.UnixNano() {
+ log.Printf("%v Trash(%v): stored mtime %v does not match trash list value %v", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime)
continue
}
- currentTime := time.Now().Unix()
- if time.Duration(currentTime-trashRequest.BlockMtime)*time.Second >= permission_ttl {
- err = volume.Delete(trashRequest.Locator)
+
+ if !theConfig.EnableDelete {
+ err = errors.New("skipping because EnableDelete is false")
+ } else {
+ err = volume.Trash(trashRequest.Locator)
+ }
+
+ if err != nil {
+ log.Printf("%v Trash(%v): %v", volume, trashRequest.Locator, err)
+ } else {
+ log.Printf("%v Trash(%v) OK", volume, trashRequest.Locator)
}
}
- return
}