X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a74c81b035c67d299e2a7298f8db3d368a578510..9c78326f370f1875e41422e2d2c8a3c5a86c5bf8:/services/keepstore/trash_worker.go diff --git a/services/keepstore/trash_worker.go b/services/keepstore/trash_worker.go index d11bc05192..5e8a5a963c 100644 --- a/services/keepstore/trash_worker.go +++ b/services/keepstore/trash_worker.go @@ -1,59 +1,72 @@ -package main +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package keepstore import ( "errors" - "log" "time" + + "git.arvados.org/arvados.git/sdk/go/arvados" + "github.com/sirupsen/logrus" ) -// RunTrashWorker is used by Keepstore to initiate trash worker channel goroutine. -// The channel will process trash list. -// For each (next) trash request: -// Delete the block indicated by the trash request Locator -// Repeat -// -func RunTrashWorker(trashq *WorkQueue) { +// RunTrashWorker processes the trash request queue. +func RunTrashWorker(volmgr *RRVolumeManager, logger logrus.FieldLogger, cluster *arvados.Cluster, trashq *WorkQueue) { for item := range trashq.NextItem { trashRequest := item.(TrashRequest) - TrashItem(trashRequest) + TrashItem(volmgr, logger, cluster, trashRequest) trashq.DoneItem <- struct{}{} } } // TrashItem deletes the indicated block from every writable volume. -func TrashItem(trashRequest TrashRequest) { +func TrashItem(volmgr *RRVolumeManager, logger logrus.FieldLogger, cluster *arvados.Cluster, trashRequest TrashRequest) { reqMtime := time.Unix(0, trashRequest.BlockMtime) - if time.Since(reqMtime) < blobSignatureTTL { - log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.", - time.Since(reqMtime), + if time.Since(reqMtime) < cluster.Collections.BlobSigningTTL.Duration() { + logger.Warnf("client asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.", + arvados.Duration(time.Since(reqMtime)), trashRequest.Locator, trashRequest.BlockMtime, reqMtime, - blobSignatureTTL) + cluster.Collections.BlobSigningTTL) + return + } + + var volumes []*VolumeMount + if uuid := trashRequest.MountUUID; uuid == "" { + volumes = volmgr.Mounts() + } else if mnt := volmgr.Lookup(uuid, false); mnt == nil { + logger.Warnf("trash request for nonexistent mount: %v", trashRequest) return + } else if !mnt.KeepMount.AllowTrash { + logger.Warnf("trash request for mount with ReadOnly=true, AllowTrashWhenReadOnly=false: %v", trashRequest) + } else { + volumes = []*VolumeMount{mnt} } - for _, volume := range KeepVM.AllWritable() { + for _, volume := range volumes { mtime, err := volume.Mtime(trashRequest.Locator) if err != nil { - log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err) + logger.WithError(err).Errorf("%v Trash(%v)", volume, trashRequest.Locator) continue } if trashRequest.BlockMtime != mtime.UnixNano() { - log.Printf("%v Delete(%v): stored mtime %v does not match trash list value %v", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime) + logger.Infof("%v Trash(%v): stored mtime %v does not match trash list value %v; skipping", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime) continue } - if neverDelete { - err = errors.New("did not delete block because neverDelete is true") + if !cluster.Collections.BlobTrash { + err = errors.New("skipping because Collections.BlobTrash is false") } else { err = volume.Trash(trashRequest.Locator) } if err != nil { - log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err) + logger.WithError(err).Errorf("%v Trash(%v)", volume, trashRequest.Locator) } else { - log.Printf("%v Delete(%v) OK", volume, trashRequest.Locator) + logger.Infof("%v Trash(%v) OK", volume, trashRequest.Locator) } } }