From: Ward Vandewege Date: Fri, 3 Dec 2021 20:12:38 +0000 (-0500) Subject: 17339: remove unnecessary memory allocation when reading from Keep with X-Git-Tag: 2.4.0~142^2~3 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/5ce5bf966dfabbc0beb7330d4c976a23fde3fd83?ds=sidebyside;hp=-c 17339: remove unnecessary memory allocation when reading from Keep with the S3 v2 driver. Also fix a few incorrect calls to log.Error() Arvados-DCO-1.1-Signed-off-by: Ward Vandewege --- 5ce5bf966dfabbc0beb7330d4c976a23fde3fd83 diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go index 52683d9716..63a23687ec 100644 --- a/services/keepstore/handlers.go +++ b/services/keepstore/handlers.go @@ -709,7 +709,7 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b if filehash != hash { // TODO: Try harder to tell a sysadmin about // this. - log.Error("checksum mismatch for block %s (actual %s) on %s", hash, filehash, vol) + log.Errorf("checksum mismatch for block %s (actual %s), size %d on %s", hash, filehash, size, vol) errorToCaller = DiskHashError continue } @@ -976,7 +976,7 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, // to tell which one is wanted if we have // both, so there's no point writing it even // on a different volume.) - log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume) + log.Errorf("collision in Compare(%s) on volume %s", hash, mnt.Volume) return CollisionError } else if os.IsNotExist(err) { // Block does not exist. This is the only diff --git a/services/keepstore/s3aws_volume.go b/services/keepstore/s3aws_volume.go index e6b4d64530..4064809d5d 100644 --- a/services/keepstore/s3aws_volume.go +++ b/services/keepstore/s3aws_volume.go @@ -463,52 +463,24 @@ func (v *S3AWSVolume) head(key string) (result *s3.HeadObjectOutput, err error) // Get a block: copy the block data into buf, and return the number of // bytes copied. func (v *S3AWSVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) { - return getWithPipe(ctx, loc, buf, v) + // Do not use getWithPipe here: the BlockReader interface does not pass + // through 'buf []byte', and we don't want to allocate two buffers for each + // read request. Instead, use a version of ReadBlock that accepts 'buf []byte' + // as an input. + return v.ReadBlock(ctx, loc, buf) } -func (v *S3AWSVolume) readWorker(ctx context.Context, key string) (rdr io.ReadCloser, err error) { - buf := make([]byte, 0, 67108864) - awsBuf := aws.NewWriteAtBuffer(buf) - - downloader := s3manager.NewDownloaderWithClient(v.bucket.svc, func(u *s3manager.Downloader) { - u.PartSize = PartSize - u.Concurrency = ReadConcurrency - }) - - v.logger.Debugf("Partsize: %d; Concurrency: %d\n", downloader.PartSize, downloader.Concurrency) - - _, err = downloader.DownloadWithContext(ctx, awsBuf, &s3.GetObjectInput{ - Bucket: aws.String(v.bucket.bucket), - Key: aws.String(key), - }) - v.bucket.stats.TickOps("get") - v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.GetOps) - v.bucket.stats.TickErr(err) - if err != nil { - return nil, v.translateError(err) - } - buf = awsBuf.Bytes() - - rdr = NewCountingReader(bytes.NewReader(buf), v.bucket.stats.TickInBytes) - return -} - -// ReadBlock implements BlockReader. -func (v *S3AWSVolume) ReadBlock(ctx context.Context, loc string, w io.Writer) error { +func (v *S3AWSVolume) ReadBlock(ctx context.Context, loc string, buf []byte) (int, error) { key := v.key(loc) - rdr, err := v.readWorker(ctx, key) - + count, err := v.readWorker(ctx, key, buf) if err == nil { - _, err2 := io.Copy(w, rdr) - if err2 != nil { - return err2 - } - return err + v.bucket.stats.TickInBytes(uint64(count)) + return count, err } err = v.translateError(err) if !os.IsNotExist(err) { - return err + return 0, err } _, err = v.head("recent/" + key) @@ -516,23 +488,45 @@ func (v *S3AWSVolume) ReadBlock(ctx context.Context, loc string, w io.Writer) er if err != nil { // If we can't read recent/X, there's no point in // trying fixRace. Give up. - return err + return 0, err } if !v.fixRace(key) { err = os.ErrNotExist - return err + return 0, err } - rdr, err = v.readWorker(ctx, key) + count, err = v.readWorker(ctx, key, buf) if err != nil { v.logger.Warnf("reading %s after successful fixRace: %s", loc, err) err = v.translateError(err) - return err + return 0, err } + v.bucket.stats.TickInBytes(uint64(count)) + return count, err +} - _, err = io.Copy(w, rdr) +func (v *S3AWSVolume) readWorker(ctx context.Context, key string, buf []byte) (int, error) { + awsBuf := aws.NewWriteAtBuffer(buf) + downloader := s3manager.NewDownloaderWithClient(v.bucket.svc, func(u *s3manager.Downloader) { + u.PartSize = PartSize + u.Concurrency = ReadConcurrency + }) - return err + v.logger.Debugf("Partsize: %d; Concurrency: %d\n", downloader.PartSize, downloader.Concurrency) + + count, err := downloader.DownloadWithContext(ctx, awsBuf, &s3.GetObjectInput{ + Bucket: aws.String(v.bucket.bucket), + Key: aws.String(key), + }) + v.bucket.stats.TickOps("get") + v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.GetOps) + v.bucket.stats.TickErr(err) + if err != nil { + return 0, v.translateError(err) + } + buf = awsBuf.Bytes() + + return int(count), err } func (v *S3AWSVolume) writeObject(ctx context.Context, key string, r io.Reader) error {