X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/4d078362f10fba8b94cce3ecc3ed8b4924b79b41..e028e5daadb83ef695b7e0b1ef6bd45eef4cfc29:/services/keepstore/trash_worker.go diff --git a/services/keepstore/trash_worker.go b/services/keepstore/trash_worker.go index ca269126ee..65e3fbd284 100644 --- a/services/keepstore/trash_worker.go +++ b/services/keepstore/trash_worker.go @@ -1,43 +1,59 @@ package main import ( + "errors" "log" "time" ) -/* - Keepstore initiates trash worker channel goroutine. - The channel will process trash list. - For each (next) trash request: - Delete the block indicated by the trash request Locator - Repeat -*/ - +// RunTrashWorker is used by Keepstore to initiate trash worker channel goroutine. +// The channel will process trash list. +// For each (next) trash request: +// Delete the block indicated by the trash request Locator +// Repeat +// func RunTrashWorker(trashq *WorkQueue) { - nextItem := trashq.NextItem - for item := range nextItem { + for item := range trashq.NextItem { trashRequest := item.(TrashRequest) - err := TrashItem(trashRequest) - if err != nil { - log.Printf("Trash request error for %s: %s", trashRequest, err) - } + TrashItem(trashRequest) + trashq.DoneItem <- struct{}{} } } -/* - Delete the block indicated by the Locator in TrashRequest. -*/ -func TrashItem(trashRequest TrashRequest) (err error) { - // Verify if the block is to be deleted based on its Mtime +// TrashItem deletes the indicated block from every writable volume. +func TrashItem(trashRequest TrashRequest) { + reqMtime := time.Unix(trashRequest.BlockMtime, 0) + if time.Since(reqMtime) < blobSignatureTTL { + log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.", + time.Since(reqMtime), + trashRequest.Locator, + trashRequest.BlockMtime, + reqMtime, + blobSignatureTTL) + return + } + for _, volume := range KeepVM.AllWritable() { mtime, err := volume.Mtime(trashRequest.Locator) - if err != nil || trashRequest.BlockMtime != mtime.Unix() { + if err != nil { + log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err) continue } - currentTime := time.Now().Unix() - if time.Duration(currentTime-trashRequest.BlockMtime)*time.Second >= permission_ttl { + if trashRequest.BlockMtime != mtime.Unix() { + log.Printf("%v Delete(%v): mtime on volume is %v does not match trash list value %v", volume, trashRequest.Locator, mtime.Unix(), trashRequest.BlockMtime) + continue + } + + if neverDelete { + err = errors.New("did not delete block because neverDelete is true") + } else { err = volume.Delete(trashRequest.Locator) } + + if err != nil { + log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err) + } else { + log.Printf("%v Delete(%v) OK", volume, trashRequest.Locator) + } } - return }