19620: Remove old "v1" S3 keepstore driver.
authorTom Clegg <tom@curii.com>
Thu, 13 Apr 2023 21:23:33 +0000 (17:23 -0400)
committerTom Clegg <tom@curii.com>
Thu, 13 Apr 2023 21:23:33 +0000 (17:23 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

doc/admin/upgrading.html.textile.liquid
doc/install/configure-s3-object-storage.html.textile.liquid
lib/config/config.default.yml
sdk/go/arvados/config.go
services/keepstore/s3_volume.go [deleted file]
services/keepstore/s3_volume_test.go [deleted file]
services/keepstore/s3aws_volume.go
tools/salt-install/config_examples/multi_host/aws/pillars/arvados.sls

index 96966c942f6a684dd27848e81dc3d2536ed67788..026931ed08ee8ccb8cf81f74363ffc6677d60306 100644 (file)
@@ -32,6 +32,10 @@ h2(#main). development main (as of 2023-04-06)
 
 "previous: Upgrading to 2.6.0":#v2_6_0
 
+h3. UseAWSS3v2Driver option removed
+
+The old "v1" S3 driver for keepstore has been removed. The new "v2" implementation, which has been the default since Arvados 2.5.0, is always used. The @Volumes.*.DriverParameters.UseAWSS3v2Driver@ configuration key is no longer recognized. If your config file uses it, remove it to avoid warning messages at startup.
+
 h2(#v2_6_0). v2.6.0 (2023-04-06)
 
 "previous: Upgrading to 2.5.0":#v2_5_0
index b4e0c1a312fd1b1feb3d0bf27fa9a05b0bc416dc..31ad994f0b7a7f9d674a8679fab8c75f1b446603 100644 (file)
@@ -70,9 +70,6 @@ h2(#example). Configuration example
           # might be needed for other S3-compatible services.
           V2Signature: false
 
-          # Use the AWS S3 v2 Go driver instead of the goamz driver.
-          UseAWSS3v2Driver: false
-
           # By default keepstore stores data using the MD5 checksum
           # (32 hexadecimal characters) as the object name, e.g.,
           # "0123456abc...". Setting PrefixLength to 3 changes this
@@ -121,12 +118,6 @@ h2(#example). Configuration example
         StorageClasses: null
 </code></pre></notextile>
 
-Two S3 drivers are available. Historically, Arvados has used the @goamz@ driver to talk to S3-compatible services. More recently, support for the @aws-sdk-go-v2@ driver was added. This driver can be activated by setting the @UseAWSS3v2Driver@ flag to @true@.
-
-The @aws-sdk-go-v2@ does not support the old S3 v2 signing algorithm. This will not affect interacting with AWS S3, but it might be an issue when Keep is backed by a very old version of a third party S3-compatible service.
-
-The @aws-sdk-go-v2@ driver can improve read performance by 50-100% over the @goamz@ driver, but it has not had as much production use. See the "wiki":https://dev.arvados.org/projects/arvados/wiki/Keep_real_world_performance_numbers for details.
-
 h2(#IAM). IAM Policy
 
 On Amazon, VMs which will access the S3 bucket (these include keepstore and compute nodes) will need an IAM policy with "permission that can read, write, list and delete objects in the bucket":https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create.html .  Here is an example policy:
index 5684723a20cf2497d99071d4a2d7a757de4d1471..af8841265c479444f501e8ac03e5ef02fa1dde42 100644 (file)
@@ -1587,8 +1587,6 @@ Clusters:
           ReadTimeout: 10m
           RaceWindow: 24h
           PrefixLength: 0
-          # Use aws-s3-go (v2) instead of goamz
-          UseAWSS3v2Driver: true
 
           # For S3 driver, potentially unsafe tuning parameter,
           # intentionally excluded from main documentation.
index 4466b0a4deffda52a71d9084dc324c1a1df807e6..01e9902c8663938d1b1683654bf84fa867d3f8ae 100644 (file)
@@ -324,7 +324,6 @@ type S3VolumeDriverParameters struct {
        Bucket             string
        LocationConstraint bool
        V2Signature        bool
-       UseAWSS3v2Driver   bool
        IndexPageSize      int
        ConnectTimeout     Duration
        ReadTimeout        Duration
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
deleted file mode 100644 (file)
index 7873764..0000000
+++ /dev/null
@@ -1,1084 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package keepstore
-
-import (
-       "bufio"
-       "bytes"
-       "context"
-       "crypto/sha256"
-       "encoding/base64"
-       "encoding/hex"
-       "encoding/json"
-       "errors"
-       "fmt"
-       "io"
-       "io/ioutil"
-       "net/http"
-       "os"
-       "regexp"
-       "strings"
-       "sync"
-       "sync/atomic"
-       "time"
-
-       "git.arvados.org/arvados.git/sdk/go/arvados"
-       "github.com/AdRoll/goamz/aws"
-       "github.com/AdRoll/goamz/s3"
-       "github.com/prometheus/client_golang/prometheus"
-       "github.com/sirupsen/logrus"
-)
-
-func init() {
-       driver["S3"] = chooseS3VolumeDriver
-}
-
-func newS3Volume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
-       v := &S3Volume{cluster: cluster, volume: volume, metrics: metrics}
-       err := json.Unmarshal(volume.DriverParameters, v)
-       if err != nil {
-               return nil, err
-       }
-       v.logger = logger.WithField("Volume", v.String())
-       return v, v.check()
-}
-
-func (v *S3Volume) check() error {
-       if v.Bucket == "" {
-               return errors.New("DriverParameters: Bucket must be provided")
-       }
-       if v.IndexPageSize == 0 {
-               v.IndexPageSize = 1000
-       }
-       if v.RaceWindow < 0 {
-               return errors.New("DriverParameters: RaceWindow must not be negative")
-       }
-
-       if v.Endpoint == "" {
-               r, ok := aws.Regions[v.Region]
-               if !ok {
-                       return fmt.Errorf("unrecognized region %+q; try specifying endpoint instead", v.Region)
-               }
-               v.region = r
-       } else {
-               v.region = aws.Region{
-                       Name:                 v.Region,
-                       S3Endpoint:           v.Endpoint,
-                       S3LocationConstraint: v.LocationConstraint,
-               }
-       }
-
-       // Zero timeouts mean "wait forever", which is a bad
-       // default. Default to long timeouts instead.
-       if v.ConnectTimeout == 0 {
-               v.ConnectTimeout = s3DefaultConnectTimeout
-       }
-       if v.ReadTimeout == 0 {
-               v.ReadTimeout = s3DefaultReadTimeout
-       }
-
-       v.bucket = &s3bucket{
-               bucket: &s3.Bucket{
-                       S3:   v.newS3Client(),
-                       Name: v.Bucket,
-               },
-       }
-       // Set up prometheus metrics
-       lbls := prometheus.Labels{"device_id": v.GetDeviceID()}
-       v.bucket.stats.opsCounters, v.bucket.stats.errCounters, v.bucket.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
-
-       err := v.bootstrapIAMCredentials()
-       if err != nil {
-               return fmt.Errorf("error getting IAM credentials: %s", err)
-       }
-
-       return nil
-}
-
-const (
-       s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
-       s3DefaultConnectTimeout = arvados.Duration(time.Minute)
-)
-
-var (
-       // ErrS3TrashDisabled is returned by Trash if that operation
-       // is impossible with the current config.
-       ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false")
-
-       s3ACL = s3.Private
-
-       zeroTime time.Time
-)
-
-const (
-       maxClockSkew  = 600 * time.Second
-       nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
-)
-
-func s3regions() (okList []string) {
-       for r := range aws.Regions {
-               okList = append(okList, r)
-       }
-       return
-}
-
-// S3Volume implements Volume using an S3 bucket.
-type S3Volume struct {
-       arvados.S3VolumeDriverParameters
-       AuthToken      string    // populated automatically when IAMRole is used
-       AuthExpiration time.Time // populated automatically when IAMRole is used
-
-       cluster   *arvados.Cluster
-       volume    arvados.Volume
-       logger    logrus.FieldLogger
-       metrics   *volumeMetricsVecs
-       bucket    *s3bucket
-       region    aws.Region
-       startOnce sync.Once
-}
-
-// GetDeviceID returns a globally unique ID for the storage bucket.
-func (v *S3Volume) GetDeviceID() string {
-       return "s3://" + v.Endpoint + "/" + v.Bucket
-}
-
-func (v *S3Volume) bootstrapIAMCredentials() error {
-       if v.AccessKeyID != "" || v.SecretAccessKey != "" {
-               if v.IAMRole != "" {
-                       return errors.New("invalid DriverParameters: AccessKeyID and SecretAccessKey must be blank if IAMRole is specified")
-               }
-               return nil
-       }
-       ttl, err := v.updateIAMCredentials()
-       if err != nil {
-               return err
-       }
-       go func() {
-               for {
-                       time.Sleep(ttl)
-                       ttl, err = v.updateIAMCredentials()
-                       if err != nil {
-                               v.logger.WithError(err).Warnf("failed to update credentials for IAM role %q", v.IAMRole)
-                               ttl = time.Second
-                       } else if ttl < time.Second {
-                               v.logger.WithField("TTL", ttl).Warnf("received stale credentials for IAM role %q", v.IAMRole)
-                               ttl = time.Second
-                       }
-               }
-       }()
-       return nil
-}
-
-func (v *S3Volume) newS3Client() *s3.S3 {
-       auth := aws.NewAuth(v.AccessKeyID, v.SecretAccessKey, v.AuthToken, v.AuthExpiration)
-       client := s3.New(*auth, v.region)
-       if !v.V2Signature {
-               client.Signature = aws.V4Signature
-       }
-       client.ConnectTimeout = time.Duration(v.ConnectTimeout)
-       client.ReadTimeout = time.Duration(v.ReadTimeout)
-       return client
-}
-
-// returned by AWS metadata endpoint .../security-credentials/${rolename}
-type iamCredentials struct {
-       Code            string
-       LastUpdated     time.Time
-       Type            string
-       AccessKeyID     string
-       SecretAccessKey string
-       Token           string
-       Expiration      time.Time
-}
-
-// Returns TTL of updated credentials, i.e., time to sleep until next
-// update.
-func (v *S3Volume) updateIAMCredentials() (time.Duration, error) {
-       ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
-       defer cancel()
-
-       metadataBaseURL := "http://169.254.169.254/latest/meta-data/iam/security-credentials/"
-
-       var url string
-       if strings.Contains(v.IAMRole, "://") {
-               // Configuration provides complete URL (used by tests)
-               url = v.IAMRole
-       } else if v.IAMRole != "" {
-               // Configuration provides IAM role name and we use the
-               // AWS metadata endpoint
-               url = metadataBaseURL + v.IAMRole
-       } else {
-               url = metadataBaseURL
-               v.logger.WithField("URL", url).Debug("looking up IAM role name")
-               req, err := http.NewRequest("GET", url, nil)
-               if err != nil {
-                       return 0, fmt.Errorf("error setting up request %s: %s", url, err)
-               }
-               resp, err := http.DefaultClient.Do(req.WithContext(ctx))
-               if err != nil {
-                       return 0, fmt.Errorf("error getting %s: %s", url, err)
-               }
-               defer resp.Body.Close()
-               if resp.StatusCode == http.StatusNotFound {
-                       return 0, fmt.Errorf("this instance does not have an IAM role assigned -- either assign a role, or configure AccessKeyID and SecretAccessKey explicitly in DriverParameters (error getting %s: HTTP status %s)", url, resp.Status)
-               } else if resp.StatusCode != http.StatusOK {
-                       return 0, fmt.Errorf("error getting %s: HTTP status %s", url, resp.Status)
-               }
-               body := bufio.NewReader(resp.Body)
-               var role string
-               _, err = fmt.Fscanf(body, "%s\n", &role)
-               if err != nil {
-                       return 0, fmt.Errorf("error reading response from %s: %s", url, err)
-               }
-               if n, _ := body.Read(make([]byte, 64)); n > 0 {
-                       v.logger.Warnf("ignoring additional data returned by metadata endpoint %s after the single role name that we expected", url)
-               }
-               v.logger.WithField("Role", role).Debug("looked up IAM role name")
-               url = url + role
-       }
-
-       v.logger.WithField("URL", url).Debug("getting credentials")
-       req, err := http.NewRequest("GET", url, nil)
-       if err != nil {
-               return 0, fmt.Errorf("error setting up request %s: %s", url, err)
-       }
-       resp, err := http.DefaultClient.Do(req.WithContext(ctx))
-       if err != nil {
-               return 0, fmt.Errorf("error getting %s: %s", url, err)
-       }
-       defer resp.Body.Close()
-       if resp.StatusCode != http.StatusOK {
-               return 0, fmt.Errorf("error getting %s: HTTP status %s", url, resp.Status)
-       }
-       var cred iamCredentials
-       err = json.NewDecoder(resp.Body).Decode(&cred)
-       if err != nil {
-               return 0, fmt.Errorf("error decoding credentials from %s: %s", url, err)
-       }
-       v.AccessKeyID, v.SecretAccessKey, v.AuthToken, v.AuthExpiration = cred.AccessKeyID, cred.SecretAccessKey, cred.Token, cred.Expiration
-       v.bucket.SetBucket(&s3.Bucket{
-               S3:   v.newS3Client(),
-               Name: v.Bucket,
-       })
-       // TTL is time from now to expiration, minus 5m.  "We make new
-       // credentials available at least five minutes before the
-       // expiration of the old credentials."  --
-       // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
-       // (If that's not true, the returned ttl might be zero or
-       // negative, which the caller can handle.)
-       ttl := cred.Expiration.Sub(time.Now()) - 5*time.Minute
-       v.logger.WithFields(logrus.Fields{
-               "AccessKeyID": cred.AccessKeyID,
-               "LastUpdated": cred.LastUpdated,
-               "Expiration":  cred.Expiration,
-               "TTL":         arvados.Duration(ttl),
-       }).Debug("updated credentials")
-       return ttl, nil
-}
-
-func (v *S3Volume) getReaderWithContext(ctx context.Context, key string) (rdr io.ReadCloser, err error) {
-       ready := make(chan bool)
-       go func() {
-               rdr, err = v.getReader(key)
-               close(ready)
-       }()
-       select {
-       case <-ready:
-               return
-       case <-ctx.Done():
-               v.logger.Debugf("s3: abandoning getReader(%s): %s", key, ctx.Err())
-               go func() {
-                       <-ready
-                       if err == nil {
-                               rdr.Close()
-                       }
-               }()
-               return nil, ctx.Err()
-       }
-}
-
-// getReader wraps (Bucket)GetReader.
-//
-// In situations where (Bucket)GetReader would fail because the block
-// disappeared in a Trash race, getReader calls fixRace to recover the
-// data, and tries again.
-func (v *S3Volume) getReader(key string) (rdr io.ReadCloser, err error) {
-       rdr, err = v.bucket.GetReader(key)
-       err = v.translateError(err)
-       if err == nil || !os.IsNotExist(err) {
-               return
-       }
-
-       _, err = v.bucket.Head("recent/"+key, nil)
-       err = v.translateError(err)
-       if err != nil {
-               // If we can't read recent/X, there's no point in
-               // trying fixRace. Give up.
-               return
-       }
-       if !v.fixRace(key) {
-               err = os.ErrNotExist
-               return
-       }
-
-       rdr, err = v.bucket.GetReader(key)
-       if err != nil {
-               v.logger.Warnf("reading %s after successful fixRace: %s", key, err)
-               err = v.translateError(err)
-       }
-       return
-}
-
-// Get a block: copy the block data into buf, and return the number of
-// bytes copied.
-func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
-       key := v.key(loc)
-       rdr, err := v.getReaderWithContext(ctx, key)
-       if err != nil {
-               return 0, err
-       }
-
-       var n int
-       ready := make(chan bool)
-       go func() {
-               defer close(ready)
-
-               defer rdr.Close()
-               n, err = io.ReadFull(rdr, buf)
-
-               switch err {
-               case nil, io.EOF, io.ErrUnexpectedEOF:
-                       err = nil
-               default:
-                       err = v.translateError(err)
-               }
-       }()
-       select {
-       case <-ctx.Done():
-               v.logger.Debugf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
-               rdr.Close()
-               // Must wait for ReadFull to return, to ensure it
-               // doesn't write to buf after we return.
-               v.logger.Debug("s3: waiting for ReadFull() to fail")
-               <-ready
-               return 0, ctx.Err()
-       case <-ready:
-               return n, err
-       }
-}
-
-// Compare the given data with the stored data.
-func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
-       key := v.key(loc)
-       errChan := make(chan error, 1)
-       go func() {
-               _, err := v.bucket.Head("recent/"+key, nil)
-               errChan <- err
-       }()
-       var err error
-       select {
-       case <-ctx.Done():
-               return ctx.Err()
-       case err = <-errChan:
-       }
-       if err != nil {
-               // Checking for "loc" itself here would interfere with
-               // future GET requests.
-               //
-               // On AWS, if X doesn't exist, a HEAD or GET request
-               // for X causes X's non-existence to be cached. Thus,
-               // if we test for X, then create X and return a
-               // signature to our client, the client might still get
-               // 404 from all keepstores when trying to read it.
-               //
-               // To avoid this, we avoid doing HEAD X or GET X until
-               // we know X has been written.
-               //
-               // Note that X might exist even though recent/X
-               // doesn't: for example, the response to HEAD recent/X
-               // might itself come from a stale cache. In such
-               // cases, we will return a false negative and
-               // PutHandler might needlessly create another replica
-               // on a different volume. That's not ideal, but it's
-               // better than passing the eventually-consistent
-               // problem on to our clients.
-               return v.translateError(err)
-       }
-       rdr, err := v.getReaderWithContext(ctx, key)
-       if err != nil {
-               return err
-       }
-       defer rdr.Close()
-       return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
-}
-
-// Put writes a block.
-func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
-       if v.volume.ReadOnly {
-               return MethodDisabledError
-       }
-       var opts s3.Options
-       size := len(block)
-       if size > 0 {
-               md5, err := hex.DecodeString(loc)
-               if err != nil {
-                       return err
-               }
-               opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
-               // In AWS regions that use V4 signatures, we need to
-               // provide ContentSHA256 up front. Otherwise, the S3
-               // library reads the request body (from our buffer)
-               // into another new buffer in order to compute the
-               // SHA256 before sending the request -- which would
-               // mean consuming 128 MiB of memory for the duration
-               // of a 64 MiB write.
-               opts.ContentSHA256 = fmt.Sprintf("%x", sha256.Sum256(block))
-       }
-
-       key := v.key(loc)
-
-       // Send the block data through a pipe, so that (if we need to)
-       // we can close the pipe early and abandon our PutReader()
-       // goroutine, without worrying about PutReader() accessing our
-       // block buffer after we release it.
-       bufr, bufw := io.Pipe()
-       go func() {
-               io.Copy(bufw, bytes.NewReader(block))
-               bufw.Close()
-       }()
-
-       var err error
-       ready := make(chan bool)
-       go func() {
-               defer func() {
-                       if ctx.Err() != nil {
-                               v.logger.Debugf("abandoned PutReader goroutine finished with err: %s", err)
-                       }
-               }()
-               defer close(ready)
-               err = v.bucket.PutReader(key, bufr, int64(size), "application/octet-stream", s3ACL, opts)
-               if err != nil {
-                       return
-               }
-               err = v.bucket.PutReader("recent/"+key, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
-       }()
-       select {
-       case <-ctx.Done():
-               v.logger.Debugf("taking PutReader's input away: %s", ctx.Err())
-               // Our pipe might be stuck in Write(), waiting for
-               // PutReader() to read. If so, un-stick it. This means
-               // PutReader will get corrupt data, but that's OK: the
-               // size and MD5 won't match, so the write will fail.
-               go io.Copy(ioutil.Discard, bufr)
-               // CloseWithError() will return once pending I/O is done.
-               bufw.CloseWithError(ctx.Err())
-               v.logger.Debugf("abandoning PutReader goroutine")
-               return ctx.Err()
-       case <-ready:
-               // Unblock pipe in case PutReader did not consume it.
-               io.Copy(ioutil.Discard, bufr)
-               return v.translateError(err)
-       }
-}
-
-// Touch sets the timestamp for the given locator to the current time.
-func (v *S3Volume) Touch(loc string) error {
-       if v.volume.ReadOnly {
-               return MethodDisabledError
-       }
-       key := v.key(loc)
-       _, err := v.bucket.Head(key, nil)
-       err = v.translateError(err)
-       if os.IsNotExist(err) && v.fixRace(key) {
-               // The data object got trashed in a race, but fixRace
-               // rescued it.
-       } else if err != nil {
-               return err
-       }
-       err = v.bucket.PutReader("recent/"+key, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
-       return v.translateError(err)
-}
-
-// Mtime returns the stored timestamp for the given locator.
-func (v *S3Volume) Mtime(loc string) (time.Time, error) {
-       key := v.key(loc)
-       _, err := v.bucket.Head(key, nil)
-       if err != nil {
-               return zeroTime, v.translateError(err)
-       }
-       resp, err := v.bucket.Head("recent/"+key, nil)
-       err = v.translateError(err)
-       if os.IsNotExist(err) {
-               // The data object X exists, but recent/X is missing.
-               err = v.bucket.PutReader("recent/"+key, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
-               if err != nil {
-                       v.logger.WithError(err).Errorf("error creating %q", "recent/"+key)
-                       return zeroTime, v.translateError(err)
-               }
-               v.logger.Infof("created %q to migrate existing block to new storage scheme", "recent/"+key)
-               resp, err = v.bucket.Head("recent/"+key, nil)
-               if err != nil {
-                       v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+key)
-                       return zeroTime, v.translateError(err)
-               }
-       } else if err != nil {
-               // HEAD recent/X failed for some other reason.
-               return zeroTime, err
-       }
-       return v.lastModified(resp)
-}
-
-// IndexTo writes a complete list of locators with the given prefix
-// for which Get() can retrieve data.
-func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
-       // Use a merge sort to find matching sets of X and recent/X.
-       dataL := s3Lister{
-               Logger:   v.logger,
-               Bucket:   v.bucket.Bucket(),
-               Prefix:   v.key(prefix),
-               PageSize: v.IndexPageSize,
-               Stats:    &v.bucket.stats,
-       }
-       recentL := s3Lister{
-               Logger:   v.logger,
-               Bucket:   v.bucket.Bucket(),
-               Prefix:   "recent/" + v.key(prefix),
-               PageSize: v.IndexPageSize,
-               Stats:    &v.bucket.stats,
-       }
-       for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
-               if data.Key >= "g" {
-                       // Conveniently, "recent/*" and "trash/*" are
-                       // lexically greater than all hex-encoded data
-                       // hashes, so stopping here avoids iterating
-                       // over all of them needlessly with dataL.
-                       break
-               }
-               loc, isBlk := v.isKeepBlock(data.Key)
-               if !isBlk {
-                       continue
-               }
-
-               // stamp is the list entry we should use to report the
-               // last-modified time for this data block: it will be
-               // the recent/X entry if one exists, otherwise the
-               // entry for the data block itself.
-               stamp := data
-
-               // Advance to the corresponding recent/X marker, if any
-               for recent != nil && recentL.Error() == nil {
-                       if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
-                               recent = recentL.Next()
-                               continue
-                       } else if cmp == 0 {
-                               stamp = recent
-                               recent = recentL.Next()
-                               break
-                       } else {
-                               // recent/X marker is missing: we'll
-                               // use the timestamp on the data
-                               // object.
-                               break
-                       }
-               }
-               if err := recentL.Error(); err != nil {
-                       return err
-               }
-               t, err := time.Parse(time.RFC3339, stamp.LastModified)
-               if err != nil {
-                       return err
-               }
-               // We truncate sub-second precision here. Otherwise
-               // timestamps will never match the RFC1123-formatted
-               // Last-Modified values parsed by Mtime().
-               fmt.Fprintf(writer, "%s+%d %d\n", loc, data.Size, t.Unix()*1000000000)
-       }
-       return dataL.Error()
-}
-
-// Trash a Keep block.
-func (v *S3Volume) Trash(loc string) error {
-       if v.volume.ReadOnly {
-               return MethodDisabledError
-       }
-       if t, err := v.Mtime(loc); err != nil {
-               return err
-       } else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
-               return nil
-       }
-       key := v.key(loc)
-       if v.cluster.Collections.BlobTrashLifetime == 0 {
-               if !v.UnsafeDelete {
-                       return ErrS3TrashDisabled
-               }
-               return v.translateError(v.bucket.Del(key))
-       }
-       err := v.checkRaceWindow(key)
-       if err != nil {
-               return err
-       }
-       err = v.safeCopy("trash/"+key, key)
-       if err != nil {
-               return err
-       }
-       return v.translateError(v.bucket.Del(key))
-}
-
-// checkRaceWindow returns a non-nil error if trash/key is, or might
-// be, in the race window (i.e., it's not safe to trash key).
-func (v *S3Volume) checkRaceWindow(key string) error {
-       resp, err := v.bucket.Head("trash/"+key, nil)
-       err = v.translateError(err)
-       if os.IsNotExist(err) {
-               // OK, trash/X doesn't exist so we're not in the race
-               // window
-               return nil
-       } else if err != nil {
-               // Error looking up trash/X. We don't know whether
-               // we're in the race window
-               return err
-       }
-       t, err := v.lastModified(resp)
-       if err != nil {
-               // Can't parse timestamp
-               return err
-       }
-       safeWindow := t.Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
-       if safeWindow <= 0 {
-               // We can't count on "touch trash/X" to prolong
-               // trash/X's lifetime. The new timestamp might not
-               // become visible until now+raceWindow, and EmptyTrash
-               // is allowed to delete trash/X before then.
-               return fmt.Errorf("%s: same block is already in trash, and safe window ended %s ago", key, -safeWindow)
-       }
-       // trash/X exists, but it won't be eligible for deletion until
-       // after now+raceWindow, so it's safe to overwrite it.
-       return nil
-}
-
-// safeCopy calls PutCopy, and checks the response to make sure the
-// copy succeeded and updated the timestamp on the destination object
-// (PutCopy returns 200 OK if the request was received, even if the
-// copy failed).
-func (v *S3Volume) safeCopy(dst, src string) error {
-       resp, err := v.bucket.Bucket().PutCopy(dst, s3ACL, s3.CopyOptions{
-               ContentType:       "application/octet-stream",
-               MetadataDirective: "REPLACE",
-       }, v.bucket.Bucket().Name+"/"+src)
-       err = v.translateError(err)
-       if os.IsNotExist(err) {
-               return err
-       } else if err != nil {
-               return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.Bucket().Name+"/"+src, err)
-       }
-       if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
-               return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
-       } else if time.Now().Sub(t) > maxClockSkew {
-               return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
-       }
-       return nil
-}
-
-// Get the LastModified header from resp, and parse it as RFC1123 or
-// -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
-func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
-       s := resp.Header.Get("Last-Modified")
-       t, err = time.Parse(time.RFC1123, s)
-       if err != nil && s != "" {
-               // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
-               // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
-               // as required by HTTP spec. If it's not a valid HTTP
-               // header value, it's probably AWS (or s3test) giving
-               // us a nearly-RFC1123 timestamp.
-               t, err = time.Parse(nearlyRFC1123, s)
-       }
-       return
-}
-
-// Untrash moves block from trash back into store
-func (v *S3Volume) Untrash(loc string) error {
-       key := v.key(loc)
-       err := v.safeCopy(key, "trash/"+key)
-       if err != nil {
-               return err
-       }
-       err = v.bucket.PutReader("recent/"+key, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
-       return v.translateError(err)
-}
-
-// Status returns a *VolumeStatus representing the current in-use
-// storage capacity and a fake available capacity that doesn't make
-// the volume seem full or nearly-full.
-func (v *S3Volume) Status() *VolumeStatus {
-       return &VolumeStatus{
-               DeviceNum: 1,
-               BytesFree: BlockSize * 1000,
-               BytesUsed: 1,
-       }
-}
-
-// InternalStats returns bucket I/O and API call counters.
-func (v *S3Volume) InternalStats() interface{} {
-       return &v.bucket.stats
-}
-
-// String implements fmt.Stringer.
-func (v *S3Volume) String() string {
-       return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
-}
-
-var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
-
-func (v *S3Volume) isKeepBlock(s string) (string, bool) {
-       if v.PrefixLength > 0 && len(s) == v.PrefixLength+33 && s[:v.PrefixLength] == s[v.PrefixLength+1:v.PrefixLength*2+1] {
-               s = s[v.PrefixLength+1:]
-       }
-       return s, s3KeepBlockRegexp.MatchString(s)
-}
-
-// Return the key used for a given loc. If PrefixLength==0 then
-// key("abcdef0123") is "abcdef0123", if PrefixLength==3 then key is
-// "abc/abcdef0123", etc.
-func (v *S3Volume) key(loc string) string {
-       if v.PrefixLength > 0 && v.PrefixLength < len(loc)-1 {
-               return loc[:v.PrefixLength] + "/" + loc
-       } else {
-               return loc
-       }
-}
-
-// fixRace(X) is called when "recent/X" exists but "X" doesn't
-// exist. If the timestamps on "recent/X" and "trash/X" indicate there
-// was a race between Put and Trash, fixRace recovers from the race by
-// Untrashing the block.
-func (v *S3Volume) fixRace(key string) bool {
-       trash, err := v.bucket.Head("trash/"+key, nil)
-       if err != nil {
-               if !os.IsNotExist(v.translateError(err)) {
-                       v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+key)
-               }
-               return false
-       }
-       trashTime, err := v.lastModified(trash)
-       if err != nil {
-               v.logger.WithError(err).Errorf("fixRace: error parsing time %q", trash.Header.Get("Last-Modified"))
-               return false
-       }
-
-       recent, err := v.bucket.Head("recent/"+key, nil)
-       if err != nil {
-               v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+key)
-               return false
-       }
-       recentTime, err := v.lastModified(recent)
-       if err != nil {
-               v.logger.WithError(err).Errorf("fixRace: error parsing time %q", recent.Header.Get("Last-Modified"))
-               return false
-       }
-
-       ageWhenTrashed := trashTime.Sub(recentTime)
-       if ageWhenTrashed >= v.cluster.Collections.BlobSigningTTL.Duration() {
-               // No evidence of a race: block hasn't been written
-               // since it became eligible for Trash. No fix needed.
-               return false
-       }
-
-       v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", key, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
-       v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+key, key)
-       err = v.safeCopy(key, "trash/"+key)
-       if err != nil {
-               v.logger.WithError(err).Error("fixRace: copy failed")
-               return false
-       }
-       return true
-}
-
-func (v *S3Volume) translateError(err error) error {
-       switch err := err.(type) {
-       case *s3.Error:
-               if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
-                       strings.Contains(err.Error(), "Not Found") {
-                       return os.ErrNotExist
-               }
-               // Other 404 errors like NoSuchVersion and
-               // NoSuchBucket are different problems which should
-               // get called out downstream, so we don't convert them
-               // to os.ErrNotExist.
-       }
-       return err
-}
-
-// EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime
-// and deletes them from the volume.
-func (v *S3Volume) EmptyTrash() {
-       if v.cluster.Collections.BlobDeleteConcurrency < 1 {
-               return
-       }
-
-       var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
-
-       // Define "ready to delete" as "...when EmptyTrash started".
-       startT := time.Now()
-
-       emptyOneKey := func(trash *s3.Key) {
-               key := trash.Key[6:]
-               loc, isBlk := v.isKeepBlock(key)
-               if !isBlk {
-                       return
-               }
-               atomic.AddInt64(&bytesInTrash, trash.Size)
-               atomic.AddInt64(&blocksInTrash, 1)
-
-               trashT, err := time.Parse(time.RFC3339, trash.LastModified)
-               if err != nil {
-                       v.logger.Warnf("EmptyTrash: %q: parse %q: %s", trash.Key, trash.LastModified, err)
-                       return
-               }
-               recent, err := v.bucket.Head("recent/"+key, nil)
-               if err != nil && os.IsNotExist(v.translateError(err)) {
-                       v.logger.Warnf("EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", trash.Key, "recent/"+loc, err)
-                       err = v.Untrash(loc)
-                       if err != nil {
-                               v.logger.WithError(err).Errorf("EmptyTrash: Untrash(%q) failed", loc)
-                       }
-                       return
-               } else if err != nil {
-                       v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+key)
-                       return
-               }
-               recentT, err := v.lastModified(recent)
-               if err != nil {
-                       v.logger.WithError(err).Warnf("EmptyTrash: %q: error parsing %q", "recent/"+key, recent.Header.Get("Last-Modified"))
-                       return
-               }
-               if trashT.Sub(recentT) < v.cluster.Collections.BlobSigningTTL.Duration() {
-                       if age := startT.Sub(recentT); age >= v.cluster.Collections.BlobSigningTTL.Duration()-time.Duration(v.RaceWindow) {
-                               // recent/loc is too old to protect
-                               // loc from being Trashed again during
-                               // the raceWindow that starts if we
-                               // delete trash/X now.
-                               //
-                               // Note this means (TrashSweepInterval
-                               // < BlobSigningTTL - raceWindow) is
-                               // necessary to avoid starvation.
-                               v.logger.Infof("EmptyTrash: detected old race for %q, calling fixRace + Touch", loc)
-                               v.fixRace(key)
-                               v.Touch(loc)
-                               return
-                       }
-                       _, err := v.bucket.Head(key, nil)
-                       if os.IsNotExist(err) {
-                               v.logger.Infof("EmptyTrash: detected recent race for %q, calling fixRace", loc)
-                               v.fixRace(key)
-                               return
-                       } else if err != nil {
-                               v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
-                               return
-                       }
-               }
-               if startT.Sub(trashT) < v.cluster.Collections.BlobTrashLifetime.Duration() {
-                       return
-               }
-               err = v.bucket.Del(trash.Key)
-               if err != nil {
-                       v.logger.WithError(err).Errorf("EmptyTrash: error deleting %q", trash.Key)
-                       return
-               }
-               atomic.AddInt64(&bytesDeleted, trash.Size)
-               atomic.AddInt64(&blocksDeleted, 1)
-
-               _, err = v.bucket.Head(key, nil)
-               if err == nil {
-                       v.logger.Warnf("EmptyTrash: HEAD %q succeeded immediately after deleting %q", key, key)
-                       return
-               }
-               if !os.IsNotExist(v.translateError(err)) {
-                       v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", key)
-                       return
-               }
-               err = v.bucket.Del("recent/" + key)
-               if err != nil {
-                       v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+key)
-               }
-       }
-
-       var wg sync.WaitGroup
-       todo := make(chan *s3.Key, v.cluster.Collections.BlobDeleteConcurrency)
-       for i := 0; i < v.cluster.Collections.BlobDeleteConcurrency; i++ {
-               wg.Add(1)
-               go func() {
-                       defer wg.Done()
-                       for key := range todo {
-                               emptyOneKey(key)
-                       }
-               }()
-       }
-
-       trashL := s3Lister{
-               Logger:   v.logger,
-               Bucket:   v.bucket.Bucket(),
-               Prefix:   "trash/",
-               PageSize: v.IndexPageSize,
-               Stats:    &v.bucket.stats,
-       }
-       for trash := trashL.First(); trash != nil; trash = trashL.Next() {
-               todo <- trash
-       }
-       close(todo)
-       wg.Wait()
-
-       if err := trashL.Error(); err != nil {
-               v.logger.WithError(err).Error("EmptyTrash: lister failed")
-       }
-       v.logger.Infof("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
-}
-
-type s3Lister struct {
-       Logger     logrus.FieldLogger
-       Bucket     *s3.Bucket
-       Prefix     string
-       PageSize   int
-       Stats      *s3bucketStats
-       nextMarker string
-       buf        []s3.Key
-       err        error
-}
-
-// First fetches the first page and returns the first item. It returns
-// nil if the response is the empty set or an error occurs.
-func (lister *s3Lister) First() *s3.Key {
-       lister.getPage()
-       return lister.pop()
-}
-
-// Next returns the next item, fetching the next page if necessary. It
-// returns nil if the last available item has already been fetched, or
-// an error occurs.
-func (lister *s3Lister) Next() *s3.Key {
-       if len(lister.buf) == 0 && lister.nextMarker != "" {
-               lister.getPage()
-       }
-       return lister.pop()
-}
-
-// Return the most recent error encountered by First or Next.
-func (lister *s3Lister) Error() error {
-       return lister.err
-}
-
-func (lister *s3Lister) getPage() {
-       lister.Stats.TickOps("list")
-       lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
-       resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
-       lister.nextMarker = ""
-       if err != nil {
-               lister.err = err
-               return
-       }
-       if resp.IsTruncated {
-               lister.nextMarker = resp.NextMarker
-       }
-       lister.buf = make([]s3.Key, 0, len(resp.Contents))
-       for _, key := range resp.Contents {
-               if !strings.HasPrefix(key.Key, lister.Prefix) {
-                       lister.Logger.Warnf("s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
-                       continue
-               }
-               lister.buf = append(lister.buf, key)
-       }
-}
-
-func (lister *s3Lister) pop() (k *s3.Key) {
-       if len(lister.buf) > 0 {
-               k = &lister.buf[0]
-               lister.buf = lister.buf[1:]
-       }
-       return
-}
-
-// s3bucket wraps s3.bucket and counts I/O and API usage stats. The
-// wrapped bucket can be replaced atomically with SetBucket in order
-// to update credentials.
-type s3bucket struct {
-       bucket *s3.Bucket
-       stats  s3bucketStats
-       mu     sync.Mutex
-}
-
-func (b *s3bucket) Bucket() *s3.Bucket {
-       b.mu.Lock()
-       defer b.mu.Unlock()
-       return b.bucket
-}
-
-func (b *s3bucket) SetBucket(bucket *s3.Bucket) {
-       b.mu.Lock()
-       defer b.mu.Unlock()
-       b.bucket = bucket
-}
-
-func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
-       rdr, err := b.Bucket().GetReader(path)
-       b.stats.TickOps("get")
-       b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
-       b.stats.TickErr(err)
-       return NewCountingReader(rdr, b.stats.TickInBytes), err
-}
-
-func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
-       resp, err := b.Bucket().Head(path, headers)
-       b.stats.TickOps("head")
-       b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
-       b.stats.TickErr(err)
-       return resp, err
-}
-
-func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
-       if length == 0 {
-               // goamz will only send Content-Length: 0 when reader
-               // is nil due to net.http.Request.ContentLength
-               // behavior.  Otherwise, Content-Length header is
-               // omitted which will cause some S3 services
-               // (including AWS and Ceph RadosGW) to fail to create
-               // empty objects.
-               r = nil
-       } else {
-               r = NewCountingReader(r, b.stats.TickOutBytes)
-       }
-       err := b.Bucket().PutReader(path, r, length, contType, perm, options)
-       b.stats.TickOps("put")
-       b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
-       b.stats.TickErr(err)
-       return err
-}
-
-func (b *s3bucket) Del(path string) error {
-       err := b.Bucket().Del(path)
-       b.stats.TickOps("delete")
-       b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
-       b.stats.TickErr(err)
-       return err
-}
-
-type s3bucketStats struct {
-       statsTicker
-       Ops     uint64
-       GetOps  uint64
-       PutOps  uint64
-       HeadOps uint64
-       DelOps  uint64
-       ListOps uint64
-}
-
-func (s *s3bucketStats) TickErr(err error) {
-       if err == nil {
-               return
-       }
-       errType := fmt.Sprintf("%T", err)
-       if err, ok := err.(*s3.Error); ok {
-               errType = errType + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
-       }
-       s.statsTicker.TickErr(err, errType)
-}
diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
deleted file mode 100644 (file)
index a820983..0000000
+++ /dev/null
@@ -1,594 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package keepstore
-
-import (
-       "bytes"
-       "context"
-       "crypto/md5"
-       "encoding/json"
-       "fmt"
-       "io"
-       "net/http"
-       "net/http/httptest"
-       "os"
-       "strings"
-       "time"
-
-       "git.arvados.org/arvados.git/sdk/go/arvados"
-       "git.arvados.org/arvados.git/sdk/go/ctxlog"
-       "github.com/AdRoll/goamz/s3"
-       "github.com/AdRoll/goamz/s3/s3test"
-       "github.com/prometheus/client_golang/prometheus"
-       "github.com/sirupsen/logrus"
-       check "gopkg.in/check.v1"
-)
-
-const (
-       TestBucketName = "testbucket"
-)
-
-type fakeClock struct {
-       now *time.Time
-}
-
-func (c *fakeClock) Now() time.Time {
-       if c.now == nil {
-               return time.Now()
-       }
-       return *c.now
-}
-
-var _ = check.Suite(&StubbedS3Suite{})
-
-type StubbedS3Suite struct {
-       s3server *httptest.Server
-       metadata *httptest.Server
-       cluster  *arvados.Cluster
-       handler  *handler
-       volumes  []*TestableS3Volume
-}
-
-func (s *StubbedS3Suite) SetUpTest(c *check.C) {
-       s.s3server = nil
-       s.metadata = nil
-       s.cluster = testCluster(c)
-       s.cluster.Volumes = map[string]arvados.Volume{
-               "zzzzz-nyw5e-000000000000000": {Driver: "S3"},
-               "zzzzz-nyw5e-111111111111111": {Driver: "S3"},
-       }
-       s.handler = &handler{}
-}
-
-func (s *StubbedS3Suite) TestGeneric(c *check.C) {
-       DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
-               // Use a negative raceWindow so s3test's 1-second
-               // timestamp precision doesn't confuse fixRace.
-               return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
-       })
-}
-
-func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
-       DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
-               return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
-       })
-}
-
-func (s *StubbedS3Suite) TestGenericWithPrefix(c *check.C) {
-       DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
-               v := s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
-               v.PrefixLength = 3
-               return v
-       })
-}
-
-func (s *StubbedS3Suite) TestIndex(c *check.C) {
-       v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 0)
-       v.IndexPageSize = 3
-       for i := 0; i < 256; i++ {
-               v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
-       }
-       for _, spec := range []struct {
-               prefix      string
-               expectMatch int
-       }{
-               {"", 256},
-               {"c", 16},
-               {"bc", 1},
-               {"abc", 0},
-       } {
-               buf := new(bytes.Buffer)
-               err := v.IndexTo(spec.prefix, buf)
-               c.Check(err, check.IsNil)
-
-               idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
-               c.Check(len(idx), check.Equals, spec.expectMatch+1)
-               c.Check(len(idx[len(idx)-1]), check.Equals, 0)
-       }
-}
-
-func (s *StubbedS3Suite) TestSignatureVersion(c *check.C) {
-       var header http.Header
-       stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
-               header = r.Header
-       }))
-       defer stub.Close()
-
-       // Default V4 signature
-       vol := S3Volume{
-               S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
-                       AccessKeyID:     "xxx",
-                       SecretAccessKey: "xxx",
-                       Endpoint:        stub.URL,
-                       Region:          "test-region-1",
-                       Bucket:          "test-bucket-name",
-               },
-               cluster: s.cluster,
-               logger:  ctxlog.TestLogger(c),
-               metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
-       }
-       err := vol.check()
-       c.Check(err, check.IsNil)
-       err = vol.Put(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
-       c.Check(err, check.IsNil)
-       c.Check(header.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 .*`)
-
-       // Force V2 signature
-       vol = S3Volume{
-               S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
-                       AccessKeyID:     "xxx",
-                       SecretAccessKey: "xxx",
-                       Endpoint:        stub.URL,
-                       Region:          "test-region-1",
-                       Bucket:          "test-bucket-name",
-                       V2Signature:     true,
-               },
-               cluster: s.cluster,
-               logger:  ctxlog.TestLogger(c),
-               metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
-       }
-       err = vol.check()
-       c.Check(err, check.IsNil)
-       err = vol.Put(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
-       c.Check(err, check.IsNil)
-       c.Check(header.Get("Authorization"), check.Matches, `AWS xxx:.*`)
-}
-
-func (s *StubbedS3Suite) TestIAMRoleCredentials(c *check.C) {
-       s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
-               upd := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339)
-               exp := time.Now().UTC().Add(time.Hour).Format(time.RFC3339)
-               // Literal example from
-               // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
-               // but with updated timestamps
-               io.WriteString(w, `{"Code":"Success","LastUpdated":"`+upd+`","Type":"AWS-HMAC","AccessKeyId":"ASIAIOSFODNN7EXAMPLE","SecretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","Token":"token","Expiration":"`+exp+`"}`)
-       }))
-       defer s.metadata.Close()
-
-       v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
-       c.Check(v.AccessKeyID, check.Equals, "ASIAIOSFODNN7EXAMPLE")
-       c.Check(v.SecretAccessKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
-       c.Check(v.bucket.bucket.S3.Auth.AccessKey, check.Equals, "ASIAIOSFODNN7EXAMPLE")
-       c.Check(v.bucket.bucket.S3.Auth.SecretKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
-
-       s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
-               w.WriteHeader(http.StatusNotFound)
-       }))
-       deadv := &S3Volume{
-               S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
-                       IAMRole:  s.metadata.URL + "/fake-metadata/test-role",
-                       Endpoint: "http://localhost:12345",
-                       Region:   "test-region-1",
-                       Bucket:   "test-bucket-name",
-               },
-               cluster: s.cluster,
-               logger:  ctxlog.TestLogger(c),
-               metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
-       }
-       err := deadv.check()
-       c.Check(err, check.ErrorMatches, `.*/fake-metadata/test-role.*`)
-       c.Check(err, check.ErrorMatches, `.*404.*`)
-}
-
-func (s *StubbedS3Suite) TestStats(c *check.C) {
-       v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
-       stats := func() string {
-               buf, err := json.Marshal(v.InternalStats())
-               c.Check(err, check.IsNil)
-               return string(buf)
-       }
-
-       c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
-
-       loc := "acbd18db4cc2f85cedef654fccc4a4d8"
-       _, err := v.Get(context.Background(), loc, make([]byte, 3))
-       c.Check(err, check.NotNil)
-       c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
-       c.Check(stats(), check.Matches, `.*"\*s3.Error 404 [^"]*":[^0].*`)
-       c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
-
-       err = v.Put(context.Background(), loc, []byte("foo"))
-       c.Check(err, check.IsNil)
-       c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
-       c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
-
-       _, err = v.Get(context.Background(), loc, make([]byte, 3))
-       c.Check(err, check.IsNil)
-       _, err = v.Get(context.Background(), loc, make([]byte, 3))
-       c.Check(err, check.IsNil)
-       c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
-}
-
-type blockingHandler struct {
-       requested chan *http.Request
-       unblock   chan struct{}
-}
-
-func (h *blockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
-       if r.Method == "PUT" && !strings.Contains(strings.Trim(r.URL.Path, "/"), "/") {
-               // Accept PutBucket ("PUT /bucketname/"), called by
-               // newTestableVolume
-               return
-       }
-       if h.requested != nil {
-               h.requested <- r
-       }
-       if h.unblock != nil {
-               <-h.unblock
-       }
-       http.Error(w, "nothing here", http.StatusNotFound)
-}
-
-func (s *StubbedS3Suite) TestGetContextCancel(c *check.C) {
-       loc := "acbd18db4cc2f85cedef654fccc4a4d8"
-       buf := make([]byte, 3)
-
-       s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
-               _, err := v.Get(ctx, loc, buf)
-               return err
-       })
-}
-
-func (s *StubbedS3Suite) TestCompareContextCancel(c *check.C) {
-       loc := "acbd18db4cc2f85cedef654fccc4a4d8"
-       buf := []byte("bar")
-
-       s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
-               return v.Compare(ctx, loc, buf)
-       })
-}
-
-func (s *StubbedS3Suite) TestPutContextCancel(c *check.C) {
-       loc := "acbd18db4cc2f85cedef654fccc4a4d8"
-       buf := []byte("foo")
-
-       s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
-               return v.Put(ctx, loc, buf)
-       })
-}
-
-func (s *StubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Context, *TestableS3Volume) error) {
-       handler := &blockingHandler{}
-       s.s3server = httptest.NewServer(handler)
-       defer s.s3server.Close()
-
-       v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
-
-       ctx, cancel := context.WithCancel(context.Background())
-
-       handler.requested = make(chan *http.Request)
-       handler.unblock = make(chan struct{})
-       defer close(handler.unblock)
-
-       doneFunc := make(chan struct{})
-       go func() {
-               err := testFunc(ctx, v)
-               c.Check(err, check.Equals, context.Canceled)
-               close(doneFunc)
-       }()
-
-       timeout := time.After(10 * time.Second)
-
-       // Wait for the stub server to receive a request, meaning
-       // Get() is waiting for an s3 operation.
-       select {
-       case <-timeout:
-               c.Fatal("timed out waiting for test func to call our handler")
-       case <-doneFunc:
-               c.Fatal("test func finished without even calling our handler!")
-       case <-handler.requested:
-       }
-
-       cancel()
-
-       select {
-       case <-timeout:
-               c.Fatal("timed out")
-       case <-doneFunc:
-       }
-}
-
-func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
-       s.cluster.Collections.BlobTrashLifetime.Set("1h")
-       s.cluster.Collections.BlobSigningTTL.Set("1h")
-
-       v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
-       var none time.Time
-
-       putS3Obj := func(t time.Time, key string, data []byte) {
-               if t == none {
-                       return
-               }
-               v.serverClock.now = &t
-               v.bucket.Bucket().Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
-       }
-
-       t0 := time.Now()
-       nextKey := 0
-       for _, scenario := range []struct {
-               label               string
-               dataT               time.Time
-               recentT             time.Time
-               trashT              time.Time
-               canGet              bool
-               canTrash            bool
-               canGetAfterTrash    bool
-               canUntrash          bool
-               haveTrashAfterEmpty bool
-               freshAfterEmpty     bool
-       }{
-               {
-                       "No related objects",
-                       none, none, none,
-                       false, false, false, false, false, false,
-               },
-               {
-                       // Stored by older version, or there was a
-                       // race between EmptyTrash and Put: Trash is a
-                       // no-op even though the data object is very
-                       // old
-                       "No recent/X",
-                       t0.Add(-48 * time.Hour), none, none,
-                       true, true, true, false, false, false,
-               },
-               {
-                       "Not trash, but old enough to be eligible for trash",
-                       t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
-                       true, true, false, false, false, false,
-               },
-               {
-                       "Not trash, and not old enough to be eligible for trash",
-                       t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
-                       true, true, true, false, false, false,
-               },
-               {
-                       "Trashed + untrashed copies exist, due to recent race between Trash and Put",
-                       t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
-                       true, true, true, true, true, false,
-               },
-               {
-                       "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
-                       t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
-                       true, false, true, true, true, false,
-               },
-               {
-                       "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
-                       t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
-                       true, false, true, true, false, false,
-               },
-               {
-                       "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
-                       t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
-                       true, false, true, true, true, true,
-               },
-               {
-                       "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
-                       t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
-                       true, true, true, true, false, false,
-               },
-               {
-                       "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
-                       t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
-                       true, false, true, true, false, false,
-               },
-               {
-                       "Trash, not yet eligible for deletion",
-                       none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
-                       false, false, false, true, true, false,
-               },
-               {
-                       "Trash, not yet eligible for deletion, prone to races",
-                       none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
-                       false, false, false, true, true, false,
-               },
-               {
-                       "Trash, eligible for deletion",
-                       none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
-                       false, false, false, true, false, false,
-               },
-               {
-                       "Erroneously trashed during a race, detected before BlobTrashLifetime",
-                       none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
-                       true, false, true, true, true, false,
-               },
-               {
-                       "Erroneously trashed during a race, rescue during EmptyTrash despite reaching BlobTrashLifetime",
-                       none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
-                       true, false, true, true, true, false,
-               },
-               {
-                       "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
-                       none, none, t0.Add(-time.Minute),
-                       false, false, false, true, true, true,
-               },
-       } {
-               for _, prefixLength := range []int{0, 3} {
-                       v.PrefixLength = prefixLength
-                       c.Logf("Scenario: %q (prefixLength=%d)", scenario.label, prefixLength)
-
-                       // We have a few tests to run for each scenario, and
-                       // the tests are expected to change state. By calling
-                       // this setup func between tests, we (re)create the
-                       // scenario as specified, using a new unique block
-                       // locator to prevent interference from previous
-                       // tests.
-
-                       setupScenario := func() (string, []byte) {
-                               nextKey++
-                               blk := []byte(fmt.Sprintf("%d", nextKey))
-                               loc := fmt.Sprintf("%x", md5.Sum(blk))
-                               key := loc
-                               if prefixLength > 0 {
-                                       key = loc[:prefixLength] + "/" + loc
-                               }
-                               c.Log("\t", loc)
-                               putS3Obj(scenario.dataT, key, blk)
-                               putS3Obj(scenario.recentT, "recent/"+key, nil)
-                               putS3Obj(scenario.trashT, "trash/"+key, blk)
-                               v.serverClock.now = &t0
-                               return loc, blk
-                       }
-
-                       // Check canGet
-                       loc, blk := setupScenario()
-                       buf := make([]byte, len(blk))
-                       _, err := v.Get(context.Background(), loc, buf)
-                       c.Check(err == nil, check.Equals, scenario.canGet)
-                       if err != nil {
-                               c.Check(os.IsNotExist(err), check.Equals, true)
-                       }
-
-                       // Call Trash, then check canTrash and canGetAfterTrash
-                       loc, _ = setupScenario()
-                       err = v.Trash(loc)
-                       c.Check(err == nil, check.Equals, scenario.canTrash)
-                       _, err = v.Get(context.Background(), loc, buf)
-                       c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
-                       if err != nil {
-                               c.Check(os.IsNotExist(err), check.Equals, true)
-                       }
-
-                       // Call Untrash, then check canUntrash
-                       loc, _ = setupScenario()
-                       err = v.Untrash(loc)
-                       c.Check(err == nil, check.Equals, scenario.canUntrash)
-                       if scenario.dataT != none || scenario.trashT != none {
-                               // In all scenarios where the data exists, we
-                               // should be able to Get after Untrash --
-                               // regardless of timestamps, errors, race
-                               // conditions, etc.
-                               _, err = v.Get(context.Background(), loc, buf)
-                               c.Check(err, check.IsNil)
-                       }
-
-                       // Call EmptyTrash, then check haveTrashAfterEmpty and
-                       // freshAfterEmpty
-                       loc, _ = setupScenario()
-                       v.EmptyTrash()
-                       _, err = v.bucket.Head("trash/"+v.key(loc), nil)
-                       c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
-                       if scenario.freshAfterEmpty {
-                               t, err := v.Mtime(loc)
-                               c.Check(err, check.IsNil)
-                               // new mtime must be current (with an
-                               // allowance for 1s timestamp precision)
-                               c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
-                       }
-
-                       // Check for current Mtime after Put (applies to all
-                       // scenarios)
-                       loc, blk = setupScenario()
-                       err = v.Put(context.Background(), loc, blk)
-                       c.Check(err, check.IsNil)
-                       t, err := v.Mtime(loc)
-                       c.Check(err, check.IsNil)
-                       c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
-               }
-       }
-}
-
-type TestableS3Volume struct {
-       *S3Volume
-       server      *s3test.Server
-       c           *check.C
-       serverClock *fakeClock
-}
-
-func (s *StubbedS3Suite) newTestableVolume(c *check.C, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs, raceWindow time.Duration) *TestableS3Volume {
-       clock := &fakeClock{}
-       srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
-       c.Assert(err, check.IsNil)
-       endpoint := srv.URL()
-       if s.s3server != nil {
-               endpoint = s.s3server.URL
-       }
-
-       iamRole, accessKey, secretKey := "", "xxx", "xxx"
-       if s.metadata != nil {
-               iamRole, accessKey, secretKey = s.metadata.URL+"/fake-metadata/test-role", "", ""
-       }
-
-       v := &TestableS3Volume{
-               S3Volume: &S3Volume{
-                       S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
-                               IAMRole:            iamRole,
-                               AccessKeyID:        accessKey,
-                               SecretAccessKey:    secretKey,
-                               Bucket:             TestBucketName,
-                               Endpoint:           endpoint,
-                               Region:             "test-region-1",
-                               LocationConstraint: true,
-                               UnsafeDelete:       true,
-                               IndexPageSize:      1000,
-                       },
-                       cluster: cluster,
-                       volume:  volume,
-                       logger:  ctxlog.TestLogger(c),
-                       metrics: metrics,
-               },
-               c:           c,
-               server:      srv,
-               serverClock: clock,
-       }
-       c.Assert(v.S3Volume.check(), check.IsNil)
-       c.Assert(v.bucket.Bucket().PutBucket(s3.ACL("private")), check.IsNil)
-       // We couldn't set RaceWindow until now because check()
-       // rejects negative values.
-       v.S3Volume.RaceWindow = arvados.Duration(raceWindow)
-       return v
-}
-
-// PutRaw skips the ContentMD5 test
-func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
-       key := v.key(loc)
-       err := v.bucket.Bucket().Put(key, block, "application/octet-stream", s3ACL, s3.Options{})
-       if err != nil {
-               v.logger.Printf("PutRaw: %s: %+v", loc, err)
-       }
-       err = v.bucket.Bucket().Put("recent/"+key, nil, "application/octet-stream", s3ACL, s3.Options{})
-       if err != nil {
-               v.logger.Printf("PutRaw: recent/%s: %+v", key, err)
-       }
-}
-
-// TouchWithDate turns back the clock while doing a Touch(). We assume
-// there are no other operations happening on the same s3test server
-// while we do this.
-func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
-       v.serverClock.now = &lastPut
-       err := v.bucket.Bucket().Put("recent/"+v.key(locator), nil, "application/octet-stream", s3ACL, s3.Options{})
-       if err != nil {
-               panic(err)
-       }
-       v.serverClock.now = nil
-}
-
-func (v *TestableS3Volume) Teardown() {
-       v.server.Quit()
-}
-
-func (v *TestableS3Volume) ReadWriteOperationLabelValues() (r, w string) {
-       return "get", "put"
-}
index d068dde074ea254ef814aea38eefa6f63102d7e3..8f2c27539109fbac45b844ac31281d9b4c3a76cd 100644 (file)
@@ -33,6 +33,21 @@ import (
        "github.com/sirupsen/logrus"
 )
 
