Merge branch '15209-python-arv-deps-pinned'
[arvados.git] / services / keepstore / s3_volume.go
index a60b2fc27e321f553c9784691702282ecb39a6e4..4c39dcd5c4f12fc9a8b8ad36d165545af952fb7a 100644 (file)
@@ -7,6 +7,7 @@ package main
 import (
        "bytes"
        "context"
+       "crypto/sha256"
        "encoding/base64"
        "encoding/hex"
        "flag"
@@ -18,11 +19,13 @@ import (
        "regexp"
        "strings"
        "sync"
+       "sync/atomic"
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/AdRoll/goamz/aws"
        "github.com/AdRoll/goamz/s3"
+       "github.com/prometheus/client_golang/prometheus"
 )
 
 const (
@@ -196,7 +199,7 @@ func (*S3Volume) Type() string {
 
 // Start populates private fields and verifies the configuration is
 // valid.
-func (v *S3Volume) Start() error {
+func (v *S3Volume) Start(vm *volumeMetricsVecs) error {
        region, ok := aws.Regions[v.Region]
        if v.Endpoint == "" {
                if !ok {
@@ -234,6 +237,10 @@ func (v *S3Volume) Start() error {
        }
 
        client := s3.New(auth, region)
+       if region.EC2Endpoint.Signer == aws.V4Signature {
+               // Currently affects only eu-central-1
+               client.Signature = aws.V4Signature
+       }
        client.ConnectTimeout = time.Duration(v.ConnectTimeout)
        client.ReadTimeout = time.Duration(v.ReadTimeout)
        v.bucket = &s3bucket{
@@ -242,6 +249,10 @@ func (v *S3Volume) Start() error {
                        Name: v.Bucket,
                },
        }
+       // Set up prometheus metrics
+       lbls := prometheus.Labels{"device_id": v.DeviceID()}
+       v.bucket.stats.opsCounters, v.bucket.stats.errCounters, v.bucket.stats.ioBytes = vm.getCounterVecsFor(lbls)
+
        return nil
 }
 
@@ -397,6 +408,14 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
                        return err
                }
                opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
+               // In AWS regions that use V4 signatures, we need to
+               // provide ContentSHA256 up front. Otherwise, the S3
+               // library reads the request body (from our buffer)
+               // into another new buffer in order to compute the
+               // SHA256 before sending the request -- which would
+               // mean consuming 128 MiB of memory for the duration
+               // of a 64 MiB write.
+               opts.ContentSHA256 = fmt.Sprintf("%x", sha256.Sum256(block))
        }
 
        // Send the block data through a pipe, so that (if we need to)
@@ -428,7 +447,7 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
        case <-ctx.Done():
                theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
                // Our pipe might be stuck in Write(), waiting for
-               // io.Copy() to read. If so, un-stick it. This means
+               // PutReader() to read. If so, un-stick it. This means
                // PutReader will get corrupt data, but that's OK: the
                // size and MD5 won't match, so the write will fail.
                go io.Copy(ioutil.Discard, bufr)
@@ -437,6 +456,8 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
                theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
                return ctx.Err()
        case <-ready:
+               // Unblock pipe in case PutReader did not consume it.
+               io.Copy(ioutil.Discard, bufr)
                return v.translateError(err)
        }
 }
@@ -494,16 +515,15 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
                Bucket:   v.bucket.Bucket,
                Prefix:   prefix,
                PageSize: v.IndexPageSize,
+               Stats:    &v.bucket.stats,
        }
        recentL := s3Lister{
                Bucket:   v.bucket.Bucket,
                Prefix:   "recent/" + prefix,
                PageSize: v.IndexPageSize,
+               Stats:    &v.bucket.stats,
        }
-       v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
-       v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
-       for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
-               v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
+       for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
                if data.Key >= "g" {
                        // Conveniently, "recent/*" and "trash/*" are
                        // lexically greater than all hex-encoded data
@@ -522,15 +542,13 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
                stamp := data
 
                // Advance to the corresponding recent/X marker, if any
-               for recent != nil {
+               for recent != nil && recentL.Error() == nil {
                        if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
                                recent = recentL.Next()
-                               v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
                                continue
                        } else if cmp == 0 {
                                stamp = recent
                                recent = recentL.Next()
-                               v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
                                break
                        } else {
                                // recent/X marker is missing: we'll
@@ -539,13 +557,16 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
                                break
                        }
                }
+               if err := recentL.Error(); err != nil {
+                       return err
+               }
                t, err := time.Parse(time.RFC3339, stamp.LastModified)
                if err != nil {
                        return err
                }
                fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
        }
-       return nil
+       return dataL.Error()
 }
 
 // Trash a Keep block.
@@ -617,8 +638,10 @@ func (v *S3Volume) safeCopy(dst, src string) error {
                MetadataDirective: "REPLACE",
        }, v.bucket.Name+"/"+src)
        err = v.translateError(err)
-       if err != nil {
+       if os.IsNotExist(err) {
                return err
+       } else if err != nil {
+               return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.Name+"/"+src, err)
        }
        if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
                return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
