"errors"
"fmt"
"io"
+ "net/url"
"os"
"regexp"
"strings"
"git.arvados.org/arvados.git/sdk/go/arvados"
"github.com/aws/aws-sdk-go-v2/aws"
- "github.com/aws/aws-sdk-go-v2/aws/awserr"
- "github.com/aws/aws-sdk-go-v2/aws/defaults"
- "github.com/aws/aws-sdk-go-v2/aws/ec2metadata"
- "github.com/aws/aws-sdk-go-v2/aws/ec2rolecreds"
- "github.com/aws/aws-sdk-go-v2/aws/endpoints"
+ v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
+ "github.com/aws/aws-sdk-go-v2/config"
+ "github.com/aws/aws-sdk-go-v2/credentials"
+ "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
- "github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
+ "github.com/aws/aws-sdk-go-v2/service/s3/types"
+ "github.com/aws/smithy-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
)
var (
- errS3TrashDisabled = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false")
- s3AWSKeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
- s3AWSZeroTime time.Time
+ errS3TrashDisabled = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false")
+ s3AWSKeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
+ s3AWSZeroTime time.Time
+ defaultEndpointResolverV2 = s3.NewDefaultEndpointResolverV2()
+
+ // Returned by an aws.EndpointResolverWithOptions to indicate
+ // that the default resolver should be used.
+ errEndpointNotOverridden = &aws.EndpointNotFoundError{Err: errors.New("endpoint not overridden")}
)
// s3Volume implements Volume using an S3 bucket.
type s3Volume struct {
arvados.S3VolumeDriverParameters
- AuthToken string // populated automatically when IAMRole is used
- AuthExpiration time.Time // populated automatically when IAMRole is used
cluster *arvados.Cluster
volume arvados.Volume
bucket *s3Bucket
region string
startOnce sync.Once
+
+ overrideEndpoint *aws.Endpoint
+ usePathStyle bool // used by test suite
}
// s3bucket wraps s3.bucket and counts I/O and API usage stats. The
}
func (v *s3Volume) translateError(err error) error {
- if _, ok := err.(*aws.RequestCanceledError); ok {
+ if cerr := (interface{ CanceledError() bool })(nil); errors.As(err, &cerr) && cerr.CanceledError() {
+ // *aws.RequestCanceledError and *smithy.CanceledError
+ // implement this interface.
return context.Canceled
- } else if aerr, ok := err.(awserr.Error); ok {
- if aerr.Code() == "NotFound" {
- return os.ErrNotExist
- } else if aerr.Code() == "NoSuchKey" {
+ }
+ var aerr smithy.APIError
+ if errors.As(err, &aerr) {
+ switch aerr.ErrorCode() {
+ case "NotFound", "NoSuchKey":
return os.ErrNotExist
}
}
Key: aws.String(dst),
}
- req := v.bucket.svc.CopyObjectRequest(input)
- resp, err := req.Send(context.Background())
+ resp, err := v.bucket.svc.CopyObject(context.Background(), input)
err = v.translateError(err)
if os.IsNotExist(err) {
return err
} else if err != nil {
return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.bucket+"/"+src, err)
- }
-
- if resp.CopyObjectResult.LastModified == nil {
- return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.CopyObjectResult.LastModified, err)
- } else if time.Now().Sub(*resp.CopyObjectResult.LastModified) > maxClockSkew {
- return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.CopyObjectResult.LastModified, resp.CopyObjectResult.LastModified)
+ } else if resp.CopyObjectResult.LastModified == nil {
+ return fmt.Errorf("PutCopy(%q ← %q): succeeded but did not return a timestamp", dst, v.bucket.bucket+"/"+src)
+ } else if skew := time.Now().UTC().Sub(*resp.CopyObjectResult.LastModified); skew > maxClockSkew {
+ return fmt.Errorf("PutCopy succeeded but returned old timestamp %s (skew %v > max %v, now %s)", resp.CopyObjectResult.LastModified, skew, maxClockSkew, time.Now())
}
return nil
}
return errors.New("DriverParameters: V2Signature is not supported")
}
- defaultResolver := endpoints.NewDefaultResolver()
-
- cfg := defaults.Config()
-
if v.Endpoint == "" && v.Region == "" {
return fmt.Errorf("AWS region or endpoint must be specified")
- } else if v.Endpoint != "" || ec2metadataHostname != "" {
- myCustomResolver := func(service, region string) (aws.Endpoint, error) {
- if v.Endpoint != "" && service == "s3" {
- return aws.Endpoint{
- URL: v.Endpoint,
- SigningRegion: region,
- }, nil
- } else if service == "ec2metadata" && ec2metadataHostname != "" {
- return aws.Endpoint{
- URL: ec2metadataHostname,
- }, nil
- } else {
- return defaultResolver.ResolveEndpoint(service, region)
- }
+ } else if v.Endpoint != "" {
+ _, err := url.Parse(v.Endpoint)
+ if err != nil {
+ return fmt.Errorf("error parsing custom S3 endpoint %q: %w", v.Endpoint, err)
+ }
+ v.overrideEndpoint = &aws.Endpoint{
+ URL: v.Endpoint,
+ HostnameImmutable: true,
+ Source: aws.EndpointSourceCustom,
}
- cfg.EndpointResolver = aws.EndpointResolverFunc(myCustomResolver)
}
if v.Region == "" {
// Endpoint is already specified (otherwise we would
// SignatureVersions.
v.Region = "us-east-1"
}
- cfg.Region = v.Region
// Zero timeouts mean "wait forever", which is a bad
// default. Default to long timeouts instead.
v.ReadTimeout = s3DefaultReadTimeout
}
- creds := aws.NewChainProvider(
- []aws.CredentialsProvider{
- aws.NewStaticCredentialsProvider(v.AccessKeyID, v.SecretAccessKey, v.AuthToken),
- ec2rolecreds.New(ec2metadata.New(cfg), func(opts *ec2rolecreds.ProviderOptions) {
- // (from aws-sdk-go-v2 comments)
- // "allow the credentials to trigger
- // refreshing prior to the credentials
- // actually expiring. This is
- // beneficial so race conditions with
- // expiring credentials do not cause
- // request to fail unexpectedly due to
- // ExpiredTokenException exceptions."
- //
- // (from
- // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html)
- // "We make new credentials available
- // at least five minutes before the
- // expiration of the old credentials."
- opts.ExpiryWindow = 5 * time.Minute
- }),
- })
-
- cfg.Credentials = creds
+ cfg, err := config.LoadDefaultConfig(context.TODO(),
+ config.WithRegion(v.Region),
+ config.WithCredentialsCacheOptions(func(o *aws.CredentialsCacheOptions) {
+ // (from aws-sdk-go-v2 comments) "allow the
+ // credentials to trigger refreshing prior to
+ // the credentials actually expiring. This is
+ // beneficial so race conditions with expiring
+ // credentials do not cause request to fail
+ // unexpectedly due to ExpiredTokenException
+ // exceptions."
+ //
+ // (from
+ // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html)
+ // "We make new credentials available at least
+ // five minutes before the expiration of the
+ // old credentials."
+ o.ExpiryWindow = 5 * time.Minute
+ }),
+ func(o *config.LoadOptions) error {
+ if v.AccessKeyID == "" && v.SecretAccessKey == "" {
+ // Use default sdk behavior (IAM / IMDS)
+ return nil
+ }
+ v.logger.Debug("using static credentials")
+ o.Credentials = credentials.StaticCredentialsProvider{
+ Value: aws.Credentials{
+ AccessKeyID: v.AccessKeyID,
+ SecretAccessKey: v.SecretAccessKey,
+ Source: "Arvados configuration",
+ },
+ }
+ return nil
+ },
+ func(o *config.LoadOptions) error {
+ if ec2metadataHostname != "" {
+ o.EC2IMDSEndpoint = ec2metadataHostname
+ }
+ if v.overrideEndpoint != nil {
+ o.EndpointResolverWithOptions = aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
+ if service == "S3" {
+ return *v.overrideEndpoint, nil
+ }
+ return aws.Endpoint{}, errEndpointNotOverridden // use default resolver
+ })
+ }
+ return nil
+ },
+ )
+ if err != nil {
+ return fmt.Errorf("error loading aws client config: %w", err)
+ }
v.bucket = &s3Bucket{
bucket: v.Bucket,
- svc: s3.New(cfg),
+ svc: s3.NewFromConfig(cfg, func(o *s3.Options) {
+ if v.usePathStyle {
+ o.UsePathStyle = true
+ }
+ }),
}
// Set up prometheus metrics
// Define "ready to delete" as "...when EmptyTrash started".
startT := time.Now()
- emptyOneKey := func(trash *s3.Object) {
+ emptyOneKey := func(trash *types.Object) {
key := strings.TrimPrefix(*trash.Key, "trash/")
loc, isblk := v.isKeepBlock(key)
if !isblk {
}
var wg sync.WaitGroup
- todo := make(chan *s3.Object, v.cluster.Collections.BlobDeleteConcurrency)
+ todo := make(chan *types.Object, v.cluster.Collections.BlobDeleteConcurrency)
for i := 0; i < v.cluster.Collections.BlobDeleteConcurrency; i++ {
wg.Add(1)
go func() {
Key: aws.String(key),
}
- req := v.bucket.svc.HeadObjectRequest(input)
- res, err := req.Send(context.TODO())
+ res, err := v.bucket.svc.HeadObject(context.Background(), input)
v.bucket.stats.TickOps("head")
v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.HeadOps)
if err != nil {
return nil, v.translateError(err)
}
- result = res.HeadObjectOutput
- return
+ return res, nil
}
// BlockRead reads a Keep block that has been stored as a block blob
}
func (v *s3Volume) readWorker(ctx context.Context, key string, dst io.WriterAt) error {
- downloader := s3manager.NewDownloaderWithClient(v.bucket.svc, func(u *s3manager.Downloader) {
+ downloader := manager.NewDownloader(v.bucket.svc, func(u *manager.Downloader) {
u.PartSize = s3downloaderPartSize
u.Concurrency = s3downloaderReadConcurrency
})
- count, err := downloader.DownloadWithContext(ctx, dst, &s3.GetObjectInput{
+ count, err := downloader.Download(ctx, dst, &s3.GetObjectInput{
Bucket: aws.String(v.bucket.bucket),
Key: aws.String(key),
})
r = bytes.NewReader(nil)
}
- uploadInput := s3manager.UploadInput{
+ uploadInput := s3.PutObjectInput{
Bucket: aws.String(v.bucket.bucket),
Key: aws.String(key),
Body: r,
// throughput, better than higher concurrency (10 or 13) by ~5%.
// Defining u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(64 * 1024 * 1024)
// is detrimental to throughput (minus ~15%).
- uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
+ uploader := manager.NewUploader(v.bucket.svc, func(u *manager.Uploader) {
u.PartSize = s3uploaderPartSize
u.Concurrency = s3uploaderWriteConcurrency
})
- // Unlike the goamz S3 driver, we don't need to precompute ContentSHA256:
- // the aws-sdk-go v2 SDK uses a ReadSeeker to avoid having to copy the
- // block, so there is no extra memory use to be concerned about. See
- // makeSha256Reader in aws/signer/v4/v4.go. In fact, we explicitly disable
- // calculating the Sha-256 because we don't need it; we already use md5sum
- // hashes that match the name of the block.
- _, err := uploader.UploadWithContext(ctx, &uploadInput, s3manager.WithUploaderRequestOptions(func(r *aws.Request) {
- r.HTTPRequest.Header.Set("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD")
- }))
+ _, err := uploader.Upload(ctx, &uploadInput,
+ // Avoid precomputing SHA256 before sending.
+ manager.WithUploaderRequestOptions(s3.WithAPIOptions(v4.SwapComputePayloadSHA256ForUnsignedPayloadMiddleware)),
+ )
v.bucket.stats.TickOps("put")
v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.PutOps)
PageSize int
Stats *s3awsbucketStats
ContinuationToken string
- buf []s3.Object
+ buf []types.Object
err error
}
// First fetches the first page and returns the first item. It returns
// nil if the response is the empty set or an error occurs.
-func (lister *s3awsLister) First() *s3.Object {
+func (lister *s3awsLister) First() *types.Object {
lister.getPage()
return lister.pop()
}
// Next returns the next item, fetching the next page if necessary. It
// returns nil if the last available item has already been fetched, or
// an error occurs.
-func (lister *s3awsLister) Next() *s3.Object {
+func (lister *s3awsLister) Next() *types.Object {
if len(lister.buf) == 0 && lister.ContinuationToken != "" {
lister.getPage()
}
if lister.ContinuationToken == "" {
input = &s3.ListObjectsV2Input{
Bucket: aws.String(lister.Bucket.bucket),
- MaxKeys: aws.Int64(int64(lister.PageSize)),
+ MaxKeys: aws.Int32(int32(lister.PageSize)),
Prefix: aws.String(lister.Prefix),
}
} else {
input = &s3.ListObjectsV2Input{
Bucket: aws.String(lister.Bucket.bucket),
- MaxKeys: aws.Int64(int64(lister.PageSize)),
+ MaxKeys: aws.Int32(int32(lister.PageSize)),
Prefix: aws.String(lister.Prefix),
ContinuationToken: &lister.ContinuationToken,
}
}
- req := lister.Bucket.svc.ListObjectsV2Request(input)
- resp, err := req.Send(context.Background())
+ resp, err := lister.Bucket.svc.ListObjectsV2(context.Background(), input)
if err != nil {
- if aerr, ok := err.(awserr.Error); ok {
+ var aerr smithy.APIError
+ if errors.As(err, &aerr) {
lister.err = aerr
} else {
lister.err = err
} else {
lister.ContinuationToken = ""
}
- lister.buf = make([]s3.Object, 0, len(resp.Contents))
+ lister.buf = make([]types.Object, 0, len(resp.Contents))
for _, key := range resp.Contents {
if !strings.HasPrefix(*key.Key, lister.Prefix) {
lister.Logger.Warnf("s3awsLister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, *key.Key)
}
}
-func (lister *s3awsLister) pop() (k *s3.Object) {
+func (lister *s3awsLister) pop() (k *types.Object) {
if len(lister.buf) > 0 {
k = &lister.buf[0]
lister.buf = lister.buf[1:]
Bucket: aws.String(b.bucket),
Key: aws.String(path),
}
- req := b.svc.DeleteObjectRequest(input)
- _, err := req.Send(context.Background())
+ _, err := b.svc.DeleteObject(context.Background(), input)
b.stats.TickOps("delete")
b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
b.stats.TickErr(err)
return
}
errType := fmt.Sprintf("%T", err)
- if aerr, ok := err.(awserr.Error); ok {
- if reqErr, ok := err.(awserr.RequestFailure); ok {
- // A service error occurred
- errType = errType + fmt.Sprintf(" %d %s", reqErr.StatusCode(), aerr.Code())
+ if aerr := smithy.APIError(nil); errors.As(err, &aerr) {
+ if rerr := interface{ HTTPStatusCode() int }(nil); errors.As(err, &rerr) {
+ errType = errType + fmt.Sprintf(" %d %s", rerr.HTTPStatusCode(), aerr.ErrorCode())
} else {
- errType = errType + fmt.Sprintf(" 000 %s", aerr.Code())
+ errType = errType + fmt.Sprintf(" 000 %s", aerr.ErrorCode())
}
}
s.statsTicker.TickErr(err, errType)