From: Tom Clegg Date: Tue, 18 Apr 2023 15:13:41 +0000 (-0400) Subject: Merge branch '19620-remove-old-s3-driver' X-Git-Tag: 2.7.0~138 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/57792708500261a817e6957e65c80c7f798a36e9?hp=347c7205cde8e9e969975ea79dca4135e9f2ce5a Merge branch '19620-remove-old-s3-driver' closes #19620 Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- diff --git a/doc/admin/upgrading.html.textile.liquid b/doc/admin/upgrading.html.textile.liquid index 3527a5c93f..02ded1fc42 100644 --- a/doc/admin/upgrading.html.textile.liquid +++ b/doc/admin/upgrading.html.textile.liquid @@ -28,10 +28,14 @@ TODO: extract this information based on git commit messages and generate changel
-h2(#main). development main (as of 2023-04-17) +h2(#main). development main (as of 2023-04-18) "previous: Upgrading to 2.6.1":#v2_6_1 +h3. UseAWSS3v2Driver option removed + +The old "v1" S3 driver for keepstore has been removed. The new "v2" implementation, which has been the default since Arvados 2.5.0, is always used. The @Volumes.*.DriverParameters.UseAWSS3v2Driver@ configuration key is no longer recognized. If your config file uses it, remove it to avoid warning messages at startup. + h2(#v2_6_1). v2.6.1 (2023-04-17) "previous: Upgrading to 2.6.0":#v2_6_0 diff --git a/doc/install/configure-s3-object-storage.html.textile.liquid b/doc/install/configure-s3-object-storage.html.textile.liquid index b4e0c1a312..31ad994f0b 100644 --- a/doc/install/configure-s3-object-storage.html.textile.liquid +++ b/doc/install/configure-s3-object-storage.html.textile.liquid @@ -70,9 +70,6 @@ h2(#example). Configuration example # might be needed for other S3-compatible services. V2Signature: false - # Use the AWS S3 v2 Go driver instead of the goamz driver. - UseAWSS3v2Driver: false - # By default keepstore stores data using the MD5 checksum # (32 hexadecimal characters) as the object name, e.g., # "0123456abc...". Setting PrefixLength to 3 changes this @@ -121,12 +118,6 @@ h2(#example). Configuration example StorageClasses: null -Two S3 drivers are available. Historically, Arvados has used the @goamz@ driver to talk to S3-compatible services. More recently, support for the @aws-sdk-go-v2@ driver was added. This driver can be activated by setting the @UseAWSS3v2Driver@ flag to @true@. - -The @aws-sdk-go-v2@ does not support the old S3 v2 signing algorithm. This will not affect interacting with AWS S3, but it might be an issue when Keep is backed by a very old version of a third party S3-compatible service. - -The @aws-sdk-go-v2@ driver can improve read performance by 50-100% over the @goamz@ driver, but it has not had as much production use. See the "wiki":https://dev.arvados.org/projects/arvados/wiki/Keep_real_world_performance_numbers for details. - h2(#IAM). IAM Policy On Amazon, VMs which will access the S3 bucket (these include keepstore and compute nodes) will need an IAM policy with "permission that can read, write, list and delete objects in the bucket":https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create.html . Here is an example policy: diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml index 5684723a20..af8841265c 100644 --- a/lib/config/config.default.yml +++ b/lib/config/config.default.yml @@ -1587,8 +1587,6 @@ Clusters: ReadTimeout: 10m RaceWindow: 24h PrefixLength: 0 - # Use aws-s3-go (v2) instead of goamz - UseAWSS3v2Driver: true # For S3 driver, potentially unsafe tuning parameter, # intentionally excluded from main documentation. diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go index 4466b0a4de..01e9902c86 100644 --- a/sdk/go/arvados/config.go +++ b/sdk/go/arvados/config.go @@ -324,7 +324,6 @@ type S3VolumeDriverParameters struct { Bucket string LocationConstraint bool V2Signature bool - UseAWSS3v2Driver bool IndexPageSize int ConnectTimeout Duration ReadTimeout Duration diff --git a/services/keep-web/s3.go b/services/keep-web/s3.go index f98efd8fdf..38428cdab1 100644 --- a/services/keep-web/s3.go +++ b/services/keep-web/s3.go @@ -28,7 +28,6 @@ import ( "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/ctxlog" - "github.com/AdRoll/goamz/s3" ) const ( @@ -42,11 +41,17 @@ type commonPrefix struct { } type listV1Resp struct { - XMLName string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListBucketResult"` - s3.ListResp - // s3.ListResp marshals an empty tag when - // CommonPrefixes is nil, which confuses some clients. - // Fix by using this nested struct instead. + XMLName string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListBucketResult"` + Name string + Prefix string + Delimiter string + Marker string + MaxKeys int + IsTruncated bool + Contents []s3Key + // If we use a []string here, xml marshals an empty tag when + // CommonPrefixes is nil, which confuses some clients. Fix by + // using this nested struct instead. CommonPrefixes []commonPrefix // Similarly, we need omitempty here, because an empty // tag confuses some clients (e.g., @@ -60,7 +65,7 @@ type listV1Resp struct { type listV2Resp struct { XMLName string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListBucketResult"` IsTruncated bool - Contents []s3.Key + Contents []s3Key Name string Prefix string Delimiter string @@ -73,6 +78,21 @@ type listV2Resp struct { StartAfter string `xml:",omitempty"` } +type s3Key struct { + Key string + LastModified string + Size int64 + // The following fields are not populated, but are here in + // case clients rely on the keys being present in xml + // responses. + ETag string + StorageClass string + Owner struct { + ID string + DisplayName string + } +} + func hmacstring(msg string, key []byte) []byte { h := hmac.New(sha256.New, key) io.WriteString(h, msg) @@ -859,7 +879,7 @@ func (h *handler) s3list(bucket string, w http.ResponseWriter, r *http.Request, return filepath.SkipDir } } - resp.Contents = append(resp.Contents, s3.Key{ + resp.Contents = append(resp.Contents, s3Key{ Key: path, LastModified: fi.ModTime().UTC().Format("2006-01-02T15:04:05.999") + "Z", Size: filesize, @@ -923,15 +943,13 @@ func (h *handler) s3list(bucket string, w http.ResponseWriter, r *http.Request, CommonPrefixes: resp.CommonPrefixes, NextMarker: nextMarker, KeyCount: resp.KeyCount, - ListResp: s3.ListResp{ - IsTruncated: resp.IsTruncated, - Name: bucket, - Prefix: params.prefix, - Delimiter: params.delimiter, - Marker: params.marker, - MaxKeys: params.maxKeys, - Contents: resp.Contents, - }, + IsTruncated: resp.IsTruncated, + Name: bucket, + Prefix: params.prefix, + Delimiter: params.delimiter, + Marker: params.marker, + MaxKeys: params.maxKeys, + Contents: resp.Contents, } } diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go deleted file mode 100644 index 7873764004..0000000000 --- a/services/keepstore/s3_volume.go +++ /dev/null @@ -1,1084 +0,0 @@ -// Copyright (C) The Arvados Authors. All rights reserved. -// -// SPDX-License-Identifier: AGPL-3.0 - -package keepstore - -import ( - "bufio" - "bytes" - "context" - "crypto/sha256" - "encoding/base64" - "encoding/hex" - "encoding/json" - "errors" - "fmt" - "io" - "io/ioutil" - "net/http" - "os" - "regexp" - "strings" - "sync" - "sync/atomic" - "time" - - "git.arvados.org/arvados.git/sdk/go/arvados" - "github.com/AdRoll/goamz/aws" - "github.com/AdRoll/goamz/s3" - "github.com/prometheus/client_golang/prometheus" - "github.com/sirupsen/logrus" -) - -func init() { - driver["S3"] = chooseS3VolumeDriver -} - -func newS3Volume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) { - v := &S3Volume{cluster: cluster, volume: volume, metrics: metrics} - err := json.Unmarshal(volume.DriverParameters, v) - if err != nil { - return nil, err - } - v.logger = logger.WithField("Volume", v.String()) - return v, v.check() -} - -func (v *S3Volume) check() error { - if v.Bucket == "" { - return errors.New("DriverParameters: Bucket must be provided") - } - if v.IndexPageSize == 0 { - v.IndexPageSize = 1000 - } - if v.RaceWindow < 0 { - return errors.New("DriverParameters: RaceWindow must not be negative") - } - - if v.Endpoint == "" { - r, ok := aws.Regions[v.Region] - if !ok { - return fmt.Errorf("unrecognized region %+q; try specifying endpoint instead", v.Region) - } - v.region = r - } else { - v.region = aws.Region{ - Name: v.Region, - S3Endpoint: v.Endpoint, - S3LocationConstraint: v.LocationConstraint, - } - } - - // Zero timeouts mean "wait forever", which is a bad - // default. Default to long timeouts instead. - if v.ConnectTimeout == 0 { - v.ConnectTimeout = s3DefaultConnectTimeout - } - if v.ReadTimeout == 0 { - v.ReadTimeout = s3DefaultReadTimeout - } - - v.bucket = &s3bucket{ - bucket: &s3.Bucket{ - S3: v.newS3Client(), - Name: v.Bucket, - }, - } - // Set up prometheus metrics - lbls := prometheus.Labels{"device_id": v.GetDeviceID()} - v.bucket.stats.opsCounters, v.bucket.stats.errCounters, v.bucket.stats.ioBytes = v.metrics.getCounterVecsFor(lbls) - - err := v.bootstrapIAMCredentials() - if err != nil { - return fmt.Errorf("error getting IAM credentials: %s", err) - } - - return nil -} - -const ( - s3DefaultReadTimeout = arvados.Duration(10 * time.Minute) - s3DefaultConnectTimeout = arvados.Duration(time.Minute) -) - -var ( - // ErrS3TrashDisabled is returned by Trash if that operation - // is impossible with the current config. - ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false") - - s3ACL = s3.Private - - zeroTime time.Time -) - -const ( - maxClockSkew = 600 * time.Second - nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT" -) - -func s3regions() (okList []string) { - for r := range aws.Regions { - okList = append(okList, r) - } - return -} - -// S3Volume implements Volume using an S3 bucket. -type S3Volume struct { - arvados.S3VolumeDriverParameters - AuthToken string // populated automatically when IAMRole is used - AuthExpiration time.Time // populated automatically when IAMRole is used - - cluster *arvados.Cluster - volume arvados.Volume - logger logrus.FieldLogger - metrics *volumeMetricsVecs - bucket *s3bucket - region aws.Region - startOnce sync.Once -} - -// GetDeviceID returns a globally unique ID for the storage bucket. -func (v *S3Volume) GetDeviceID() string { - return "s3://" + v.Endpoint + "/" + v.Bucket -} - -func (v *S3Volume) bootstrapIAMCredentials() error { - if v.AccessKeyID != "" || v.SecretAccessKey != "" { - if v.IAMRole != "" { - return errors.New("invalid DriverParameters: AccessKeyID and SecretAccessKey must be blank if IAMRole is specified") - } - return nil - } - ttl, err := v.updateIAMCredentials() - if err != nil { - return err - } - go func() { - for { - time.Sleep(ttl) - ttl, err = v.updateIAMCredentials() - if err != nil { - v.logger.WithError(err).Warnf("failed to update credentials for IAM role %q", v.IAMRole) - ttl = time.Second - } else if ttl < time.Second { - v.logger.WithField("TTL", ttl).Warnf("received stale credentials for IAM role %q", v.IAMRole) - ttl = time.Second - } - } - }() - return nil -} - -func (v *S3Volume) newS3Client() *s3.S3 { - auth := aws.NewAuth(v.AccessKeyID, v.SecretAccessKey, v.AuthToken, v.AuthExpiration) - client := s3.New(*auth, v.region) - if !v.V2Signature { - client.Signature = aws.V4Signature - } - client.ConnectTimeout = time.Duration(v.ConnectTimeout) - client.ReadTimeout = time.Duration(v.ReadTimeout) - return client -} - -// returned by AWS metadata endpoint .../security-credentials/${rolename} -type iamCredentials struct { - Code string - LastUpdated time.Time - Type string - AccessKeyID string - SecretAccessKey string - Token string - Expiration time.Time -} - -// Returns TTL of updated credentials, i.e., time to sleep until next -// update. -func (v *S3Volume) updateIAMCredentials() (time.Duration, error) { - ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute)) - defer cancel() - - metadataBaseURL := "http://169.254.169.254/latest/meta-data/iam/security-credentials/" - - var url string - if strings.Contains(v.IAMRole, "://") { - // Configuration provides complete URL (used by tests) - url = v.IAMRole - } else if v.IAMRole != "" { - // Configuration provides IAM role name and we use the - // AWS metadata endpoint - url = metadataBaseURL + v.IAMRole - } else { - url = metadataBaseURL - v.logger.WithField("URL", url).Debug("looking up IAM role name") - req, err := http.NewRequest("GET", url, nil) - if err != nil { - return 0, fmt.Errorf("error setting up request %s: %s", url, err) - } - resp, err := http.DefaultClient.Do(req.WithContext(ctx)) - if err != nil { - return 0, fmt.Errorf("error getting %s: %s", url, err) - } - defer resp.Body.Close() - if resp.StatusCode == http.StatusNotFound { - return 0, fmt.Errorf("this instance does not have an IAM role assigned -- either assign a role, or configure AccessKeyID and SecretAccessKey explicitly in DriverParameters (error getting %s: HTTP status %s)", url, resp.Status) - } else if resp.StatusCode != http.StatusOK { - return 0, fmt.Errorf("error getting %s: HTTP status %s", url, resp.Status) - } - body := bufio.NewReader(resp.Body) - var role string - _, err = fmt.Fscanf(body, "%s\n", &role) - if err != nil { - return 0, fmt.Errorf("error reading response from %s: %s", url, err) - } - if n, _ := body.Read(make([]byte, 64)); n > 0 { - v.logger.Warnf("ignoring additional data returned by metadata endpoint %s after the single role name that we expected", url) - } - v.logger.WithField("Role", role).Debug("looked up IAM role name") - url = url + role - } - - v.logger.WithField("URL", url).Debug("getting credentials") - req, err := http.NewRequest("GET", url, nil) - if err != nil { - return 0, fmt.Errorf("error setting up request %s: %s", url, err) - } - resp, err := http.DefaultClient.Do(req.WithContext(ctx)) - if err != nil { - return 0, fmt.Errorf("error getting %s: %s", url, err) - } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - return 0, fmt.Errorf("error getting %s: HTTP status %s", url, resp.Status) - } - var cred iamCredentials - err = json.NewDecoder(resp.Body).Decode(&cred) - if err != nil { - return 0, fmt.Errorf("error decoding credentials from %s: %s", url, err) - } - v.AccessKeyID, v.SecretAccessKey, v.AuthToken, v.AuthExpiration = cred.AccessKeyID, cred.SecretAccessKey, cred.Token, cred.Expiration - v.bucket.SetBucket(&s3.Bucket{ - S3: v.newS3Client(), - Name: v.Bucket, - }) - // TTL is time from now to expiration, minus 5m. "We make new - // credentials available at least five minutes before the - // expiration of the old credentials." -- - // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials - // (If that's not true, the returned ttl might be zero or - // negative, which the caller can handle.) - ttl := cred.Expiration.Sub(time.Now()) - 5*time.Minute - v.logger.WithFields(logrus.Fields{ - "AccessKeyID": cred.AccessKeyID, - "LastUpdated": cred.LastUpdated, - "Expiration": cred.Expiration, - "TTL": arvados.Duration(ttl), - }).Debug("updated credentials") - return ttl, nil -} - -func (v *S3Volume) getReaderWithContext(ctx context.Context, key string) (rdr io.ReadCloser, err error) { - ready := make(chan bool) - go func() { - rdr, err = v.getReader(key) - close(ready) - }() - select { - case <-ready: - return - case <-ctx.Done(): - v.logger.Debugf("s3: abandoning getReader(%s): %s", key, ctx.Err()) - go func() { - <-ready - if err == nil { - rdr.Close() - } - }() - return nil, ctx.Err() - } -} - -// getReader wraps (Bucket)GetReader. -// -// In situations where (Bucket)GetReader would fail because the block -// disappeared in a Trash race, getReader calls fixRace to recover the -// data, and tries again. -func (v *S3Volume) getReader(key string) (rdr io.ReadCloser, err error) { - rdr, err = v.bucket.GetReader(key) - err = v.translateError(err) - if err == nil || !os.IsNotExist(err) { - return - } - - _, err = v.bucket.Head("recent/"+key, nil) - err = v.translateError(err) - if err != nil { - // If we can't read recent/X, there's no point in - // trying fixRace. Give up. - return - } - if !v.fixRace(key) { - err = os.ErrNotExist - return - } - - rdr, err = v.bucket.GetReader(key) - if err != nil { - v.logger.Warnf("reading %s after successful fixRace: %s", key, err) - err = v.translateError(err) - } - return -} - -// Get a block: copy the block data into buf, and return the number of -// bytes copied. -func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) { - key := v.key(loc) - rdr, err := v.getReaderWithContext(ctx, key) - if err != nil { - return 0, err - } - - var n int - ready := make(chan bool) - go func() { - defer close(ready) - - defer rdr.Close() - n, err = io.ReadFull(rdr, buf) - - switch err { - case nil, io.EOF, io.ErrUnexpectedEOF: - err = nil - default: - err = v.translateError(err) - } - }() - select { - case <-ctx.Done(): - v.logger.Debugf("s3: interrupting ReadFull() with Close() because %s", ctx.Err()) - rdr.Close() - // Must wait for ReadFull to return, to ensure it - // doesn't write to buf after we return. - v.logger.Debug("s3: waiting for ReadFull() to fail") - <-ready - return 0, ctx.Err() - case <-ready: - return n, err - } -} - -// Compare the given data with the stored data. -func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error { - key := v.key(loc) - errChan := make(chan error, 1) - go func() { - _, err := v.bucket.Head("recent/"+key, nil) - errChan <- err - }() - var err error - select { - case <-ctx.Done(): - return ctx.Err() - case err = <-errChan: - } - if err != nil { - // Checking for "loc" itself here would interfere with - // future GET requests. - // - // On AWS, if X doesn't exist, a HEAD or GET request - // for X causes X's non-existence to be cached. Thus, - // if we test for X, then create X and return a - // signature to our client, the client might still get - // 404 from all keepstores when trying to read it. - // - // To avoid this, we avoid doing HEAD X or GET X until - // we know X has been written. - // - // Note that X might exist even though recent/X - // doesn't: for example, the response to HEAD recent/X - // might itself come from a stale cache. In such - // cases, we will return a false negative and - // PutHandler might needlessly create another replica - // on a different volume. That's not ideal, but it's - // better than passing the eventually-consistent - // problem on to our clients. - return v.translateError(err) - } - rdr, err := v.getReaderWithContext(ctx, key) - if err != nil { - return err - } - defer rdr.Close() - return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32])) -} - -// Put writes a block. -func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error { - if v.volume.ReadOnly { - return MethodDisabledError - } - var opts s3.Options - size := len(block) - if size > 0 { - md5, err := hex.DecodeString(loc) - if err != nil { - return err - } - opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5) - // In AWS regions that use V4 signatures, we need to - // provide ContentSHA256 up front. Otherwise, the S3 - // library reads the request body (from our buffer) - // into another new buffer in order to compute the - // SHA256 before sending the request -- which would - // mean consuming 128 MiB of memory for the duration - // of a 64 MiB write. - opts.ContentSHA256 = fmt.Sprintf("%x", sha256.Sum256(block)) - } - - key := v.key(loc) - - // Send the block data through a pipe, so that (if we need to) - // we can close the pipe early and abandon our PutReader() - // goroutine, without worrying about PutReader() accessing our - // block buffer after we release it. - bufr, bufw := io.Pipe() - go func() { - io.Copy(bufw, bytes.NewReader(block)) - bufw.Close() - }() - - var err error - ready := make(chan bool) - go func() { - defer func() { - if ctx.Err() != nil { - v.logger.Debugf("abandoned PutReader goroutine finished with err: %s", err) - } - }() - defer close(ready) - err = v.bucket.PutReader(key, bufr, int64(size), "application/octet-stream", s3ACL, opts) - if err != nil { - return - } - err = v.bucket.PutReader("recent/"+key, nil, 0, "application/octet-stream", s3ACL, s3.Options{}) - }() - select { - case <-ctx.Done(): - v.logger.Debugf("taking PutReader's input away: %s", ctx.Err()) - // Our pipe might be stuck in Write(), waiting for - // PutReader() to read. If so, un-stick it. This means - // PutReader will get corrupt data, but that's OK: the - // size and MD5 won't match, so the write will fail. - go io.Copy(ioutil.Discard, bufr) - // CloseWithError() will return once pending I/O is done. - bufw.CloseWithError(ctx.Err()) - v.logger.Debugf("abandoning PutReader goroutine") - return ctx.Err() - case <-ready: - // Unblock pipe in case PutReader did not consume it. - io.Copy(ioutil.Discard, bufr) - return v.translateError(err) - } -} - -// Touch sets the timestamp for the given locator to the current time. -func (v *S3Volume) Touch(loc string) error { - if v.volume.ReadOnly { - return MethodDisabledError - } - key := v.key(loc) - _, err := v.bucket.Head(key, nil) - err = v.translateError(err) - if os.IsNotExist(err) && v.fixRace(key) { - // The data object got trashed in a race, but fixRace - // rescued it. - } else if err != nil { - return err - } - err = v.bucket.PutReader("recent/"+key, nil, 0, "application/octet-stream", s3ACL, s3.Options{}) - return v.translateError(err) -} - -// Mtime returns the stored timestamp for the given locator. -func (v *S3Volume) Mtime(loc string) (time.Time, error) { - key := v.key(loc) - _, err := v.bucket.Head(key, nil) - if err != nil { - return zeroTime, v.translateError(err) - } - resp, err := v.bucket.Head("recent/"+key, nil) - err = v.translateError(err) - if os.IsNotExist(err) { - // The data object X exists, but recent/X is missing. - err = v.bucket.PutReader("recent/"+key, nil, 0, "application/octet-stream", s3ACL, s3.Options{}) - if err != nil { - v.logger.WithError(err).Errorf("error creating %q", "recent/"+key) - return zeroTime, v.translateError(err) - } - v.logger.Infof("created %q to migrate existing block to new storage scheme", "recent/"+key) - resp, err = v.bucket.Head("recent/"+key, nil) - if err != nil { - v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+key) - return zeroTime, v.translateError(err) - } - } else if err != nil { - // HEAD recent/X failed for some other reason. - return zeroTime, err - } - return v.lastModified(resp) -} - -// IndexTo writes a complete list of locators with the given prefix -// for which Get() can retrieve data. -func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error { - // Use a merge sort to find matching sets of X and recent/X. - dataL := s3Lister{ - Logger: v.logger, - Bucket: v.bucket.Bucket(), - Prefix: v.key(prefix), - PageSize: v.IndexPageSize, - Stats: &v.bucket.stats, - } - recentL := s3Lister{ - Logger: v.logger, - Bucket: v.bucket.Bucket(), - Prefix: "recent/" + v.key(prefix), - PageSize: v.IndexPageSize, - Stats: &v.bucket.stats, - } - for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() { - if data.Key >= "g" { - // Conveniently, "recent/*" and "trash/*" are - // lexically greater than all hex-encoded data - // hashes, so stopping here avoids iterating - // over all of them needlessly with dataL. - break - } - loc, isBlk := v.isKeepBlock(data.Key) - if !isBlk { - continue - } - - // stamp is the list entry we should use to report the - // last-modified time for this data block: it will be - // the recent/X entry if one exists, otherwise the - // entry for the data block itself. - stamp := data - - // Advance to the corresponding recent/X marker, if any - for recent != nil && recentL.Error() == nil { - if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 { - recent = recentL.Next() - continue - } else if cmp == 0 { - stamp = recent - recent = recentL.Next() - break - } else { - // recent/X marker is missing: we'll - // use the timestamp on the data - // object. - break - } - } - if err := recentL.Error(); err != nil { - return err - } - t, err := time.Parse(time.RFC3339, stamp.LastModified) - if err != nil { - return err - } - // We truncate sub-second precision here. Otherwise - // timestamps will never match the RFC1123-formatted - // Last-Modified values parsed by Mtime(). - fmt.Fprintf(writer, "%s+%d %d\n", loc, data.Size, t.Unix()*1000000000) - } - return dataL.Error() -} - -// Trash a Keep block. -func (v *S3Volume) Trash(loc string) error { - if v.volume.ReadOnly { - return MethodDisabledError - } - if t, err := v.Mtime(loc); err != nil { - return err - } else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() { - return nil - } - key := v.key(loc) - if v.cluster.Collections.BlobTrashLifetime == 0 { - if !v.UnsafeDelete { - return ErrS3TrashDisabled - } - return v.translateError(v.bucket.Del(key)) - } - err := v.checkRaceWindow(key) - if err != nil { - return err - } - err = v.safeCopy("trash/"+key, key) - if err != nil { - return err - } - return v.translateError(v.bucket.Del(key)) -} - -// checkRaceWindow returns a non-nil error if trash/key is, or might -// be, in the race window (i.e., it's not safe to trash key). -func (v *S3Volume) checkRaceWindow(key string) error { - resp, err := v.bucket.Head("trash/"+key, nil) - err = v.translateError(err) - if os.IsNotExist(err) { - // OK, trash/X doesn't exist so we're not in the race - // window - return nil - } else if err != nil { - // Error looking up trash/X. We don't know whether - // we're in the race window - return err - } - t, err := v.lastModified(resp) - if err != nil { - // Can't parse timestamp - return err - } - safeWindow := t.Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow))) - if safeWindow <= 0 { - // We can't count on "touch trash/X" to prolong - // trash/X's lifetime. The new timestamp might not - // become visible until now+raceWindow, and EmptyTrash - // is allowed to delete trash/X before then. - return fmt.Errorf("%s: same block is already in trash, and safe window ended %s ago", key, -safeWindow) - } - // trash/X exists, but it won't be eligible for deletion until - // after now+raceWindow, so it's safe to overwrite it. - return nil -} - -// safeCopy calls PutCopy, and checks the response to make sure the -// copy succeeded and updated the timestamp on the destination object -// (PutCopy returns 200 OK if the request was received, even if the -// copy failed). -func (v *S3Volume) safeCopy(dst, src string) error { - resp, err := v.bucket.Bucket().PutCopy(dst, s3ACL, s3.CopyOptions{ - ContentType: "application/octet-stream", - MetadataDirective: "REPLACE", - }, v.bucket.Bucket().Name+"/"+src) - err = v.translateError(err) - if os.IsNotExist(err) { - return err - } else if err != nil { - return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.Bucket().Name+"/"+src, err) - } - if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil { - return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err) - } else if time.Now().Sub(t) > maxClockSkew { - return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t) - } - return nil -} - -// Get the LastModified header from resp, and parse it as RFC1123 or -// -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123. -func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) { - s := resp.Header.Get("Last-Modified") - t, err = time.Parse(time.RFC1123, s) - if err != nil && s != "" { - // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT", - // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT" - // as required by HTTP spec. If it's not a valid HTTP - // header value, it's probably AWS (or s3test) giving - // us a nearly-RFC1123 timestamp. - t, err = time.Parse(nearlyRFC1123, s) - } - return -} - -// Untrash moves block from trash back into store -func (v *S3Volume) Untrash(loc string) error { - key := v.key(loc) - err := v.safeCopy(key, "trash/"+key) - if err != nil { - return err - } - err = v.bucket.PutReader("recent/"+key, nil, 0, "application/octet-stream", s3ACL, s3.Options{}) - return v.translateError(err) -} - -// Status returns a *VolumeStatus representing the current in-use -// storage capacity and a fake available capacity that doesn't make -// the volume seem full or nearly-full. -func (v *S3Volume) Status() *VolumeStatus { - return &VolumeStatus{ - DeviceNum: 1, - BytesFree: BlockSize * 1000, - BytesUsed: 1, - } -} - -// InternalStats returns bucket I/O and API call counters. -func (v *S3Volume) InternalStats() interface{} { - return &v.bucket.stats -} - -// String implements fmt.Stringer. -func (v *S3Volume) String() string { - return fmt.Sprintf("s3-bucket:%+q", v.Bucket) -} - -var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`) - -func (v *S3Volume) isKeepBlock(s string) (string, bool) { - if v.PrefixLength > 0 && len(s) == v.PrefixLength+33 && s[:v.PrefixLength] == s[v.PrefixLength+1:v.PrefixLength*2+1] { - s = s[v.PrefixLength+1:] - } - return s, s3KeepBlockRegexp.MatchString(s) -} - -// Return the key used for a given loc. If PrefixLength==0 then -// key("abcdef0123") is "abcdef0123", if PrefixLength==3 then key is -// "abc/abcdef0123", etc. -func (v *S3Volume) key(loc string) string { - if v.PrefixLength > 0 && v.PrefixLength < len(loc)-1 { - return loc[:v.PrefixLength] + "/" + loc - } else { - return loc - } -} - -// fixRace(X) is called when "recent/X" exists but "X" doesn't -// exist. If the timestamps on "recent/X" and "trash/X" indicate there -// was a race between Put and Trash, fixRace recovers from the race by -// Untrashing the block. -func (v *S3Volume) fixRace(key string) bool { - trash, err := v.bucket.Head("trash/"+key, nil) - if err != nil { - if !os.IsNotExist(v.translateError(err)) { - v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+key) - } - return false - } - trashTime, err := v.lastModified(trash) - if err != nil { - v.logger.WithError(err).Errorf("fixRace: error parsing time %q", trash.Header.Get("Last-Modified")) - return false - } - - recent, err := v.bucket.Head("recent/"+key, nil) - if err != nil { - v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+key) - return false - } - recentTime, err := v.lastModified(recent) - if err != nil { - v.logger.WithError(err).Errorf("fixRace: error parsing time %q", recent.Header.Get("Last-Modified")) - return false - } - - ageWhenTrashed := trashTime.Sub(recentTime) - if ageWhenTrashed >= v.cluster.Collections.BlobSigningTTL.Duration() { - // No evidence of a race: block hasn't been written - // since it became eligible for Trash. No fix needed. - return false - } - - v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", key, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL) - v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+key, key) - err = v.safeCopy(key, "trash/"+key) - if err != nil { - v.logger.WithError(err).Error("fixRace: copy failed") - return false - } - return true -} - -func (v *S3Volume) translateError(err error) error { - switch err := err.(type) { - case *s3.Error: - if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") || - strings.Contains(err.Error(), "Not Found") { - return os.ErrNotExist - } - // Other 404 errors like NoSuchVersion and - // NoSuchBucket are different problems which should - // get called out downstream, so we don't convert them - // to os.ErrNotExist. - } - return err -} - -// EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime -// and deletes them from the volume. -func (v *S3Volume) EmptyTrash() { - if v.cluster.Collections.BlobDeleteConcurrency < 1 { - return - } - - var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64 - - // Define "ready to delete" as "...when EmptyTrash started". - startT := time.Now() - - emptyOneKey := func(trash *s3.Key) { - key := trash.Key[6:] - loc, isBlk := v.isKeepBlock(key) - if !isBlk { - return - } - atomic.AddInt64(&bytesInTrash, trash.Size) - atomic.AddInt64(&blocksInTrash, 1) - - trashT, err := time.Parse(time.RFC3339, trash.LastModified) - if err != nil { - v.logger.Warnf("EmptyTrash: %q: parse %q: %s", trash.Key, trash.LastModified, err) - return - } - recent, err := v.bucket.Head("recent/"+key, nil) - if err != nil && os.IsNotExist(v.translateError(err)) { - v.logger.Warnf("EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", trash.Key, "recent/"+loc, err) - err = v.Untrash(loc) - if err != nil { - v.logger.WithError(err).Errorf("EmptyTrash: Untrash(%q) failed", loc) - } - return - } else if err != nil { - v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+key) - return - } - recentT, err := v.lastModified(recent) - if err != nil { - v.logger.WithError(err).Warnf("EmptyTrash: %q: error parsing %q", "recent/"+key, recent.Header.Get("Last-Modified")) - return - } - if trashT.Sub(recentT) < v.cluster.Collections.BlobSigningTTL.Duration() { - if age := startT.Sub(recentT); age >= v.cluster.Collections.BlobSigningTTL.Duration()-time.Duration(v.RaceWindow) { - // recent/loc is too old to protect - // loc from being Trashed again during - // the raceWindow that starts if we - // delete trash/X now. - // - // Note this means (TrashSweepInterval - // < BlobSigningTTL - raceWindow) is - // necessary to avoid starvation. - v.logger.Infof("EmptyTrash: detected old race for %q, calling fixRace + Touch", loc) - v.fixRace(key) - v.Touch(loc) - return - } - _, err := v.bucket.Head(key, nil) - if os.IsNotExist(err) { - v.logger.Infof("EmptyTrash: detected recent race for %q, calling fixRace", loc) - v.fixRace(key) - return - } else if err != nil { - v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc) - return - } - } - if startT.Sub(trashT) < v.cluster.Collections.BlobTrashLifetime.Duration() { - return - } - err = v.bucket.Del(trash.Key) - if err != nil { - v.logger.WithError(err).Errorf("EmptyTrash: error deleting %q", trash.Key) - return - } - atomic.AddInt64(&bytesDeleted, trash.Size) - atomic.AddInt64(&blocksDeleted, 1) - - _, err = v.bucket.Head(key, nil) - if err == nil { - v.logger.Warnf("EmptyTrash: HEAD %q succeeded immediately after deleting %q", key, key) - return - } - if !os.IsNotExist(v.translateError(err)) { - v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", key) - return - } - err = v.bucket.Del("recent/" + key) - if err != nil { - v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+key) - } - } - - var wg sync.WaitGroup - todo := make(chan *s3.Key, v.cluster.Collections.BlobDeleteConcurrency) - for i := 0; i < v.cluster.Collections.BlobDeleteConcurrency; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for key := range todo { - emptyOneKey(key) - } - }() - } - - trashL := s3Lister{ - Logger: v.logger, - Bucket: v.bucket.Bucket(), - Prefix: "trash/", - PageSize: v.IndexPageSize, - Stats: &v.bucket.stats, - } - for trash := trashL.First(); trash != nil; trash = trashL.Next() { - todo <- trash - } - close(todo) - wg.Wait() - - if err := trashL.Error(); err != nil { - v.logger.WithError(err).Error("EmptyTrash: lister failed") - } - v.logger.Infof("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted) -} - -type s3Lister struct { - Logger logrus.FieldLogger - Bucket *s3.Bucket - Prefix string - PageSize int - Stats *s3bucketStats - nextMarker string - buf []s3.Key - err error -} - -// First fetches the first page and returns the first item. It returns -// nil if the response is the empty set or an error occurs. -func (lister *s3Lister) First() *s3.Key { - lister.getPage() - return lister.pop() -} - -// Next returns the next item, fetching the next page if necessary. It -// returns nil if the last available item has already been fetched, or -// an error occurs. -func (lister *s3Lister) Next() *s3.Key { - if len(lister.buf) == 0 && lister.nextMarker != "" { - lister.getPage() - } - return lister.pop() -} - -// Return the most recent error encountered by First or Next. -func (lister *s3Lister) Error() error { - return lister.err -} - -func (lister *s3Lister) getPage() { - lister.Stats.TickOps("list") - lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps) - resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize) - lister.nextMarker = "" - if err != nil { - lister.err = err - return - } - if resp.IsTruncated { - lister.nextMarker = resp.NextMarker - } - lister.buf = make([]s3.Key, 0, len(resp.Contents)) - for _, key := range resp.Contents { - if !strings.HasPrefix(key.Key, lister.Prefix) { - lister.Logger.Warnf("s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key) - continue - } - lister.buf = append(lister.buf, key) - } -} - -func (lister *s3Lister) pop() (k *s3.Key) { - if len(lister.buf) > 0 { - k = &lister.buf[0] - lister.buf = lister.buf[1:] - } - return -} - -// s3bucket wraps s3.bucket and counts I/O and API usage stats. The -// wrapped bucket can be replaced atomically with SetBucket in order -// to update credentials. -type s3bucket struct { - bucket *s3.Bucket - stats s3bucketStats - mu sync.Mutex -} - -func (b *s3bucket) Bucket() *s3.Bucket { - b.mu.Lock() - defer b.mu.Unlock() - return b.bucket -} - -func (b *s3bucket) SetBucket(bucket *s3.Bucket) { - b.mu.Lock() - defer b.mu.Unlock() - b.bucket = bucket -} - -func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) { - rdr, err := b.Bucket().GetReader(path) - b.stats.TickOps("get") - b.stats.Tick(&b.stats.Ops, &b.stats.GetOps) - b.stats.TickErr(err) - return NewCountingReader(rdr, b.stats.TickInBytes), err -} - -func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) { - resp, err := b.Bucket().Head(path, headers) - b.stats.TickOps("head") - b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps) - b.stats.TickErr(err) - return resp, err -} - -func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error { - if length == 0 { - // goamz will only send Content-Length: 0 when reader - // is nil due to net.http.Request.ContentLength - // behavior. Otherwise, Content-Length header is - // omitted which will cause some S3 services - // (including AWS and Ceph RadosGW) to fail to create - // empty objects. - r = nil - } else { - r = NewCountingReader(r, b.stats.TickOutBytes) - } - err := b.Bucket().PutReader(path, r, length, contType, perm, options) - b.stats.TickOps("put") - b.stats.Tick(&b.stats.Ops, &b.stats.PutOps) - b.stats.TickErr(err) - return err -} - -func (b *s3bucket) Del(path string) error { - err := b.Bucket().Del(path) - b.stats.TickOps("delete") - b.stats.Tick(&b.stats.Ops, &b.stats.DelOps) - b.stats.TickErr(err) - return err -} - -type s3bucketStats struct { - statsTicker - Ops uint64 - GetOps uint64 - PutOps uint64 - HeadOps uint64 - DelOps uint64 - ListOps uint64 -} - -func (s *s3bucketStats) TickErr(err error) { - if err == nil { - return - } - errType := fmt.Sprintf("%T", err) - if err, ok := err.(*s3.Error); ok { - errType = errType + fmt.Sprintf(" %d %s", err.StatusCode, err.Code) - } - s.statsTicker.TickErr(err, errType) -} diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go deleted file mode 100644 index a820983568..0000000000 --- a/services/keepstore/s3_volume_test.go +++ /dev/null @@ -1,594 +0,0 @@ -// Copyright (C) The Arvados Authors. All rights reserved. -// -// SPDX-License-Identifier: AGPL-3.0 - -package keepstore - -import ( - "bytes" - "context" - "crypto/md5" - "encoding/json" - "fmt" - "io" - "net/http" - "net/http/httptest" - "os" - "strings" - "time" - - "git.arvados.org/arvados.git/sdk/go/arvados" - "git.arvados.org/arvados.git/sdk/go/ctxlog" - "github.com/AdRoll/goamz/s3" - "github.com/AdRoll/goamz/s3/s3test" - "github.com/prometheus/client_golang/prometheus" - "github.com/sirupsen/logrus" - check "gopkg.in/check.v1" -) - -const ( - TestBucketName = "testbucket" -) - -type fakeClock struct { - now *time.Time -} - -func (c *fakeClock) Now() time.Time { - if c.now == nil { - return time.Now() - } - return *c.now -} - -var _ = check.Suite(&StubbedS3Suite{}) - -type StubbedS3Suite struct { - s3server *httptest.Server - metadata *httptest.Server - cluster *arvados.Cluster - handler *handler - volumes []*TestableS3Volume -} - -func (s *StubbedS3Suite) SetUpTest(c *check.C) { - s.s3server = nil - s.metadata = nil - s.cluster = testCluster(c) - s.cluster.Volumes = map[string]arvados.Volume{ - "zzzzz-nyw5e-000000000000000": {Driver: "S3"}, - "zzzzz-nyw5e-111111111111111": {Driver: "S3"}, - } - s.handler = &handler{} -} - -func (s *StubbedS3Suite) TestGeneric(c *check.C) { - DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume { - // Use a negative raceWindow so s3test's 1-second - // timestamp precision doesn't confuse fixRace. - return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second) - }) -} - -func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) { - DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume { - return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second) - }) -} - -func (s *StubbedS3Suite) TestGenericWithPrefix(c *check.C) { - DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume { - v := s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second) - v.PrefixLength = 3 - return v - }) -} - -func (s *StubbedS3Suite) TestIndex(c *check.C) { - v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 0) - v.IndexPageSize = 3 - for i := 0; i < 256; i++ { - v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111}) - } - for _, spec := range []struct { - prefix string - expectMatch int - }{ - {"", 256}, - {"c", 16}, - {"bc", 1}, - {"abc", 0}, - } { - buf := new(bytes.Buffer) - err := v.IndexTo(spec.prefix, buf) - c.Check(err, check.IsNil) - - idx := bytes.SplitAfter(buf.Bytes(), []byte{10}) - c.Check(len(idx), check.Equals, spec.expectMatch+1) - c.Check(len(idx[len(idx)-1]), check.Equals, 0) - } -} - -func (s *StubbedS3Suite) TestSignatureVersion(c *check.C) { - var header http.Header - stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - header = r.Header - })) - defer stub.Close() - - // Default V4 signature - vol := S3Volume{ - S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{ - AccessKeyID: "xxx", - SecretAccessKey: "xxx", - Endpoint: stub.URL, - Region: "test-region-1", - Bucket: "test-bucket-name", - }, - cluster: s.cluster, - logger: ctxlog.TestLogger(c), - metrics: newVolumeMetricsVecs(prometheus.NewRegistry()), - } - err := vol.check() - c.Check(err, check.IsNil) - err = vol.Put(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo")) - c.Check(err, check.IsNil) - c.Check(header.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 .*`) - - // Force V2 signature - vol = S3Volume{ - S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{ - AccessKeyID: "xxx", - SecretAccessKey: "xxx", - Endpoint: stub.URL, - Region: "test-region-1", - Bucket: "test-bucket-name", - V2Signature: true, - }, - cluster: s.cluster, - logger: ctxlog.TestLogger(c), - metrics: newVolumeMetricsVecs(prometheus.NewRegistry()), - } - err = vol.check() - c.Check(err, check.IsNil) - err = vol.Put(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo")) - c.Check(err, check.IsNil) - c.Check(header.Get("Authorization"), check.Matches, `AWS xxx:.*`) -} - -func (s *StubbedS3Suite) TestIAMRoleCredentials(c *check.C) { - s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - upd := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339) - exp := time.Now().UTC().Add(time.Hour).Format(time.RFC3339) - // Literal example from - // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials - // but with updated timestamps - io.WriteString(w, `{"Code":"Success","LastUpdated":"`+upd+`","Type":"AWS-HMAC","AccessKeyId":"ASIAIOSFODNN7EXAMPLE","SecretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","Token":"token","Expiration":"`+exp+`"}`) - })) - defer s.metadata.Close() - - v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute) - c.Check(v.AccessKeyID, check.Equals, "ASIAIOSFODNN7EXAMPLE") - c.Check(v.SecretAccessKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY") - c.Check(v.bucket.bucket.S3.Auth.AccessKey, check.Equals, "ASIAIOSFODNN7EXAMPLE") - c.Check(v.bucket.bucket.S3.Auth.SecretKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY") - - s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusNotFound) - })) - deadv := &S3Volume{ - S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{ - IAMRole: s.metadata.URL + "/fake-metadata/test-role", - Endpoint: "http://localhost:12345", - Region: "test-region-1", - Bucket: "test-bucket-name", - }, - cluster: s.cluster, - logger: ctxlog.TestLogger(c), - metrics: newVolumeMetricsVecs(prometheus.NewRegistry()), - } - err := deadv.check() - c.Check(err, check.ErrorMatches, `.*/fake-metadata/test-role.*`) - c.Check(err, check.ErrorMatches, `.*404.*`) -} - -func (s *StubbedS3Suite) TestStats(c *check.C) { - v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute) - stats := func() string { - buf, err := json.Marshal(v.InternalStats()) - c.Check(err, check.IsNil) - return string(buf) - } - - c.Check(stats(), check.Matches, `.*"Ops":0,.*`) - - loc := "acbd18db4cc2f85cedef654fccc4a4d8" - _, err := v.Get(context.Background(), loc, make([]byte, 3)) - c.Check(err, check.NotNil) - c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`) - c.Check(stats(), check.Matches, `.*"\*s3.Error 404 [^"]*":[^0].*`) - c.Check(stats(), check.Matches, `.*"InBytes":0,.*`) - - err = v.Put(context.Background(), loc, []byte("foo")) - c.Check(err, check.IsNil) - c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`) - c.Check(stats(), check.Matches, `.*"PutOps":2,.*`) - - _, err = v.Get(context.Background(), loc, make([]byte, 3)) - c.Check(err, check.IsNil) - _, err = v.Get(context.Background(), loc, make([]byte, 3)) - c.Check(err, check.IsNil) - c.Check(stats(), check.Matches, `.*"InBytes":6,.*`) -} - -type blockingHandler struct { - requested chan *http.Request - unblock chan struct{} -} - -func (h *blockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if r.Method == "PUT" && !strings.Contains(strings.Trim(r.URL.Path, "/"), "/") { - // Accept PutBucket ("PUT /bucketname/"), called by - // newTestableVolume - return - } - if h.requested != nil { - h.requested <- r - } - if h.unblock != nil { - <-h.unblock - } - http.Error(w, "nothing here", http.StatusNotFound) -} - -func (s *StubbedS3Suite) TestGetContextCancel(c *check.C) { - loc := "acbd18db4cc2f85cedef654fccc4a4d8" - buf := make([]byte, 3) - - s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error { - _, err := v.Get(ctx, loc, buf) - return err - }) -} - -func (s *StubbedS3Suite) TestCompareContextCancel(c *check.C) { - loc := "acbd18db4cc2f85cedef654fccc4a4d8" - buf := []byte("bar") - - s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error { - return v.Compare(ctx, loc, buf) - }) -} - -func (s *StubbedS3Suite) TestPutContextCancel(c *check.C) { - loc := "acbd18db4cc2f85cedef654fccc4a4d8" - buf := []byte("foo") - - s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error { - return v.Put(ctx, loc, buf) - }) -} - -func (s *StubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Context, *TestableS3Volume) error) { - handler := &blockingHandler{} - s.s3server = httptest.NewServer(handler) - defer s.s3server.Close() - - v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute) - - ctx, cancel := context.WithCancel(context.Background()) - - handler.requested = make(chan *http.Request) - handler.unblock = make(chan struct{}) - defer close(handler.unblock) - - doneFunc := make(chan struct{}) - go func() { - err := testFunc(ctx, v) - c.Check(err, check.Equals, context.Canceled) - close(doneFunc) - }() - - timeout := time.After(10 * time.Second) - - // Wait for the stub server to receive a request, meaning - // Get() is waiting for an s3 operation. - select { - case <-timeout: - c.Fatal("timed out waiting for test func to call our handler") - case <-doneFunc: - c.Fatal("test func finished without even calling our handler!") - case <-handler.requested: - } - - cancel() - - select { - case <-timeout: - c.Fatal("timed out") - case <-doneFunc: - } -} - -func (s *StubbedS3Suite) TestBackendStates(c *check.C) { - s.cluster.Collections.BlobTrashLifetime.Set("1h") - s.cluster.Collections.BlobSigningTTL.Set("1h") - - v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute) - var none time.Time - - putS3Obj := func(t time.Time, key string, data []byte) { - if t == none { - return - } - v.serverClock.now = &t - v.bucket.Bucket().Put(key, data, "application/octet-stream", s3ACL, s3.Options{}) - } - - t0 := time.Now() - nextKey := 0 - for _, scenario := range []struct { - label string - dataT time.Time - recentT time.Time - trashT time.Time - canGet bool - canTrash bool - canGetAfterTrash bool - canUntrash bool - haveTrashAfterEmpty bool - freshAfterEmpty bool - }{ - { - "No related objects", - none, none, none, - false, false, false, false, false, false, - }, - { - // Stored by older version, or there was a - // race between EmptyTrash and Put: Trash is a - // no-op even though the data object is very - // old - "No recent/X", - t0.Add(-48 * time.Hour), none, none, - true, true, true, false, false, false, - }, - { - "Not trash, but old enough to be eligible for trash", - t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none, - true, true, false, false, false, false, - }, - { - "Not trash, and not old enough to be eligible for trash", - t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none, - true, true, true, false, false, false, - }, - { - "Trashed + untrashed copies exist, due to recent race between Trash and Put", - t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute), - true, true, true, true, true, false, - }, - { - "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race", - t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute), - true, false, true, true, true, false, - }, - { - "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race", - t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute), - true, false, true, true, false, false, - }, - { - "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe", - t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour), - true, false, true, true, true, true, - }, - { - "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch", - t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour), - true, true, true, true, false, false, - }, - { - "Trashed + untrashed copies exist because Trash operation was interrupted (no race)", - t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), - true, false, true, true, false, false, - }, - { - "Trash, not yet eligible for deletion", - none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute), - false, false, false, true, true, false, - }, - { - "Trash, not yet eligible for deletion, prone to races", - none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute), - false, false, false, true, true, false, - }, - { - "Trash, eligible for deletion", - none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour), - false, false, false, true, false, false, - }, - { - "Erroneously trashed during a race, detected before BlobTrashLifetime", - none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute), - true, false, true, true, true, false, - }, - { - "Erroneously trashed during a race, rescue during EmptyTrash despite reaching BlobTrashLifetime", - none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute), - true, false, true, true, true, false, - }, - { - "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing", - none, none, t0.Add(-time.Minute), - false, false, false, true, true, true, - }, - } { - for _, prefixLength := range []int{0, 3} { - v.PrefixLength = prefixLength - c.Logf("Scenario: %q (prefixLength=%d)", scenario.label, prefixLength) - - // We have a few tests to run for each scenario, and - // the tests are expected to change state. By calling - // this setup func between tests, we (re)create the - // scenario as specified, using a new unique block - // locator to prevent interference from previous - // tests. - - setupScenario := func() (string, []byte) { - nextKey++ - blk := []byte(fmt.Sprintf("%d", nextKey)) - loc := fmt.Sprintf("%x", md5.Sum(blk)) - key := loc - if prefixLength > 0 { - key = loc[:prefixLength] + "/" + loc - } - c.Log("\t", loc) - putS3Obj(scenario.dataT, key, blk) - putS3Obj(scenario.recentT, "recent/"+key, nil) - putS3Obj(scenario.trashT, "trash/"+key, blk) - v.serverClock.now = &t0 - return loc, blk - } - - // Check canGet - loc, blk := setupScenario() - buf := make([]byte, len(blk)) - _, err := v.Get(context.Background(), loc, buf) - c.Check(err == nil, check.Equals, scenario.canGet) - if err != nil { - c.Check(os.IsNotExist(err), check.Equals, true) - } - - // Call Trash, then check canTrash and canGetAfterTrash - loc, _ = setupScenario() - err = v.Trash(loc) - c.Check(err == nil, check.Equals, scenario.canTrash) - _, err = v.Get(context.Background(), loc, buf) - c.Check(err == nil, check.Equals, scenario.canGetAfterTrash) - if err != nil { - c.Check(os.IsNotExist(err), check.Equals, true) - } - - // Call Untrash, then check canUntrash - loc, _ = setupScenario() - err = v.Untrash(loc) - c.Check(err == nil, check.Equals, scenario.canUntrash) - if scenario.dataT != none || scenario.trashT != none { - // In all scenarios where the data exists, we - // should be able to Get after Untrash -- - // regardless of timestamps, errors, race - // conditions, etc. - _, err = v.Get(context.Background(), loc, buf) - c.Check(err, check.IsNil) - } - - // Call EmptyTrash, then check haveTrashAfterEmpty and - // freshAfterEmpty - loc, _ = setupScenario() - v.EmptyTrash() - _, err = v.bucket.Head("trash/"+v.key(loc), nil) - c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty) - if scenario.freshAfterEmpty { - t, err := v.Mtime(loc) - c.Check(err, check.IsNil) - // new mtime must be current (with an - // allowance for 1s timestamp precision) - c.Check(t.After(t0.Add(-time.Second)), check.Equals, true) - } - - // Check for current Mtime after Put (applies to all - // scenarios) - loc, blk = setupScenario() - err = v.Put(context.Background(), loc, blk) - c.Check(err, check.IsNil) - t, err := v.Mtime(loc) - c.Check(err, check.IsNil) - c.Check(t.After(t0.Add(-time.Second)), check.Equals, true) - } - } -} - -type TestableS3Volume struct { - *S3Volume - server *s3test.Server - c *check.C - serverClock *fakeClock -} - -func (s *StubbedS3Suite) newTestableVolume(c *check.C, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs, raceWindow time.Duration) *TestableS3Volume { - clock := &fakeClock{} - srv, err := s3test.NewServer(&s3test.Config{Clock: clock}) - c.Assert(err, check.IsNil) - endpoint := srv.URL() - if s.s3server != nil { - endpoint = s.s3server.URL - } - - iamRole, accessKey, secretKey := "", "xxx", "xxx" - if s.metadata != nil { - iamRole, accessKey, secretKey = s.metadata.URL+"/fake-metadata/test-role", "", "" - } - - v := &TestableS3Volume{ - S3Volume: &S3Volume{ - S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{ - IAMRole: iamRole, - AccessKeyID: accessKey, - SecretAccessKey: secretKey, - Bucket: TestBucketName, - Endpoint: endpoint, - Region: "test-region-1", - LocationConstraint: true, - UnsafeDelete: true, - IndexPageSize: 1000, - }, - cluster: cluster, - volume: volume, - logger: ctxlog.TestLogger(c), - metrics: metrics, - }, - c: c, - server: srv, - serverClock: clock, - } - c.Assert(v.S3Volume.check(), check.IsNil) - c.Assert(v.bucket.Bucket().PutBucket(s3.ACL("private")), check.IsNil) - // We couldn't set RaceWindow until now because check() - // rejects negative values. - v.S3Volume.RaceWindow = arvados.Duration(raceWindow) - return v -} - -// PutRaw skips the ContentMD5 test -func (v *TestableS3Volume) PutRaw(loc string, block []byte) { - key := v.key(loc) - err := v.bucket.Bucket().Put(key, block, "application/octet-stream", s3ACL, s3.Options{}) - if err != nil { - v.logger.Printf("PutRaw: %s: %+v", loc, err) - } - err = v.bucket.Bucket().Put("recent/"+key, nil, "application/octet-stream", s3ACL, s3.Options{}) - if err != nil { - v.logger.Printf("PutRaw: recent/%s: %+v", key, err) - } -} - -// TouchWithDate turns back the clock while doing a Touch(). We assume -// there are no other operations happening on the same s3test server -// while we do this. -func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) { - v.serverClock.now = &lastPut - err := v.bucket.Bucket().Put("recent/"+v.key(locator), nil, "application/octet-stream", s3ACL, s3.Options{}) - if err != nil { - panic(err) - } - v.serverClock.now = nil -} - -func (v *TestableS3Volume) Teardown() { - v.server.Quit() -} - -func (v *TestableS3Volume) ReadWriteOperationLabelValues() (r, w string) { - return "get", "put" -} diff --git a/services/keepstore/s3aws_volume.go b/services/keepstore/s3aws_volume.go index d068dde074..8f2c275391 100644 --- a/services/keepstore/s3aws_volume.go +++ b/services/keepstore/s3aws_volume.go @@ -33,6 +33,21 @@ import ( "github.com/sirupsen/logrus" ) +func init() { + driver["S3"] = newS3AWSVolume +} + +const ( + s3DefaultReadTimeout = arvados.Duration(10 * time.Minute) + s3DefaultConnectTimeout = arvados.Duration(time.Minute) + maxClockSkew = 600 * time.Second + nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT" +) + +var ( + ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false") +) + // S3AWSVolume implements Volume using an S3 bucket. type S3AWSVolume struct { arvados.S3VolumeDriverParameters @@ -58,24 +73,6 @@ type s3AWSbucket struct { mu sync.Mutex } -// chooseS3VolumeDriver distinguishes between the old goamz driver and -// aws-sdk-go based on the UseAWSS3v2Driver feature flag -func chooseS3VolumeDriver(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) { - v := &S3Volume{cluster: cluster, volume: volume, metrics: metrics} - // Default value will be overriden if it happens to be defined in the config - v.S3VolumeDriverParameters.UseAWSS3v2Driver = true - err := json.Unmarshal(volume.DriverParameters, v) - if err != nil { - return nil, err - } - if v.UseAWSS3v2Driver { - logger.Debugln("Using AWS S3 v2 driver") - return newS3AWSVolume(cluster, volume, logger, metrics) - } - logger.Debugln("Using goamz S3 driver") - return newS3Volume(cluster, volume, logger, metrics) -} - const ( PartSize = 5 * 1024 * 1024 ReadConcurrency = 13 diff --git a/tools/salt-install/config_examples/multi_host/aws/pillars/arvados.sls b/tools/salt-install/config_examples/multi_host/aws/pillars/arvados.sls index cacc6a0a11..bf046f6644 100644 --- a/tools/salt-install/config_examples/multi_host/aws/pillars/arvados.sls +++ b/tools/salt-install/config_examples/multi_host/aws/pillars/arvados.sls @@ -140,7 +140,6 @@ arvados: Replication: 2 Driver: S3 DriverParameters: - UseAWSS3v2Driver: true Bucket: __CLUSTER__-nyw5e-000000000000000-volume IAMRole: __CLUSTER__-keepstore-00-iam-role Region: FIXME