14360: Merge branch 'master'
[arvados.git] / services / keepstore / s3_volume.go
index 61e69f9096503eb88cf9328af5e85f129dba4226..fb978fe2ba41fbdf9895c0c718d2ca6c925d5f9c 100644 (file)
@@ -7,6 +7,7 @@ package main
 import (
        "bytes"
        "context"
+       "crypto/sha256"
        "encoding/base64"
        "encoding/hex"
        "flag"
@@ -18,12 +19,12 @@ import (
        "regexp"
        "strings"
        "sync"
+       "sync/atomic"
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/AdRoll/goamz/aws"
        "github.com/AdRoll/goamz/s3"
-       log "github.com/Sirupsen/logrus"
 )
 
 const (
@@ -153,6 +154,7 @@ type S3Volume struct {
        RaceWindow         arvados.Duration
        ReadOnly           bool
        UnsafeDelete       bool
+       StorageClasses     []string
 
        bucket *s3bucket
 
@@ -234,6 +236,10 @@ func (v *S3Volume) Start() error {
        }
 
        client := s3.New(auth, region)
+       if region.EC2Endpoint.Signer == aws.V4Signature {
+               // Currently affects only eu-central-1
+               client.Signature = aws.V4Signature
+       }
        client.ConnectTimeout = time.Duration(v.ConnectTimeout)
        client.ReadTimeout = time.Duration(v.ReadTimeout)
        v.bucket = &s3bucket{
@@ -397,6 +403,14 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
                        return err
                }
                opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
+               // In AWS regions that use V4 signatures, we need to
+               // provide ContentSHA256 up front. Otherwise, the S3
+               // library reads the request body (from our buffer)
+               // into another new buffer in order to compute the
+               // SHA256 before sending the request -- which would
+               // mean consuming 128 MiB of memory for the duration
+               // of a 64 MiB write.
+               opts.ContentSHA256 = fmt.Sprintf("%x", sha256.Sum256(block))
        }
 
        // Send the block data through a pipe, so that (if we need to)
@@ -428,7 +442,7 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
        case <-ctx.Done():
                theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
                // Our pipe might be stuck in Write(), waiting for
-               // io.Copy() to read. If so, un-stick it. This means
+               // PutReader() to read. If so, un-stick it. This means
                // PutReader will get corrupt data, but that's OK: the
                // size and MD5 won't match, so the write will fail.
                go io.Copy(ioutil.Discard, bufr)
@@ -437,6 +451,8 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
                theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
                return ctx.Err()
        case <-ready:
+               // Unblock pipe in case PutReader did not consume it.
+               io.Copy(ioutil.Discard, bufr)
                return v.translateError(err)
        }
 }
@@ -494,16 +510,15 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
                Bucket:   v.bucket.Bucket,
                Prefix:   prefix,
                PageSize: v.IndexPageSize,
+               Stats:    &v.bucket.stats,
        }
        recentL := s3Lister{
                Bucket:   v.bucket.Bucket,
                Prefix:   "recent/" + prefix,
                PageSize: v.IndexPageSize,
+               Stats:    &v.bucket.stats,
        }
-       v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
-       v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
-       for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
-               v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
+       for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
                if data.Key >= "g" {
                        // Conveniently, "recent/*" and "trash/*" are
                        // lexically greater than all hex-encoded data
@@ -522,15 +537,13 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
                stamp := data
 
                // Advance to the corresponding recent/X marker, if any
-               for recent != nil {
+               for recent != nil && recentL.Error() == nil {
                        if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
                                recent = recentL.Next()
-                               v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
                                continue
                        } else if cmp == 0 {
                                stamp = recent
                                recent = recentL.Next()
-                               v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
                                break
                        } else {
                                // recent/X marker is missing: we'll
@@ -539,13 +552,16 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
                                break
                        }
                }
+               if err := recentL.Error(); err != nil {
+                       return err
+               }
                t, err := time.Parse(time.RFC3339, stamp.LastModified)
                if err != nil {
                        return err
                }
                fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
        }
-       return nil
+       return dataL.Error()
 }
 
 // Trash a Keep block.
@@ -617,8 +633,10 @@ func (v *S3Volume) safeCopy(dst, src string) error {
                MetadataDirective: "REPLACE",
        }, v.bucket.Name+"/"+src)
        err = v.translateError(err)
-       if err != nil {
+       if os.IsNotExist(err) {
                return err
+       } else if err != nil {
+               return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.Name+"/"+src, err)
        }
        if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
                return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
