X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/da4bc7c758d09c1c02542b54b96eab018f746eae..HEAD:/services/keepstore/s3_volume.go diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go index dc857c3264..dd4666039d 100644 --- a/services/keepstore/s3_volume.go +++ b/services/keepstore/s3_volume.go @@ -13,6 +13,7 @@ import ( "errors" "fmt" "io" + "net/url" "os" "regexp" "strings" @@ -22,13 +23,13 @@ import ( "git.arvados.org/arvados.git/sdk/go/arvados" "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/aws/awserr" - "github.com/aws/aws-sdk-go-v2/aws/defaults" - "github.com/aws/aws-sdk-go-v2/aws/ec2metadata" - "github.com/aws/aws-sdk-go-v2/aws/ec2rolecreds" - "github.com/aws/aws-sdk-go-v2/aws/endpoints" + v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/aws/aws-sdk-go-v2/service/s3/s3manager" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/aws/smithy-go" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) @@ -49,16 +50,19 @@ const ( ) var ( - errS3TrashDisabled = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false") - s3AWSKeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`) - s3AWSZeroTime time.Time + errS3TrashDisabled = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false") + s3AWSKeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`) + s3AWSZeroTime time.Time + defaultEndpointResolverV2 = s3.NewDefaultEndpointResolverV2() + + // Returned by an aws.EndpointResolverWithOptions to indicate + // that the default resolver should be used. + errEndpointNotOverridden = &aws.EndpointNotFoundError{Err: errors.New("endpoint not overridden")} ) // s3Volume implements Volume using an S3 bucket. type s3Volume struct { arvados.S3VolumeDriverParameters - AuthToken string // populated automatically when IAMRole is used - AuthExpiration time.Time // populated automatically when IAMRole is used cluster *arvados.Cluster volume arvados.Volume @@ -68,6 +72,9 @@ type s3Volume struct { bucket *s3Bucket region string startOnce sync.Once + + overrideEndpoint *aws.Endpoint + usePathStyle bool // used by test suite } // s3bucket wraps s3.bucket and counts I/O and API usage stats. The @@ -114,12 +121,15 @@ func news3Volume(params newVolumeParams) (volume, error) { } func (v *s3Volume) translateError(err error) error { - if _, ok := err.(*aws.RequestCanceledError); ok { + if cerr := (interface{ CanceledError() bool })(nil); errors.As(err, &cerr) && cerr.CanceledError() { + // *aws.RequestCanceledError and *smithy.CanceledError + // implement this interface. return context.Canceled - } else if aerr, ok := err.(awserr.Error); ok { - if aerr.Code() == "NotFound" { - return os.ErrNotExist - } else if aerr.Code() == "NoSuchKey" { + } + var aerr smithy.APIError + if errors.As(err, &aerr) { + switch aerr.ErrorCode() { + case "NotFound", "NoSuchKey": return os.ErrNotExist } } @@ -140,20 +150,17 @@ func (v *s3Volume) safeCopy(dst, src string) error { Key: aws.String(dst), } - req := v.bucket.svc.CopyObjectRequest(input) - resp, err := req.Send(context.Background()) + resp, err := v.bucket.svc.CopyObject(context.Background(), input) err = v.translateError(err) if os.IsNotExist(err) { return err } else if err != nil { return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.bucket+"/"+src, err) - } - - if resp.CopyObjectResult.LastModified == nil { - return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.CopyObjectResult.LastModified, err) - } else if time.Now().Sub(*resp.CopyObjectResult.LastModified) > maxClockSkew { - return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.CopyObjectResult.LastModified, resp.CopyObjectResult.LastModified) + } else if resp.CopyObjectResult.LastModified == nil { + return fmt.Errorf("PutCopy(%q ← %q): succeeded but did not return a timestamp", dst, v.bucket.bucket+"/"+src) + } else if skew := time.Now().UTC().Sub(*resp.CopyObjectResult.LastModified); skew > maxClockSkew { + return fmt.Errorf("PutCopy succeeded but returned old timestamp %s (skew %v > max %v, now %s)", resp.CopyObjectResult.LastModified, skew, maxClockSkew, time.Now()) } return nil } @@ -173,28 +180,18 @@ func (v *s3Volume) check(ec2metadataHostname string) error { return errors.New("DriverParameters: V2Signature is not supported") } - defaultResolver := endpoints.NewDefaultResolver() - - cfg := defaults.Config() - if v.Endpoint == "" && v.Region == "" { return fmt.Errorf("AWS region or endpoint must be specified") - } else if v.Endpoint != "" || ec2metadataHostname != "" { - myCustomResolver := func(service, region string) (aws.Endpoint, error) { - if v.Endpoint != "" && service == "s3" { - return aws.Endpoint{ - URL: v.Endpoint, - SigningRegion: region, - }, nil - } else if service == "ec2metadata" && ec2metadataHostname != "" { - return aws.Endpoint{ - URL: ec2metadataHostname, - }, nil - } else { - return defaultResolver.ResolveEndpoint(service, region) - } + } else if v.Endpoint != "" { + _, err := url.Parse(v.Endpoint) + if err != nil { + return fmt.Errorf("error parsing custom S3 endpoint %q: %w", v.Endpoint, err) + } + v.overrideEndpoint = &aws.Endpoint{ + URL: v.Endpoint, + HostnameImmutable: true, + Source: aws.EndpointSourceCustom, } - cfg.EndpointResolver = aws.EndpointResolverFunc(myCustomResolver) } if v.Region == "" { // Endpoint is already specified (otherwise we would @@ -203,7 +200,6 @@ func (v *s3Volume) check(ec2metadataHostname string) error { // SignatureVersions. v.Region = "us-east-1" } - cfg.Region = v.Region // Zero timeouts mean "wait forever", which is a bad // default. Default to long timeouts instead. @@ -214,17 +210,65 @@ func (v *s3Volume) check(ec2metadataHostname string) error { v.ReadTimeout = s3DefaultReadTimeout } - creds := aws.NewChainProvider( - []aws.CredentialsProvider{ - aws.NewStaticCredentialsProvider(v.AccessKeyID, v.SecretAccessKey, v.AuthToken), - ec2rolecreds.New(ec2metadata.New(cfg)), - }) - - cfg.Credentials = creds + cfg, err := config.LoadDefaultConfig(context.TODO(), + config.WithRegion(v.Region), + config.WithCredentialsCacheOptions(func(o *aws.CredentialsCacheOptions) { + // (from aws-sdk-go-v2 comments) "allow the + // credentials to trigger refreshing prior to + // the credentials actually expiring. This is + // beneficial so race conditions with expiring + // credentials do not cause request to fail + // unexpectedly due to ExpiredTokenException + // exceptions." + // + // (from + // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html) + // "We make new credentials available at least + // five minutes before the expiration of the + // old credentials." + o.ExpiryWindow = 5 * time.Minute + }), + func(o *config.LoadOptions) error { + if v.AccessKeyID == "" && v.SecretAccessKey == "" { + // Use default sdk behavior (IAM / IMDS) + return nil + } + v.logger.Debug("using static credentials") + o.Credentials = credentials.StaticCredentialsProvider{ + Value: aws.Credentials{ + AccessKeyID: v.AccessKeyID, + SecretAccessKey: v.SecretAccessKey, + Source: "Arvados configuration", + }, + } + return nil + }, + func(o *config.LoadOptions) error { + if ec2metadataHostname != "" { + o.EC2IMDSEndpoint = ec2metadataHostname + } + if v.overrideEndpoint != nil { + o.EndpointResolverWithOptions = aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { + if service == "S3" { + return *v.overrideEndpoint, nil + } + return aws.Endpoint{}, errEndpointNotOverridden // use default resolver + }) + } + return nil + }, + ) + if err != nil { + return fmt.Errorf("error loading aws client config: %w", err) + } v.bucket = &s3Bucket{ bucket: v.Bucket, - svc: s3.New(cfg), + svc: s3.NewFromConfig(cfg, func(o *s3.Options) { + if v.usePathStyle { + o.UsePathStyle = true + } + }), } // Set up prometheus metrics @@ -247,7 +291,7 @@ func (v *s3Volume) EmptyTrash() { // Define "ready to delete" as "...when EmptyTrash started". startT := time.Now() - emptyOneKey := func(trash *s3.Object) { + emptyOneKey := func(trash *types.Object) { key := strings.TrimPrefix(*trash.Key, "trash/") loc, isblk := v.isKeepBlock(key) if !isblk { @@ -321,7 +365,7 @@ func (v *s3Volume) EmptyTrash() { } var wg sync.WaitGroup - todo := make(chan *s3.Object, v.cluster.Collections.BlobDeleteConcurrency) + todo := make(chan *types.Object, v.cluster.Collections.BlobDeleteConcurrency) for i := 0; i < v.cluster.Collections.BlobDeleteConcurrency; i++ { wg.Add(1) go func() { @@ -395,8 +439,7 @@ func (v *s3Volume) head(key string) (result *s3.HeadObjectOutput, err error) { Key: aws.String(key), } - req := v.bucket.svc.HeadObjectRequest(input) - res, err := req.Send(context.TODO()) + res, err := v.bucket.svc.HeadObject(context.Background(), input) v.bucket.stats.TickOps("head") v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.HeadOps) @@ -405,8 +448,7 @@ func (v *s3Volume) head(key string) (result *s3.HeadObjectOutput, err error) { if err != nil { return nil, v.translateError(err) } - result = res.HeadObjectOutput - return + return res, nil } // BlockRead reads a Keep block that has been stored as a block blob @@ -443,11 +485,11 @@ func (v *s3Volume) BlockRead(ctx context.Context, hash string, w io.WriterAt) er } func (v *s3Volume) readWorker(ctx context.Context, key string, dst io.WriterAt) error { - downloader := s3manager.NewDownloaderWithClient(v.bucket.svc, func(u *s3manager.Downloader) { + downloader := manager.NewDownloader(v.bucket.svc, func(u *manager.Downloader) { u.PartSize = s3downloaderPartSize u.Concurrency = s3downloaderReadConcurrency }) - count, err := downloader.DownloadWithContext(ctx, dst, &s3.GetObjectInput{ + count, err := downloader.Download(ctx, dst, &s3.GetObjectInput{ Bucket: aws.String(v.bucket.bucket), Key: aws.String(key), }) @@ -465,7 +507,7 @@ func (v *s3Volume) writeObject(ctx context.Context, key string, r io.Reader) err r = bytes.NewReader(nil) } - uploadInput := s3manager.UploadInput{ + uploadInput := s3.PutObjectInput{ Bucket: aws.String(v.bucket.bucket), Key: aws.String(key), Body: r, @@ -485,20 +527,15 @@ func (v *s3Volume) writeObject(ctx context.Context, key string, r io.Reader) err // throughput, better than higher concurrency (10 or 13) by ~5%. // Defining u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(64 * 1024 * 1024) // is detrimental to throughput (minus ~15%). - uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) { + uploader := manager.NewUploader(v.bucket.svc, func(u *manager.Uploader) { u.PartSize = s3uploaderPartSize u.Concurrency = s3uploaderWriteConcurrency }) - // Unlike the goamz S3 driver, we don't need to precompute ContentSHA256: - // the aws-sdk-go v2 SDK uses a ReadSeeker to avoid having to copy the - // block, so there is no extra memory use to be concerned about. See - // makeSha256Reader in aws/signer/v4/v4.go. In fact, we explicitly disable - // calculating the Sha-256 because we don't need it; we already use md5sum - // hashes that match the name of the block. - _, err := uploader.UploadWithContext(ctx, &uploadInput, s3manager.WithUploaderRequestOptions(func(r *aws.Request) { - r.HTTPRequest.Header.Set("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD") - })) + _, err := uploader.Upload(ctx, &uploadInput, + // Avoid precomputing SHA256 before sending. + manager.WithUploaderRequestOptions(s3.WithAPIOptions(v4.SwapComputePayloadSHA256ForUnsignedPayloadMiddleware)), + ) v.bucket.stats.TickOps("put") v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.PutOps) @@ -528,13 +565,13 @@ type s3awsLister struct { PageSize int Stats *s3awsbucketStats ContinuationToken string - buf []s3.Object + buf []types.Object err error } // First fetches the first page and returns the first item. It returns // nil if the response is the empty set or an error occurs. -func (lister *s3awsLister) First() *s3.Object { +func (lister *s3awsLister) First() *types.Object { lister.getPage() return lister.pop() } @@ -542,7 +579,7 @@ func (lister *s3awsLister) First() *s3.Object { // Next returns the next item, fetching the next page if necessary. It // returns nil if the last available item has already been fetched, or // an error occurs. -func (lister *s3awsLister) Next() *s3.Object { +func (lister *s3awsLister) Next() *types.Object { if len(lister.buf) == 0 && lister.ContinuationToken != "" { lister.getPage() } @@ -562,22 +599,22 @@ func (lister *s3awsLister) getPage() { if lister.ContinuationToken == "" { input = &s3.ListObjectsV2Input{ Bucket: aws.String(lister.Bucket.bucket), - MaxKeys: aws.Int64(int64(lister.PageSize)), + MaxKeys: aws.Int32(int32(lister.PageSize)), Prefix: aws.String(lister.Prefix), } } else { input = &s3.ListObjectsV2Input{ Bucket: aws.String(lister.Bucket.bucket), - MaxKeys: aws.Int64(int64(lister.PageSize)), + MaxKeys: aws.Int32(int32(lister.PageSize)), Prefix: aws.String(lister.Prefix), ContinuationToken: &lister.ContinuationToken, } } - req := lister.Bucket.svc.ListObjectsV2Request(input) - resp, err := req.Send(context.Background()) + resp, err := lister.Bucket.svc.ListObjectsV2(context.Background(), input) if err != nil { - if aerr, ok := err.(awserr.Error); ok { + var aerr smithy.APIError + if errors.As(err, &aerr) { lister.err = aerr } else { lister.err = err @@ -590,7 +627,7 @@ func (lister *s3awsLister) getPage() { } else { lister.ContinuationToken = "" } - lister.buf = make([]s3.Object, 0, len(resp.Contents)) + lister.buf = make([]types.Object, 0, len(resp.Contents)) for _, key := range resp.Contents { if !strings.HasPrefix(*key.Key, lister.Prefix) { lister.Logger.Warnf("s3awsLister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, *key.Key) @@ -600,7 +637,7 @@ func (lister *s3awsLister) getPage() { } } -func (lister *s3awsLister) pop() (k *s3.Object) { +func (lister *s3awsLister) pop() (k *types.Object) { if len(lister.buf) > 0 { k = &lister.buf[0] lister.buf = lister.buf[1:] @@ -758,8 +795,7 @@ func (b *s3Bucket) Del(path string) error { Bucket: aws.String(b.bucket), Key: aws.String(path), } - req := b.svc.DeleteObjectRequest(input) - _, err := req.Send(context.Background()) + _, err := b.svc.DeleteObject(context.Background(), input) b.stats.TickOps("delete") b.stats.Tick(&b.stats.Ops, &b.stats.DelOps) b.stats.TickErr(err) @@ -817,12 +853,11 @@ func (s *s3awsbucketStats) TickErr(err error) { return } errType := fmt.Sprintf("%T", err) - if aerr, ok := err.(awserr.Error); ok { - if reqErr, ok := err.(awserr.RequestFailure); ok { - // A service error occurred - errType = errType + fmt.Sprintf(" %d %s", reqErr.StatusCode(), aerr.Code()) + if aerr := smithy.APIError(nil); errors.As(err, &aerr) { + if rerr := interface{ HTTPStatusCode() int }(nil); errors.As(err, &rerr) { + errType = errType + fmt.Sprintf(" %d %s", rerr.HTTPStatusCode(), aerr.ErrorCode()) } else { - errType = errType + fmt.Sprintf(" 000 %s", aerr.Code()) + errType = errType + fmt.Sprintf(" 000 %s", aerr.ErrorCode()) } } s.statsTicker.TickErr(err, errType)