2960: Fix some needlessly exported identifiers.
[arvados.git] / services / keepstore / s3aws_volume.go
index 2417bb81496f918f1114814c3acd7fb5c25e1227..8e93eed12c839175ba050b18e5772161ad82bd47 100644 (file)
@@ -34,7 +34,7 @@ import (
 )
 
 func init() {
-       driver["S3"] = newS3AWSVolume
+       driver["S3"] = news3Volume
 }
 
 const (
@@ -49,11 +49,13 @@ const (
 )
 
 var (
-       ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false")
+       errS3TrashDisabled   = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false")
+       s3AWSKeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
+       s3AWSZeroTime        time.Time
 )
 
-// S3AWSVolume implements Volume using an S3 bucket.
-type S3AWSVolume struct {
+// s3Volume implements Volume using an S3 bucket.
+type s3Volume struct {
        arvados.S3VolumeDriverParameters
        AuthToken      string    // populated automatically when IAMRole is used
        AuthExpiration time.Time // populated automatically when IAMRole is used
@@ -78,12 +80,7 @@ type s3AWSbucket struct {
        mu     sync.Mutex
 }
 
-const ()
-
-var s3AWSKeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
-var s3AWSZeroTime time.Time
-
-func (v *S3AWSVolume) isKeepBlock(s string) (string, bool) {
+func (v *s3Volume) isKeepBlock(s string) (string, bool) {
        if v.PrefixLength > 0 && len(s) == v.PrefixLength+33 && s[:v.PrefixLength] == s[v.PrefixLength+1:v.PrefixLength*2+1] {
                s = s[v.PrefixLength+1:]
        }
@@ -93,7 +90,7 @@ func (v *S3AWSVolume) isKeepBlock(s string) (string, bool) {
 // Return the key used for a given loc. If PrefixLength==0 then
 // key("abcdef0123") is "abcdef0123", if PrefixLength==3 then key is
 // "abc/abcdef0123", etc.
-func (v *S3AWSVolume) key(loc string) string {
+func (v *s3Volume) key(loc string) string {
        if v.PrefixLength > 0 && v.PrefixLength < len(loc)-1 {
                return loc[:v.PrefixLength] + "/" + loc
        } else {
@@ -101,8 +98,8 @@ func (v *S3AWSVolume) key(loc string) string {
        }
 }
 
-func newS3AWSVolume(params newVolumeParams) (volume, error) {
-       v := &S3AWSVolume{
+func news3Volume(params newVolumeParams) (volume, error) {
+       v := &s3Volume{
                cluster:    params.Cluster,
                volume:     params.ConfigVolume,
                metrics:    params.MetricsVecs,
@@ -116,7 +113,7 @@ func newS3AWSVolume(params newVolumeParams) (volume, error) {
        return v, v.check("")
 }
 
-func (v *S3AWSVolume) translateError(err error) error {
+func (v *s3Volume) translateError(err error) error {
        if _, ok := err.(*aws.RequestCanceledError); ok {
                return context.Canceled
        } else if aerr, ok := err.(awserr.Error); ok {
@@ -135,7 +132,7 @@ func (v *S3AWSVolume) translateError(err error) error {
 //
 // (If something goes wrong during the copy, the error will be
 // embedded in the 200 OK response)
-func (v *S3AWSVolume) safeCopy(dst, src string) error {
+func (v *s3Volume) safeCopy(dst, src string) error {
        input := &s3.CopyObjectInput{
                Bucket:      aws.String(v.bucket.bucket),
                ContentType: aws.String("application/octet-stream"),
@@ -161,7 +158,7 @@ func (v *S3AWSVolume) safeCopy(dst, src string) error {
        return nil
 }
 
-func (v *S3AWSVolume) check(ec2metadataHostname string) error {
+func (v *s3Volume) check(ec2metadataHostname string) error {
        if v.Bucket == "" {
                return errors.New("DriverParameters: Bucket must be provided")
        }
@@ -238,13 +235,13 @@ func (v *S3AWSVolume) check(ec2metadataHostname string) error {
 }
 
 // DeviceID returns a globally unique ID for the storage bucket.
-func (v *S3AWSVolume) DeviceID() string {
+func (v *s3Volume) DeviceID() string {
        return "s3://" + v.Endpoint + "/" + v.Bucket
 }
 
 // EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime
 // and deletes them from the volume.
-func (v *S3AWSVolume) EmptyTrash() {
+func (v *s3Volume) EmptyTrash() {
        var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
 
        // Define "ready to delete" as "...when EmptyTrash started".
@@ -358,7 +355,7 @@ func (v *S3AWSVolume) EmptyTrash() {
 // exist. If the timestamps on "recent/X" and "trash/X" indicate there
 // was a race between Put and Trash, fixRace recovers from the race by
 // Untrashing the block.
-func (v *S3AWSVolume) fixRace(key string) bool {
+func (v *s3Volume) fixRace(key string) bool {
        trash, err := v.head("trash/" + key)
        if err != nil {
                if !os.IsNotExist(v.translateError(err)) {
@@ -392,7 +389,7 @@ func (v *S3AWSVolume) fixRace(key string) bool {
        return true
 }
 
-func (v *S3AWSVolume) head(key string) (result *s3.HeadObjectOutput, err error) {
+func (v *s3Volume) head(key string) (result *s3.HeadObjectOutput, err error) {
        input := &s3.HeadObjectInput{
                Bucket: aws.String(v.bucket.bucket),
                Key:    aws.String(key),
@@ -414,7 +411,7 @@ func (v *S3AWSVolume) head(key string) (result *s3.HeadObjectOutput, err error)
 
 // BlockRead reads a Keep block that has been stored as a block blob
 // in the S3 bucket.
-func (v *S3AWSVolume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) {
+func (v *s3Volume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) {
        key := v.key(hash)
        buf, err := v.bufferPool.GetContext(ctx)
        if err != nil {
@@ -460,7 +457,7 @@ func (v *S3AWSVolume) BlockRead(ctx context.Context, hash string, writeTo io.Wri
        return streamer.Wrote(), nil
 }
 
-func (v *S3AWSVolume) readWorker(ctx context.Context, key string, dst io.WriterAt) error {
+func (v *s3Volume) readWorker(ctx context.Context, key string, dst io.WriterAt) error {
        downloader := s3manager.NewDownloaderWithClient(v.bucket.svc, func(u *s3manager.Downloader) {
                u.PartSize = s3downloaderPartSize
                u.Concurrency = s3downloaderReadConcurrency
@@ -476,7 +473,7 @@ func (v *S3AWSVolume) readWorker(ctx context.Context, key string, dst io.WriterA
        return v.translateError(err)
 }
 
-func (v *S3AWSVolume) writeObject(ctx context.Context, key string, r io.Reader) error {
+func (v *s3Volume) writeObject(ctx context.Context, key string, r io.Reader) error {
        if r == nil {
                // r == nil leads to a memory violation in func readFillBuf in
                // aws-sdk-go-v2@v0.23.0/service/s3/s3manager/upload.go
@@ -526,7 +523,7 @@ func (v *S3AWSVolume) writeObject(ctx context.Context, key string, r io.Reader)
 }
 
 // Put writes a block.
-func (v *S3AWSVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
+func (v *s3Volume) BlockWrite(ctx context.Context, hash string, data []byte) error {
        // Do not use putWithPipe here; we want to pass an io.ReadSeeker to the S3
        // sdk to avoid memory allocation there. See #17339 for more information.
        rdr := bytes.NewReader(data)
@@ -628,7 +625,7 @@ func (lister *s3awsLister) pop() (k *s3.Object) {
 
 // Index writes a complete list of locators with the given prefix
 // for which Get() can retrieve data.
-func (v *S3AWSVolume) Index(ctx context.Context, prefix string, writer io.Writer) error {
+func (v *s3Volume) Index(ctx context.Context, prefix string, writer io.Writer) error {
        prefix = v.key(prefix)
        // Use a merge sort to find matching sets of X and recent/X.
        dataL := s3awsLister{
@@ -695,7 +692,7 @@ func (v *S3AWSVolume) Index(ctx context.Context, prefix string, writer io.Writer
 }
 
 // Mtime returns the stored timestamp for the given locator.
-func (v *S3AWSVolume) Mtime(loc string) (time.Time, error) {
+func (v *s3Volume) Mtime(loc string) (time.Time, error) {
        key := v.key(loc)
        _, err := v.head(key)
        if err != nil {
@@ -724,12 +721,12 @@ func (v *S3AWSVolume) Mtime(loc string) (time.Time, error) {
 }
 
 // InternalStats returns bucket I/O and API call counters.
-func (v *S3AWSVolume) InternalStats() interface{} {
+func (v *s3Volume) InternalStats() interface{} {
        return &v.bucket.stats
 }
 
 // BlockTouch sets the timestamp for the given locator to the current time.
-func (v *S3AWSVolume) BlockTouch(hash string) error {
+func (v *s3Volume) BlockTouch(hash string) error {
        key := v.key(hash)
        _, err := v.head(key)
        err = v.translateError(err)
@@ -745,7 +742,7 @@ func (v *S3AWSVolume) BlockTouch(hash string) error {
 
 // checkRaceWindow returns a non-nil error if trash/key is, or might
 // be, in the race window (i.e., it's not safe to trash key).
-func (v *S3AWSVolume) checkRaceWindow(key string) error {
+func (v *s3Volume) checkRaceWindow(key string) error {
        resp, err := v.head("trash/" + key)
        err = v.translateError(err)
        if os.IsNotExist(err) {
@@ -785,7 +782,7 @@ func (b *s3AWSbucket) Del(path string) error {
 }
 
 // Trash a Keep block.
-func (v *S3AWSVolume) BlockTrash(loc string) error {
+func (v *s3Volume) BlockTrash(loc string) error {
        if t, err := v.Mtime(loc); err != nil {
                return err
        } else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
@@ -794,7 +791,7 @@ func (v *S3AWSVolume) BlockTrash(loc string) error {
        key := v.key(loc)
        if v.cluster.Collections.BlobTrashLifetime == 0 {
                if !v.UnsafeDelete {
-                       return ErrS3TrashDisabled
+                       return errS3TrashDisabled
                }
                return v.translateError(v.bucket.Del(key))
        }
@@ -810,7 +807,7 @@ func (v *S3AWSVolume) BlockTrash(loc string) error {
 }
 
 // BlockUntrash moves block from trash back into store
-func (v *S3AWSVolume) BlockUntrash(hash string) error {
+func (v *s3Volume) BlockUntrash(hash string) error {
        key := v.key(hash)
        err := v.safeCopy(key, "trash/"+key)
        if err != nil {