X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a6d2f88debdfa7bc390b63c1f18a0541987ae0b8..feaf44094afa6774cd6583f1a121454d6754ba0c:/services/keepstore/s3_volume.go diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go index 5b00d4abc5..1a2a47b0df 100644 --- a/services/keepstore/s3_volume.go +++ b/services/keepstore/s3_volume.go @@ -18,8 +18,8 @@ import ( ) var ( - // Returned by Trash if that operation is impossible with the - // current config. + // ErrS3TrashDisabled is returned by Trash if that operation + // is impossible with the current config. ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false") s3AccessKeyFile string @@ -43,9 +43,6 @@ type s3VolumeAdder struct { } func (s *s3VolumeAdder) Set(bucketName string) error { - if trashLifetime != 0 { - return ErrNotImplemented - } if bucketName == "" { return fmt.Errorf("no container name given") } @@ -551,6 +548,8 @@ func (v *S3Volume) translateError(err error) error { // EmptyTrash looks for trashed blocks that exceeded trashLifetime // and deletes them from the volume. func (v *S3Volume) EmptyTrash() { + var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64 + // Use a merge sort to find matching sets of trash/X and recent/X. trashL := s3Lister{ Bucket: v.Bucket, @@ -558,32 +557,61 @@ func (v *S3Volume) EmptyTrash() { PageSize: v.indexPageSize, } // Define "ready to delete" as "...when EmptyTrash started". - now := time.Now() + startT := time.Now() for trash := trashL.First(); trash != nil; trash = trashL.Next() { loc := trash.Key[6:] if !v.isKeepBlock(loc) { continue } - recent, err := v.Bucket.Head("recent/"+loc, nil) - if err != nil { - log.Printf("warning: %s: EmptyTrash: cannot delete trash %q with no corresponding recent/* marker", v, trash.Key) - continue - } + bytesInTrash += trash.Size + blocksInTrash++ + trashT, err := time.Parse(time.RFC3339, trash.LastModified) if err != nil { log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err) continue } + recent, err := v.Bucket.Head("recent/"+loc, nil) + if err != nil && os.IsNotExist(v.translateError(err)) { + log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err) + err = v.Untrash(loc) + if err != nil { + log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err) + } + continue + } else if err != nil { + log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err) + continue + } recentT, err := v.lastModified(recent) if err != nil { log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err) continue } if trashT.Sub(recentT) < blobSignatureTTL { - v.fixRace(loc) - continue + if age := startT.Sub(recentT); age >= blobSignatureTTL-v.raceWindow { + // recent/loc is too old to protect + // loc from being Trashed again during + // the raceWindow that starts if we + // delete trash/X now. + // + // Note this means (trashCheckInterval + // < blobSignatureTTL - raceWindow) is + // necessary to avoid starvation. + log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc) + v.fixRace(loc) + v.Touch(loc) + continue + } else if _, err := v.Bucket.Head(loc, nil); os.IsNotExist(err) { + log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc) + v.fixRace(loc) + continue + } else if err != nil { + log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err) + continue + } } - if now.Sub(trashT) < trashLifetime { + if startT.Sub(trashT) < trashLifetime { continue } err = v.Bucket.Del(trash.Key) @@ -591,6 +619,9 @@ func (v *S3Volume) EmptyTrash() { log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err) continue } + bytesDeleted += trash.Size + blocksDeleted++ + _, err = v.Bucket.Head(loc, nil) if os.IsNotExist(err) { err = v.Bucket.Del("recent/" + loc) @@ -604,6 +635,7 @@ func (v *S3Volume) EmptyTrash() { if err := trashL.Error(); err != nil { log.Printf("error: %s: EmptyTrash: lister: %s", v, err) } + log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted) } type s3Lister struct {