10467: Use context instead of http.CloseNotifier to interrupt buffer waits.
[arvados.git] / services / keepstore / s3_volume.go
index 6339cf8e2866abcc22906e3b0d91f483bd222caa..7ef590f4a685e5d83a4a69c3b1f85be9ebfdbab1 100644 (file)
@@ -1,12 +1,14 @@
 package main
 
 import (
+       "bytes"
        "context"
        "encoding/base64"
        "encoding/hex"
        "flag"
        "fmt"
        "io"
+       "io/ioutil"
        "log"
        "net/http"
        "os"
@@ -320,24 +322,64 @@ func (v *S3Volume) Compare(loc string, expect []byte) error {
 }
 
 // Put writes a block.
-func (v *S3Volume) Put(loc string, block []byte) error {
+func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
        if v.ReadOnly {
                return MethodDisabledError
        }
        var opts s3.Options
-       if len(block) > 0 {
+       size := len(block)
+       if size > 0 {
                md5, err := hex.DecodeString(loc)
                if err != nil {
                        return err
                }
                opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
        }
-       err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, opts)
-       if err != nil {
-               return v.translateError(err)
+
+       // Send the block data through a pipe, so that (if we need to)
+       // we can close the pipe early and abandon our PutReader()
+       // goroutine, without worrying about PutReader() accessing our
+       // block buffer after we release it.
+       bufr, bufw := io.Pipe()
+       go func() {
+               io.Copy(bufw, bytes.NewReader(block))
+               bufw.Close()
+       }()
+
+       var err error
+       ready := make(chan bool)
+       go func() {
+               defer func() {
+                       select {
+                       case <-ctx.Done():
+                               theConfig.debugLogf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
+                       default:
+                       }
+               }()
+               defer close(ready)
+               err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
+               if err != nil {
+                       err = v.translateError(err)
+                       return
+               }
+               err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+               err = v.translateError(err)
+       }()
+       select {
+       case <-ctx.Done():
+               theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
+               // Our pipe might be stuck in Write(), waiting for
+               // io.Copy() to read. If so, un-stick it. This means
+               // PutReader will get corrupt data, but that's OK: the
+               // size and MD5 won't match, so the write will fail.
+               go io.Copy(ioutil.Discard, bufr)
+               // CloseWithError() will return once pending I/O is done.
+               bufw.CloseWithError(ctx.Err())
+               theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
+               return ctx.Err()
+       case <-ready:
+               return err
        }
-       err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
-       return v.translateError(err)
 }
 
 // Touch sets the timestamp for the given locator to the current time.