14360: Merge branch 'master'
[arvados.git] / services / keepstore / s3_volume.go
index 532a0823e2da2aefa5120a5fa2d5326e557bfa5e..fb978fe2ba41fbdf9895c0c718d2ca6c925d5f9c 100644 (file)
@@ -7,6 +7,7 @@ package main
 import (
        "bytes"
        "context"
+       "crypto/sha256"
        "encoding/base64"
        "encoding/hex"
        "flag"
@@ -235,6 +236,10 @@ func (v *S3Volume) Start() error {
        }
 
        client := s3.New(auth, region)
+       if region.EC2Endpoint.Signer == aws.V4Signature {
+               // Currently affects only eu-central-1
+               client.Signature = aws.V4Signature
+       }
        client.ConnectTimeout = time.Duration(v.ConnectTimeout)
        client.ReadTimeout = time.Duration(v.ReadTimeout)
        v.bucket = &s3bucket{
@@ -398,6 +403,14 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
                        return err
                }
                opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
+               // In AWS regions that use V4 signatures, we need to
+               // provide ContentSHA256 up front. Otherwise, the S3
+               // library reads the request body (from our buffer)
+               // into another new buffer in order to compute the
+               // SHA256 before sending the request -- which would
+               // mean consuming 128 MiB of memory for the duration
+               // of a 64 MiB write.
+               opts.ContentSHA256 = fmt.Sprintf("%x", sha256.Sum256(block))
        }
 
        // Send the block data through a pipe, so that (if we need to)
@@ -429,7 +442,7 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
        case <-ctx.Done():
                theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
                // Our pipe might be stuck in Write(), waiting for
-               // io.Copy() to read. If so, un-stick it. This means
+               // PutReader() to read. If so, un-stick it. This means
                // PutReader will get corrupt data, but that's OK: the
                // size and MD5 won't match, so the write will fail.
                go io.Copy(ioutil.Discard, bufr)
@@ -438,6 +451,8 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
                theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
                return ctx.Err()
        case <-ready:
+               // Unblock pipe in case PutReader did not consume it.
+               io.Copy(ioutil.Discard, bufr)
                return v.translateError(err)
        }
 }
@@ -495,16 +510,15 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
                Bucket:   v.bucket.Bucket,
                Prefix:   prefix,
                PageSize: v.IndexPageSize,
+               Stats:    &v.bucket.stats,
        }
        recentL := s3Lister{
                Bucket:   v.bucket.Bucket,
                Prefix:   "recent/" + prefix,
                PageSize: v.IndexPageSize,
+               Stats:    &v.bucket.stats,
        }
-       v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
-       v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
-       for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
-               v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
+       for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
                if data.Key >= "g" {
                        // Conveniently, "recent/*" and "trash/*" are
                        // lexically greater than all hex-encoded data
@@ -523,15 +537,13 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
                stamp := data
 
                // Advance to the corresponding recent/X marker, if any
-               for recent != nil {
+               for recent != nil && recentL.Error() == nil {
                        if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
                                recent = recentL.Next()
-                               v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
                                continue
                        } else if cmp == 0 {
                                stamp = recent
                                recent = recentL.Next()
-                               v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
                                break
                        } else {
                                // recent/X marker is missing: we'll
@@ -540,13 +552,16 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
                                break
                        }
                }
+               if err := recentL.Error(); err != nil {
+                       return err
+               }
                t, err := time.Parse(time.RFC3339, stamp.LastModified)
                if err != nil {
                        return err
                }
                fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
        }
-       return nil
+       return dataL.Error()
 }
 
 // Trash a Keep block.
@@ -618,8 +633,10 @@ func (v *S3Volume) safeCopy(dst, src string) error {
                MetadataDirective: "REPLACE",
        }, v.bucket.Name+"/"+src)
        err = v.translateError(err)
-       if err != nil {
+       if os.IsNotExist(err) {
                return err
+       } else if err != nil {
+               return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.Name+"/"+src, err)
        }
        if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
                return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
@@ -835,18 +852,22 @@ func (v *S3Volume) EmptyTrash() {
                atomic.AddInt64(&blocksDeleted, 1)
 
                _, err = v.bucket.Head(loc, nil)
-               if os.IsNotExist(err) {
-                       err = v.bucket.Del("recent/" + loc)
-                       if err != nil {
-                               log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
-                       }
-               } else if err != nil {
-                       log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
+               if err == nil {
+                       log.Printf("warning: %s: EmptyTrash: HEAD %q succeeded immediately after deleting %q", v, loc, loc)
+                       return
+               }
+               if !os.IsNotExist(v.translateError(err)) {
+                       log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
+                       return
+               }
+               err = v.bucket.Del("recent/" + loc)
+               if err != nil {
+                       log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
                }
        }
 
        var wg sync.WaitGroup
-       todo := make(chan *s3.Key)
+       todo := make(chan *s3.Key, theConfig.EmptyTrashWorkers)
        for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
                wg.Add(1)
                go func() {
@@ -861,6 +882,7 @@ func (v *S3Volume) EmptyTrash() {
                Bucket:   v.bucket.Bucket,
                Prefix:   "trash/",
                PageSize: v.IndexPageSize,
+               Stats:    &v.bucket.stats,
        }
        for trash := trashL.First(); trash != nil; trash = trashL.Next() {
                todo <- trash
@@ -878,6 +900,7 @@ type s3Lister struct {
        Bucket     *s3.Bucket
        Prefix     string
        PageSize   int
+       Stats      *s3bucketStats
        nextMarker string
        buf        []s3.Key
        err        error
@@ -906,6 +929,7 @@ func (lister *s3Lister) Error() error {
 }
 
 func (lister *s3Lister) getPage() {
+       lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
        resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
        lister.nextMarker = ""
        if err != nil {