From 39f6e9f70f683237d9488faac1c549ca19ac9dae Mon Sep 17 00:00:00 2001 From: Tom Clegg <tom@curii.com> Date: Tue, 13 Feb 2024 14:48:09 -0500 Subject: [PATCH] 2960: Fix some needlessly exported identifiers. Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com> --- lib/crunchrun/integration_test.go | 5 +- services/keepstore/azure_blob_volume.go | 38 ++++++------ services/keepstore/azure_blob_volume_test.go | 12 ++-- services/keepstore/keepstore.go | 10 +++- services/keepstore/router.go | 2 +- services/keepstore/router_test.go | 4 +- services/keepstore/s3aws_volume.go | 61 ++++++++++---------- services/keepstore/s3aws_volume_test.go | 50 ++++++++-------- services/keepstore/unix_volume.go | 50 ++++++++-------- services/keepstore/unix_volume_test.go | 14 ++--- services/keepstore/volume.go | 4 -- 11 files changed, 124 insertions(+), 126 deletions(-) diff --git a/lib/crunchrun/integration_test.go b/lib/crunchrun/integration_test.go index d569020824..4f0100b267 100644 --- a/lib/crunchrun/integration_test.go +++ b/lib/crunchrun/integration_test.go @@ -20,7 +20,6 @@ import ( "git.arvados.org/arvados.git/sdk/go/arvadostest" "git.arvados.org/arvados.git/sdk/go/ctxlog" "git.arvados.org/arvados.git/sdk/go/keepclient" - "git.arvados.org/arvados.git/services/keepstore" . "gopkg.in/check.v1" ) @@ -195,7 +194,9 @@ func (s *integrationSuite) TestRunTrivialContainerWithLocalKeepstore(c *C) { volume.Replication = 2 cluster.Volumes[uuid] = volume - var v keepstore.UnixVolume + var v struct { + Root string + } err = json.Unmarshal(volume.DriverParameters, &v) c.Assert(err, IsNil) err = os.Mkdir(v.Root, 0777) diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go index bdd669bb46..31660614f3 100644 --- a/services/keepstore/azure_blob_volume.go +++ b/services/keepstore/azure_blob_volume.go @@ -31,7 +31,7 @@ func init() { } func newAzureBlobVolume(params newVolumeParams) (volume, error) { - v := &AzureBlobVolume{ + v := &azureBlobVolume{ RequestTimeout: azureDefaultRequestTimeout, WriteRaceInterval: azureDefaultWriteRaceInterval, WriteRacePollTime: azureDefaultWriteRacePollTime, @@ -79,7 +79,7 @@ func newAzureBlobVolume(params newVolumeParams) (volume, error) { return v, v.check() } -func (v *AzureBlobVolume) check() error { +func (v *azureBlobVolume) check() error { lbls := prometheus.Labels{"device_id": v.DeviceID()} v.container.stats.opsCounters, v.container.stats.errCounters, v.container.stats.ioBytes = v.metrics.getCounterVecsFor(lbls) return nil @@ -93,9 +93,9 @@ const ( azureDefaultWriteRacePollTime = arvados.Duration(time.Second) ) -// An AzureBlobVolume stores and retrieves blocks in an Azure Blob +// An azureBlobVolume stores and retrieves blocks in an Azure Blob // container. -type AzureBlobVolume struct { +type azureBlobVolume struct { StorageAccountName string StorageAccountKey string StorageBaseURL string // "" means default, "core.windows.net" @@ -125,12 +125,12 @@ func (*singleSender) Send(c *storage.Client, req *http.Request) (resp *http.Resp } // DeviceID returns a globally unique ID for the storage container. -func (v *AzureBlobVolume) DeviceID() string { +func (v *azureBlobVolume) DeviceID() string { return "azure://" + v.StorageBaseURL + "/" + v.StorageAccountName + "/" + v.ContainerName } // Return true if expires_at metadata attribute is found on the block -func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) { +func (v *azureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) { metadata, err := v.container.GetBlobMetadata(loc) if err != nil { return false, metadata, v.translateError(err) @@ -147,7 +147,7 @@ func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, err // If the block is younger than azureWriteRaceInterval and is // unexpectedly empty, assume a BlockWrite operation is in progress, // and wait for it to finish writing. -func (v *AzureBlobVolume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) { +func (v *azureBlobVolume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) { trashed, _, err := v.checkTrashed(hash) if err != nil { return 0, err @@ -201,7 +201,7 @@ func (v *AzureBlobVolume) BlockRead(ctx context.Context, hash string, writeTo io return streamer.Wrote(), err } -func (v *AzureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt) (int, error) { +func (v *azureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt) (int, error) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -318,7 +318,7 @@ func (v *AzureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt) // BlockWrite stores a block on the volume. If it already exists, its // timestamp is updated. -func (v *AzureBlobVolume) BlockWrite(ctx context.Context, hash string, data []byte) error { +func (v *azureBlobVolume) BlockWrite(ctx context.Context, hash string, data []byte) error { // Send the block data through a pipe, so that (if we need to) // we can close the pipe early and abandon our // CreateBlockBlobFromReader() goroutine, without worrying @@ -359,7 +359,7 @@ func (v *AzureBlobVolume) BlockWrite(ctx context.Context, hash string, data []by } // BlockTouch updates the last-modified property of a block blob. -func (v *AzureBlobVolume) BlockTouch(hash string) error { +func (v *azureBlobVolume) BlockTouch(hash string) error { trashed, metadata, err := v.checkTrashed(hash) if err != nil { return err @@ -373,7 +373,7 @@ func (v *AzureBlobVolume) BlockTouch(hash string) error { } // Mtime returns the last-modified property of a block blob. -func (v *AzureBlobVolume) Mtime(hash string) (time.Time, error) { +func (v *azureBlobVolume) Mtime(hash string) (time.Time, error) { trashed, _, err := v.checkTrashed(hash) if err != nil { return time.Time{}, err @@ -391,7 +391,7 @@ func (v *AzureBlobVolume) Mtime(hash string) (time.Time, error) { // Index writes a list of Keep blocks that are stored in the // container. -func (v *AzureBlobVolume) Index(ctx context.Context, prefix string, writer io.Writer) error { +func (v *azureBlobVolume) Index(ctx context.Context, prefix string, writer io.Writer) error { params := storage.ListBlobsParameters{ Prefix: prefix, Include: &storage.IncludeBlobDataset{Metadata: true}, @@ -432,7 +432,7 @@ func (v *AzureBlobVolume) Index(ctx context.Context, prefix string, writer io.Wr } // call v.container.ListBlobs, retrying if needed. -func (v *AzureBlobVolume) listBlobs(page int, params storage.ListBlobsParameters) (resp storage.BlobListResponse, err error) { +func (v *azureBlobVolume) listBlobs(page int, params storage.ListBlobsParameters) (resp storage.BlobListResponse, err error) { for i := 0; i < v.ListBlobsMaxAttempts; i++ { resp, err = v.container.ListBlobs(params) err = v.translateError(err) @@ -448,7 +448,7 @@ func (v *AzureBlobVolume) listBlobs(page int, params storage.ListBlobsParameters } // Trash a Keep block. -func (v *AzureBlobVolume) BlockTrash(loc string) error { +func (v *azureBlobVolume) BlockTrash(loc string) error { // Ideally we would use If-Unmodified-Since, but that // particular condition seems to be ignored by Azure. Instead, // we get the Etag before checking Mtime, and use If-Match to @@ -481,7 +481,7 @@ func (v *AzureBlobVolume) BlockTrash(loc string) error { // BlockUntrash deletes the expires_at metadata attribute for the // specified block blob. -func (v *AzureBlobVolume) BlockUntrash(hash string) error { +func (v *azureBlobVolume) BlockUntrash(hash string) error { // if expires_at does not exist, return NotFoundError metadata, err := v.container.GetBlobMetadata(hash) if err != nil { @@ -499,7 +499,7 @@ func (v *AzureBlobVolume) BlockUntrash(hash string) error { // If possible, translate an Azure SDK error to a recognizable error // like os.ErrNotExist. -func (v *AzureBlobVolume) translateError(err error) error { +func (v *azureBlobVolume) translateError(err error) error { switch { case err == nil: return err @@ -519,13 +519,13 @@ func (v *AzureBlobVolume) translateError(err error) error { var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`) -func (v *AzureBlobVolume) isKeepBlock(s string) bool { +func (v *azureBlobVolume) isKeepBlock(s string) bool { return keepBlockRegexp.MatchString(s) } // EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime // and deletes them from the volume. -func (v *AzureBlobVolume) EmptyTrash() { +func (v *azureBlobVolume) EmptyTrash() { var bytesDeleted, bytesInTrash int64 var blocksDeleted, blocksInTrash int64 @@ -593,7 +593,7 @@ func (v *AzureBlobVolume) EmptyTrash() { } // InternalStats returns bucket I/O and API call counters. -func (v *AzureBlobVolume) InternalStats() interface{} { +func (v *azureBlobVolume) InternalStats() interface{} { return &v.container.stats } diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go index a543dfc245..c629c9dc15 100644 --- a/services/keepstore/azure_blob_volume_test.go +++ b/services/keepstore/azure_blob_volume_test.go @@ -222,7 +222,7 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { rw.WriteHeader(http.StatusCreated) case r.Method == "PUT" && r.Form.Get("comp") == "metadata": // "Set Metadata Headers" API. We don't bother - // stubbing "Get Metadata Headers": AzureBlobVolume + // stubbing "Get Metadata Headers": azureBlobVolume // sets metadata headers only as a way to bump Etag // and Last-Modified. if !blobExists { @@ -367,7 +367,7 @@ func (d *azStubDialer) Dial(network, address string) (net.Conn, error) { } type testableAzureBlobVolume struct { - *AzureBlobVolume + *azureBlobVolume azHandler *azStubHandler azStub *httptest.Server t TB @@ -397,7 +397,7 @@ func (s *stubbedAzureBlobSuite) newTestableAzureBlobVolume(t TB, params newVolum azClient.Sender = &singleSender{} bs := azClient.GetBlobService() - v := &AzureBlobVolume{ + v := &azureBlobVolume{ ContainerName: container, WriteRaceInterval: arvados.Duration(time.Millisecond), WriteRacePollTime: arvados.Duration(time.Nanosecond), @@ -416,7 +416,7 @@ func (s *stubbedAzureBlobSuite) newTestableAzureBlobVolume(t TB, params newVolum } return &testableAzureBlobVolume{ - AzureBlobVolume: v, + azureBlobVolume: v, azHandler: azHandler, azStub: azStub, t: t, @@ -553,8 +553,8 @@ func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRaceDeadline(c *che MetricsVecs: newVolumeMetricsVecs(prometheus.NewRegistry()), BufferPool: newBufferPool(ctxlog.TestLogger(c), 8, prometheus.NewRegistry()), }) - v.AzureBlobVolume.WriteRaceInterval.Set("2s") - v.AzureBlobVolume.WriteRacePollTime.Set("5ms") + v.azureBlobVolume.WriteRaceInterval.Set("2s") + v.azureBlobVolume.WriteRacePollTime.Set("5ms") defer v.Teardown() v.BlockWriteRaw(TestHash, nil) diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go index 89afa9089f..62b6d15e56 100644 --- a/services/keepstore/keepstore.go +++ b/services/keepstore/keepstore.go @@ -2,6 +2,12 @@ // // SPDX-License-Identifier: AGPL-3.0 +// Package keepstore implements the keepstore service component and +// back-end storage drivers. +// +// It is an internal module, only intended to be imported by +// /cmd/arvados-server and other server-side components in this +// repository. package keepstore import ( @@ -47,7 +53,7 @@ var ( driver = make(map[string]volumeDriver) ) -type IndexOptions struct { +type indexOptions struct { MountUUID string Prefix string WriteTo io.Writer @@ -653,7 +659,7 @@ func (ks *keepstore) Mounts() []*mount { return ks.mountsR } -func (ks *keepstore) Index(ctx context.Context, opts IndexOptions) error { +func (ks *keepstore) Index(ctx context.Context, opts indexOptions) error { mounts := ks.mountsR if opts.MountUUID != "" { mnt, ok := ks.mounts[opts.MountUUID] diff --git a/services/keepstore/router.go b/services/keepstore/router.go index 7ff82aa80f..256bc18c26 100644 --- a/services/keepstore/router.go +++ b/services/keepstore/router.go @@ -155,7 +155,7 @@ func (rtr *router) handleIndex(w http.ResponseWriter, req *http.Request) { prefix = mux.Vars(req)["prefix"] } cw := &countingWriter{writer: w} - err := rtr.keepstore.Index(req.Context(), IndexOptions{ + err := rtr.keepstore.Index(req.Context(), indexOptions{ MountUUID: mux.Vars(req)["uuid"], Prefix: prefix, WriteTo: cw, diff --git a/services/keepstore/router_test.go b/services/keepstore/router_test.go index a729ee0df3..f4bcdd4ae4 100644 --- a/services/keepstore/router_test.go +++ b/services/keepstore/router_test.go @@ -302,8 +302,8 @@ func (s *routerSuite) TestBadRequest(c *C) { "GET /mounts/blocks/123", "GET /trash", "GET /pull", - "GET /debug.json", - "GET /status.json", + "GET /debug.json", // old endpoint, no longer exists + "GET /status.json", // old endpoint, no longer exists "POST /", "POST /aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", "POST /trash", diff --git a/services/keepstore/s3aws_volume.go b/services/keepstore/s3aws_volume.go index 2417bb8149..8e93eed12c 100644 --- a/services/keepstore/s3aws_volume.go +++ b/services/keepstore/s3aws_volume.go @@ -34,7 +34,7 @@ import ( ) func init() { - driver["S3"] = newS3AWSVolume + driver["S3"] = news3Volume } const ( @@ -49,11 +49,13 @@ const ( ) var ( - ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false") + errS3TrashDisabled = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false") + s3AWSKeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`) + s3AWSZeroTime time.Time ) -// S3AWSVolume implements Volume using an S3 bucket. -type S3AWSVolume struct { +// s3Volume implements Volume using an S3 bucket. +type s3Volume struct { arvados.S3VolumeDriverParameters AuthToken string // populated automatically when IAMRole is used AuthExpiration time.Time // populated automatically when IAMRole is used @@ -78,12 +80,7 @@ type s3AWSbucket struct { mu sync.Mutex } -const () - -var s3AWSKeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`) -var s3AWSZeroTime time.Time - -func (v *S3AWSVolume) isKeepBlock(s string) (string, bool) { +func (v *s3Volume) isKeepBlock(s string) (string, bool) { if v.PrefixLength > 0 && len(s) == v.PrefixLength+33 && s[:v.PrefixLength] == s[v.PrefixLength+1:v.PrefixLength*2+1] { s = s[v.PrefixLength+1:] } @@ -93,7 +90,7 @@ func (v *S3AWSVolume) isKeepBlock(s string) (string, bool) { // Return the key used for a given loc. If PrefixLength==0 then // key("abcdef0123") is "abcdef0123", if PrefixLength==3 then key is // "abc/abcdef0123", etc. -func (v *S3AWSVolume) key(loc string) string { +func (v *s3Volume) key(loc string) string { if v.PrefixLength > 0 && v.PrefixLength < len(loc)-1 { return loc[:v.PrefixLength] + "/" + loc } else { @@ -101,8 +98,8 @@ func (v *S3AWSVolume) key(loc string) string { } } -func newS3AWSVolume(params newVolumeParams) (volume, error) { - v := &S3AWSVolume{ +func news3Volume(params newVolumeParams) (volume, error) { + v := &s3Volume{ cluster: params.Cluster, volume: params.ConfigVolume, metrics: params.MetricsVecs, @@ -116,7 +113,7 @@ func newS3AWSVolume(params newVolumeParams) (volume, error) { return v, v.check("") } -func (v *S3AWSVolume) translateError(err error) error { +func (v *s3Volume) translateError(err error) error { if _, ok := err.(*aws.RequestCanceledError); ok { return context.Canceled } else if aerr, ok := err.(awserr.Error); ok { @@ -135,7 +132,7 @@ func (v *S3AWSVolume) translateError(err error) error { // // (If something goes wrong during the copy, the error will be // embedded in the 200 OK response) -func (v *S3AWSVolume) safeCopy(dst, src string) error { +func (v *s3Volume) safeCopy(dst, src string) error { input := &s3.CopyObjectInput{ Bucket: aws.String(v.bucket.bucket), ContentType: aws.String("application/octet-stream"), @@ -161,7 +158,7 @@ func (v *S3AWSVolume) safeCopy(dst, src string) error { return nil } -func (v *S3AWSVolume) check(ec2metadataHostname string) error { +func (v *s3Volume) check(ec2metadataHostname string) error { if v.Bucket == "" { return errors.New("DriverParameters: Bucket must be provided") } @@ -238,13 +235,13 @@ func (v *S3AWSVolume) check(ec2metadataHostname string) error { } // DeviceID returns a globally unique ID for the storage bucket. -func (v *S3AWSVolume) DeviceID() string { +func (v *s3Volume) DeviceID() string { return "s3://" + v.Endpoint + "/" + v.Bucket } // EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime // and deletes them from the volume. -func (v *S3AWSVolume) EmptyTrash() { +func (v *s3Volume) EmptyTrash() { var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64 // Define "ready to delete" as "...when EmptyTrash started". @@ -358,7 +355,7 @@ func (v *S3AWSVolume) EmptyTrash() { // exist. If the timestamps on "recent/X" and "trash/X" indicate there // was a race between Put and Trash, fixRace recovers from the race by // Untrashing the block. -func (v *S3AWSVolume) fixRace(key string) bool { +func (v *s3Volume) fixRace(key string) bool { trash, err := v.head("trash/" + key) if err != nil { if !os.IsNotExist(v.translateError(err)) { @@ -392,7 +389,7 @@ func (v *S3AWSVolume) fixRace(key string) bool { return true } -func (v *S3AWSVolume) head(key string) (result *s3.HeadObjectOutput, err error) { +func (v *s3Volume) head(key string) (result *s3.HeadObjectOutput, err error) { input := &s3.HeadObjectInput{ Bucket: aws.String(v.bucket.bucket), Key: aws.String(key), @@ -414,7 +411,7 @@ func (v *S3AWSVolume) head(key string) (result *s3.HeadObjectOutput, err error) // BlockRead reads a Keep block that has been stored as a block blob // in the S3 bucket. -func (v *S3AWSVolume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) { +func (v *s3Volume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) { key := v.key(hash) buf, err := v.bufferPool.GetContext(ctx) if err != nil { @@ -460,7 +457,7 @@ func (v *S3AWSVolume) BlockRead(ctx context.Context, hash string, writeTo io.Wri return streamer.Wrote(), nil } -func (v *S3AWSVolume) readWorker(ctx context.Context, key string, dst io.WriterAt) error { +func (v *s3Volume) readWorker(ctx context.Context, key string, dst io.WriterAt) error { downloader := s3manager.NewDownloaderWithClient(v.bucket.svc, func(u *s3manager.Downloader) { u.PartSize = s3downloaderPartSize u.Concurrency = s3downloaderReadConcurrency @@ -476,7 +473,7 @@ func (v *S3AWSVolume) readWorker(ctx context.Context, key string, dst io.WriterA return v.translateError(err) } -func (v *S3AWSVolume) writeObject(ctx context.Context, key string, r io.Reader) error { +func (v *s3Volume) writeObject(ctx context.Context, key string, r io.Reader) error { if r == nil { // r == nil leads to a memory violation in func readFillBuf in // aws-sdk-go-v2@v0.23.0/service/s3/s3manager/upload.go @@ -526,7 +523,7 @@ func (v *S3AWSVolume) writeObject(ctx context.Context, key string, r io.Reader) } // Put writes a block. -func (v *S3AWSVolume) BlockWrite(ctx context.Context, hash string, data []byte) error { +func (v *s3Volume) BlockWrite(ctx context.Context, hash string, data []byte) error { // Do not use putWithPipe here; we want to pass an io.ReadSeeker to the S3 // sdk to avoid memory allocation there. See #17339 for more information. rdr := bytes.NewReader(data) @@ -628,7 +625,7 @@ func (lister *s3awsLister) pop() (k *s3.Object) { // Index writes a complete list of locators with the given prefix // for which Get() can retrieve data. -func (v *S3AWSVolume) Index(ctx context.Context, prefix string, writer io.Writer) error { +func (v *s3Volume) Index(ctx context.Context, prefix string, writer io.Writer) error { prefix = v.key(prefix) // Use a merge sort to find matching sets of X and recent/X. dataL := s3awsLister{ @@ -695,7 +692,7 @@ func (v *S3AWSVolume) Index(ctx context.Context, prefix string, writer io.Writer } // Mtime returns the stored timestamp for the given locator. -func (v *S3AWSVolume) Mtime(loc string) (time.Time, error) { +func (v *s3Volume) Mtime(loc string) (time.Time, error) { key := v.key(loc) _, err := v.head(key) if err != nil { @@ -724,12 +721,12 @@ func (v *S3AWSVolume) Mtime(loc string) (time.Time, error) { } // InternalStats returns bucket I/O and API call counters. -func (v *S3AWSVolume) InternalStats() interface{} { +func (v *s3Volume) InternalStats() interface{} { return &v.bucket.stats } // BlockTouch sets the timestamp for the given locator to the current time. -func (v *S3AWSVolume) BlockTouch(hash string) error { +func (v *s3Volume) BlockTouch(hash string) error { key := v.key(hash) _, err := v.head(key) err = v.translateError(err) @@ -745,7 +742,7 @@ func (v *S3AWSVolume) BlockTouch(hash string) error { // checkRaceWindow returns a non-nil error if trash/key is, or might // be, in the race window (i.e., it's not safe to trash key). -func (v *S3AWSVolume) checkRaceWindow(key string) error { +func (v *s3Volume) checkRaceWindow(key string) error { resp, err := v.head("trash/" + key) err = v.translateError(err) if os.IsNotExist(err) { @@ -785,7 +782,7 @@ func (b *s3AWSbucket) Del(path string) error { } // Trash a Keep block. -func (v *S3AWSVolume) BlockTrash(loc string) error { +func (v *s3Volume) BlockTrash(loc string) error { if t, err := v.Mtime(loc); err != nil { return err } else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() { @@ -794,7 +791,7 @@ func (v *S3AWSVolume) BlockTrash(loc string) error { key := v.key(loc) if v.cluster.Collections.BlobTrashLifetime == 0 { if !v.UnsafeDelete { - return ErrS3TrashDisabled + return errS3TrashDisabled } return v.translateError(v.bucket.Del(key)) } @@ -810,7 +807,7 @@ func (v *S3AWSVolume) BlockTrash(loc string) error { } // BlockUntrash moves block from trash back into store -func (v *S3AWSVolume) BlockUntrash(hash string) error { +func (v *s3Volume) BlockUntrash(hash string) error { key := v.key(hash) err := v.safeCopy(key, "trash/"+key) if err != nil { diff --git a/services/keepstore/s3aws_volume_test.go b/services/keepstore/s3aws_volume_test.go index f05cbee848..d9dcbc52d6 100644 --- a/services/keepstore/s3aws_volume_test.go +++ b/services/keepstore/s3aws_volume_test.go @@ -58,7 +58,7 @@ type StubbedS3AWSSuite struct { s3server *httptest.Server metadata *httptest.Server cluster *arvados.Cluster - volumes []*TestableS3AWSVolume + volumes []*testableS3Volume } func (s *StubbedS3AWSSuite) SetUpTest(c *check.C) { @@ -102,7 +102,8 @@ func (s *StubbedS3AWSSuite) TestIndex(c *check.C) { }, 0) v.IndexPageSize = 3 for i := 0; i < 256; i++ { - v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111}) + err := v.blockWriteWithoutMD5Check(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111}) + c.Assert(err, check.IsNil) } for _, spec := range []struct { prefix string @@ -132,7 +133,7 @@ func (s *StubbedS3AWSSuite) TestSignature(c *check.C) { // The aws-sdk-go-v2 driver only supports S3 V4 signatures. S3 v2 signatures are being phased out // as of June 24, 2020. Cf. https://forums.aws.amazon.com/ann.jspa?annID=5816 - vol := S3AWSVolume{ + vol := s3Volume{ S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{ AccessKeyID: "xxx", SecretAccessKey: "xxx", @@ -165,7 +166,7 @@ func (s *StubbedS3AWSSuite) TestIAMRoleCredentials(c *check.C) { })) defer s.metadata.Close() - v := &S3AWSVolume{ + v := &s3Volume{ S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{ IAMRole: s.metadata.URL + "/latest/api/token", Endpoint: "http://localhost:12345", @@ -186,7 +187,7 @@ func (s *StubbedS3AWSSuite) TestIAMRoleCredentials(c *check.C) { s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNotFound) })) - deadv := &S3AWSVolume{ + deadv := &s3Volume{ S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{ IAMRole: s.metadata.URL + "/fake-metadata/test-role", Endpoint: "http://localhost:12345", @@ -259,19 +260,19 @@ func (h *s3AWSBlockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) } func (s *StubbedS3AWSSuite) TestGetContextCancel(c *check.C) { - s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error { + s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error { _, err := v.BlockRead(ctx, fooHash, io.Discard) return err }) } func (s *StubbedS3AWSSuite) TestPutContextCancel(c *check.C) { - s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error { + s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error { return v.BlockWrite(ctx, fooHash, []byte("foo")) }) } -func (s *StubbedS3AWSSuite) testContextCancel(c *check.C, testFunc func(context.Context, *TestableS3AWSVolume) error) { +func (s *StubbedS3AWSSuite) testContextCancel(c *check.C, testFunc func(context.Context, *testableS3Volume) error) { handler := &s3AWSBlockingHandler{} s.s3server = httptest.NewServer(handler) defer s.s3server.Close() @@ -534,8 +535,8 @@ func (s *StubbedS3AWSSuite) TestBackendStates(c *check.C) { } } -type TestableS3AWSVolume struct { - *S3AWSVolume +type testableS3Volume struct { + *s3Volume server *httptest.Server c *check.C serverClock *s3AWSFakeClock @@ -558,7 +559,7 @@ func (l LogrusLog) Print(level gofakes3.LogLevel, v ...interface{}) { } } -func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, params newVolumeParams, raceWindow time.Duration) *TestableS3AWSVolume { +func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, params newVolumeParams, raceWindow time.Duration) *testableS3Volume { clock := &s3AWSFakeClock{} // fake s3 @@ -581,8 +582,8 @@ func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, params newVolumeParams iamRole, accessKey, secretKey = s.metadata.URL+"/fake-metadata/test-role", "", "" } - v := &TestableS3AWSVolume{ - S3AWSVolume: &S3AWSVolume{ + v := &testableS3Volume{ + s3Volume: &s3Volume{ S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{ IAMRole: iamRole, AccessKeyID: accessKey, @@ -604,24 +605,23 @@ func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, params newVolumeParams server: srv, serverClock: clock, } - c.Assert(v.S3AWSVolume.check(""), check.IsNil) + c.Assert(v.s3Volume.check(""), check.IsNil) // Our test S3 server uses the older 'Path Style' - v.S3AWSVolume.bucket.svc.ForcePathStyle = true + v.s3Volume.bucket.svc.ForcePathStyle = true // Create the testbucket input := &s3.CreateBucketInput{ Bucket: aws.String(S3AWSTestBucketName), } - req := v.S3AWSVolume.bucket.svc.CreateBucketRequest(input) + req := v.s3Volume.bucket.svc.CreateBucketRequest(input) _, err := req.Send(context.Background()) c.Assert(err, check.IsNil) // We couldn't set RaceWindow until now because check() // rejects negative values. - v.S3AWSVolume.RaceWindow = arvados.Duration(raceWindow) + v.s3Volume.RaceWindow = arvados.Duration(raceWindow) return v } -// PutRaw skips the ContentMD5 test -func (v *TestableS3AWSVolume) PutRaw(loc string, block []byte) { +func (v *testableS3Volume) blockWriteWithoutMD5Check(loc string, block []byte) error { key := v.key(loc) r := newCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes) @@ -636,7 +636,7 @@ func (v *TestableS3AWSVolume) PutRaw(loc string, block []byte) { Body: r, }) if err != nil { - v.logger.Printf("PutRaw: %s: %+v", key, err) + return err } empty := bytes.NewReader([]byte{}) @@ -645,15 +645,13 @@ func (v *TestableS3AWSVolume) PutRaw(loc string, block []byte) { Key: aws.String("recent/" + key), Body: empty, }) - if err != nil { - v.logger.Printf("PutRaw: recent/%s: %+v", key, err) - } + return err } // TouchWithDate turns back the clock while doing a Touch(). We assume // there are no other operations happening on the same s3test server // while we do this. -func (v *TestableS3AWSVolume) TouchWithDate(loc string, lastPut time.Time) { +func (v *testableS3Volume) TouchWithDate(loc string, lastPut time.Time) { v.serverClock.now = &lastPut uploader := s3manager.NewUploaderWithClient(v.bucket.svc) @@ -670,10 +668,10 @@ func (v *TestableS3AWSVolume) TouchWithDate(loc string, lastPut time.Time) { v.serverClock.now = nil } -func (v *TestableS3AWSVolume) Teardown() { +func (v *testableS3Volume) Teardown() { v.server.Close() } -func (v *TestableS3AWSVolume) ReadWriteOperationLabelValues() (r, w string) { +func (v *testableS3Volume) ReadWriteOperationLabelValues() (r, w string) { return "get", "put" } diff --git a/services/keepstore/unix_volume.go b/services/keepstore/unix_volume.go index 98edfae14d..f01ad97553 100644 --- a/services/keepstore/unix_volume.go +++ b/services/keepstore/unix_volume.go @@ -32,7 +32,7 @@ func init() { } func newUnixVolume(params newVolumeParams) (volume, error) { - v := &UnixVolume{ + v := &unixVolume{ uuid: params.UUID, cluster: params.Cluster, volume: params.ConfigVolume, @@ -47,7 +47,7 @@ func newUnixVolume(params newVolumeParams) (volume, error) { return v, v.check() } -func (v *UnixVolume) check() error { +func (v *unixVolume) check() error { if v.Root == "" { return errors.New("DriverParameters.Root was not provided") } @@ -66,8 +66,8 @@ func (v *UnixVolume) check() error { return err } -// A UnixVolume stores and retrieves blocks in a local directory. -type UnixVolume struct { +// A unixVolume stores and retrieves blocks in a local directory. +type unixVolume struct { Root string // path to the volume's root directory Serialize bool @@ -89,7 +89,7 @@ type UnixVolume struct { // filesystem root to storage directory, joined by "/". For example, // the device ID for a local directory "/mnt/xvda1/keep" might be // "fa0b6166-3b55-4994-bd3f-92f4e00a1bb0/keep". -func (v *UnixVolume) DeviceID() string { +func (v *unixVolume) DeviceID() string { giveup := func(f string, args ...interface{}) string { v.logger.Infof(f+"; using hostname:path for volume %s", append(args, v.uuid)...) host, _ := os.Hostname() @@ -163,7 +163,7 @@ func (v *UnixVolume) DeviceID() string { } // BlockTouch sets the timestamp for the given locator to the current time -func (v *UnixVolume) BlockTouch(hash string) error { +func (v *unixVolume) BlockTouch(hash string) error { p := v.blockPath(hash) f, err := v.os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644) if err != nil { @@ -187,7 +187,7 @@ func (v *UnixVolume) BlockTouch(hash string) error { } // Mtime returns the stored timestamp for the given locator. -func (v *UnixVolume) Mtime(loc string) (time.Time, error) { +func (v *unixVolume) Mtime(loc string) (time.Time, error) { p := v.blockPath(loc) fi, err := v.os.Stat(p) if err != nil { @@ -198,7 +198,7 @@ func (v *UnixVolume) Mtime(loc string) (time.Time, error) { // Lock the locker (if one is in use), open the file for reading, and // call the given function if and when the file is ready to read. -func (v *UnixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error { +func (v *unixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error { if err := v.lock(ctx); err != nil { return err } @@ -212,7 +212,7 @@ func (v *UnixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader } // stat is os.Stat() with some extra sanity checks. -func (v *UnixVolume) stat(path string) (os.FileInfo, error) { +func (v *unixVolume) stat(path string) (os.FileInfo, error) { stat, err := v.os.Stat(path) if err == nil { if stat.Size() < 0 { @@ -225,7 +225,7 @@ func (v *UnixVolume) stat(path string) (os.FileInfo, error) { } // BlockRead reads a block from the volume. -func (v *UnixVolume) BlockRead(ctx context.Context, hash string, w io.Writer) (int, error) { +func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.Writer) (int, error) { path := v.blockPath(hash) stat, err := v.stat(path) if err != nil { @@ -244,7 +244,7 @@ func (v *UnixVolume) BlockRead(ctx context.Context, hash string, w io.Writer) (i // BlockWrite stores a block on the volume. If it already exists, its // timestamp is updated. -func (v *UnixVolume) BlockWrite(ctx context.Context, hash string, data []byte) error { +func (v *unixVolume) BlockWrite(ctx context.Context, hash string, data []byte) error { if v.isFull() { return errFull } @@ -293,7 +293,7 @@ func (v *UnixVolume) BlockWrite(ctx context.Context, hash string, data []byte) e var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`) var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`) -func (v *UnixVolume) Index(ctx context.Context, prefix string, w io.Writer) error { +func (v *unixVolume) Index(ctx context.Context, prefix string, w io.Writer) error { rootdir, err := v.os.Open(v.Root) if err != nil { return err @@ -374,7 +374,7 @@ func (v *UnixVolume) Index(ctx context.Context, prefix string, w io.Writer) erro // BlobTrashLifetime == 0, the block is deleted; otherwise, the block // is renamed as path/{loc}.trash.{deadline}, where deadline = now + // BlobTrashLifetime. -func (v *UnixVolume) BlockTrash(loc string) error { +func (v *unixVolume) BlockTrash(loc string) error { // Touch() must be called before calling Write() on a block. Touch() // also uses lockfile(). This avoids a race condition between Write() // and Trash() because either (a) the file will be trashed and Touch() @@ -417,7 +417,7 @@ func (v *UnixVolume) BlockTrash(loc string) error { // BlockUntrash moves block from trash back into store // Look for path/{loc}.trash.{deadline} in storage, // and rename the first such file as path/{loc} -func (v *UnixVolume) BlockUntrash(hash string) error { +func (v *unixVolume) BlockUntrash(hash string) error { v.os.stats.TickOps("readdir") v.os.stats.Tick(&v.os.stats.ReaddirOps) files, err := ioutil.ReadDir(v.blockDir(hash)) @@ -450,19 +450,19 @@ func (v *UnixVolume) BlockUntrash(hash string) error { // blockDir returns the fully qualified directory name for the directory // where loc is (or would be) stored on this volume. -func (v *UnixVolume) blockDir(loc string) string { +func (v *unixVolume) blockDir(loc string) string { return filepath.Join(v.Root, loc[0:3]) } // blockPath returns the fully qualified pathname for the path to loc // on this volume. -func (v *UnixVolume) blockPath(loc string) string { +func (v *unixVolume) blockPath(loc string) string { return filepath.Join(v.blockDir(loc), loc) } // isFull returns true if the free space on the volume is less than // MinFreeKilobytes. -func (v *UnixVolume) isFull() (isFull bool) { +func (v *unixVolume) isFull() (isFull bool) { fullSymlink := v.Root + "/full" // Check if the volume has been marked as full in the last hour. @@ -492,7 +492,7 @@ func (v *UnixVolume) isFull() (isFull bool) { // FreeDiskSpace returns the number of unused 1k blocks available on // the volume. -func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) { +func (v *unixVolume) FreeDiskSpace() (free uint64, err error) { var fs syscall.Statfs_t err = syscall.Statfs(v.Root, &fs) if err == nil { @@ -504,14 +504,14 @@ func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) { } // InternalStats returns I/O and filesystem ops counters. -func (v *UnixVolume) InternalStats() interface{} { +func (v *unixVolume) InternalStats() interface{} { return &v.os.stats } // lock acquires the serialize lock, if one is in use. If ctx is done // before the lock is acquired, lock returns ctx.Err() instead of // acquiring the lock. -func (v *UnixVolume) lock(ctx context.Context) error { +func (v *unixVolume) lock(ctx context.Context) error { if v.locker == nil { return nil } @@ -535,7 +535,7 @@ func (v *UnixVolume) lock(ctx context.Context) error { } // unlock releases the serialize lock, if one is in use. -func (v *UnixVolume) unlock() { +func (v *unixVolume) unlock() { if v.locker == nil { return } @@ -543,7 +543,7 @@ func (v *UnixVolume) unlock() { } // lockfile and unlockfile use flock(2) to manage kernel file locks. -func (v *UnixVolume) lockfile(f *os.File) error { +func (v *unixVolume) lockfile(f *os.File) error { v.os.stats.TickOps("flock") v.os.stats.Tick(&v.os.stats.FlockOps) err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX) @@ -551,7 +551,7 @@ func (v *UnixVolume) lockfile(f *os.File) error { return err } -func (v *UnixVolume) unlockfile(f *os.File) error { +func (v *unixVolume) unlockfile(f *os.File) error { err := syscall.Flock(int(f.Fd()), syscall.LOCK_UN) v.os.stats.TickErr(err) return err @@ -559,7 +559,7 @@ func (v *UnixVolume) unlockfile(f *os.File) error { // Where appropriate, translate a more specific filesystem error to an // error recognized by handlers, like os.ErrNotExist. -func (v *UnixVolume) translateError(err error) error { +func (v *unixVolume) translateError(err error) error { switch err.(type) { case *os.PathError: // stat() returns a PathError if the parent directory @@ -574,7 +574,7 @@ var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`) // EmptyTrash walks hierarchy looking for {hash}.trash.* // and deletes those with deadline < now. -func (v *UnixVolume) EmptyTrash() { +func (v *unixVolume) EmptyTrash() { var bytesDeleted, bytesInTrash int64 var blocksDeleted, blocksInTrash int64 diff --git a/services/keepstore/unix_volume_test.go b/services/keepstore/unix_volume_test.go index a8dc4e809a..de8d3c42d8 100644 --- a/services/keepstore/unix_volume_test.go +++ b/services/keepstore/unix_volume_test.go @@ -23,7 +23,7 @@ import ( ) type testableUnixVolume struct { - UnixVolume + unixVolume t TB } @@ -77,7 +77,7 @@ func (s *unixVolumeSuite) newTestableUnixVolume(c *check.C, params newVolumePara locker = &sync.Mutex{} } v := &testableUnixVolume{ - UnixVolume: UnixVolume{ + unixVolume: unixVolume{ Root: d, locker: locker, uuid: params.UUID, @@ -313,7 +313,7 @@ func (s *unixVolumeSuite) TestStats(c *check.C) { return string(buf) } - c.Check(stats(), check.Matches, `.*"StatOps":1,.*`) // (*UnixVolume)check() calls Stat() once + c.Check(stats(), check.Matches, `.*"StatOps":1,.*`) // (*unixVolume)check() calls Stat() once c.Check(stats(), check.Matches, `.*"Errors":0,.*`) _, err := vol.BlockRead(context.Background(), fooHash, io.Discard) @@ -353,14 +353,14 @@ func (s *unixVolumeSuite) TestStats(c *check.C) { func (s *unixVolumeSuite) TestSkipUnusedDirs(c *check.C) { vol := s.newTestableUnixVolume(c, s.params, false) - err := os.Mkdir(vol.UnixVolume.Root+"/aaa", 0777) + err := os.Mkdir(vol.unixVolume.Root+"/aaa", 0777) c.Assert(err, check.IsNil) - err = os.Mkdir(vol.UnixVolume.Root+"/.aaa", 0777) // EmptyTrash should not look here + err = os.Mkdir(vol.unixVolume.Root+"/.aaa", 0777) // EmptyTrash should not look here c.Assert(err, check.IsNil) - deleteme := vol.UnixVolume.Root + "/aaa/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.trash.1" + deleteme := vol.unixVolume.Root + "/aaa/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.trash.1" err = ioutil.WriteFile(deleteme, []byte{1, 2, 3}, 0777) c.Assert(err, check.IsNil) - skipme := vol.UnixVolume.Root + "/.aaa/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.trash.1" + skipme := vol.unixVolume.Root + "/.aaa/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.trash.1" err = ioutil.WriteFile(skipme, []byte{1, 2, 3}, 0777) c.Assert(err, check.IsNil) vol.EmptyTrash() diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go index 41a0eba86f..a0b6fda7d3 100644 --- a/services/keepstore/volume.go +++ b/services/keepstore/volume.go @@ -48,7 +48,3 @@ type ioStats struct { InBytes uint64 OutBytes uint64 } - -type InternalStatser interface { - InternalStats() interface{} -} -- 2.30.2