+func init() {
+       driver["S3"] = newS3AWSVolume
+}
+
+const (
+       s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
+       s3DefaultConnectTimeout = arvados.Duration(time.Minute)
+       maxClockSkew            = 600 * time.Second
+       nearlyRFC1123           = "Mon, 2 Jan 2006 15:04:05 GMT"
+)
+
+var (
+       ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false")
+)
+
 // S3AWSVolume implements Volume using an S3 bucket.
 type S3AWSVolume struct {
        arvados.S3VolumeDriverParameters
@@ -58,24 +73,6 @@ type s3AWSbucket struct {
        mu     sync.Mutex
 }
 
-// chooseS3VolumeDriver distinguishes between the old goamz driver and
-// aws-sdk-go based on the UseAWSS3v2Driver feature flag
-func chooseS3VolumeDriver(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
-       v := &S3Volume{cluster: cluster, volume: volume, metrics: metrics}
-       // Default value will be overriden if it happens to be defined in the config
-       v.S3VolumeDriverParameters.UseAWSS3v2Driver = true
-       err := json.Unmarshal(volume.DriverParameters, v)
-       if err != nil {
-               return nil, err
-       }
-       if v.UseAWSS3v2Driver {
-               logger.Debugln("Using AWS S3 v2 driver")
-               return newS3AWSVolume(cluster, volume, logger, metrics)
-       }
-       logger.Debugln("Using goamz S3 driver")
-       return newS3Volume(cluster, volume, logger, metrics)
-}
-
 const (
        PartSize         = 5 * 1024 * 1024
        ReadConcurrency  = 13
index cacc6a0a11f045f6aea49196ce5a01cb4aef1d5f..bf046f6644ebdd545617839644cc07e1fb33fdc3 100644 (file)
@@ -140,7 +140,6 @@ arvados:
         Replication: 2
         Driver: S3
         DriverParameters:
-          UseAWSS3v2Driver: true
           Bucket: __CLUSTER__-nyw5e-000000000000000-volume
           IAMRole: __CLUSTER__-keepstore-00-iam-role
           Region: FIXME