X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/07baa0ed049746514495d1648c1aef0c40545141..0d85dd75361bfab5e90479aa85fb4782860e636e:/services/keepstore/s3aws_volume.go diff --git a/services/keepstore/s3aws_volume.go b/services/keepstore/s3aws_volume.go index f7cff6d33e..18b30f4638 100644 --- a/services/keepstore/s3aws_volume.go +++ b/services/keepstore/s3aws_volume.go @@ -33,6 +33,21 @@ import ( "github.com/sirupsen/logrus" ) +func init() { + driver["S3"] = newS3AWSVolume +} + +const ( + s3DefaultReadTimeout = arvados.Duration(10 * time.Minute) + s3DefaultConnectTimeout = arvados.Duration(time.Minute) + maxClockSkew = 600 * time.Second + nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT" +) + +var ( + ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false") +) + // S3AWSVolume implements Volume using an S3 bucket. type S3AWSVolume struct { arvados.S3VolumeDriverParameters @@ -58,24 +73,6 @@ type s3AWSbucket struct { mu sync.Mutex } -// chooseS3VolumeDriver distinguishes between the old goamz driver and -// aws-sdk-go based on the UseAWSS3v2Driver feature flag -func chooseS3VolumeDriver(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) { - v := &S3Volume{cluster: cluster, volume: volume, metrics: metrics} - // Default value will be overriden if it happens to be defined in the config - v.S3VolumeDriverParameters.UseAWSS3v2Driver = true - err := json.Unmarshal(volume.DriverParameters, v) - if err != nil { - return nil, err - } - if v.UseAWSS3v2Driver { - logger.Debugln("Using AWS S3 v2 driver") - return newS3AWSVolume(cluster, volume, logger, metrics) - } - logger.Debugln("Using goamz S3 driver") - return newS3Volume(cluster, volume, logger, metrics) -} - const ( PartSize = 5 * 1024 * 1024 ReadConcurrency = 13 @@ -184,19 +181,25 @@ func (v *S3AWSVolume) check(ec2metadataHostname string) error { if v.Endpoint != "" && service == "s3" { return aws.Endpoint{ URL: v.Endpoint, - SigningRegion: v.Region, + SigningRegion: region, }, nil } else if service == "ec2metadata" && ec2metadataHostname != "" { return aws.Endpoint{ URL: ec2metadataHostname, }, nil + } else { + return defaultResolver.ResolveEndpoint(service, region) } - - return defaultResolver.ResolveEndpoint(service, region) } cfg.EndpointResolver = aws.EndpointResolverFunc(myCustomResolver) } - + if v.Region == "" { + // Endpoint is already specified (otherwise we would + // have errored out above), but Region is also + // required by the aws sdk, in order to determine + // SignatureVersions. + v.Region = "us-east-1" + } cfg.Region = v.Region // Zero timeouts mean "wait forever", which is a bad @@ -292,10 +295,6 @@ func (v *S3AWSVolume) Compare(ctx context.Context, loc string, expect []byte) er // EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime // and deletes them from the volume. func (v *S3AWSVolume) EmptyTrash() { - if v.cluster.Collections.BlobDeleteConcurrency < 1 { - return - } - var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64 // Define "ready to delete" as "...when EmptyTrash started". @@ -847,7 +846,7 @@ func (b *s3AWSbucket) Del(path string) error { // Trash a Keep block. func (v *S3AWSVolume) Trash(loc string) error { - if v.volume.ReadOnly { + if v.volume.ReadOnly && !v.volume.AllowTrashWhenReadOnly { return MethodDisabledError } if t, err := v.Mtime(loc); err != nil {