@@ -687,6 +705,11 @@ func (v *S3Volume) Replication() int {
        return v.S3Replication
 }
 
+// GetStorageClasses implements Volume
+func (v *S3Volume) GetStorageClasses() []string {
+       return v.StorageClasses
+}
+
 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
 
 func (v *S3Volume) isKeepBlock(s string) bool {
@@ -759,26 +782,21 @@ func (v *S3Volume) translateError(err error) error {
 func (v *S3Volume) EmptyTrash() {
        var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
 
-       // Use a merge sort to find matching sets of trash/X and recent/X.
-       trashL := s3Lister{
-               Bucket:   v.bucket.Bucket,
-               Prefix:   "trash/",
-               PageSize: v.IndexPageSize,
-       }
        // Define "ready to delete" as "...when EmptyTrash started".
        startT := time.Now()
-       for trash := trashL.First(); trash != nil; trash = trashL.Next() {
+
+       emptyOneKey := func(trash *s3.Key) {
                loc := trash.Key[6:]
                if !v.isKeepBlock(loc) {
-                       continue
+                       return
                }
-               bytesInTrash += trash.Size
-               blocksInTrash++
+               atomic.AddInt64(&bytesInTrash, trash.Size)
+               atomic.AddInt64(&blocksInTrash, 1)
 
                trashT, err := time.Parse(time.RFC3339, trash.LastModified)
                if err != nil {
                        log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
-                       continue
+                       return
                }
                recent, err := v.bucket.Head("recent/"+loc, nil)
                if err != nil && os.IsNotExist(v.translateError(err)) {
@@ -787,15 +805,15 @@ func (v *S3Volume) EmptyTrash() {
                        if err != nil {
                                log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
                        }
-                       continue
+                       return
                } else if err != nil {
                        log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
-                       continue
+                       return
                }
                recentT, err := v.lastModified(recent)
                if err != nil {
                        log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
-                       continue
+                       return
                }
                if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
                        if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
@@ -810,39 +828,68 @@ func (v *S3Volume) EmptyTrash() {
                                log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
                                v.fixRace(loc)
                                v.Touch(loc)
-                               continue
+                               return
                        }
                        _, err := v.bucket.Head(loc, nil)
                        if os.IsNotExist(err) {
                                log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
                                v.fixRace(loc)
-                               continue
+                               return
                        } else if err != nil {
                                log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
-                               continue
+                               return
                        }
                }
                if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
-                       continue
+                       return
                }
                err = v.bucket.Del(trash.Key)
                if err != nil {
                        log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
-                       continue
+                       return
                }
-               bytesDeleted += trash.Size
-               blocksDeleted++
+               atomic.AddInt64(&bytesDeleted, trash.Size)
+               atomic.AddInt64(&blocksDeleted, 1)
 
                _, err = v.bucket.Head(loc, nil)
-               if os.IsNotExist(err) {
-                       err = v.bucket.Del("recent/" + loc)
-                       if err != nil {
-                               log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
-                       }
-               } else if err != nil {
-                       log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
+               if err == nil {
+                       log.Printf("warning: %s: EmptyTrash: HEAD %q succeeded immediately after deleting %q", v, loc, loc)
+                       return
+               }
+               if !os.IsNotExist(v.translateError(err)) {
+                       log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
+                       return
+               }
+               err = v.bucket.Del("recent/" + loc)
+               if err != nil {
+                       log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
                }
        }
+
+       var wg sync.WaitGroup
+       todo := make(chan *s3.Key, theConfig.EmptyTrashWorkers)
+       for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       for key := range todo {
+                               emptyOneKey(key)
+                       }
+               }()
+       }
+
+       trashL := s3Lister{
+               Bucket:   v.bucket.Bucket,
+               Prefix:   "trash/",
+               PageSize: v.IndexPageSize,
+               Stats:    &v.bucket.stats,
+       }
+       for trash := trashL.First(); trash != nil; trash = trashL.Next() {
+               todo <- trash
+       }
+       close(todo)
+       wg.Wait()
+
        if err := trashL.Error(); err != nil {
                log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
        }
@@ -853,6 +900,7 @@ type s3Lister struct {
        Bucket     *s3.Bucket
        Prefix     string
        PageSize   int
+       Stats      *s3bucketStats
        nextMarker string
        buf        []s3.Key
        err        error
@@ -881,6 +929,7 @@ func (lister *s3Lister) Error() error {
 }
 
 func (lister *s3Lister) getPage() {
+       lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
        resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
        lister.nextMarker = ""
        if err != nil {