X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d3502068c07807aea7b36270bdae74b04095df62..f04693da1811e670d4cbb981debeecf14d79137c:/services/keepstore/trash_worker.go diff --git a/services/keepstore/trash_worker.go b/services/keepstore/trash_worker.go index 8a9fedfb70..ba1455ac65 100644 --- a/services/keepstore/trash_worker.go +++ b/services/keepstore/trash_worker.go @@ -6,6 +6,7 @@ package main import ( "errors" + "log" "time" "git.curoverse.com/arvados.git/sdk/go/arvados" @@ -17,35 +18,35 @@ import ( // Delete the block indicated by the trash request Locator // Repeat // -func RunTrashWorker(trashq *WorkQueue) { +func RunTrashWorker(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashq *WorkQueue) { for item := range trashq.NextItem { trashRequest := item.(TrashRequest) - TrashItem(trashRequest) + TrashItem(volmgr, cluster, trashRequest) trashq.DoneItem <- struct{}{} } } // TrashItem deletes the indicated block from every writable volume. -func TrashItem(trashRequest TrashRequest) { +func TrashItem(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashRequest TrashRequest) { reqMtime := time.Unix(0, trashRequest.BlockMtime) - if time.Since(reqMtime) < theConfig.BlobSignatureTTL.Duration() { + if time.Since(reqMtime) < cluster.Collections.BlobSigningTTL.Duration() { log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.", arvados.Duration(time.Since(reqMtime)), trashRequest.Locator, trashRequest.BlockMtime, reqMtime, - theConfig.BlobSignatureTTL) + cluster.Collections.BlobSigningTTL) return } - var volumes []Volume + var volumes []*VolumeMount if uuid := trashRequest.MountUUID; uuid == "" { - volumes = KeepVM.AllWritable() - } else if v := KeepVM.Lookup(uuid, true); v == nil { + volumes = volmgr.AllWritable() + } else if mnt := volmgr.Lookup(uuid, true); mnt == nil { log.Printf("warning: trash request for nonexistent mount: %v", trashRequest) return } else { - volumes = []Volume{v} + volumes = []*VolumeMount{mnt} } for _, volume := range volumes { @@ -59,8 +60,8 @@ func TrashItem(trashRequest TrashRequest) { continue } - if !theConfig.EnableDelete { - err = errors.New("skipping because EnableDelete is false") + if !cluster.Collections.BlobTrash { + err = errors.New("skipping because Collections.BlobTrash is false") } else { err = volume.Trash(trashRequest.Locator) }