import (
"errors"
+ "log"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
// Delete the block indicated by the trash request Locator
// Repeat
//
-func RunTrashWorker(trashq *WorkQueue) {
+func RunTrashWorker(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashq *WorkQueue) {
for item := range trashq.NextItem {
trashRequest := item.(TrashRequest)
- TrashItem(trashRequest)
+ TrashItem(volmgr, cluster, trashRequest)
trashq.DoneItem <- struct{}{}
}
}
// TrashItem deletes the indicated block from every writable volume.
-func TrashItem(trashRequest TrashRequest) {
+func TrashItem(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashRequest TrashRequest) {
reqMtime := time.Unix(0, trashRequest.BlockMtime)
- if time.Since(reqMtime) < theConfig.BlobSignatureTTL.Duration() {
+ if time.Since(reqMtime) < cluster.Collections.BlobSigningTTL.Duration() {
log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
arvados.Duration(time.Since(reqMtime)),
trashRequest.Locator,
trashRequest.BlockMtime,
reqMtime,
- theConfig.BlobSignatureTTL)
+ cluster.Collections.BlobSigningTTL)
return
}
- var volumes []Volume
+ var volumes []*VolumeMount
if uuid := trashRequest.MountUUID; uuid == "" {
- volumes = KeepVM.AllWritable()
- } else if v := KeepVM.Lookup(uuid, true); v == nil {
+ volumes = volmgr.AllWritable()
+ } else if mnt := volmgr.Lookup(uuid, true); mnt == nil {
log.Printf("warning: trash request for nonexistent mount: %v", trashRequest)
return
} else {
- volumes = []Volume{v}
+ volumes = []*VolumeMount{mnt}
}
for _, volume := range volumes {
continue
}
- if !theConfig.EnableDelete {
- err = errors.New("skipping because EnableDelete is false")
+ if !cluster.Collections.BlobTrash {
+ err = errors.New("skipping because Collections.BlobTrash is false")
} else {
err = volume.Trash(trashRequest.Locator)
}