2960: Fix some needlessly exported identifiers.
authorTom Clegg <tom@curii.com>
Tue, 13 Feb 2024 19:48:09 +0000 (14:48 -0500)
committerTom Clegg <tom@curii.com>
Tue, 13 Feb 2024 20:40:02 +0000 (15:40 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

lib/crunchrun/integration_test.go
services/keepstore/azure_blob_volume.go
services/keepstore/azure_blob_volume_test.go
services/keepstore/keepstore.go
services/keepstore/router.go
services/keepstore/router_test.go
services/keepstore/s3aws_volume.go
services/keepstore/s3aws_volume_test.go
services/keepstore/unix_volume.go
services/keepstore/unix_volume_test.go
services/keepstore/volume.go

index d569020824c22373d5098e0afd4c14d6156dd773..4f0100b2677f956b1af9dadcbd5b6082a1be0ab0 100644 (file)
@@ -20,7 +20,6 @@ import (
        "git.arvados.org/arvados.git/sdk/go/arvadostest"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/keepclient"
-       "git.arvados.org/arvados.git/services/keepstore"
        . "gopkg.in/check.v1"
 )
 
@@ -195,7 +194,9 @@ func (s *integrationSuite) TestRunTrivialContainerWithLocalKeepstore(c *C) {
                        volume.Replication = 2
                        cluster.Volumes[uuid] = volume
 
-                       var v keepstore.UnixVolume
+                       var v struct {
+                               Root string
+                       }
                        err = json.Unmarshal(volume.DriverParameters, &v)
                        c.Assert(err, IsNil)
                        err = os.Mkdir(v.Root, 0777)
index bdd669bb4649e2471f0a36035a7f96dfcf6ccd8a..31660614f3c8fd213e7859ae9634e798ac83754b 100644 (file)
@@ -31,7 +31,7 @@ func init() {
 }
 
 func newAzureBlobVolume(params newVolumeParams) (volume, error) {
-       v := &AzureBlobVolume{
+       v := &azureBlobVolume{
                RequestTimeout:    azureDefaultRequestTimeout,
                WriteRaceInterval: azureDefaultWriteRaceInterval,
                WriteRacePollTime: azureDefaultWriteRacePollTime,
@@ -79,7 +79,7 @@ func newAzureBlobVolume(params newVolumeParams) (volume, error) {
        return v, v.check()
 }
 
-func (v *AzureBlobVolume) check() error {
+func (v *azureBlobVolume) check() error {
        lbls := prometheus.Labels{"device_id": v.DeviceID()}
        v.container.stats.opsCounters, v.container.stats.errCounters, v.container.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
        return nil
@@ -93,9 +93,9 @@ const (
        azureDefaultWriteRacePollTime    = arvados.Duration(time.Second)
 )
 
-// An AzureBlobVolume stores and retrieves blocks in an Azure Blob
+// An azureBlobVolume stores and retrieves blocks in an Azure Blob
 // container.
-type AzureBlobVolume struct {
+type azureBlobVolume struct {
        StorageAccountName   string
        StorageAccountKey    string
        StorageBaseURL       string // "" means default, "core.windows.net"
@@ -125,12 +125,12 @@ func (*singleSender) Send(c *storage.Client, req *http.Request) (resp *http.Resp
 }
 
 // DeviceID returns a globally unique ID for the storage container.
-func (v *AzureBlobVolume) DeviceID() string {
+func (v *azureBlobVolume) DeviceID() string {
        return "azure://" + v.StorageBaseURL + "/" + v.StorageAccountName + "/" + v.ContainerName
 }
 
 // Return true if expires_at metadata attribute is found on the block
-func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
+func (v *azureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
        metadata, err := v.container.GetBlobMetadata(loc)
        if err != nil {
                return false, metadata, v.translateError(err)
@@ -147,7 +147,7 @@ func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, err
 // If the block is younger than azureWriteRaceInterval and is
 // unexpectedly empty, assume a BlockWrite operation is in progress,
 // and wait for it to finish writing.
-func (v *AzureBlobVolume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) {
+func (v *azureBlobVolume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) {
        trashed, _, err := v.checkTrashed(hash)
        if err != nil {
                return 0, err
@@ -201,7 +201,7 @@ func (v *AzureBlobVolume) BlockRead(ctx context.Context, hash string, writeTo io
        return streamer.Wrote(), err
 }
 
-func (v *AzureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt) (int, error) {
+func (v *azureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt) (int, error) {
        ctx, cancel := context.WithCancel(ctx)
        defer cancel()
 
@@ -318,7 +318,7 @@ func (v *AzureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt)
 
 // BlockWrite stores a block on the volume. If it already exists, its
 // timestamp is updated.
-func (v *AzureBlobVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
+func (v *azureBlobVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
        // Send the block data through a pipe, so that (if we need to)
        // we can close the pipe early and abandon our
        // CreateBlockBlobFromReader() goroutine, without worrying
@@ -359,7 +359,7 @@ func (v *AzureBlobVolume) BlockWrite(ctx context.Context, hash string, data []by
 }
 
 // BlockTouch updates the last-modified property of a block blob.
-func (v *AzureBlobVolume) BlockTouch(hash string) error {
+func (v *azureBlobVolume) BlockTouch(hash string) error {
        trashed, metadata, err := v.checkTrashed(hash)
        if err != nil {
                return err
@@ -373,7 +373,7 @@ func (v *AzureBlobVolume) BlockTouch(hash string) error {
 }
 
 // Mtime returns the last-modified property of a block blob.
-func (v *AzureBlobVolume) Mtime(hash string) (time.Time, error) {
+func (v *azureBlobVolume) Mtime(hash string) (time.Time, error) {
        trashed, _, err := v.checkTrashed(hash)
        if err != nil {
                return time.Time{}, err
@@ -391,7 +391,7 @@ func (v *AzureBlobVolume) Mtime(hash string) (time.Time, error) {
 
 // Index writes a list of Keep blocks that are stored in the
 // container.
-func (v *AzureBlobVolume) Index(ctx context.Context, prefix string, writer io.Writer) error {
+func (v *azureBlobVolume) Index(ctx context.Context, prefix string, writer io.Writer) error {
        params := storage.ListBlobsParameters{
                Prefix:  prefix,
                Include: &storage.IncludeBlobDataset{Metadata: true},
@@ -432,7 +432,7 @@ func (v *AzureBlobVolume) Index(ctx context.Context, prefix string, writer io.Wr
 }
 
 // call v.container.ListBlobs, retrying if needed.
-func (v *AzureBlobVolume) listBlobs(page int, params storage.ListBlobsParameters) (resp storage.BlobListResponse, err error) {
+func (v *azureBlobVolume) listBlobs(page int, params storage.ListBlobsParameters) (resp storage.BlobListResponse, err error) {
        for i := 0; i < v.ListBlobsMaxAttempts; i++ {
                resp, err = v.container.ListBlobs(params)
                err = v.translateError(err)
@@ -448,7 +448,7 @@ func (v *AzureBlobVolume) listBlobs(page int, params storage.ListBlobsParameters
 }
 
 // Trash a Keep block.
-func (v *AzureBlobVolume) BlockTrash(loc string) error {
+func (v *azureBlobVolume) BlockTrash(loc string) error {
        // Ideally we would use If-Unmodified-Since, but that
        // particular condition seems to be ignored by Azure. Instead,
        // we get the Etag before checking Mtime, and use If-Match to
@@ -481,7 +481,7 @@ func (v *AzureBlobVolume) BlockTrash(loc string) error {
 
 // BlockUntrash deletes the expires_at metadata attribute for the
 // specified block blob.
-func (v *AzureBlobVolume) BlockUntrash(hash string) error {
+func (v *azureBlobVolume) BlockUntrash(hash string) error {
        // if expires_at does not exist, return NotFoundError
        metadata, err := v.container.GetBlobMetadata(hash)
        if err != nil {
@@ -499,7 +499,7 @@ func (v *AzureBlobVolume) BlockUntrash(hash string) error {
 
 // If possible, translate an Azure SDK error to a recognizable error
 // like os.ErrNotExist.
-func (v *AzureBlobVolume) translateError(err error) error {
+func (v *azureBlobVolume) translateError(err error) error {
        switch {
        case err == nil:
                return err
@@ -519,13 +519,13 @@ func (v *AzureBlobVolume) translateError(err error) error {
 
 var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
 
-func (v *AzureBlobVolume) isKeepBlock(s string) bool {
+func (v *azureBlobVolume) isKeepBlock(s string) bool {
        return keepBlockRegexp.MatchString(s)
 }
 
 // EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime
 // and deletes them from the volume.
-func (v *AzureBlobVolume) EmptyTrash() {
+func (v *azureBlobVolume) EmptyTrash() {
        var bytesDeleted, bytesInTrash int64
        var blocksDeleted, blocksInTrash int64
 
@@ -593,7 +593,7 @@ func (v *AzureBlobVolume) EmptyTrash() {
 }
 
 // InternalStats returns bucket I/O and API call counters.
-func (v *AzureBlobVolume) InternalStats() interface{} {
+func (v *azureBlobVolume) InternalStats() interface{} {
        return &v.container.stats
 }
 
index a543dfc245174d26f6d8a66fcfa47b6d54215d0e..c629c9dc156367ab636b918d7a3671694a3e511f 100644 (file)
@@ -222,7 +222,7 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
                rw.WriteHeader(http.StatusCreated)
        case r.Method == "PUT" && r.Form.Get("comp") == "metadata":
                // "Set Metadata Headers" API. We don't bother
-               // stubbing "Get Metadata Headers": AzureBlobVolume
+               // stubbing "Get Metadata Headers": azureBlobVolume
                // sets metadata headers only as a way to bump Etag
                // and Last-Modified.
                if !blobExists {
@@ -367,7 +367,7 @@ func (d *azStubDialer) Dial(network, address string) (net.Conn, error) {
 }
 
 type testableAzureBlobVolume struct {
-       *AzureBlobVolume
+       *azureBlobVolume
        azHandler *azStubHandler
        azStub    *httptest.Server
        t         TB
@@ -397,7 +397,7 @@ func (s *stubbedAzureBlobSuite) newTestableAzureBlobVolume(t TB, params newVolum
        azClient.Sender = &singleSender{}
 
        bs := azClient.GetBlobService()
-       v := &AzureBlobVolume{
+       v := &azureBlobVolume{
                ContainerName:        container,
                WriteRaceInterval:    arvados.Duration(time.Millisecond),
                WriteRacePollTime:    arvados.Duration(time.Nanosecond),
@@ -416,7 +416,7 @@ func (s *stubbedAzureBlobSuite) newTestableAzureBlobVolume(t TB, params newVolum
        }
 
        return &testableAzureBlobVolume{
-               AzureBlobVolume: v,
+               azureBlobVolume: v,
                azHandler:       azHandler,
                azStub:          azStub,
                t:               t,
@@ -553,8 +553,8 @@ func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRaceDeadline(c *che
                MetricsVecs:  newVolumeMetricsVecs(prometheus.NewRegistry()),
                BufferPool:   newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()),
        })
-       v.AzureBlobVolume.WriteRaceInterval.Set("2s")
-       v.AzureBlobVolume.WriteRacePollTime.Set("5ms")
+       v.azureBlobVolume.WriteRaceInterval.Set("2s")
+       v.azureBlobVolume.WriteRacePollTime.Set("5ms")
        defer v.Teardown()
 
        v.BlockWriteRaw(TestHash, nil)
index 89afa9089f973b968493c275fa13d0aaa4db8d45..62b6d15e565cfe6e60671055f073a6f9c11de864 100644 (file)
@@ -2,6 +2,12 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
+// Package keepstore implements the keepstore service component and
+// back-end storage drivers.
+//
+// It is an internal module, only intended to be imported by
+// /cmd/arvados-server and other server-side components in this
+// repository.
 package keepstore
 
 import (
@@ -47,7 +53,7 @@ var (
        driver               = make(map[string]volumeDriver)
 )
 
-type IndexOptions struct {
+type indexOptions struct {
        MountUUID string
        Prefix    string
        WriteTo   io.Writer
@@ -653,7 +659,7 @@ func (ks *keepstore) Mounts() []*mount {
        return ks.mountsR
 }
 
-func (ks *keepstore) Index(ctx context.Context, opts IndexOptions) error {
+func (ks *keepstore) Index(ctx context.Context, opts indexOptions) error {
        mounts := ks.mountsR
        if opts.MountUUID != "" {
                mnt, ok := ks.mounts[opts.MountUUID]
index 7ff82aa80f407a9e1f6e7571815209e4347e87d4..256bc18c2698c80e5d0ff7d9fe1eb73d8f4c3747 100644 (file)
@@ -155,7 +155,7 @@ func (rtr *router) handleIndex(w http.ResponseWriter, req *http.Request) {
                prefix = mux.Vars(req)["prefix"]
        }
        cw := &countingWriter{writer: w}
-       err := rtr.keepstore.Index(req.Context(), IndexOptions{
+       err := rtr.keepstore.Index(req.Context(), indexOptions{
                MountUUID: mux.Vars(req)["uuid"],
                Prefix:    prefix,
                WriteTo:   cw,
index a729ee0df3c76d04342cc0abbe7bc54b07faa338..f4bcdd4ae4df19ba8baccb87e7775282957b7fcb 100644 (file)
@@ -302,8 +302,8 @@ func (s *routerSuite) TestBadRequest(c *C) {
                "GET /mounts/blocks/123",
                "GET /trash",
                "GET /pull",
-               "GET /debug.json",
-               "GET /status.json",
+               "GET /debug.json",  // old endpoint, no longer exists
+               "GET /status.json", // old endpoint, no longer exists
                "POST /",
                "POST /aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
                "POST /trash",
index 2417bb81496f918f1114814c3acd7fb5c25e1227..8e93eed12c839175ba050b18e5772161ad82bd47 100644 (file)
@@ -34,7 +34,7 @@ import (
 )
 
 func init() {
-       driver["S3"] = newS3AWSVolume
+       driver["S3"] = news3Volume
 }
 
 const (
@@ -49,11 +49,13 @@ const (
 )
 
 var (
-       ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false")
+       errS3TrashDisabled   = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false")
+       s3AWSKeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
+       s3AWSZeroTime        time.Time
 )
 
-// S3AWSVolume implements Volume using an S3 bucket.
-type S3AWSVolume struct {
+// s3Volume implements Volume using an S3 bucket.
+type s3Volume struct {
        arvados.S3VolumeDriverParameters
        AuthToken      string    // populated automatically when IAMRole is used
        AuthExpiration time.Time // populated automatically when IAMRole is used
@@ -78,12 +80,7 @@ type s3AWSbucket struct {
        mu     sync.Mutex
 }
 
-const ()
-
-var s3AWSKeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
-var s3AWSZeroTime time.Time
-
-func (v *S3AWSVolume) isKeepBlock(s string) (string, bool) {
+func (v *s3Volume) isKeepBlock(s string) (string, bool) {
        if v.PrefixLength > 0 && len(s) == v.PrefixLength+33 && s[:v.PrefixLength] == s[v.PrefixLength+1:v.PrefixLength*2+1] {
                s = s[v.PrefixLength+1:]
        }
@@ -93,7 +90,7 @@ func (v *S3AWSVolume) isKeepBlock(s string) (string, bool) {
 // Return the key used for a given loc. If PrefixLength==0 then
 // key("abcdef0123") is "abcdef0123", if PrefixLength==3 then key is
 // "abc/abcdef0123", etc.
-func (v *S3AWSVolume) key(loc string) string {
+func (v *s3Volume) key(loc string) string {
        if v.PrefixLength > 0 && v.PrefixLength < len(loc)-1 {
                return loc[:v.PrefixLength] + "/" + loc
        } else {
@@ -101,8 +98,8 @@ func (v *S3AWSVolume) key(loc string) string {
        }
 }
 
-func newS3AWSVolume(params newVolumeParams) (volume, error) {
-       v := &S3AWSVolume{
+func news3Volume(params newVolumeParams) (volume, error) {
+       v := &s3Volume{
                cluster:    params.Cluster,
                volume:     params.ConfigVolume,
                metrics:    params.MetricsVecs,
@@ -116,7 +113,7 @@ func newS3AWSVolume(params newVolumeParams) (volume, error) {
        return v, v.check("")
 }
 
-func (v *S3AWSVolume) translateError(err error) error {
+func (v *s3Volume) translateError(err error) error {
        if _, ok := err.(*aws.RequestCanceledError); ok {
                return context.Canceled
        } else if aerr, ok := err.(awserr.Error); ok {
@@ -135,7 +132,7 @@ func (v *S3AWSVolume) translateError(err error) error {
 //
 // (If something goes wrong during the copy, the error will be
 // embedded in the 200 OK response)
-func (v *S3AWSVolume) safeCopy(dst, src string) error {
+func (v *s3Volume) safeCopy(dst, src string) error {
        input := &s3.CopyObjectInput{
                Bucket:      aws.String(v.bucket.bucket),
                ContentType: aws.String("application/octet-stream"),
@@ -161,7 +158,7 @@ func (v *S3AWSVolume) safeCopy(dst, src string) error {
        return nil
 }
 
-func (v *S3AWSVolume) check(ec2metadataHostname string) error {
+func (v *s3Volume) check(ec2metadataHostname string) error {
        if v.Bucket == "" {
                return errors.New("DriverParameters: Bucket must be provided")
        }
@@ -238,13 +235,13 @@ func (v *S3AWSVolume) check(ec2metadataHostname string) error {
 }
 
 // DeviceID returns a globally unique ID for the storage bucket.
-func (v *S3AWSVolume) DeviceID() string {
+func (v *s3Volume) DeviceID() string {
        return "s3://" + v.Endpoint + "/" + v.Bucket
 }
 
 // EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime
 // and deletes them from the volume.
-func (v *S3AWSVolume) EmptyTrash() {
+func (v *s3Volume) EmptyTrash() {
        var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
 
        // Define "ready to delete" as "...when EmptyTrash started".
@@ -358,7 +355,7 @@ func (v *S3AWSVolume) EmptyTrash() {
 // exist. If the timestamps on "recent/X" and "trash/X" indicate there
 // was a race between Put and Trash, fixRace recovers from the race by
 // Untrashing the block.
-func (v *S3AWSVolume) fixRace(key string) bool {
+func (v *s3Volume) fixRace(key string) bool {
        trash, err := v.head("trash/" + key)
        if err != nil {
                if !os.IsNotExist(v.translateError(err)) {
@@ -392,7 +389,7 @@ func (v *S3AWSVolume) fixRace(key string) bool {
        return true
 }
 
-func (v *S3AWSVolume) head(key string) (result *s3.HeadObjectOutput, err error) {
+func (v *s3Volume) head(key string) (result *s3.HeadObjectOutput, err error) {
        input := &s3.HeadObjectInput{
                Bucket: aws.String(v.bucket.bucket),
                Key:    aws.String(key),
@@ -414,7 +411,7 @@ func (v *S3AWSVolume) head(key string) (result *s3.HeadObjectOutput, err error)
 
 // BlockRead reads a Keep block that has been stored as a block blob
 // in the S3 bucket.
-func (v *S3AWSVolume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) {
+func (v *s3Volume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) {
        key := v.key(hash)
        buf, err := v.bufferPool.GetContext(ctx)
        if err != nil {
@@ -460,7 +457,7 @@ func (v *S3AWSVolume) BlockRead(ctx context.Context, hash string, writeTo io.Wri
        return streamer.Wrote(), nil
 }
 
-func (v *S3AWSVolume) readWorker(ctx context.Context, key string, dst io.WriterAt) error {
+func (v *s3Volume) readWorker(ctx context.Context, key string, dst io.WriterAt) error {
        downloader := s3manager.NewDownloaderWithClient(v.bucket.svc, func(u *s3manager.Downloader) {
                u.PartSize = s3downloaderPartSize
                u.Concurrency = s3downloaderReadConcurrency
@@ -476,7 +473,7 @@ func (v *S3AWSVolume) readWorker(ctx context.Context, key string, dst io.WriterA
        return v.translateError(err)
 }
 
-func (v *S3AWSVolume) writeObject(ctx context.Context, key string, r io.Reader) error {
+func (v *s3Volume) writeObject(ctx context.Context, key string, r io.Reader) error {
        if r == nil {
                // r == nil leads to a memory violation in func readFillBuf in
                // aws-sdk-go-v2@v0.23.0/service/s3/s3manager/upload.go
@@ -526,7 +523,7 @@ func (v *S3AWSVolume) writeObject(ctx context.Context, key string, r io.Reader)
 }
 
 // Put writes a block.
-func (v *S3AWSVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
+func (v *s3Volume) BlockWrite(ctx context.Context, hash string, data []byte) error {
        // Do not use putWithPipe here; we want to pass an io.ReadSeeker to the S3
        // sdk to avoid memory allocation there. See #17339 for more information.
        rdr := bytes.NewReader(data)
@@ -628,7 +625,7 @@ func (lister *s3awsLister) pop() (k *s3.Object) {
 
 // Index writes a complete list of locators with the given prefix
 // for which Get() can retrieve data.
-func (v *S3AWSVolume) Index(ctx context.Context, prefix string, writer io.Writer) error {
+func (v *s3Volume) Index(ctx context.Context, prefix string, writer io.Writer) error {
        prefix = v.key(prefix)
        // Use a merge sort to find matching sets of X and recent/X.
        dataL := s3awsLister{
@@ -695,7 +692,7 @@ func (v *S3AWSVolume) Index(ctx context.Context, prefix string, writer io.Writer
 }
 
 // Mtime returns the stored timestamp for the given locator.
-func (v *S3AWSVolume) Mtime(loc string) (time.Time, error) {
+func (v *s3Volume) Mtime(loc string) (time.Time, error) {
        key := v.key(loc)
        _, err := v.head(key)
        if err != nil {
@@ -724,12 +721,12 @@ func (v *S3AWSVolume) Mtime(loc string) (time.Time, error) {
 }
 
 // InternalStats returns bucket I/O and API call counters.
-func (v *S3AWSVolume) InternalStats() interface{} {
+func (v *s3Volume) InternalStats() interface{} {
        return &v.bucket.stats
 }
 
 // BlockTouch sets the timestamp for the given locator to the current time.
-func (v *S3AWSVolume) BlockTouch(hash string) error {
+func (v *s3Volume) BlockTouch(hash string) error {
        key := v.key(hash)
        _, err := v.head(key)
        err = v.translateError(err)
@@ -745,7 +742,7 @@ func (v *S3AWSVolume) BlockTouch(hash string) error {
 
 // checkRaceWindow returns a non-nil error if trash/key is, or might
 // be, in the race window (i.e., it's not safe to trash key).
-func (v *S3AWSVolume) checkRaceWindow(key string) error {
+func (v *s3Volume) checkRaceWindow(key string) error {
        resp, err := v.head("trash/" + key)
        err = v.translateError(err)
        if os.IsNotExist(err) {
@@ -785,7 +782,7 @@ func (b *s3AWSbucket) Del(path string) error {
 }
 
 // Trash a Keep block.
-func (v *S3AWSVolume) BlockTrash(loc string) error {
+func (v *s3Volume) BlockTrash(loc string) error {
        if t, err := v.Mtime(loc); err != nil {
                return err
        } else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
@@ -794,7 +791,7 @@ func (v *S3AWSVolume) BlockTrash(loc string) error {
        key := v.key(loc)
        if v.cluster.Collections.BlobTrashLifetime == 0 {
                if !v.UnsafeDelete {
-                       return ErrS3TrashDisabled
+                       return errS3TrashDisabled
                }
                return v.translateError(v.bucket.Del(key))
        }
@@ -810,7 +807,7 @@ func (v *S3AWSVolume) BlockTrash(loc string) error {
 }
 
 // BlockUntrash moves block from trash back into store
-func (v *S3AWSVolume) BlockUntrash(hash string) error {
+func (v *s3Volume) BlockUntrash(hash string) error {
        key := v.key(hash)
        err := v.safeCopy(key, "trash/"+key)
        if err != nil {
index f05cbee848742a436073230eac00b767930dd9b5..d9dcbc52d686daaced43d728fa50ef5ea8fb4dbb 100644 (file)
@@ -58,7 +58,7 @@ type StubbedS3AWSSuite struct {
        s3server *httptest.Server
        metadata *httptest.Server
        cluster  *arvados.Cluster
-       volumes  []*TestableS3AWSVolume
+       volumes  []*testableS3Volume
 }
 
 func (s *StubbedS3AWSSuite) SetUpTest(c *check.C) {
@@ -102,7 +102,8 @@ func (s *StubbedS3AWSSuite) TestIndex(c *check.C) {
        }, 0)
        v.IndexPageSize = 3
        for i := 0; i < 256; i++ {
-               v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
+               err := v.blockWriteWithoutMD5Check(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
+               c.Assert(err, check.IsNil)
        }
        for _, spec := range []struct {
                prefix      string
@@ -132,7 +133,7 @@ func (s *StubbedS3AWSSuite) TestSignature(c *check.C) {
 
        // The aws-sdk-go-v2 driver only supports S3 V4 signatures. S3 v2 signatures are being phased out
        // as of June 24, 2020. Cf. https://forums.aws.amazon.com/ann.jspa?annID=5816
-       vol := S3AWSVolume{
+       vol := s3Volume{
                S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
                        AccessKeyID:     "xxx",
                        SecretAccessKey: "xxx",
@@ -165,7 +166,7 @@ func (s *StubbedS3AWSSuite) TestIAMRoleCredentials(c *check.C) {
        }))
        defer s.metadata.Close()
 
-       v := &S3AWSVolume{
+       v := &s3Volume{
                S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
                        IAMRole:  s.metadata.URL + "/latest/api/token",
                        Endpoint: "http://localhost:12345",
@@ -186,7 +187,7 @@ func (s *StubbedS3AWSSuite) TestIAMRoleCredentials(c *check.C) {
        s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
                w.WriteHeader(http.StatusNotFound)
        }))
-       deadv := &S3AWSVolume{
+       deadv := &s3Volume{
                S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
                        IAMRole:  s.metadata.URL + "/fake-metadata/test-role",
                        Endpoint: "http://localhost:12345",
@@ -259,19 +260,19 @@ func (h *s3AWSBlockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
 }
 
 func (s *StubbedS3AWSSuite) TestGetContextCancel(c *check.C) {
-       s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
+       s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error {
                _, err := v.BlockRead(ctx, fooHash, io.Discard)
                return err
        })
 }
 
 func (s *StubbedS3AWSSuite) TestPutContextCancel(c *check.C) {
-       s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
+       s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error {
                return v.BlockWrite(ctx, fooHash, []byte("foo"))
        })
 }
 
-func (s *StubbedS3AWSSuite) testContextCancel(c *check.C, testFunc func(context.Context, *TestableS3AWSVolume) error) {
+func (s *StubbedS3AWSSuite) testContextCancel(c *check.C, testFunc func(context.Context, *testableS3Volume) error) {
        handler := &s3AWSBlockingHandler{}
        s.s3server = httptest.NewServer(handler)
        defer s.s3server.Close()
@@ -534,8 +535,8 @@ func (s *StubbedS3AWSSuite) TestBackendStates(c *check.C) {
        }
 }
 
-type TestableS3AWSVolume struct {
-       *S3AWSVolume
+type testableS3Volume struct {
+       *s3Volume
        server      *httptest.Server
        c           *check.C
        serverClock *s3AWSFakeClock
@@ -558,7 +559,7 @@ func (l LogrusLog) Print(level gofakes3.LogLevel, v ...interface{}) {
        }
 }
 
-func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, params newVolumeParams, raceWindow time.Duration) *TestableS3AWSVolume {
+func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, params newVolumeParams, raceWindow time.Duration) *testableS3Volume {
 
        clock := &s3AWSFakeClock{}
        // fake s3
@@ -581,8 +582,8 @@ func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, params newVolumeParams
                iamRole, accessKey, secretKey = s.metadata.URL+"/fake-metadata/test-role", "", ""
        }
 
-       v := &TestableS3AWSVolume{
-               S3AWSVolume: &S3AWSVolume{
+       v := &testableS3Volume{
+               s3Volume: &s3Volume{
                        S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
                                IAMRole:            iamRole,
                                AccessKeyID:        accessKey,
@@ -604,24 +605,23 @@ func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, params newVolumeParams
                server:      srv,
                serverClock: clock,
        }
-       c.Assert(v.S3AWSVolume.check(""), check.IsNil)
+       c.Assert(v.s3Volume.check(""), check.IsNil)
        // Our test S3 server uses the older 'Path Style'
-       v.S3AWSVolume.bucket.svc.ForcePathStyle = true
+       v.s3Volume.bucket.svc.ForcePathStyle = true
        // Create the testbucket
        input := &s3.CreateBucketInput{
                Bucket: aws.String(S3AWSTestBucketName),
        }
-       req := v.S3AWSVolume.bucket.svc.CreateBucketRequest(input)
+       req := v.s3Volume.bucket.svc.CreateBucketRequest(input)
        _, err := req.Send(context.Background())
        c.Assert(err, check.IsNil)
        // We couldn't set RaceWindow until now because check()
        // rejects negative values.
-       v.S3AWSVolume.RaceWindow = arvados.Duration(raceWindow)
+       v.s3Volume.RaceWindow = arvados.Duration(raceWindow)
        return v
 }
 
-// PutRaw skips the ContentMD5 test
-func (v *TestableS3AWSVolume) PutRaw(loc string, block []byte) {
+func (v *testableS3Volume) blockWriteWithoutMD5Check(loc string, block []byte) error {
        key := v.key(loc)
        r := newCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes)
 
@@ -636,7 +636,7 @@ func (v *TestableS3AWSVolume) PutRaw(loc string, block []byte) {
                Body:   r,
        })
        if err != nil {
-               v.logger.Printf("PutRaw: %s: %+v", key, err)
+               return err
        }
 
        empty := bytes.NewReader([]byte{})
@@ -645,15 +645,13 @@ func (v *TestableS3AWSVolume) PutRaw(loc string, block []byte) {
                Key:    aws.String("recent/" + key),
                Body:   empty,
        })
-       if err != nil {
-               v.logger.Printf("PutRaw: recent/%s: %+v", key, err)
-       }
+       return err
 }
 
 // TouchWithDate turns back the clock while doing a Touch(). We assume
 // there are no other operations happening on the same s3test server
 // while we do this.
-func (v *TestableS3AWSVolume) TouchWithDate(loc string, lastPut time.Time) {
+func (v *testableS3Volume) TouchWithDate(loc string, lastPut time.Time) {
        v.serverClock.now = &lastPut
 
        uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
@@ -670,10 +668,10 @@ func (v *TestableS3AWSVolume) TouchWithDate(loc string, lastPut time.Time) {
        v.serverClock.now = nil
 }
 
-func (v *TestableS3AWSVolume) Teardown() {
+func (v *testableS3Volume) Teardown() {
        v.server.Close()
 }
 
-func (v *TestableS3AWSVolume) ReadWriteOperationLabelValues() (r, w string) {
+func (v *testableS3Volume) ReadWriteOperationLabelValues() (r, w string) {
        return "get", "put"
 }
index 98edfae14d7e602d79e055a77d698dc8a6b466d2..f01ad97553fed07991bb2a667fbc65b8105ea8ca 100644 (file)
@@ -32,7 +32,7 @@ func init() {
 }
 
 func newUnixVolume(params newVolumeParams) (volume, error) {
-       v := &UnixVolume{
+       v := &unixVolume{
                uuid:    params.UUID,
                cluster: params.Cluster,
                volume:  params.ConfigVolume,
@@ -47,7 +47,7 @@ func newUnixVolume(params newVolumeParams) (volume, error) {
        return v, v.check()
 }
 
-func (v *UnixVolume) check() error {
+func (v *unixVolume) check() error {
        if v.Root == "" {
                return errors.New("DriverParameters.Root was not provided")
        }
@@ -66,8 +66,8 @@ func (v *UnixVolume) check() error {
        return err
 }
 
-// A UnixVolume stores and retrieves blocks in a local directory.
-type UnixVolume struct {
+// A unixVolume stores and retrieves blocks in a local directory.
+type unixVolume struct {
        Root      string // path to the volume's root directory
        Serialize bool
 
@@ -89,7 +89,7 @@ type UnixVolume struct {
 // filesystem root to storage directory, joined by "/". For example,
 // the device ID for a local directory "/mnt/xvda1/keep" might be
 // "fa0b6166-3b55-4994-bd3f-92f4e00a1bb0/keep".
-func (v *UnixVolume) DeviceID() string {
+func (v *unixVolume) DeviceID() string {
        giveup := func(f string, args ...interface{}) string {
                v.logger.Infof(f+"; using hostname:path for volume %s", append(args, v.uuid)...)
                host, _ := os.Hostname()
@@ -163,7 +163,7 @@ func (v *UnixVolume) DeviceID() string {
 }
 
 // BlockTouch sets the timestamp for the given locator to the current time
-func (v *UnixVolume) BlockTouch(hash string) error {
+func (v *unixVolume) BlockTouch(hash string) error {
        p := v.blockPath(hash)
        f, err := v.os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
        if err != nil {
@@ -187,7 +187,7 @@ func (v *UnixVolume) BlockTouch(hash string) error {
 }
 
 // Mtime returns the stored timestamp for the given locator.
-func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
+func (v *unixVolume) Mtime(loc string) (time.Time, error) {
        p := v.blockPath(loc)
        fi, err := v.os.Stat(p)
        if err != nil {
@@ -198,7 +198,7 @@ func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
 
 // Lock the locker (if one is in use), open the file for reading, and
 // call the given function if and when the file is ready to read.
-func (v *UnixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error {
+func (v *unixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error {
        if err := v.lock(ctx); err != nil {
                return err
        }
@@ -212,7 +212,7 @@ func (v *UnixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader
 }
 
 // stat is os.Stat() with some extra sanity checks.
-func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
+func (v *unixVolume) stat(path string) (os.FileInfo, error) {
        stat, err := v.os.Stat(path)
        if err == nil {
                if stat.Size() < 0 {
@@ -225,7 +225,7 @@ func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
 }
 
 // BlockRead reads a block from the volume.
-func (v *UnixVolume) BlockRead(ctx context.Context, hash string, w io.Writer) (int, error) {
+func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.Writer) (int, error) {
        path := v.blockPath(hash)
        stat, err := v.stat(path)
        if err != nil {
@@ -244,7 +244,7 @@ func (v *UnixVolume) BlockRead(ctx context.Context, hash string, w io.Writer) (i
 
 // BlockWrite stores a block on the volume. If it already exists, its
 // timestamp is updated.
-func (v *UnixVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
+func (v *unixVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
        if v.isFull() {
                return errFull
        }
@@ -293,7 +293,7 @@ func (v *UnixVolume) BlockWrite(ctx context.Context, hash string, data []byte) e
 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
 var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
 
-func (v *UnixVolume) Index(ctx context.Context, prefix string, w io.Writer) error {
+func (v *unixVolume) Index(ctx context.Context, prefix string, w io.Writer) error {
        rootdir, err := v.os.Open(v.Root)
        if err != nil {
                return err
@@ -374,7 +374,7 @@ func (v *UnixVolume) Index(ctx context.Context, prefix string, w io.Writer) erro
 // BlobTrashLifetime == 0, the block is deleted; otherwise, the block
 // is renamed as path/{loc}.trash.{deadline}, where deadline = now +
 // BlobTrashLifetime.
-func (v *UnixVolume) BlockTrash(loc string) error {
+func (v *unixVolume) BlockTrash(loc string) error {
        // Touch() must be called before calling Write() on a block.  Touch()
        // also uses lockfile().  This avoids a race condition between Write()
        // and Trash() because either (a) the file will be trashed and Touch()
@@ -417,7 +417,7 @@ func (v *UnixVolume) BlockTrash(loc string) error {
 // BlockUntrash moves block from trash back into store
 // Look for path/{loc}.trash.{deadline} in storage,
 // and rename the first such file as path/{loc}
-func (v *UnixVolume) BlockUntrash(hash string) error {
+func (v *unixVolume) BlockUntrash(hash string) error {
        v.os.stats.TickOps("readdir")
        v.os.stats.Tick(&v.os.stats.ReaddirOps)
        files, err := ioutil.ReadDir(v.blockDir(hash))
@@ -450,19 +450,19 @@ func (v *UnixVolume) BlockUntrash(hash string) error {
 
 // blockDir returns the fully qualified directory name for the directory
 // where loc is (or would be) stored on this volume.
-func (v *UnixVolume) blockDir(loc string) string {
+func (v *unixVolume) blockDir(loc string) string {
        return filepath.Join(v.Root, loc[0:3])
 }
 
 // blockPath returns the fully qualified pathname for the path to loc
 // on this volume.
-func (v *UnixVolume) blockPath(loc string) string {
+func (v *unixVolume) blockPath(loc string) string {
        return filepath.Join(v.blockDir(loc), loc)
 }
 
 // isFull returns true if the free space on the volume is less than
 // MinFreeKilobytes.
-func (v *UnixVolume) isFull() (isFull bool) {
+func (v *unixVolume) isFull() (isFull bool) {
        fullSymlink := v.Root + "/full"
 
        // Check if the volume has been marked as full in the last hour.
@@ -492,7 +492,7 @@ func (v *UnixVolume) isFull() (isFull bool) {
 
 // FreeDiskSpace returns the number of unused 1k blocks available on
 // the volume.
-func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
+func (v *unixVolume) FreeDiskSpace() (free uint64, err error) {
        var fs syscall.Statfs_t
        err = syscall.Statfs(v.Root, &fs)
        if err == nil {
@@ -504,14 +504,14 @@ func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
 }
 
 // InternalStats returns I/O and filesystem ops counters.
-func (v *UnixVolume) InternalStats() interface{} {
+func (v *unixVolume) InternalStats() interface{} {
        return &v.os.stats
 }
 
 // lock acquires the serialize lock, if one is in use. If ctx is done
 // before the lock is acquired, lock returns ctx.Err() instead of
 // acquiring the lock.
-func (v *UnixVolume) lock(ctx context.Context) error {
+func (v *unixVolume) lock(ctx context.Context) error {
        if v.locker == nil {
                return nil
        }
@@ -535,7 +535,7 @@ func (v *UnixVolume) lock(ctx context.Context) error {
 }
 
 // unlock releases the serialize lock, if one is in use.
-func (v *UnixVolume) unlock() {
+func (v *unixVolume) unlock() {
        if v.locker == nil {
                return
        }
@@ -543,7 +543,7 @@ func (v *UnixVolume) unlock() {
 }
 
 // lockfile and unlockfile use flock(2) to manage kernel file locks.
-func (v *UnixVolume) lockfile(f *os.File) error {
+func (v *unixVolume) lockfile(f *os.File) error {
        v.os.stats.TickOps("flock")
        v.os.stats.Tick(&v.os.stats.FlockOps)
        err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
@@ -551,7 +551,7 @@ func (v *UnixVolume) lockfile(f *os.File) error {
        return err
 }
 
-func (v *UnixVolume) unlockfile(f *os.File) error {
+func (v *unixVolume) unlockfile(f *os.File) error {
        err := syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
        v.os.stats.TickErr(err)
        return err
@@ -559,7 +559,7 @@ func (v *UnixVolume) unlockfile(f *os.File) error {
 
 // Where appropriate, translate a more specific filesystem error to an
 // error recognized by handlers, like os.ErrNotExist.
-func (v *UnixVolume) translateError(err error) error {
+func (v *unixVolume) translateError(err error) error {
        switch err.(type) {
        case *os.PathError:
                // stat() returns a PathError if the parent directory
@@ -574,7 +574,7 @@ var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
 
 // EmptyTrash walks hierarchy looking for {hash}.trash.*
 // and deletes those with deadline < now.
-func (v *UnixVolume) EmptyTrash() {
+func (v *unixVolume) EmptyTrash() {
        var bytesDeleted, bytesInTrash int64
        var blocksDeleted, blocksInTrash int64
 
index a8dc4e809a863c92bb1459967cc165115bfb4d6b..de8d3c42d8299a7c2a5557f9a69192d74ce39283 100644 (file)
@@ -23,7 +23,7 @@ import (
 )
 
 type testableUnixVolume struct {
-       UnixVolume
+       unixVolume
        t TB
 }
 
@@ -77,7 +77,7 @@ func (s *unixVolumeSuite) newTestableUnixVolume(c *check.C, params newVolumePara
                locker = &sync.Mutex{}
        }
        v := &testableUnixVolume{
-               UnixVolume: UnixVolume{
+               unixVolume: unixVolume{
                        Root:    d,
                        locker:  locker,
                        uuid:    params.UUID,
@@ -313,7 +313,7 @@ func (s *unixVolumeSuite) TestStats(c *check.C) {
                return string(buf)
        }
 
-       c.Check(stats(), check.Matches, `.*"StatOps":1,.*`) // (*UnixVolume)check() calls Stat() once
+       c.Check(stats(), check.Matches, `.*"StatOps":1,.*`) // (*unixVolume)check() calls Stat() once
        c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
 
        _, err := vol.BlockRead(context.Background(), fooHash, io.Discard)
@@ -353,14 +353,14 @@ func (s *unixVolumeSuite) TestStats(c *check.C) {
 func (s *unixVolumeSuite) TestSkipUnusedDirs(c *check.C) {
        vol := s.newTestableUnixVolume(c, s.params, false)
 
-       err := os.Mkdir(vol.UnixVolume.Root+"/aaa", 0777)
+       err := os.Mkdir(vol.unixVolume.Root+"/aaa", 0777)
        c.Assert(err, check.IsNil)
-       err = os.Mkdir(vol.UnixVolume.Root+"/.aaa", 0777) // EmptyTrash should not look here
+       err = os.Mkdir(vol.unixVolume.Root+"/.aaa", 0777) // EmptyTrash should not look here
        c.Assert(err, check.IsNil)
-       deleteme := vol.UnixVolume.Root + "/aaa/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.trash.1"
+       deleteme := vol.unixVolume.Root + "/aaa/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.trash.1"
        err = ioutil.WriteFile(deleteme, []byte{1, 2, 3}, 0777)
        c.Assert(err, check.IsNil)
-       skipme := vol.UnixVolume.Root + "/.aaa/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.trash.1"
+       skipme := vol.unixVolume.Root + "/.aaa/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.trash.1"
        err = ioutil.WriteFile(skipme, []byte{1, 2, 3}, 0777)
        c.Assert(err, check.IsNil)
        vol.EmptyTrash()
index 41a0eba86f5c281b265e0a605ab4f34e7ff20a5d..a0b6fda7d3390155e35821196892406b00b933a2 100644 (file)
@@ -48,7 +48,3 @@ type ioStats struct {
        InBytes    uint64
        OutBytes   uint64
 }
-
-type InternalStatser interface {
-       InternalStats() interface{}
-}