X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/5836e576fe0b78c50383cf56e1c4fb4521daeca1..e62dc0ffea0983f6a17ef16b368a0dceb62b98ea:/services/keepstore/s3_volume.go diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go index bdab58927b..fb978fe2ba 100644 --- a/services/keepstore/s3_volume.go +++ b/services/keepstore/s3_volume.go @@ -7,6 +7,7 @@ package main import ( "bytes" "context" + "crypto/sha256" "encoding/base64" "encoding/hex" "flag" @@ -235,6 +236,10 @@ func (v *S3Volume) Start() error { } client := s3.New(auth, region) + if region.EC2Endpoint.Signer == aws.V4Signature { + // Currently affects only eu-central-1 + client.Signature = aws.V4Signature + } client.ConnectTimeout = time.Duration(v.ConnectTimeout) client.ReadTimeout = time.Duration(v.ReadTimeout) v.bucket = &s3bucket{ @@ -398,6 +403,14 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error { return err } opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5) + // In AWS regions that use V4 signatures, we need to + // provide ContentSHA256 up front. Otherwise, the S3 + // library reads the request body (from our buffer) + // into another new buffer in order to compute the + // SHA256 before sending the request -- which would + // mean consuming 128 MiB of memory for the duration + // of a 64 MiB write. + opts.ContentSHA256 = fmt.Sprintf("%x", sha256.Sum256(block)) } // Send the block data through a pipe, so that (if we need to) @@ -497,16 +510,15 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error { Bucket: v.bucket.Bucket, Prefix: prefix, PageSize: v.IndexPageSize, + Stats: &v.bucket.stats, } recentL := s3Lister{ Bucket: v.bucket.Bucket, Prefix: "recent/" + prefix, PageSize: v.IndexPageSize, + Stats: &v.bucket.stats, } - v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps) - v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps) - for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() { - v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps) + for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() { if data.Key >= "g" { // Conveniently, "recent/*" and "trash/*" are // lexically greater than all hex-encoded data @@ -525,15 +537,13 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error { stamp := data // Advance to the corresponding recent/X marker, if any - for recent != nil { + for recent != nil && recentL.Error() == nil { if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 { recent = recentL.Next() - v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps) continue } else if cmp == 0 { stamp = recent recent = recentL.Next() - v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps) break } else { // recent/X marker is missing: we'll @@ -542,13 +552,16 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error { break } } + if err := recentL.Error(); err != nil { + return err + } t, err := time.Parse(time.RFC3339, stamp.LastModified) if err != nil { return err } fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano()) } - return nil + return dataL.Error() } // Trash a Keep block. @@ -620,8 +633,10 @@ func (v *S3Volume) safeCopy(dst, src string) error { MetadataDirective: "REPLACE", }, v.bucket.Name+"/"+src) err = v.translateError(err) - if err != nil { + if os.IsNotExist(err) { return err + } else if err != nil { + return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.Name+"/"+src, err) } if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil { return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err) @@ -867,6 +882,7 @@ func (v *S3Volume) EmptyTrash() { Bucket: v.bucket.Bucket, Prefix: "trash/", PageSize: v.IndexPageSize, + Stats: &v.bucket.stats, } for trash := trashL.First(); trash != nil; trash = trashL.Next() { todo <- trash @@ -884,6 +900,7 @@ type s3Lister struct { Bucket *s3.Bucket Prefix string PageSize int + Stats *s3bucketStats nextMarker string buf []s3.Key err error @@ -912,6 +929,7 @@ func (lister *s3Lister) Error() error { } func (lister *s3Lister) getPage() { + lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps) resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize) lister.nextMarker = "" if err != nil {