X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c15c0971e10534f36748feae87b1b73a386fd9b1..70e9fc0a1e57fb4d85d985b4c9258d7a5615b3bb:/services/keepstore/s3_volume.go?ds=sidebyside diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go index be4da4d8a4..1f62f4a173 100644 --- a/services/keepstore/s3_volume.go +++ b/services/keepstore/s3_volume.go @@ -18,8 +18,8 @@ import ( ) var ( - // Returned by Trash if that operation is impossible with the - // current config. + // ErrS3TrashDisabled is returned by Trash if that operation + // is impossible with the current config. ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false") s3AccessKeyFile string @@ -551,6 +551,8 @@ func (v *S3Volume) translateError(err error) error { // EmptyTrash looks for trashed blocks that exceeded trashLifetime // and deletes them from the volume. func (v *S3Volume) EmptyTrash() { + var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64 + // Use a merge sort to find matching sets of trash/X and recent/X. trashL := s3Lister{ Bucket: v.Bucket, @@ -564,14 +566,24 @@ func (v *S3Volume) EmptyTrash() { if !v.isKeepBlock(loc) { continue } + bytesInTrash += trash.Size + blocksInTrash++ + trashT, err := time.Parse(time.RFC3339, trash.LastModified) if err != nil { log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err) continue } recent, err := v.Bucket.Head("recent/"+loc, nil) - if err != nil { - log.Printf("warning: %s: EmptyTrash: cannot delete trash %q with no corresponding recent/* marker", v, trash.Key) + if err != nil && os.IsNotExist(v.translateError(err)) { + log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err) + err = v.Untrash(loc) + if err != nil { + log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err) + } + continue + } else if err != nil { + log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err) continue } recentT, err := v.lastModified(recent) @@ -580,7 +592,7 @@ func (v *S3Volume) EmptyTrash() { continue } if trashT.Sub(recentT) < blobSignatureTTL { - if age := startT.Sub(recentT); age >= blobSignatureTTL - v.raceWindow { + if age := startT.Sub(recentT); age >= blobSignatureTTL-v.raceWindow { // recent/loc is too old to protect // loc from being Trashed again during // the raceWindow that starts if we @@ -610,6 +622,9 @@ func (v *S3Volume) EmptyTrash() { log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err) continue } + bytesDeleted += trash.Size + blocksDeleted++ + _, err = v.Bucket.Head(loc, nil) if os.IsNotExist(err) { err = v.Bucket.Del("recent/" + loc) @@ -623,6 +638,7 @@ func (v *S3Volume) EmptyTrash() { if err := trashL.Error(); err != nil { log.Printf("error: %s: EmptyTrash: lister: %s", v, err) } + log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted) } type s3Lister struct {