@@ -764,26 +787,21 @@ func (v *S3Volume) translateError(err error) error {
 func (v *S3Volume) EmptyTrash() {
        var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
 
-       // Use a merge sort to find matching sets of trash/X and recent/X.
-       trashL := s3Lister{
-               Bucket:   v.bucket.Bucket,
-               Prefix:   "trash/",
-               PageSize: v.IndexPageSize,
-       }
        // Define "ready to delete" as "...when EmptyTrash started".
        startT := time.Now()
-       for trash := trashL.First(); trash != nil; trash = trashL.Next() {
+
+       emptyOneKey := func(trash *s3.Key) {
                loc := trash.Key[6:]
                if !v.isKeepBlock(loc) {
-                       continue
+                       return
                }
-               bytesInTrash += trash.Size
-               blocksInTrash++
+               atomic.AddInt64(&bytesInTrash, trash.Size)
+               atomic.AddInt64(&blocksInTrash, 1)
 
                trashT, err := time.Parse(time.RFC3339, trash.LastModified)
                if err != nil {
                        log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
-                       continue
+                       return
                }
                recent, err := v.bucket.Head("recent/"+loc, nil)
                if err != nil && os.IsNotExist(v.translateError(err)) {
@@ -792,15 +810,15 @@ func (v *S3Volume) EmptyTrash() {
                        if err != nil {
                                log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
                        }
-                       continue
+                       return
                } else if err != nil {
                        log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
-                       continue
+                       return
                }
                recentT, err := v.lastModified(recent)
                if err != nil {
                        log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
-                       continue
+                       return
                }
                if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
                        if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
@@ -815,39 +833,68 @@ func (v *S3Volume) EmptyTrash() {
                                log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
                                v.fixRace(loc)
                                v.Touch(loc)
-                               continue
+                               return
                        }
                        _, err := v.bucket.Head(loc, nil)
                        if os.IsNotExist(err) {
                                log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
                                v.fixRace(loc)
-                               continue
+                               return
                        } else if err != nil {
                                log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
-                               continue
+                               return
                        }
                }
                if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
-                       continue
+                       return
                }
                err = v.bucket.Del(trash.Key)
                if err != nil {
                        log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
-                       continue
+                       return
                }
-               bytesDeleted += trash.Size
-               blocksDeleted++
+               atomic.AddInt64(&bytesDeleted, trash.Size)
+               atomic.AddInt64(&blocksDeleted, 1)
 
                _, err = v.bucket.Head(loc, nil)
-               if os.IsNotExist(err) {
-                       err = v.bucket.Del("recent/" + loc)
-                       if err != nil {
-                               log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
-                       }
-               } else if err != nil {
-                       log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
+               if err == nil {
+                       log.Printf("warning: %s: EmptyTrash: HEAD %q succeeded immediately after deleting %q", v, loc, loc)
+                       return
+               }
+               if !os.IsNotExist(v.translateError(err)) {
+                       log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
+                       return
+               }
+               err = v.bucket.Del("recent/" + loc)
+               if err != nil {
+                       log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
                }
        }
+
+       var wg sync.WaitGroup
+       todo := make(chan *s3.Key, theConfig.EmptyTrashWorkers)
+       for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       for key := range todo {
+                               emptyOneKey(key)
+                       }
+               }()
+       }
+
+       trashL := s3Lister{
+               Bucket:   v.bucket.Bucket,
+               Prefix:   "trash/",
+               PageSize: v.IndexPageSize,
+               Stats:    &v.bucket.stats,
+       }
+       for trash := trashL.First(); trash != nil; trash = trashL.Next() {
+               todo <- trash
+       }
+       close(todo)
+       wg.Wait()
+
        if err := trashL.Error(); err != nil {
                log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
        }
@@ -858,6 +905,7 @@ type s3Lister struct {
        Bucket     *s3.Bucket
        Prefix     string
        PageSize   int
+       Stats      *s3bucketStats
        nextMarker string
        buf        []s3.Key
        err        error
@@ -886,6 +934,8 @@ func (lister *s3Lister) Error() error {
 }
 
 func (lister *s3Lister) getPage() {
+       lister.Stats.TickOps("list")
+       lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
        resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
        lister.nextMarker = ""
        if err != nil {
@@ -921,6 +971,7 @@ type s3bucket struct {
 
 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
        rdr, err := b.Bucket.GetReader(path)
+       b.stats.TickOps("get")
        b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
        b.stats.TickErr(err)
        return NewCountingReader(rdr, b.stats.TickInBytes), err
@@ -928,6 +979,7 @@ func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
 
 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
        resp, err := b.Bucket.Head(path, headers)
+       b.stats.TickOps("head")
        b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
        b.stats.TickErr(err)
        return resp, err
@@ -946,6 +998,7 @@ func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType st
                r = NewCountingReader(r, b.stats.TickOutBytes)
        }
        err := b.Bucket.PutReader(path, r, length, contType, perm, options)
+       b.stats.TickOps("put")
        b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
        b.stats.TickErr(err)
        return err
@@ -953,6 +1006,7 @@ func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType st
 
 func (b *s3bucket) Del(path string) error {
        err := b.Bucket.Del(path)
+       b.stats.TickOps("delete")
        b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
        b.stats.TickErr(err)
        return err