X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/6c14ca8fe128f4db3bafe4edd1cc5551bc85e1ef..e6ccda1f6925da119589e93b54d22508cf979069:/services/keepstore/trash_worker.go diff --git a/services/keepstore/trash_worker.go b/services/keepstore/trash_worker.go index bc99e9657a..5e8a5a963c 100644 --- a/services/keepstore/trash_worker.go +++ b/services/keepstore/trash_worker.go @@ -1,55 +1,72 @@ -package main +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package keepstore import ( - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "log" + "errors" "time" + + "git.arvados.org/arvados.git/sdk/go/arvados" + "github.com/sirupsen/logrus" ) -/* - Keepstore initiates trash worker channel goroutine. - The channel will process trash list. - For each (next) trash request: - Delete the block indicated by the trash request Locator - Repeat -*/ +// RunTrashWorker processes the trash request queue. +func RunTrashWorker(volmgr *RRVolumeManager, logger logrus.FieldLogger, cluster *arvados.Cluster, trashq *WorkQueue) { + for item := range trashq.NextItem { + trashRequest := item.(TrashRequest) + TrashItem(volmgr, logger, cluster, trashRequest) + trashq.DoneItem <- struct{}{} + } +} -var defaultTrashLifetime int64 = 0 +// TrashItem deletes the indicated block from every writable volume. +func TrashItem(volmgr *RRVolumeManager, logger logrus.FieldLogger, cluster *arvados.Cluster, trashRequest TrashRequest) { + reqMtime := time.Unix(0, trashRequest.BlockMtime) + if time.Since(reqMtime) < cluster.Collections.BlobSigningTTL.Duration() { + logger.Warnf("client asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.", + arvados.Duration(time.Since(reqMtime)), + trashRequest.Locator, + trashRequest.BlockMtime, + reqMtime, + cluster.Collections.BlobSigningTTL) + return + } -func RunTrashWorker(arv *arvadosclient.ArvadosClient, trashq *WorkQueue) { - if arv != nil { - defaultTrashLifetimeMap, err := arv.Discovery("defaultTrashLifetime") - if err != nil { - log.Fatalf("Error setting up arvados client %s", err.Error()) - } - defaultTrashLifetime = int64(defaultTrashLifetimeMap["defaultTrashLifetime"].(float64)) + var volumes []*VolumeMount + if uuid := trashRequest.MountUUID; uuid == "" { + volumes = volmgr.Mounts() + } else if mnt := volmgr.Lookup(uuid, false); mnt == nil { + logger.Warnf("trash request for nonexistent mount: %v", trashRequest) + return + } else if !mnt.KeepMount.AllowTrash { + logger.Warnf("trash request for mount with ReadOnly=true, AllowTrashWhenReadOnly=false: %v", trashRequest) + } else { + volumes = []*VolumeMount{mnt} } - nextItem := trashq.NextItem - for item := range nextItem { - trashRequest := item.(TrashRequest) - err := TrashItem(trashRequest) + for _, volume := range volumes { + mtime, err := volume.Mtime(trashRequest.Locator) if err != nil { - log.Printf("Trash request error for %s: %s", trashRequest, err) + logger.WithError(err).Errorf("%v Trash(%v)", volume, trashRequest.Locator) + continue + } + if trashRequest.BlockMtime != mtime.UnixNano() { + logger.Infof("%v Trash(%v): stored mtime %v does not match trash list value %v; skipping", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime) + continue } - } -} -/* - Delete the block indicated by the Locator in TrashRequest. -*/ -func TrashItem(trashRequest TrashRequest) (err error) { - // Verify if the block is to be deleted based on its Mtime - for _, volume := range KeepVM.Volumes() { - mtime, err := volume.Mtime(trashRequest.Locator) - if err == nil { - if trashRequest.BlockMtime == mtime.Unix() { - currentTime := time.Now().Unix() - if (currentTime - trashRequest.BlockMtime) > defaultTrashLifetime { - err = volume.Delete(trashRequest.Locator) - } - } + if !cluster.Collections.BlobTrash { + err = errors.New("skipping because Collections.BlobTrash is false") + } else { + err = volume.Trash(trashRequest.Locator) + } + + if err != nil { + logger.WithError(err).Errorf("%v Trash(%v)", volume, trashRequest.Locator) + } else { + logger.Infof("%v Trash(%v) OK", volume, trashRequest.Locator) } } - return }