13937: Removes the 'any' operation type for a better way of testing driver labels
[arvados.git] / services / keepstore / s3_volume.go
index ae1342512171419d93052c97209cca1b9e7cf73a..4c39dcd5c4f12fc9a8b8ad36d165545af952fb7a 100644 (file)
@@ -7,6 +7,7 @@ package main
 import (
        "bytes"
        "context"
+       "crypto/sha256"
        "encoding/base64"
        "encoding/hex"
        "flag"
@@ -24,6 +25,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/AdRoll/goamz/aws"
        "github.com/AdRoll/goamz/s3"
+       "github.com/prometheus/client_golang/prometheus"
 )
 
 const (
@@ -197,7 +199,7 @@ func (*S3Volume) Type() string {
 
 // Start populates private fields and verifies the configuration is
 // valid.
-func (v *S3Volume) Start() error {
+func (v *S3Volume) Start(vm *volumeMetricsVecs) error {
        region, ok := aws.Regions[v.Region]
        if v.Endpoint == "" {
                if !ok {
@@ -247,6 +249,10 @@ func (v *S3Volume) Start() error {
                        Name: v.Bucket,
                },
        }
+       // Set up prometheus metrics
+       lbls := prometheus.Labels{"device_id": v.DeviceID()}
+       v.bucket.stats.opsCounters, v.bucket.stats.errCounters, v.bucket.stats.ioBytes = vm.getCounterVecsFor(lbls)
+
        return nil
 }
 
@@ -402,6 +408,14 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
                        return err
                }
                opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
+               // In AWS regions that use V4 signatures, we need to
+               // provide ContentSHA256 up front. Otherwise, the S3
+               // library reads the request body (from our buffer)
+               // into another new buffer in order to compute the
+               // SHA256 before sending the request -- which would
+               // mean consuming 128 MiB of memory for the duration
+               // of a 64 MiB write.
+               opts.ContentSHA256 = fmt.Sprintf("%x", sha256.Sum256(block))
        }
 
        // Send the block data through a pipe, so that (if we need to)
@@ -501,16 +515,15 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
                Bucket:   v.bucket.Bucket,
                Prefix:   prefix,
                PageSize: v.IndexPageSize,
+               Stats:    &v.bucket.stats,
        }
        recentL := s3Lister{
                Bucket:   v.bucket.Bucket,
                Prefix:   "recent/" + prefix,
                PageSize: v.IndexPageSize,
+               Stats:    &v.bucket.stats,
        }
-       v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
-       v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
-       for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
-               v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
+       for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
                if data.Key >= "g" {
                        // Conveniently, "recent/*" and "trash/*" are
                        // lexically greater than all hex-encoded data
@@ -529,15 +542,13 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
                stamp := data
 
                // Advance to the corresponding recent/X marker, if any
-               for recent != nil {
+               for recent != nil && recentL.Error() == nil {
                        if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
                                recent = recentL.Next()
-                               v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
                                continue
                        } else if cmp == 0 {
                                stamp = recent
                                recent = recentL.Next()
-                               v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
                                break
                        } else {
                                // recent/X marker is missing: we'll
@@ -546,13 +557,16 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
                                break
                        }
                }
+               if err := recentL.Error(); err != nil {
+                       return err
+               }
                t, err := time.Parse(time.RFC3339, stamp.LastModified)
                if err != nil {
                        return err
                }
                fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
        }
-       return nil
+       return dataL.Error()
 }
 
 // Trash a Keep block.
@@ -873,6 +887,7 @@ func (v *S3Volume) EmptyTrash() {
                Bucket:   v.bucket.Bucket,
                Prefix:   "trash/",
                PageSize: v.IndexPageSize,
+               Stats:    &v.bucket.stats,
        }
        for trash := trashL.First(); trash != nil; trash = trashL.Next() {
                todo <- trash
@@ -890,6 +905,7 @@ type s3Lister struct {
        Bucket     *s3.Bucket
        Prefix     string
        PageSize   int
+       Stats      *s3bucketStats
        nextMarker string
        buf        []s3.Key
        err        error
@@ -918,6 +934,8 @@ func (lister *s3Lister) Error() error {
 }
 
 func (lister *s3Lister) getPage() {
+       lister.Stats.TickOps("list")
+       lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
        resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
        lister.nextMarker = ""
        if err != nil {
@@ -953,6 +971,7 @@ type s3bucket struct {
 
 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
        rdr, err := b.Bucket.GetReader(path)
+       b.stats.TickOps("get")
        b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
        b.stats.TickErr(err)
        return NewCountingReader(rdr, b.stats.TickInBytes), err
@@ -960,6 +979,7 @@ func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
 
 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
        resp, err := b.Bucket.Head(path, headers)
+       b.stats.TickOps("head")
        b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
        b.stats.TickErr(err)
        return resp, err
@@ -978,6 +998,7 @@ func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType st
                r = NewCountingReader(r, b.stats.TickOutBytes)
        }
        err := b.Bucket.PutReader(path, r, length, contType, perm, options)
+       b.stats.TickOps("put")
        b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
        b.stats.TickErr(err)
        return err
@@ -985,6 +1006,7 @@ func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType st
 
 func (b *s3bucket) Del(path string) error {
        err := b.Bucket.Del(path)
+       b.stats.TickOps("delete")
        b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
        b.stats.TickErr(err)
        return err