8555: Log statistics in EmptyTrash.
[arvados.git] / services / keepstore / s3_volume.go
index 5eb99414a0eddea7edbd8bcae738575a424ea66c..0cfaaaf3b4a079c60afe4c0e96db12a4b03371ce 100644 (file)
@@ -18,6 +18,8 @@ import (
 )
 
 var (
+       // Returned by Trash if that operation is impossible with the
+       // current config.
        ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
 
        s3AccessKeyFile string
@@ -134,6 +136,7 @@ func init() {
                "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
 }
 
+// S3Volume implements Volume using an S3 bucket.
 type S3Volume struct {
        *s3.Bucket
        raceWindow    time.Duration
@@ -158,6 +161,8 @@ func NewS3Volume(auth aws.Auth, region aws.Region, bucket string, raceWindow tim
        }
 }
 
+// Check returns an error if the volume is inaccessible (e.g., config
+// error).
 func (v *S3Volume) Check() error {
        return nil
 }
@@ -192,6 +197,8 @@ func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
        return
 }
 
+// Get a block: copy the block data into buf, and return the number of
+// bytes copied.
 func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
        rdr, err := v.getReader(loc)
        if err != nil {
@@ -207,6 +214,7 @@ func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
        }
 }
 
+// Compare the given data with the stored data.
 func (v *S3Volume) Compare(loc string, expect []byte) error {
        rdr, err := v.getReader(loc)
        if err != nil {
@@ -216,6 +224,7 @@ func (v *S3Volume) Compare(loc string, expect []byte) error {
        return v.translateError(compareReaderWithBuf(rdr, expect, loc[:32]))
 }
 
+// Put writes a block.
 func (v *S3Volume) Put(loc string, block []byte) error {
        if v.readonly {
                return MethodDisabledError
@@ -236,6 +245,7 @@ func (v *S3Volume) Put(loc string, block []byte) error {
        return v.translateError(err)
 }
 
+// Touch sets the timestamp for the given locator to the current time.
 func (v *S3Volume) Touch(loc string) error {
        if v.readonly {
                return MethodDisabledError
@@ -252,6 +262,7 @@ func (v *S3Volume) Touch(loc string) error {
        return v.translateError(err)
 }
 
+// Mtime returns the stored timestamp for the given locator.
 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
        _, err := v.Bucket.Head(loc, nil)
        if err != nil {
@@ -279,6 +290,8 @@ func (v *S3Volume) Mtime(loc string) (time.Time, error) {
        return v.lastModified(resp)
 }
 
+// IndexTo writes a complete list of locators with the given prefix
+// for which Get() can retrieve data.
 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
        // Use a merge sort to find matching sets of X and recent/X.
        dataL := s3Lister{
@@ -288,7 +301,7 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
        }
        recentL := s3Lister{
                Bucket:   v.Bucket,
-               Prefix:   "recent/"+prefix,
+               Prefix:   "recent/" + prefix,
                PageSize: v.indexPageSize,
        }
        for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
@@ -393,6 +406,10 @@ func (v *S3Volume) checkRaceWindow(loc string) error {
        return nil
 }
 
+// safeCopy calls PutCopy, and checks the response to make sure the
+// copy succeeded and updated the timestamp on the destination object
+// (PutCopy returns 200 OK if the request was received, even if the
+// copy failed).
 func (v *S3Volume) safeCopy(dst, src string) error {
        resp, err := v.Bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
                ContentType:       "application/octet-stream",
@@ -426,6 +443,7 @@ func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
        return
 }
 
+// Untrash moves block from trash back into store
 func (v *S3Volume) Untrash(loc string) error {
        err := v.safeCopy(loc, "trash/"+loc)
        if err != nil {
@@ -435,6 +453,9 @@ func (v *S3Volume) Untrash(loc string) error {
        return v.translateError(err)
 }
 
+// Status returns a *VolumeStatus representing the current in-use
+// storage capacity and a fake available capacity that doesn't make
+// the volume seem full or nearly-full.
 func (v *S3Volume) Status() *VolumeStatus {
        return &VolumeStatus{
                DeviceNum: 1,
@@ -443,13 +464,19 @@ func (v *S3Volume) Status() *VolumeStatus {
        }
 }
 
+// String implements fmt.Stringer.
 func (v *S3Volume) String() string {
        return fmt.Sprintf("s3-bucket:%+q", v.Bucket.Name)
 }
 
+// Writable returns false if all future Put, Mtime, and Delete calls
+// are expected to fail.
 func (v *S3Volume) Writable() bool {
        return !v.readonly
 }
+
+// Replication returns the storage redundancy of the underlying
+// device. Configured via command line flag.
 func (v *S3Volume) Replication() int {
        return v.replication
 }
@@ -524,6 +551,8 @@ func (v *S3Volume) translateError(err error) error {
 // EmptyTrash looks for trashed blocks that exceeded trashLifetime
 // and deletes them from the volume.
 func (v *S3Volume) EmptyTrash() {
+       var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
+
        // Use a merge sort to find matching sets of trash/X and recent/X.
        trashL := s3Lister{
                Bucket:   v.Bucket,
@@ -531,20 +560,23 @@ func (v *S3Volume) EmptyTrash() {
                PageSize: v.indexPageSize,
        }
        // Define "ready to delete" as "...when EmptyTrash started".
-       now := time.Now()
+       startT := time.Now()
        for trash := trashL.First(); trash != nil; trash = trashL.Next() {
                loc := trash.Key[6:]
                if !v.isKeepBlock(loc) {
                        continue
                }
-               recent, err := v.Bucket.Head("recent/"+loc, nil)
+               bytesInTrash += trash.Size
+               blocksInTrash++
+
+               trashT, err := time.Parse(time.RFC3339, trash.LastModified)
                if err != nil {
-                       log.Printf("warning: %s: EmptyTrash: cannot delete trash %q with no corresponding recent/* marker", v, trash.Key)
+                       log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
                        continue
                }
-               trashT, err := time.Parse(time.RFC3339, trash.LastModified)
+               recent, err := v.Bucket.Head("recent/"+loc, nil)
                if err != nil {
-                       log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
+                       log.Printf("warning: %s: EmptyTrash: cannot delete trash %q with no corresponding recent/* marker", v, trash.Key)
                        continue
                }
                recentT, err := v.lastModified(recent)
@@ -553,10 +585,29 @@ func (v *S3Volume) EmptyTrash() {
                        continue
                }
                if trashT.Sub(recentT) < blobSignatureTTL {
-                       v.fixRace(loc)
-                       continue
+                       if age := startT.Sub(recentT); age >= blobSignatureTTL - v.raceWindow {
+                               // recent/loc is too old to protect
+                               // loc from being Trashed again during
+                               // the raceWindow that starts if we
+                               // delete trash/X now.
+                               //
+                               // Note this means (trashCheckInterval
+                               // < blobSignatureTTL - raceWindow) is
+                               // necessary to avoid starvation.
+                               log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
+                               v.fixRace(loc)
+                               v.Touch(loc)
+                               continue
+                       } else if _, err := v.Bucket.Head(loc, nil); os.IsNotExist(err) {
+                               log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
+                               v.fixRace(loc)
+                               continue
+                       } else if err != nil {
+                               log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
+                               continue
+                       }
                }
-               if now.Sub(trashT) < trashLifetime {
+               if startT.Sub(trashT) < trashLifetime {
                        continue
                }
                err = v.Bucket.Del(trash.Key)
@@ -564,9 +615,12 @@ func (v *S3Volume) EmptyTrash() {
                        log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
                        continue
                }
+               bytesDeleted += trash.Size
+               blocksDeleted++
+
                _, err = v.Bucket.Head(loc, nil)
                if os.IsNotExist(err) {
-                       err = v.Bucket.Del("recent/"+loc)
+                       err = v.Bucket.Del("recent/" + loc)
                        if err != nil {
                                log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
                        }
@@ -577,15 +631,16 @@ func (v *S3Volume) EmptyTrash() {
        if err := trashL.Error(); err != nil {
                log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
        }
+       log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
 }
 
 type s3Lister struct {
-       Bucket      *s3.Bucket
-       Prefix      string
-       PageSize    int
-       nextMarker  string
-       buf         []s3.Key
-       err         error
+       Bucket     *s3.Bucket
+       Prefix     string
+       PageSize   int
+       nextMarker string
+       buf        []s3.Key
+       err        error
 }
 
 // First fetches the first page and returns the first item. It returns