+
+// EmptyTrash looks for trashed blocks that exceeded trashLifetime
+// and deletes them from the volume.
+func (v *S3Volume) EmptyTrash() {
+ var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
+
+ // Use a merge sort to find matching sets of trash/X and recent/X.
+ trashL := s3Lister{
+ Bucket: v.Bucket,
+ Prefix: "trash/",
+ PageSize: v.indexPageSize,
+ }
+ // Define "ready to delete" as "...when EmptyTrash started".
+ startT := time.Now()
+ for trash := trashL.First(); trash != nil; trash = trashL.Next() {
+ loc := trash.Key[6:]
+ if !v.isKeepBlock(loc) {
+ continue
+ }
+ bytesInTrash += trash.Size
+ blocksInTrash++
+
+ trashT, err := time.Parse(time.RFC3339, trash.LastModified)
+ if err != nil {
+ log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
+ continue
+ }
+ recent, err := v.Bucket.Head("recent/"+loc, nil)
+ if err != nil {
+ log.Printf("warning: %s: EmptyTrash: cannot delete trash %q with no corresponding recent/* marker", v, trash.Key)
+ continue
+ }
+ recentT, err := v.lastModified(recent)
+ if err != nil {
+ log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
+ continue
+ }
+ if trashT.Sub(recentT) < blobSignatureTTL {
+ if age := startT.Sub(recentT); age >= blobSignatureTTL - v.raceWindow {
+ // recent/loc is too old to protect
+ // loc from being Trashed again during
+ // the raceWindow that starts if we
+ // delete trash/X now.
+ //
+ // Note this means (trashCheckInterval
+ // < blobSignatureTTL - raceWindow) is
+ // necessary to avoid starvation.
+ log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
+ v.fixRace(loc)
+ v.Touch(loc)
+ continue
+ } else if _, err := v.Bucket.Head(loc, nil); os.IsNotExist(err) {
+ log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
+ v.fixRace(loc)
+ continue
+ } else if err != nil {
+ log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
+ continue
+ }
+ }
+ if startT.Sub(trashT) < trashLifetime {
+ continue
+ }
+ err = v.Bucket.Del(trash.Key)
+ if err != nil {
+ log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
+ continue
+ }
+ bytesDeleted += trash.Size
+ blocksDeleted++
+
+ _, err = v.Bucket.Head(loc, nil)
+ if os.IsNotExist(err) {
+ err = v.Bucket.Del("recent/" + loc)
+ if err != nil {
+ log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
+ }
+ } else if err != nil {
+ log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
+ }
+ }
+ if err := trashL.Error(); err != nil {
+ log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
+ }
+ log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
+}
+
+type s3Lister struct {
+ Bucket *s3.Bucket
+ Prefix string
+ PageSize int
+ nextMarker string
+ buf []s3.Key
+ err error
+}
+
+// First fetches the first page and returns the first item. It returns
+// nil if the response is the empty set or an error occurs.
+func (lister *s3Lister) First() *s3.Key {
+ lister.getPage()
+ return lister.pop()
+}
+
+// Next returns the next item, fetching the next page if necessary. It
+// returns nil if the last available item has already been fetched, or
+// an error occurs.
+func (lister *s3Lister) Next() *s3.Key {
+ if len(lister.buf) == 0 && lister.nextMarker != "" {
+ lister.getPage()
+ }
+ return lister.pop()
+}
+
+// Return the most recent error encountered by First or Next.
+func (lister *s3Lister) Error() error {
+ return lister.err
+}
+
+func (lister *s3Lister) getPage() {
+ resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
+ lister.nextMarker = ""
+ if err != nil {
+ lister.err = err
+ return
+ }
+ if resp.IsTruncated {
+ lister.nextMarker = resp.NextMarker
+ }
+ lister.buf = make([]s3.Key, 0, len(resp.Contents))
+ for _, key := range resp.Contents {
+ if !strings.HasPrefix(key.Key, lister.Prefix) {
+ log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
+ continue
+ }
+ lister.buf = append(lister.buf, key)
+ }
+}
+
+func (lister *s3Lister) pop() (k *s3.Key) {
+ if len(lister.buf) > 0 {
+ k = &lister.buf[0]
+ lister.buf = lister.buf[1:]
+ }
+ return
+}