X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/472d5d0daa87835ab04c02a6298fdc338d1a0446..74df5a58360fe6bcb273480d3ddec507a53b6b2b:/services/keepstore/s3_volume.go diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go index 02d9dcf91b..fb978fe2ba 100644 --- a/services/keepstore/s3_volume.go +++ b/services/keepstore/s3_volume.go @@ -7,6 +7,7 @@ package main import ( "bytes" "context" + "crypto/sha256" "encoding/base64" "encoding/hex" "flag" @@ -402,6 +403,14 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error { return err } opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5) + // In AWS regions that use V4 signatures, we need to + // provide ContentSHA256 up front. Otherwise, the S3 + // library reads the request body (from our buffer) + // into another new buffer in order to compute the + // SHA256 before sending the request -- which would + // mean consuming 128 MiB of memory for the duration + // of a 64 MiB write. + opts.ContentSHA256 = fmt.Sprintf("%x", sha256.Sum256(block)) } // Send the block data through a pipe, so that (if we need to) @@ -509,8 +518,7 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error { PageSize: v.IndexPageSize, Stats: &v.bucket.stats, } - for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() { - v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps) + for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() { if data.Key >= "g" { // Conveniently, "recent/*" and "trash/*" are // lexically greater than all hex-encoded data @@ -529,7 +537,7 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error { stamp := data // Advance to the corresponding recent/X marker, if any - for recent != nil { + for recent != nil && recentL.Error() == nil { if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 { recent = recentL.Next() continue @@ -544,13 +552,16 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error { break } } + if err := recentL.Error(); err != nil { + return err + } t, err := time.Parse(time.RFC3339, stamp.LastModified) if err != nil { return err } fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano()) } - return nil + return dataL.Error() } // Trash a Keep block.