10467: Abort S3 and release buffer if caller disconnects during S3 PUT request.
[arvados.git] / services / keepstore / handlers.go
index ac2d71228f823a9ab64c24d82e2c4fe6091648bf..5dc68df4aa1e3c5491de769664dc65b53b2e8ffe 100644 (file)
@@ -157,6 +157,8 @@ func getBufferForResponseWriter(resp http.ResponseWriter, bufs *bufferPool, bufS
 
 // PutBlockHandler is a HandleFunc to address Put block requests.
 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
+       ctx := contextForResponse(context.TODO(), resp)
+
        hash := mux.Vars(req)["hash"]
 
        // Detect as many error conditions as possible before reading
@@ -191,7 +193,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
                return
        }
 
-       replication, err := PutBlock(buf, hash)
+       replication, err := PutBlock(ctx, buf, hash)
        bufs.Put(buf)
 
        if err != nil {
@@ -611,7 +613,7 @@ func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWr
 
 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
 //
-// PutBlock(block, hash)
+// PutBlock(ctx, block, hash)
 //   Stores the BLOCK (identified by the content id HASH) in Keep.
 //
 //   The MD5 checksum of the block must be identical to the content id HASH.
@@ -636,7 +638,7 @@ func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWr
 //          all writes failed). The text of the error message should
 //          provide as much detail as possible.
 //
-func PutBlock(block []byte, hash string) (int, error) {
+func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
        // Check that BLOCK's checksum matches HASH.
        blockhash := fmt.Sprintf("%x", md5.Sum(block))
        if blockhash != hash {
@@ -654,7 +656,7 @@ func PutBlock(block []byte, hash string) (int, error) {
        // Choose a Keep volume to write to.
        // If this volume fails, try all of the volumes in order.
        if vol := KeepVM.NextWritable(); vol != nil {
-               if err := vol.Put(hash, block); err == nil {
+               if err := vol.Put(context.TODO(), hash, block); err == nil {
                        return vol.Replication(), nil // success!
                }
        }
@@ -667,7 +669,12 @@ func PutBlock(block []byte, hash string) (int, error) {
 
        allFull := true
        for _, vol := range writables {
-               err := vol.Put(hash, block)
+               err := vol.Put(ctx, hash, block)
+               select {
+               case <-ctx.Done():
+                       return 0, ctx.Err()
+               default:
+               }
                if err == nil {
                        return vol.Replication(), nil // success!
                }