X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/e1902b8a0490aa6b7ffc544c1609d4d57a5110ce..6fb784416db6651b33b921a0684c2f8de84410fc:/services/keepstore/s3_volume.go diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go index caed35b670..17923f807d 100644 --- a/services/keepstore/s3_volume.go +++ b/services/keepstore/s3_volume.go @@ -1,11 +1,14 @@ package main import ( + "bytes" + "context" "encoding/base64" "encoding/hex" "flag" "fmt" "io" + "io/ioutil" "log" "net/http" "os" @@ -19,6 +22,11 @@ import ( "github.com/AdRoll/goamz/s3" ) +const ( + s3DefaultReadTimeout = arvados.Duration(10 * time.Minute) + s3DefaultConnectTimeout = arvados.Duration(time.Minute) +) + var ( // ErrS3TrashDisabled is returned by Trash if that operation // is impossible with the current config. @@ -134,6 +142,8 @@ type S3Volume struct { LocationConstraint bool IndexPageSize int S3Replication int + ConnectTimeout arvados.Duration + ReadTimeout arvados.Duration RaceWindow arvados.Duration ReadOnly bool UnsafeDelete bool @@ -147,24 +157,28 @@ type S3Volume struct { func (*S3Volume) Examples() []Volume { return []Volume{ &S3Volume{ - AccessKeyFile: "/etc/aws_s3_access_key.txt", - SecretKeyFile: "/etc/aws_s3_secret_key.txt", - Endpoint: "", - Region: "us-east-1", - Bucket: "example-bucket-name", - IndexPageSize: 1000, - S3Replication: 2, - RaceWindow: arvados.Duration(24 * time.Hour), + AccessKeyFile: "/etc/aws_s3_access_key.txt", + SecretKeyFile: "/etc/aws_s3_secret_key.txt", + Endpoint: "", + Region: "us-east-1", + Bucket: "example-bucket-name", + IndexPageSize: 1000, + S3Replication: 2, + RaceWindow: arvados.Duration(24 * time.Hour), + ConnectTimeout: arvados.Duration(time.Minute), + ReadTimeout: arvados.Duration(5 * time.Minute), }, &S3Volume{ - AccessKeyFile: "/etc/gce_s3_access_key.txt", - SecretKeyFile: "/etc/gce_s3_secret_key.txt", - Endpoint: "https://storage.googleapis.com", - Region: "", - Bucket: "example-bucket-name", - IndexPageSize: 1000, - S3Replication: 2, - RaceWindow: arvados.Duration(24 * time.Hour), + AccessKeyFile: "/etc/gce_s3_access_key.txt", + SecretKeyFile: "/etc/gce_s3_secret_key.txt", + Endpoint: "https://storage.googleapis.com", + Region: "", + Bucket: "example-bucket-name", + IndexPageSize: 1000, + S3Replication: 2, + RaceWindow: arvados.Duration(24 * time.Hour), + ConnectTimeout: arvados.Duration(time.Minute), + ReadTimeout: arvados.Duration(5 * time.Minute), }, } } @@ -203,13 +217,47 @@ func (v *S3Volume) Start() error { if err != nil { return err } + + // Zero timeouts mean "wait forever", which is a bad + // default. Default to long timeouts instead. + if v.ConnectTimeout == 0 { + v.ConnectTimeout = s3DefaultConnectTimeout + } + if v.ReadTimeout == 0 { + v.ReadTimeout = s3DefaultReadTimeout + } + + client := s3.New(auth, region) + client.ConnectTimeout = time.Duration(v.ConnectTimeout) + client.ReadTimeout = time.Duration(v.ReadTimeout) v.bucket = &s3.Bucket{ - S3: s3.New(auth, region), + S3: client, Name: v.Bucket, } return nil } +func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) { + ready := make(chan bool) + go func() { + rdr, err = v.getReader(loc) + close(ready) + }() + select { + case <-ready: + return + case <-ctx.Done(): + theConfig.debugLogf("s3: abandoning getReader(): %s", ctx.Err()) + go func() { + <-ready + if err == nil { + rdr.Close() + } + }() + return nil, ctx.Err() + } +} + // getReader wraps (Bucket)GetReader. // // In situations where (Bucket)GetReader would fail because the block @@ -242,50 +290,106 @@ func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) { // Get a block: copy the block data into buf, and return the number of // bytes copied. -func (v *S3Volume) Get(loc string, buf []byte) (int, error) { - rdr, err := v.getReader(loc) +func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) { + rdr, err := v.getReaderWithContext(ctx, loc) if err != nil { return 0, err } - defer rdr.Close() - n, err := io.ReadFull(rdr, buf) - switch err { - case nil, io.EOF, io.ErrUnexpectedEOF: - return n, nil - default: - return 0, v.translateError(err) + + var n int + ready := make(chan bool) + go func() { + defer close(ready) + + defer rdr.Close() + n, err = io.ReadFull(rdr, buf) + + switch err { + case nil, io.EOF, io.ErrUnexpectedEOF: + err = nil + default: + err = v.translateError(err) + } + }() + select { + case <-ctx.Done(): + theConfig.debugLogf("s3: interrupting ReadFull() with Close() because %s", ctx.Err()) + rdr.Close() + // Must wait for ReadFull to return, to ensure it + // doesn't write to buf after we return. + theConfig.debugLogf("s3: waiting for ReadFull() to fail") + <-ready + return 0, ctx.Err() + case <-ready: + return n, err } } // Compare the given data with the stored data. -func (v *S3Volume) Compare(loc string, expect []byte) error { - rdr, err := v.getReader(loc) +func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error { + rdr, err := v.getReaderWithContext(ctx, loc) if err != nil { return err } defer rdr.Close() - return v.translateError(compareReaderWithBuf(rdr, expect, loc[:32])) + return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32])) } // Put writes a block. -func (v *S3Volume) Put(loc string, block []byte) error { +func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error { if v.ReadOnly { return MethodDisabledError } var opts s3.Options - if len(block) > 0 { + size := len(block) + if size > 0 { md5, err := hex.DecodeString(loc) if err != nil { return err } opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5) } - err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, opts) - if err != nil { + + // Send the block data through a pipe, so that (if we need to) + // we can close the pipe early and abandon our PutReader() + // goroutine, without worrying about PutReader() accessing our + // block buffer after we release it. + bufr, bufw := io.Pipe() + go func() { + io.Copy(bufw, bytes.NewReader(block)) + bufw.Close() + }() + + var err error + ready := make(chan bool) + go func() { + defer func() { + if ctx.Err() != nil { + theConfig.debugLogf("%s: abandoned PutReader goroutine finished with err: %s", v, err) + } + }() + defer close(ready) + err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts) + if err != nil { + return + } + err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{}) + }() + select { + case <-ctx.Done(): + theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err()) + // Our pipe might be stuck in Write(), waiting for + // io.Copy() to read. If so, un-stick it. This means + // PutReader will get corrupt data, but that's OK: the + // size and MD5 won't match, so the write will fail. + go io.Copy(ioutil.Discard, bufr) + // CloseWithError() will return once pending I/O is done. + bufw.CloseWithError(ctx.Err()) + theConfig.debugLogf("%s: abandoning PutReader goroutine", v) + return ctx.Err() + case <-ready: return v.translateError(err) } - err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{}) - return v.translateError(err) } // Touch sets the timestamp for the given locator to the current time.