21126: Add AllowTrashWhenReadOnly flag.
[arvados.git] / services / keepstore / trash_worker.go
index ba1455ac657bca2f05dd808a23936af83a425e20..5e8a5a963ceaad37527e652ca19460dce349fcd4 100644 (file)
@@ -2,35 +2,30 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package keepstore
 
 import (
        "errors"
-       "log"
        "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/sirupsen/logrus"
 )
 
-// RunTrashWorker is used by Keepstore to initiate trash worker channel goroutine.
-//     The channel will process trash list.
-//             For each (next) trash request:
-//      Delete the block indicated by the trash request Locator
-//             Repeat
-//
-func RunTrashWorker(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashq *WorkQueue) {
+// RunTrashWorker processes the trash request queue.
+func RunTrashWorker(volmgr *RRVolumeManager, logger logrus.FieldLogger, cluster *arvados.Cluster, trashq *WorkQueue) {
        for item := range trashq.NextItem {
                trashRequest := item.(TrashRequest)
-               TrashItem(volmgr, cluster, trashRequest)
+               TrashItem(volmgr, logger, cluster, trashRequest)
                trashq.DoneItem <- struct{}{}
        }
 }
 
 // TrashItem deletes the indicated block from every writable volume.
-func TrashItem(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashRequest TrashRequest) {
+func TrashItem(volmgr *RRVolumeManager, logger logrus.FieldLogger, cluster *arvados.Cluster, trashRequest TrashRequest) {
        reqMtime := time.Unix(0, trashRequest.BlockMtime)
        if time.Since(reqMtime) < cluster.Collections.BlobSigningTTL.Duration() {
-               log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
+               logger.Warnf("client asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
                        arvados.Duration(time.Since(reqMtime)),
                        trashRequest.Locator,
                        trashRequest.BlockMtime,
@@ -41,10 +36,12 @@ func TrashItem(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashRequest T
 
        var volumes []*VolumeMount
        if uuid := trashRequest.MountUUID; uuid == "" {
-               volumes = volmgr.AllWritable()
-       } else if mnt := volmgr.Lookup(uuid, true); mnt == nil {
-               log.Printf("warning: trash request for nonexistent mount: %v", trashRequest)
+               volumes = volmgr.Mounts()
+       } else if mnt := volmgr.Lookup(uuid, false); mnt == nil {
+               logger.Warnf("trash request for nonexistent mount: %v", trashRequest)
                return
+       } else if !mnt.KeepMount.AllowTrash {
+               logger.Warnf("trash request for mount with ReadOnly=true, AllowTrashWhenReadOnly=false: %v", trashRequest)
        } else {
                volumes = []*VolumeMount{mnt}
        }
@@ -52,11 +49,11 @@ func TrashItem(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashRequest T
        for _, volume := range volumes {
                mtime, err := volume.Mtime(trashRequest.Locator)
                if err != nil {
-                       log.Printf("%v Trash(%v): %v", volume, trashRequest.Locator, err)
+                       logger.WithError(err).Errorf("%v Trash(%v)", volume, trashRequest.Locator)
                        continue
                }
                if trashRequest.BlockMtime != mtime.UnixNano() {
-                       log.Printf("%v Trash(%v): stored mtime %v does not match trash list value %v", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime)
+                       logger.Infof("%v Trash(%v): stored mtime %v does not match trash list value %v; skipping", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime)
                        continue
                }
 
@@ -67,9 +64,9 @@ func TrashItem(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashRequest T
                }
 
                if err != nil {
-                       log.Printf("%v Trash(%v): %v", volume, trashRequest.Locator, err)
+                       logger.WithError(err).Errorf("%v Trash(%v)", volume, trashRequest.Locator)
                } else {
-                       log.Printf("%v Trash(%v) OK", volume, trashRequest.Locator)
+                       logger.Infof("%v Trash(%v) OK", volume, trashRequest.Locator)
                }
        }
 }