8555: Fix up comments.
[arvados.git] / services / keepstore / s3_volume.go
index be4da4d8a4e2d7b3624e6dc71a3b9306f26b9934..1f62f4a173d11bae05d37b3e7994d00877a625b7 100644 (file)
@@ -18,8 +18,8 @@ import (
 )
 
 var (
-       // Returned by Trash if that operation is impossible with the
-       // current config.
+       // ErrS3TrashDisabled is returned by Trash if that operation
+       // is impossible with the current config.
        ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
 
        s3AccessKeyFile string
@@ -551,6 +551,8 @@ func (v *S3Volume) translateError(err error) error {
 // EmptyTrash looks for trashed blocks that exceeded trashLifetime
 // and deletes them from the volume.
 func (v *S3Volume) EmptyTrash() {
+       var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
+
        // Use a merge sort to find matching sets of trash/X and recent/X.
        trashL := s3Lister{
                Bucket:   v.Bucket,
@@ -564,14 +566,24 @@ func (v *S3Volume) EmptyTrash() {
                if !v.isKeepBlock(loc) {
                        continue
                }
+               bytesInTrash += trash.Size
+               blocksInTrash++
+
                trashT, err := time.Parse(time.RFC3339, trash.LastModified)
                if err != nil {
                        log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
                        continue
                }
                recent, err := v.Bucket.Head("recent/"+loc, nil)
-               if err != nil {
-                       log.Printf("warning: %s: EmptyTrash: cannot delete trash %q with no corresponding recent/* marker", v, trash.Key)
+               if err != nil && os.IsNotExist(v.translateError(err)) {
+                       log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
+                       err = v.Untrash(loc)
+                       if err != nil {
+                               log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
+                       }
+                       continue
+               } else if err != nil {
+                       log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
                        continue
                }
                recentT, err := v.lastModified(recent)
@@ -580,7 +592,7 @@ func (v *S3Volume) EmptyTrash() {
                        continue
                }
                if trashT.Sub(recentT) < blobSignatureTTL {
-                       if age := startT.Sub(recentT); age >= blobSignatureTTL - v.raceWindow {
+                       if age := startT.Sub(recentT); age >= blobSignatureTTL-v.raceWindow {
                                // recent/loc is too old to protect
                                // loc from being Trashed again during
                                // the raceWindow that starts if we
@@ -610,6 +622,9 @@ func (v *S3Volume) EmptyTrash() {
                        log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
                        continue
                }
+               bytesDeleted += trash.Size
+               blocksDeleted++
+
                _, err = v.Bucket.Head(loc, nil)
                if os.IsNotExist(err) {
                        err = v.Bucket.Del("recent/" + loc)
@@ -623,6 +638,7 @@ func (v *S3Volume) EmptyTrash() {
        if err := trashL.Error(); err != nil {
                log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
        }
+       log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
 }
 
 type s3Lister struct {