From 464c1562415ac7b5b5503f41b20c3183610dc899 Mon Sep 17 00:00:00 2001 From: Ward Vandewege Date: Thu, 2 Dec 2021 15:59:18 -0500 Subject: [PATCH] 17339: remove unnecessary memory allocation when writing to Keep with the S3 v2 driver. Arvados-DCO-1.1-Signed-off-by: Ward Vandewege --- services/keepstore/count.go | 36 ++++++++++++++++++++++++++++++ services/keepstore/s3aws_volume.go | 6 +++-- 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/services/keepstore/count.go b/services/keepstore/count.go index 272b5017cb..2dc7e04b43 100644 --- a/services/keepstore/count.go +++ b/services/keepstore/count.go @@ -22,6 +22,15 @@ func NewCountingReader(r io.Reader, f func(uint64)) io.ReadCloser { } } +func NewCountingReaderAtSeeker(r interface{}, f func(uint64)) *countingReaderAtSeeker { + return &countingReaderAtSeeker{readerAtSeeker: readerAtSeeker{ + readSeeker: r.(io.ReadSeeker), + readerAt: r.(io.ReaderAt), + }, + counter: f, + } +} + type countingReadWriter struct { reader io.Reader writer io.Writer @@ -46,3 +55,30 @@ func (crw *countingReadWriter) Close() error { } return nil } + +type readerAtSeeker struct { + readSeeker io.ReadSeeker + readerAt io.ReaderAt +} + +type countingReaderAtSeeker struct { + readerAtSeeker + counter func(uint64) +} + +func (crw *countingReaderAtSeeker) Read(buf []byte) (int, error) { + n, err := crw.readSeeker.Read(buf) + crw.counter(uint64(n)) + return n, err +} + +func (crw *countingReaderAtSeeker) ReadAt(buf []byte, off int64) (int, error) { + n, err := crw.readerAt.ReadAt(buf, off) + crw.counter(uint64(n)) + return n, err +} + +func (crw *countingReaderAtSeeker) Seek(offset int64, whence int) (int64, error) { + n, err := crw.readSeeker.Seek(offset, whence) + return n, err +} diff --git a/services/keepstore/s3aws_volume.go b/services/keepstore/s3aws_volume.go index cb0b73cb8b..e6b4d64530 100644 --- a/services/keepstore/s3aws_volume.go +++ b/services/keepstore/s3aws_volume.go @@ -586,7 +586,9 @@ func (v *S3AWSVolume) writeObject(ctx context.Context, key string, r io.Reader) // Put writes a block. func (v *S3AWSVolume) Put(ctx context.Context, loc string, block []byte) error { - return putWithPipe(ctx, loc, block, v) + // Do not use putWithPipe here; we want to pass an io.ReadSeeker to the S3 + // sdk to avoid memory allocation there. See #17339 for more information. + return v.WriteBlock(ctx, loc, bytes.NewReader(block)) } // WriteBlock implements BlockWriter. @@ -595,7 +597,7 @@ func (v *S3AWSVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) return MethodDisabledError } - r := NewCountingReader(rdr, v.bucket.stats.TickOutBytes) + r := NewCountingReaderAtSeeker(rdr, v.bucket.stats.TickOutBytes) key := v.key(loc) err := v.writeObject(ctx, key, r) if err != nil { -- 2.30.2