X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/5ce5bf966dfabbc0beb7330d4c976a23fde3fd83..ad17f016de2feecc24a163af77c9c9c5add7dc3b:/services/keepstore/s3aws_volume.go diff --git a/services/keepstore/s3aws_volume.go b/services/keepstore/s3aws_volume.go index 4064809d5d..8f2c275391 100644 --- a/services/keepstore/s3aws_volume.go +++ b/services/keepstore/s3aws_volume.go @@ -33,6 +33,21 @@ import ( "github.com/sirupsen/logrus" ) +func init() { + driver["S3"] = newS3AWSVolume +} + +const ( + s3DefaultReadTimeout = arvados.Duration(10 * time.Minute) + s3DefaultConnectTimeout = arvados.Duration(time.Minute) + maxClockSkew = 600 * time.Second + nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT" +) + +var ( + ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false") +) + // S3AWSVolume implements Volume using an S3 bucket. type S3AWSVolume struct { arvados.S3VolumeDriverParameters @@ -58,22 +73,6 @@ type s3AWSbucket struct { mu sync.Mutex } -// chooseS3VolumeDriver distinguishes between the old goamz driver and -// aws-sdk-go based on the UseAWSS3v2Driver feature flag -func chooseS3VolumeDriver(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) { - v := &S3Volume{cluster: cluster, volume: volume, metrics: metrics} - err := json.Unmarshal(volume.DriverParameters, v) - if err != nil { - return nil, err - } - if v.UseAWSS3v2Driver { - logger.Debugln("Using AWS S3 v2 driver") - return newS3AWSVolume(cluster, volume, logger, metrics) - } - logger.Debugln("Using goamz S3 driver") - return newS3Volume(cluster, volume, logger, metrics) -} - const ( PartSize = 5 * 1024 * 1024 ReadConcurrency = 13 @@ -112,11 +111,12 @@ func newS3AWSVolume(cluster *arvados.Cluster, volume arvados.Volume, logger logr } func (v *S3AWSVolume) translateError(err error) error { - if aerr, ok := err.(awserr.Error); ok { - switch aerr.Code() { - case "NotFound": + if _, ok := err.(*aws.RequestCanceledError); ok { + return context.Canceled + } else if aerr, ok := err.(awserr.Error); ok { + if aerr.Code() == "NotFound" { return os.ErrNotExist - case "NoSuchKey": + } else if aerr.Code() == "NoSuchKey" { return os.ErrNotExist } } @@ -181,19 +181,25 @@ func (v *S3AWSVolume) check(ec2metadataHostname string) error { if v.Endpoint != "" && service == "s3" { return aws.Endpoint{ URL: v.Endpoint, - SigningRegion: v.Region, + SigningRegion: region, }, nil } else if service == "ec2metadata" && ec2metadataHostname != "" { return aws.Endpoint{ URL: ec2metadataHostname, }, nil + } else { + return defaultResolver.ResolveEndpoint(service, region) } - - return defaultResolver.ResolveEndpoint(service, region) } cfg.EndpointResolver = aws.EndpointResolverFunc(myCustomResolver) } - + if v.Region == "" { + // Endpoint is already specified (otherwise we would + // have errored out above), but Region is also + // required by the aws sdk, in order to determine + // SignatureVersions. + v.Region = "us-east-1" + } cfg.Region = v.Region // Zero timeouts mean "wait forever", which is a bad @@ -467,14 +473,9 @@ func (v *S3AWSVolume) Get(ctx context.Context, loc string, buf []byte) (int, err // through 'buf []byte', and we don't want to allocate two buffers for each // read request. Instead, use a version of ReadBlock that accepts 'buf []byte' // as an input. - return v.ReadBlock(ctx, loc, buf) -} - -func (v *S3AWSVolume) ReadBlock(ctx context.Context, loc string, buf []byte) (int, error) { key := v.key(loc) count, err := v.readWorker(ctx, key, buf) if err == nil { - v.bucket.stats.TickInBytes(uint64(count)) return count, err } @@ -501,7 +502,6 @@ func (v *S3AWSVolume) ReadBlock(ctx context.Context, loc string, buf []byte) (in err = v.translateError(err) return 0, err } - v.bucket.stats.TickInBytes(uint64(count)) return count, err } @@ -521,12 +521,8 @@ func (v *S3AWSVolume) readWorker(ctx context.Context, key string, buf []byte) (i v.bucket.stats.TickOps("get") v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.GetOps) v.bucket.stats.TickErr(err) - if err != nil { - return 0, v.translateError(err) - } - buf = awsBuf.Bytes() - - return int(count), err + v.bucket.stats.TickInBytes(uint64(count)) + return int(count), v.translateError(err) } func (v *S3AWSVolume) writeObject(ctx context.Context, key string, r io.Reader) error { @@ -546,7 +542,7 @@ func (v *S3AWSVolume) writeObject(ctx context.Context, key string, r io.Reader) var contentMD5 string md5, err := hex.DecodeString(loc) if err != nil { - return err + return v.translateError(err) } contentMD5 = base64.StdEncoding.EncodeToString(md5) uploadInput.ContentMD5 = &contentMD5 @@ -575,22 +571,18 @@ func (v *S3AWSVolume) writeObject(ctx context.Context, key string, r io.Reader) v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.PutOps) v.bucket.stats.TickErr(err) - return err + return v.translateError(err) } // Put writes a block. func (v *S3AWSVolume) Put(ctx context.Context, loc string, block []byte) error { // Do not use putWithPipe here; we want to pass an io.ReadSeeker to the S3 // sdk to avoid memory allocation there. See #17339 for more information. - return v.WriteBlock(ctx, loc, bytes.NewReader(block)) -} - -// WriteBlock implements BlockWriter. -func (v *S3AWSVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) error { if v.volume.ReadOnly { return MethodDisabledError } + rdr := bytes.NewReader(block) r := NewCountingReaderAtSeeker(rdr, v.bucket.stats.TickOutBytes) key := v.key(loc) err := v.writeObject(ctx, key, r)