s3DefaultConnectTimeout = arvados.Duration(time.Minute)
maxClockSkew = 600 * time.Second
nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
- s3downloaderPartSize = 5 * 1024 * 1024
- s3downloaderReadConcurrency = 13
+ s3downloaderPartSize = 6 * 1024 * 1024
+ s3downloaderReadConcurrency = 11
s3uploaderPartSize = 5 * 1024 * 1024
s3uploaderWriteConcurrency = 5
)
creds := aws.NewChainProvider(
[]aws.CredentialsProvider{
aws.NewStaticCredentialsProvider(v.AccessKeyID, v.SecretAccessKey, v.AuthToken),
- ec2rolecreds.New(ec2metadata.New(cfg)),
+ ec2rolecreds.New(ec2metadata.New(cfg), func(opts *ec2rolecreds.ProviderOptions) {
+ // (from aws-sdk-go-v2 comments)
+ // "allow the credentials to trigger
+ // refreshing prior to the credentials
+ // actually expiring. This is
+ // beneficial so race conditions with
+ // expiring credentials do not cause
+ // request to fail unexpectedly due to
+ // ExpiredTokenException exceptions."
+ //
+ // (from
+ // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html)
+ // "We make new credentials available
+ // at least five minutes before the
+ // expiration of the old credentials."
+ opts.ExpiryWindow = 5 * time.Minute
+ }),
})
cfg.Credentials = creds
// BlockRead reads a Keep block that has been stored as a block blob
// in the S3 bucket.
-func (v *s3Volume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) {
+func (v *s3Volume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error {
key := v.key(hash)
- buf, err := v.bufferPool.GetContext(ctx)
- if err != nil {
- return 0, err
- }
- defer v.bufferPool.Put(buf)
-
- streamer := newStreamWriterAt(writeTo, 65536, buf)
- defer streamer.Close()
- err = v.readWorker(ctx, key, streamer)
+ err := v.readWorker(ctx, key, w)
if err != nil {
err = v.translateError(err)
if !os.IsNotExist(err) {
- return 0, err
- }
- if streamer.WroteAt() > 0 {
- return 0, errors.New("bug? readWorker returned ErrNotExist after writing to streamer")
+ return err
}
_, err = v.head("recent/" + key)
if err != nil {
// If we can't read recent/X, there's no point in
// trying fixRace. Give up.
- return 0, err
+ return err
}
if !v.fixRace(key) {
err = os.ErrNotExist
- return 0, err
+ return err
}
- err = v.readWorker(ctx, key, streamer)
+ err = v.readWorker(ctx, key, w)
if err != nil {
v.logger.Warnf("reading %s after successful fixRace: %s", hash, err)
err = v.translateError(err)
- return 0, err
+ return err
}
}
- err = streamer.Close()
- if err != nil {
- return 0, v.translateError(err)
- }
- return streamer.Wrote(), nil
+ return nil
}
func (v *s3Volume) readWorker(ctx context.Context, key string, dst io.WriterAt) error {