)
func init() {
- driver["S3"] = newS3AWSVolume
+ driver["S3"] = news3Volume
}
const (
)
var (
- ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false")
+ errS3TrashDisabled = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false")
+ s3AWSKeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
+ s3AWSZeroTime time.Time
)
-// S3AWSVolume implements Volume using an S3 bucket.
-type S3AWSVolume struct {
+// s3Volume implements Volume using an S3 bucket.
+type s3Volume struct {
arvados.S3VolumeDriverParameters
AuthToken string // populated automatically when IAMRole is used
AuthExpiration time.Time // populated automatically when IAMRole is used
mu sync.Mutex
}
-const ()
-
-var s3AWSKeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
-var s3AWSZeroTime time.Time
-
-func (v *S3AWSVolume) isKeepBlock(s string) (string, bool) {
+func (v *s3Volume) isKeepBlock(s string) (string, bool) {
if v.PrefixLength > 0 && len(s) == v.PrefixLength+33 && s[:v.PrefixLength] == s[v.PrefixLength+1:v.PrefixLength*2+1] {
s = s[v.PrefixLength+1:]
}
// Return the key used for a given loc. If PrefixLength==0 then
// key("abcdef0123") is "abcdef0123", if PrefixLength==3 then key is
// "abc/abcdef0123", etc.
-func (v *S3AWSVolume) key(loc string) string {
+func (v *s3Volume) key(loc string) string {
if v.PrefixLength > 0 && v.PrefixLength < len(loc)-1 {
return loc[:v.PrefixLength] + "/" + loc
} else {
}
}
-func newS3AWSVolume(params newVolumeParams) (volume, error) {
- v := &S3AWSVolume{
+func news3Volume(params newVolumeParams) (volume, error) {
+ v := &s3Volume{
cluster: params.Cluster,
volume: params.ConfigVolume,
metrics: params.MetricsVecs,
return v, v.check("")
}
-func (v *S3AWSVolume) translateError(err error) error {
+func (v *s3Volume) translateError(err error) error {
if _, ok := err.(*aws.RequestCanceledError); ok {
return context.Canceled
} else if aerr, ok := err.(awserr.Error); ok {
//
// (If something goes wrong during the copy, the error will be
// embedded in the 200 OK response)
-func (v *S3AWSVolume) safeCopy(dst, src string) error {
+func (v *s3Volume) safeCopy(dst, src string) error {
input := &s3.CopyObjectInput{
Bucket: aws.String(v.bucket.bucket),
ContentType: aws.String("application/octet-stream"),
return nil
}
-func (v *S3AWSVolume) check(ec2metadataHostname string) error {
+func (v *s3Volume) check(ec2metadataHostname string) error {
if v.Bucket == "" {
return errors.New("DriverParameters: Bucket must be provided")
}
}
// DeviceID returns a globally unique ID for the storage bucket.
-func (v *S3AWSVolume) DeviceID() string {
+func (v *s3Volume) DeviceID() string {
return "s3://" + v.Endpoint + "/" + v.Bucket
}
// EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime
// and deletes them from the volume.
-func (v *S3AWSVolume) EmptyTrash() {
+func (v *s3Volume) EmptyTrash() {
var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
// Define "ready to delete" as "...when EmptyTrash started".
// exist. If the timestamps on "recent/X" and "trash/X" indicate there
// was a race between Put and Trash, fixRace recovers from the race by
// Untrashing the block.
-func (v *S3AWSVolume) fixRace(key string) bool {
+func (v *s3Volume) fixRace(key string) bool {
trash, err := v.head("trash/" + key)
if err != nil {
if !os.IsNotExist(v.translateError(err)) {
return true
}
-func (v *S3AWSVolume) head(key string) (result *s3.HeadObjectOutput, err error) {
+func (v *s3Volume) head(key string) (result *s3.HeadObjectOutput, err error) {
input := &s3.HeadObjectInput{
Bucket: aws.String(v.bucket.bucket),
Key: aws.String(key),
// BlockRead reads a Keep block that has been stored as a block blob
// in the S3 bucket.
-func (v *S3AWSVolume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) {
+func (v *s3Volume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) {
key := v.key(hash)
buf, err := v.bufferPool.GetContext(ctx)
if err != nil {
return streamer.Wrote(), nil
}
-func (v *S3AWSVolume) readWorker(ctx context.Context, key string, dst io.WriterAt) error {
+func (v *s3Volume) readWorker(ctx context.Context, key string, dst io.WriterAt) error {
downloader := s3manager.NewDownloaderWithClient(v.bucket.svc, func(u *s3manager.Downloader) {
u.PartSize = s3downloaderPartSize
u.Concurrency = s3downloaderReadConcurrency
return v.translateError(err)
}
-func (v *S3AWSVolume) writeObject(ctx context.Context, key string, r io.Reader) error {
+func (v *s3Volume) writeObject(ctx context.Context, key string, r io.Reader) error {
if r == nil {
// r == nil leads to a memory violation in func readFillBuf in
// aws-sdk-go-v2@v0.23.0/service/s3/s3manager/upload.go
}
// Put writes a block.
-func (v *S3AWSVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
+func (v *s3Volume) BlockWrite(ctx context.Context, hash string, data []byte) error {
// Do not use putWithPipe here; we want to pass an io.ReadSeeker to the S3
// sdk to avoid memory allocation there. See #17339 for more information.
rdr := bytes.NewReader(data)
// Index writes a complete list of locators with the given prefix
// for which Get() can retrieve data.
-func (v *S3AWSVolume) Index(ctx context.Context, prefix string, writer io.Writer) error {
+func (v *s3Volume) Index(ctx context.Context, prefix string, writer io.Writer) error {
prefix = v.key(prefix)
// Use a merge sort to find matching sets of X and recent/X.
dataL := s3awsLister{
}
// Mtime returns the stored timestamp for the given locator.
-func (v *S3AWSVolume) Mtime(loc string) (time.Time, error) {
+func (v *s3Volume) Mtime(loc string) (time.Time, error) {
key := v.key(loc)
_, err := v.head(key)
if err != nil {
}
// InternalStats returns bucket I/O and API call counters.
-func (v *S3AWSVolume) InternalStats() interface{} {
+func (v *s3Volume) InternalStats() interface{} {
return &v.bucket.stats
}
// BlockTouch sets the timestamp for the given locator to the current time.
-func (v *S3AWSVolume) BlockTouch(hash string) error {
+func (v *s3Volume) BlockTouch(hash string) error {
key := v.key(hash)
_, err := v.head(key)
err = v.translateError(err)
// checkRaceWindow returns a non-nil error if trash/key is, or might
// be, in the race window (i.e., it's not safe to trash key).
-func (v *S3AWSVolume) checkRaceWindow(key string) error {
+func (v *s3Volume) checkRaceWindow(key string) error {
resp, err := v.head("trash/" + key)
err = v.translateError(err)
if os.IsNotExist(err) {
}
// Trash a Keep block.
-func (v *S3AWSVolume) BlockTrash(loc string) error {
+func (v *s3Volume) BlockTrash(loc string) error {
if t, err := v.Mtime(loc); err != nil {
return err
} else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
key := v.key(loc)
if v.cluster.Collections.BlobTrashLifetime == 0 {
if !v.UnsafeDelete {
- return ErrS3TrashDisabled
+ return errS3TrashDisabled
}
return v.translateError(v.bucket.Del(key))
}
}
// BlockUntrash moves block from trash back into store
-func (v *S3AWSVolume) BlockUntrash(hash string) error {
+func (v *s3Volume) BlockUntrash(hash string) error {
key := v.key(hash)
err := v.safeCopy(key, "trash/"+key)
if err != nil {