21720: added canwrite & canmanage to user.test
[arvados.git] / services / keepstore / s3_volume.go
index bd79d49e167fd77f8e768185189efd9cf620fc2c..2e2e97a974efa2ddbb7b5e60f67160da85181980 100644 (file)
@@ -42,8 +42,8 @@ const (
        s3DefaultConnectTimeout     = arvados.Duration(time.Minute)
        maxClockSkew                = 600 * time.Second
        nearlyRFC1123               = "Mon, 2 Jan 2006 15:04:05 GMT"
-       s3downloaderPartSize        = 5 * 1024 * 1024
-       s3downloaderReadConcurrency = 13
+       s3downloaderPartSize        = 6 * 1024 * 1024
+       s3downloaderReadConcurrency = 11
        s3uploaderPartSize          = 5 * 1024 * 1024
        s3uploaderWriteConcurrency  = 5
 )
@@ -217,7 +217,23 @@ func (v *s3Volume) check(ec2metadataHostname string) error {
        creds := aws.NewChainProvider(
                []aws.CredentialsProvider{
                        aws.NewStaticCredentialsProvider(v.AccessKeyID, v.SecretAccessKey, v.AuthToken),
-                       ec2rolecreds.New(ec2metadata.New(cfg)),
+                       ec2rolecreds.New(ec2metadata.New(cfg), func(opts *ec2rolecreds.ProviderOptions) {
+                               // (from aws-sdk-go-v2 comments)
+                               // "allow the credentials to trigger
+                               // refreshing prior to the credentials
+                               // actually expiring. This is
+                               // beneficial so race conditions with
+                               // expiring credentials do not cause
+                               // request to fail unexpectedly due to
+                               // ExpiredTokenException exceptions."
+                               //
+                               // (from
+                               // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html)
+                               // "We make new credentials available
+                               // at least five minutes before the
+                               // expiration of the old credentials."
+                               opts.ExpiryWindow = 5 * time.Minute
+                       }),
                })
 
        cfg.Credentials = creds
@@ -411,24 +427,13 @@ func (v *s3Volume) head(key string) (result *s3.HeadObjectOutput, err error) {
 
 // BlockRead reads a Keep block that has been stored as a block blob
 // in the S3 bucket.
-func (v *s3Volume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) {
+func (v *s3Volume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error {
        key := v.key(hash)
-       buf, err := v.bufferPool.GetContext(ctx)
-       if err != nil {
-               return 0, err
-       }
-       defer v.bufferPool.Put(buf)
-
-       streamer := newStreamWriterAt(writeTo, 65536, buf)
-       defer streamer.Close()
-       err = v.readWorker(ctx, key, streamer)
+       err := v.readWorker(ctx, key, w)
        if err != nil {
                err = v.translateError(err)
                if !os.IsNotExist(err) {
-                       return 0, err
-               }
-               if streamer.WroteAt() > 0 {
-                       return 0, errors.New("bug? readWorker returned ErrNotExist after writing to streamer")
+                       return err
                }
 
                _, err = v.head("recent/" + key)
@@ -436,25 +441,21 @@ func (v *s3Volume) BlockRead(ctx context.Context, hash string, writeTo io.Writer
                if err != nil {
                        // If we can't read recent/X, there's no point in
                        // trying fixRace. Give up.
-                       return 0, err
+                       return err
                }
                if !v.fixRace(key) {
                        err = os.ErrNotExist
-                       return 0, err
+                       return err
                }
 
-               err = v.readWorker(ctx, key, streamer)
+               err = v.readWorker(ctx, key, w)
                if err != nil {
                        v.logger.Warnf("reading %s after successful fixRace: %s", hash, err)
                        err = v.translateError(err)
-                       return 0, err
+                       return err
                }
        }
-       err = streamer.Close()
-       if err != nil {
-               return 0, v.translateError(err)
-       }
-       return streamer.Wrote(), nil
+       return nil
 }
 
 func (v *s3Volume) readWorker(ctx context.Context, key string, dst io.WriterAt) error {