X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/ee9d1e39b5d469a827be5a719c9c0860914ab2a8..2c6557f613fcf6cdcebb08c321a5d061aeb780c6:/services/keepstore/s3aws_volume.go diff --git a/services/keepstore/s3aws_volume.go b/services/keepstore/s3aws_volume.go index 18b30f4638..2417bb8149 100644 --- a/services/keepstore/s3aws_volume.go +++ b/services/keepstore/s3aws_volume.go @@ -38,10 +38,14 @@ func init() { } const ( - s3DefaultReadTimeout = arvados.Duration(10 * time.Minute) - s3DefaultConnectTimeout = arvados.Duration(time.Minute) - maxClockSkew = 600 * time.Second - nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT" + s3DefaultReadTimeout = arvados.Duration(10 * time.Minute) + s3DefaultConnectTimeout = arvados.Duration(time.Minute) + maxClockSkew = 600 * time.Second + nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT" + s3downloaderPartSize = 5 * 1024 * 1024 + s3downloaderReadConcurrency = 13 + s3uploaderPartSize = 5 * 1024 * 1024 + s3uploaderWriteConcurrency = 5 ) var ( @@ -54,13 +58,14 @@ type S3AWSVolume struct { AuthToken string // populated automatically when IAMRole is used AuthExpiration time.Time // populated automatically when IAMRole is used - cluster *arvados.Cluster - volume arvados.Volume - logger logrus.FieldLogger - metrics *volumeMetricsVecs - bucket *s3AWSbucket - region string - startOnce sync.Once + cluster *arvados.Cluster + volume arvados.Volume + logger logrus.FieldLogger + metrics *volumeMetricsVecs + bufferPool *bufferPool + bucket *s3AWSbucket + region string + startOnce sync.Once } // s3bucket wraps s3.bucket and counts I/O and API usage stats. The @@ -73,11 +78,7 @@ type s3AWSbucket struct { mu sync.Mutex } -const ( - PartSize = 5 * 1024 * 1024 - ReadConcurrency = 13 - WriteConcurrency = 5 -) +const () var s3AWSKeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`) var s3AWSZeroTime time.Time @@ -100,13 +101,18 @@ func (v *S3AWSVolume) key(loc string) string { } } -func newS3AWSVolume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) { - v := &S3AWSVolume{cluster: cluster, volume: volume, metrics: metrics} - err := json.Unmarshal(volume.DriverParameters, v) +func newS3AWSVolume(params newVolumeParams) (volume, error) { + v := &S3AWSVolume{ + cluster: params.Cluster, + volume: params.ConfigVolume, + metrics: params.MetricsVecs, + bufferPool: params.BufferPool, + } + err := json.Unmarshal(params.ConfigVolume.DriverParameters, v) if err != nil { return nil, err } - v.logger = logger.WithField("Volume", v.String()) + v.logger = params.Logger.WithField("Volume", v.DeviceID()) return v, v.check("") } @@ -225,73 +231,17 @@ func (v *S3AWSVolume) check(ec2metadataHostname string) error { } // Set up prometheus metrics - lbls := prometheus.Labels{"device_id": v.GetDeviceID()} + lbls := prometheus.Labels{"device_id": v.DeviceID()} v.bucket.stats.opsCounters, v.bucket.stats.errCounters, v.bucket.stats.ioBytes = v.metrics.getCounterVecsFor(lbls) return nil } -// String implements fmt.Stringer. -func (v *S3AWSVolume) String() string { - return fmt.Sprintf("s3-bucket:%+q", v.Bucket) -} - -// GetDeviceID returns a globally unique ID for the storage bucket. -func (v *S3AWSVolume) GetDeviceID() string { +// DeviceID returns a globally unique ID for the storage bucket. +func (v *S3AWSVolume) DeviceID() string { return "s3://" + v.Endpoint + "/" + v.Bucket } -// Compare the given data with the stored data. -func (v *S3AWSVolume) Compare(ctx context.Context, loc string, expect []byte) error { - key := v.key(loc) - errChan := make(chan error, 1) - go func() { - _, err := v.head("recent/" + key) - errChan <- err - }() - var err error - select { - case <-ctx.Done(): - return ctx.Err() - case err = <-errChan: - } - if err != nil { - // Checking for the key itself here would interfere - // with future GET requests. - // - // On AWS, if X doesn't exist, a HEAD or GET request - // for X causes X's non-existence to be cached. Thus, - // if we test for X, then create X and return a - // signature to our client, the client might still get - // 404 from all keepstores when trying to read it. - // - // To avoid this, we avoid doing HEAD X or GET X until - // we know X has been written. - // - // Note that X might exist even though recent/X - // doesn't: for example, the response to HEAD recent/X - // might itself come from a stale cache. In such - // cases, we will return a false negative and - // PutHandler might needlessly create another replica - // on a different volume. That's not ideal, but it's - // better than passing the eventually-consistent - // problem on to our clients. - return v.translateError(err) - } - - input := &s3.GetObjectInput{ - Bucket: aws.String(v.bucket.bucket), - Key: aws.String(key), - } - - req := v.bucket.svc.GetObjectRequest(input) - result, err := req.Send(ctx) - if err != nil { - return v.translateError(err) - } - return v.translateError(compareReaderWithBuf(ctx, result.Body, expect, loc[:32])) -} - // EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime // and deletes them from the volume. func (v *S3AWSVolume) EmptyTrash() { @@ -313,7 +263,7 @@ func (v *S3AWSVolume) EmptyTrash() { recent, err := v.head("recent/" + key) if err != nil && os.IsNotExist(v.translateError(err)) { v.logger.Warnf("EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", *trash.Key, "recent/"+key, err) - err = v.Untrash(loc) + err = v.BlockUntrash(loc) if err != nil { v.logger.WithError(err).Errorf("EmptyTrash: Untrash(%q) failed", loc) } @@ -334,7 +284,7 @@ func (v *S3AWSVolume) EmptyTrash() { // necessary to avoid starvation. v.logger.Infof("EmptyTrash: detected old race for %q, calling fixRace + Touch", loc) v.fixRace(key) - v.Touch(loc) + v.BlockTouch(loc) return } _, err := v.head(key) @@ -401,7 +351,7 @@ func (v *S3AWSVolume) EmptyTrash() { if err := trashL.Error(); err != nil { v.logger.WithError(err).Error("EmptyTrash: lister failed") } - v.logger.Infof("EmptyTrash: stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted) + v.logger.Infof("EmptyTrash: stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.DeviceID(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted) } // fixRace(X) is called when "recent/X" exists but "X" doesn't @@ -462,55 +412,60 @@ func (v *S3AWSVolume) head(key string) (result *s3.HeadObjectOutput, err error) return } -// Get a block: copy the block data into buf, and return the number of -// bytes copied. -func (v *S3AWSVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) { - // Do not use getWithPipe here: the BlockReader interface does not pass - // through 'buf []byte', and we don't want to allocate two buffers for each - // read request. Instead, use a version of ReadBlock that accepts 'buf []byte' - // as an input. - key := v.key(loc) - count, err := v.readWorker(ctx, key, buf) - if err == nil { - return count, err - } - - err = v.translateError(err) - if !os.IsNotExist(err) { - return 0, err - } - - _, err = v.head("recent/" + key) - err = v.translateError(err) +// BlockRead reads a Keep block that has been stored as a block blob +// in the S3 bucket. +func (v *S3AWSVolume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) { + key := v.key(hash) + buf, err := v.bufferPool.GetContext(ctx) if err != nil { - // If we can't read recent/X, there's no point in - // trying fixRace. Give up. - return 0, err - } - if !v.fixRace(key) { - err = os.ErrNotExist return 0, err } + defer v.bufferPool.Put(buf) - count, err = v.readWorker(ctx, key, buf) + streamer := newStreamWriterAt(writeTo, 65536, buf) + defer streamer.Close() + err = v.readWorker(ctx, key, streamer) if err != nil { - v.logger.Warnf("reading %s after successful fixRace: %s", loc, err) err = v.translateError(err) - return 0, err + if !os.IsNotExist(err) { + return 0, err + } + if streamer.WroteAt() > 0 { + return 0, errors.New("bug? readWorker returned ErrNotExist after writing to streamer") + } + + _, err = v.head("recent/" + key) + err = v.translateError(err) + if err != nil { + // If we can't read recent/X, there's no point in + // trying fixRace. Give up. + return 0, err + } + if !v.fixRace(key) { + err = os.ErrNotExist + return 0, err + } + + err = v.readWorker(ctx, key, streamer) + if err != nil { + v.logger.Warnf("reading %s after successful fixRace: %s", hash, err) + err = v.translateError(err) + return 0, err + } } - return count, err + err = streamer.Close() + if err != nil { + return 0, v.translateError(err) + } + return streamer.Wrote(), nil } -func (v *S3AWSVolume) readWorker(ctx context.Context, key string, buf []byte) (int, error) { - awsBuf := aws.NewWriteAtBuffer(buf) +func (v *S3AWSVolume) readWorker(ctx context.Context, key string, dst io.WriterAt) error { downloader := s3manager.NewDownloaderWithClient(v.bucket.svc, func(u *s3manager.Downloader) { - u.PartSize = PartSize - u.Concurrency = ReadConcurrency + u.PartSize = s3downloaderPartSize + u.Concurrency = s3downloaderReadConcurrency }) - - v.logger.Debugf("Partsize: %d; Concurrency: %d\n", downloader.PartSize, downloader.Concurrency) - - count, err := downloader.DownloadWithContext(ctx, awsBuf, &s3.GetObjectInput{ + count, err := downloader.DownloadWithContext(ctx, dst, &s3.GetObjectInput{ Bucket: aws.String(v.bucket.bucket), Key: aws.String(key), }) @@ -518,7 +473,7 @@ func (v *S3AWSVolume) readWorker(ctx context.Context, key string, buf []byte) (i v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.GetOps) v.bucket.stats.TickErr(err) v.bucket.stats.TickInBytes(uint64(count)) - return int(count), v.translateError(err) + return v.translateError(err) } func (v *S3AWSVolume) writeObject(ctx context.Context, key string, r io.Reader) error { @@ -547,10 +502,10 @@ func (v *S3AWSVolume) writeObject(ctx context.Context, key string, r io.Reader) // Experimentation indicated that using concurrency 5 yields the best // throughput, better than higher concurrency (10 or 13) by ~5%. // Defining u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(64 * 1024 * 1024) - // is detrimental to througput (minus ~15%). + // is detrimental to throughput (minus ~15%). uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) { - u.PartSize = PartSize - u.Concurrency = WriteConcurrency + u.PartSize = s3uploaderPartSize + u.Concurrency = s3uploaderWriteConcurrency }) // Unlike the goamz S3 driver, we don't need to precompute ContentSHA256: @@ -571,16 +526,12 @@ func (v *S3AWSVolume) writeObject(ctx context.Context, key string, r io.Reader) } // Put writes a block. -func (v *S3AWSVolume) Put(ctx context.Context, loc string, block []byte) error { +func (v *S3AWSVolume) BlockWrite(ctx context.Context, hash string, data []byte) error { // Do not use putWithPipe here; we want to pass an io.ReadSeeker to the S3 // sdk to avoid memory allocation there. See #17339 for more information. - if v.volume.ReadOnly { - return MethodDisabledError - } - - rdr := bytes.NewReader(block) - r := NewCountingReaderAtSeeker(rdr, v.bucket.stats.TickOutBytes) - key := v.key(loc) + rdr := bytes.NewReader(data) + r := newCountingReaderAtSeeker(rdr, v.bucket.stats.TickOutBytes) + key := v.key(hash) err := v.writeObject(ctx, key, r) if err != nil { return err @@ -675,9 +626,9 @@ func (lister *s3awsLister) pop() (k *s3.Object) { return } -// IndexTo writes a complete list of locators with the given prefix +// Index writes a complete list of locators with the given prefix // for which Get() can retrieve data. -func (v *S3AWSVolume) IndexTo(prefix string, writer io.Writer) error { +func (v *S3AWSVolume) Index(ctx context.Context, prefix string, writer io.Writer) error { prefix = v.key(prefix) // Use a merge sort to find matching sets of X and recent/X. dataL := s3awsLister{ @@ -695,6 +646,9 @@ func (v *S3AWSVolume) IndexTo(prefix string, writer io.Writer) error { Stats: &v.bucket.stats, } for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() { + if ctx.Err() != nil { + return ctx.Err() + } if *data.Key >= "g" { // Conveniently, "recent/*" and "trash/*" are // lexically greater than all hex-encoded data @@ -769,28 +723,14 @@ func (v *S3AWSVolume) Mtime(loc string) (time.Time, error) { return *resp.LastModified, err } -// Status returns a *VolumeStatus representing the current in-use -// storage capacity and a fake available capacity that doesn't make -// the volume seem full or nearly-full. -func (v *S3AWSVolume) Status() *VolumeStatus { - return &VolumeStatus{ - DeviceNum: 1, - BytesFree: BlockSize * 1000, - BytesUsed: 1, - } -} - // InternalStats returns bucket I/O and API call counters. func (v *S3AWSVolume) InternalStats() interface{} { return &v.bucket.stats } -// Touch sets the timestamp for the given locator to the current time. -func (v *S3AWSVolume) Touch(loc string) error { - if v.volume.ReadOnly { - return MethodDisabledError - } - key := v.key(loc) +// BlockTouch sets the timestamp for the given locator to the current time. +func (v *S3AWSVolume) BlockTouch(hash string) error { + key := v.key(hash) _, err := v.head(key) err = v.translateError(err) if os.IsNotExist(err) && v.fixRace(key) { @@ -845,10 +785,7 @@ func (b *s3AWSbucket) Del(path string) error { } // Trash a Keep block. -func (v *S3AWSVolume) Trash(loc string) error { - if v.volume.ReadOnly && !v.volume.AllowTrashWhenReadOnly { - return MethodDisabledError - } +func (v *S3AWSVolume) BlockTrash(loc string) error { if t, err := v.Mtime(loc); err != nil { return err } else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() { @@ -872,9 +809,9 @@ func (v *S3AWSVolume) Trash(loc string) error { return v.translateError(v.bucket.Del(key)) } -// Untrash moves block from trash back into store -func (v *S3AWSVolume) Untrash(loc string) error { - key := v.key(loc) +// BlockUntrash moves block from trash back into store +func (v *S3AWSVolume) BlockUntrash(hash string) error { + key := v.key(hash) err := v.safeCopy(key, "trash/"+key) if err != nil { return err