From: Ward Vandewege Date: Mon, 6 Dec 2021 22:01:26 +0000 (-0500) Subject: 17339: implement review feedback. X-Git-Tag: 2.4.0~142^2~1 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/24e333637a1ecf1fd01ad501d62778a24985948f?ds=sidebyside;hp=-c 17339: implement review feedback. Arvados-DCO-1.1-Signed-off-by: Ward Vandewege --- 24e333637a1ecf1fd01ad501d62778a24985948f diff --git a/services/keepstore/count.go b/services/keepstore/count.go index 2dc7e04b43..700ca19dec 100644 --- a/services/keepstore/count.go +++ b/services/keepstore/count.go @@ -22,13 +22,8 @@ func NewCountingReader(r io.Reader, f func(uint64)) io.ReadCloser { } } -func NewCountingReaderAtSeeker(r interface{}, f func(uint64)) *countingReaderAtSeeker { - return &countingReaderAtSeeker{readerAtSeeker: readerAtSeeker{ - readSeeker: r.(io.ReadSeeker), - readerAt: r.(io.ReaderAt), - }, - counter: f, - } +func NewCountingReaderAtSeeker(r readerAtSeeker, f func(uint64)) *countingReaderAtSeeker { + return &countingReaderAtSeeker{readerAtSeeker: r, counter: f} } type countingReadWriter struct { @@ -56,9 +51,9 @@ func (crw *countingReadWriter) Close() error { return nil } -type readerAtSeeker struct { - readSeeker io.ReadSeeker - readerAt io.ReaderAt +type readerAtSeeker interface { + io.ReadSeeker + io.ReaderAt } type countingReaderAtSeeker struct { @@ -67,18 +62,13 @@ type countingReaderAtSeeker struct { } func (crw *countingReaderAtSeeker) Read(buf []byte) (int, error) { - n, err := crw.readSeeker.Read(buf) + n, err := crw.readerAtSeeker.Read(buf) crw.counter(uint64(n)) return n, err } func (crw *countingReaderAtSeeker) ReadAt(buf []byte, off int64) (int, error) { - n, err := crw.readerAt.ReadAt(buf, off) + n, err := crw.readerAtSeeker.ReadAt(buf, off) crw.counter(uint64(n)) return n, err } - -func (crw *countingReaderAtSeeker) Seek(offset int64, whence int) (int64, error) { - n, err := crw.readSeeker.Seek(offset, whence) - return n, err -} diff --git a/services/keepstore/s3aws_volume.go b/services/keepstore/s3aws_volume.go index a7801ae262..6205da5beb 100644 --- a/services/keepstore/s3aws_volume.go +++ b/services/keepstore/s3aws_volume.go @@ -112,18 +112,14 @@ func newS3AWSVolume(cluster *arvados.Cluster, volume arvados.Volume, logger logr } func (v *S3AWSVolume) translateError(err error) error { - if aerr, ok := err.(awserr.Error); ok { - switch aerr.Code() { - case "NotFound": + if _, ok := err.(*aws.RequestCanceledError); ok { + return context.Canceled + } else if aerr, ok := err.(awserr.Error); ok { + if aerr.Code() == "NotFound" { return os.ErrNotExist - case "NoSuchKey": + } else if aerr.Code() == "NoSuchKey" { return os.ErrNotExist } - } else { - switch err.(type) { - case *aws.RequestCanceledError: - return context.Canceled - } } return err } @@ -472,14 +468,9 @@ func (v *S3AWSVolume) Get(ctx context.Context, loc string, buf []byte) (int, err // through 'buf []byte', and we don't want to allocate two buffers for each // read request. Instead, use a version of ReadBlock that accepts 'buf []byte' // as an input. - return v.ReadBlock(ctx, loc, buf) -} - -func (v *S3AWSVolume) ReadBlock(ctx context.Context, loc string, buf []byte) (int, error) { key := v.key(loc) count, err := v.readWorker(ctx, key, buf) if err == nil { - v.bucket.stats.TickInBytes(uint64(count)) return count, err } @@ -506,7 +497,6 @@ func (v *S3AWSVolume) ReadBlock(ctx context.Context, loc string, buf []byte) (in err = v.translateError(err) return 0, err } - v.bucket.stats.TickInBytes(uint64(count)) return count, err } @@ -526,12 +516,8 @@ func (v *S3AWSVolume) readWorker(ctx context.Context, key string, buf []byte) (i v.bucket.stats.TickOps("get") v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.GetOps) v.bucket.stats.TickErr(err) - if err != nil { - return 0, v.translateError(err) - } - buf = awsBuf.Bytes() - - return int(count), err + v.bucket.stats.TickInBytes(uint64(count)) + return int(count), v.translateError(err) } func (v *S3AWSVolume) writeObject(ctx context.Context, key string, r io.Reader) error { @@ -551,7 +537,7 @@ func (v *S3AWSVolume) writeObject(ctx context.Context, key string, r io.Reader) var contentMD5 string md5, err := hex.DecodeString(loc) if err != nil { - return err + return v.translateError(err) } contentMD5 = base64.StdEncoding.EncodeToString(md5) uploadInput.ContentMD5 = &contentMD5 @@ -580,22 +566,18 @@ func (v *S3AWSVolume) writeObject(ctx context.Context, key string, r io.Reader) v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.PutOps) v.bucket.stats.TickErr(err) - return err + return v.translateError(err) } // Put writes a block. func (v *S3AWSVolume) Put(ctx context.Context, loc string, block []byte) error { // Do not use putWithPipe here; we want to pass an io.ReadSeeker to the S3 // sdk to avoid memory allocation there. See #17339 for more information. - return v.translateError(v.WriteBlock(ctx, loc, bytes.NewReader(block))) -} - -// WriteBlock implements BlockWriter. -func (v *S3AWSVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) error { if v.volume.ReadOnly { return MethodDisabledError } + rdr := bytes.NewReader(block) r := NewCountingReaderAtSeeker(rdr, v.bucket.stats.TickOutBytes) key := v.key(loc) err := v.writeObject(ctx, key, r)