- log.Printf("error: fixRace: %s", err)
- return false
- }
- return true
-}
-
-func (v *S3Volume) translateError(err error) error {
- switch err := err.(type) {
- case *s3.Error:
- if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
- strings.Contains(err.Error(), "Not Found") {
- return os.ErrNotExist
- }
- // Other 404 errors like NoSuchVersion and
- // NoSuchBucket are different problems which should
- // get called out downstream, so we don't convert them
- // to os.ErrNotExist.
- }
- return err
-}
-
-// EmptyTrash looks for trashed blocks that exceeded TrashLifetime
-// and deletes them from the volume.
-func (v *S3Volume) EmptyTrash() {
- var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
-
- // Define "ready to delete" as "...when EmptyTrash started".
- startT := time.Now()
-
- emptyOneKey := func(trash *s3.Key) {
- loc := trash.Key[6:]
- if !v.isKeepBlock(loc) {
- return
- }
- atomic.AddInt64(&bytesInTrash, trash.Size)
- atomic.AddInt64(&blocksInTrash, 1)
-
- trashT, err := time.Parse(time.RFC3339, trash.LastModified)
- if err != nil {
- log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
- return
- }
- recent, err := v.bucket.Head("recent/"+loc, nil)
- if err != nil && os.IsNotExist(v.translateError(err)) {
- log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
- err = v.Untrash(loc)
- if err != nil {
- log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
- }
- return
- } else if err != nil {
- log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
- return
- }
- recentT, err := v.lastModified(recent)
- if err != nil {
- log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
- return
- }
- if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
- if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
- // recent/loc is too old to protect
- // loc from being Trashed again during
- // the raceWindow that starts if we
- // delete trash/X now.
- //
- // Note this means (TrashCheckInterval
- // < BlobSignatureTTL - raceWindow) is
- // necessary to avoid starvation.
- log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
- v.fixRace(loc)
- v.Touch(loc)
- return
- }
- _, err := v.bucket.Head(loc, nil)
- if os.IsNotExist(err) {
- log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
- v.fixRace(loc)
- return
- } else if err != nil {
- log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
- return
- }
- }
- if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
- return
- }
- err = v.bucket.Del(trash.Key)
- if err != nil {
- log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
- return
- }
- atomic.AddInt64(&bytesDeleted, trash.Size)
- atomic.AddInt64(&blocksDeleted, 1)
-
- _, err = v.bucket.Head(loc, nil)
- if err == nil {
- log.Printf("warning: %s: EmptyTrash: HEAD %q succeeded immediately after deleting %q", v, loc, loc)
- return
- }
- if !os.IsNotExist(v.translateError(err)) {
- log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
- return
- }
- err = v.bucket.Del("recent/" + loc)
- if err != nil {
- log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
- }
- }
-
- var wg sync.WaitGroup
- todo := make(chan *s3.Key, theConfig.EmptyTrashWorkers)
- for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
- wg.Add(1)
- go func() {
- defer wg.Done()
- for key := range todo {
- emptyOneKey(key)
- }
- }()
- }
-
- trashL := s3Lister{
- Bucket: v.bucket.Bucket,
- Prefix: "trash/",
- PageSize: v.IndexPageSize,
- Stats: &v.bucket.stats,
- }
- for trash := trashL.First(); trash != nil; trash = trashL.Next() {
- todo <- trash
- }
- close(todo)
- wg.Wait()
-
- if err := trashL.Error(); err != nil {
- log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
- }
- log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
-}
-
-type s3Lister struct {
- Bucket *s3.Bucket
- Prefix string
- PageSize int
- Stats *s3bucketStats
- nextMarker string
- buf []s3.Key
- err error
-}
-
-// First fetches the first page and returns the first item. It returns
-// nil if the response is the empty set or an error occurs.
-func (lister *s3Lister) First() *s3.Key {
- lister.getPage()
- return lister.pop()
-}
-
-// Next returns the next item, fetching the next page if necessary. It
-// returns nil if the last available item has already been fetched, or
-// an error occurs.
-func (lister *s3Lister) Next() *s3.Key {
- if len(lister.buf) == 0 && lister.nextMarker != "" {
- lister.getPage()