+
+// EmptyTrash looks for trashed blocks that exceeded TrashLifetime
+// and deletes them from the volume.
+func (v *S3Volume) EmptyTrash() {
+ var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
+
+ // Use a merge sort to find matching sets of trash/X and recent/X.
+ trashL := s3Lister{
+ Bucket: v.bucket.Bucket,
+ Prefix: "trash/",
+ PageSize: v.IndexPageSize,
+ }
+ // Define "ready to delete" as "...when EmptyTrash started".
+ startT := time.Now()
+ for trash := trashL.First(); trash != nil; trash = trashL.Next() {
+ loc := trash.Key[6:]
+ if !v.isKeepBlock(loc) {
+ continue
+ }
+ bytesInTrash += trash.Size
+ blocksInTrash++
+
+ trashT, err := time.Parse(time.RFC3339, trash.LastModified)
+ if err != nil {
+ log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
+ continue
+ }
+ recent, err := v.bucket.Head("recent/"+loc, nil)
+ if err != nil && os.IsNotExist(v.translateError(err)) {
+ log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
+ err = v.Untrash(loc)
+ if err != nil {
+ log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
+ }
+ continue
+ } else if err != nil {
+ log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
+ continue
+ }
+ recentT, err := v.lastModified(recent)
+ if err != nil {
+ log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
+ continue
+ }
+ if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
+ if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
+ // recent/loc is too old to protect
+ // loc from being Trashed again during
+ // the raceWindow that starts if we
+ // delete trash/X now.
+ //
+ // Note this means (TrashCheckInterval
+ // < BlobSignatureTTL - raceWindow) is
+ // necessary to avoid starvation.
+ log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
+ v.fixRace(loc)
+ v.Touch(loc)
+ continue
+ }
+ _, err := v.bucket.Head(loc, nil)
+ if os.IsNotExist(err) {
+ log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
+ v.fixRace(loc)
+ continue
+ } else if err != nil {
+ log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
+ continue
+ }
+ }
+ if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
+ continue
+ }
+ err = v.bucket.Del(trash.Key)
+ if err != nil {
+ log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
+ continue
+ }
+ bytesDeleted += trash.Size
+ blocksDeleted++
+
+ _, err = v.bucket.Head(loc, nil)
+ if os.IsNotExist(err) {
+ err = v.bucket.Del("recent/" + loc)
+ if err != nil {
+ log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
+ }
+ } else if err != nil {
+ log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
+ }
+ }
+ if err := trashL.Error(); err != nil {
+ log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
+ }
+ log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
+}
+
+type s3Lister struct {
+ Bucket *s3.Bucket
+ Prefix string
+ PageSize int
+ nextMarker string
+ buf []s3.Key
+ err error
+}
+
+// First fetches the first page and returns the first item. It returns
+// nil if the response is the empty set or an error occurs.
+func (lister *s3Lister) First() *s3.Key {
+ lister.getPage()
+ return lister.pop()
+}
+
+// Next returns the next item, fetching the next page if necessary. It
+// returns nil if the last available item has already been fetched, or
+// an error occurs.
+func (lister *s3Lister) Next() *s3.Key {
+ if len(lister.buf) == 0 && lister.nextMarker != "" {
+ lister.getPage()
+ }
+ return lister.pop()
+}
+
+// Return the most recent error encountered by First or Next.
+func (lister *s3Lister) Error() error {
+ return lister.err
+}
+
+func (lister *s3Lister) getPage() {
+ resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
+ lister.nextMarker = ""
+ if err != nil {
+ lister.err = err
+ return
+ }
+ if resp.IsTruncated {
+ lister.nextMarker = resp.NextMarker
+ }
+ lister.buf = make([]s3.Key, 0, len(resp.Contents))
+ for _, key := range resp.Contents {
+ if !strings.HasPrefix(key.Key, lister.Prefix) {
+ log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
+ continue
+ }
+ lister.buf = append(lister.buf, key)
+ }
+}
+
+func (lister *s3Lister) pop() (k *s3.Key) {
+ if len(lister.buf) > 0 {
+ k = &lister.buf[0]
+ lister.buf = lister.buf[1:]
+ }
+ return
+}
+
+// s3bucket wraps s3.bucket and counts I/O and API usage stats.
+type s3bucket struct {
+ *s3.Bucket
+ stats s3bucketStats
+}
+
+func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
+ rdr, err := b.Bucket.GetReader(path)
+ b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
+ b.stats.TickErr(err)
+ return NewCountingReader(rdr, b.stats.TickInBytes), err
+}
+
+func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
+ resp, err := b.Bucket.Head(path, headers)
+ b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
+ b.stats.TickErr(err)
+ return resp, err
+}
+
+func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
+ err := b.Bucket.PutReader(path, NewCountingReader(r, b.stats.TickOutBytes), length, contType, perm, options)
+ b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
+ b.stats.TickErr(err)
+ return err
+}
+
+func (b *s3bucket) Put(path string, data []byte, contType string, perm s3.ACL, options s3.Options) error {
+ err := b.Bucket.PutReader(path, NewCountingReader(bytes.NewBuffer(data), b.stats.TickOutBytes), int64(len(data)), contType, perm, options)
+ b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
+ b.stats.TickErr(err)
+ return err
+}
+
+func (b *s3bucket) Del(path string) error {
+ err := b.Bucket.Del(path)
+ b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
+ b.stats.TickErr(err)
+ return err
+}
+
+type s3bucketStats struct {
+ statsTicker
+ Ops uint64
+ GetOps uint64
+ PutOps uint64
+ HeadOps uint64
+ DelOps uint64
+ ListOps uint64
+}
+
+func (s *s3bucketStats) TickErr(err error) {
+ if err == nil {
+ return
+ }
+ errType := fmt.Sprintf("%T", err)
+ if err, ok := err.(*s3.Error); ok {
+ errType = errType + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
+ }
+ s.statsTicker.TickErr(err, errType)
+}