closes #10290
[arvados.git] / services / keepstore / handlers.go
index 210e2b4039fa828e5ef8eb57c9eb480ecc844d43..289dce15a06168572f5269d7fed82bdb31a75075 100644 (file)
@@ -72,7 +72,8 @@ func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
 
 // GetBlockHandler is a HandleFunc to address Get block requests.
 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
-       ctx := contextForResponse(context.TODO(), resp)
+       ctx, cancel := contextForResponse(context.TODO(), resp)
+       defer cancel()
 
        if theConfig.RequireSignatures {
                locator := req.URL.Path[1:] // strip leading slash
@@ -111,20 +112,20 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
        resp.Write(buf[:size])
 }
 
-// Return a new context that gets cancelled by resp's
-// CloseNotifier. If resp does not implement http.CloseNotifier,
-// return parent.
-func contextForResponse(parent context.Context, resp http.ResponseWriter) context.Context {
-       cn, ok := resp.(http.CloseNotifier)
-       if !ok {
-               return parent
-       }
+// Return a new context that gets cancelled by resp's CloseNotifier.
+func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
        ctx, cancel := context.WithCancel(parent)
-       go func(c <-chan bool) {
-               <-c
-               cancel()
-       }(cn.CloseNotify())
-       return ctx
+       if cn, ok := resp.(http.CloseNotifier); ok {
+               go func(c <-chan bool) {
+                       select {
+                       case <-c:
+                               theConfig.debugLogf("cancel context")
+                               cancel()
+                       case <-ctx.Done():
+                       }
+               }(cn.CloseNotify())
+       }
+       return ctx, cancel
 }
 
 // Get a buffer from the pool -- but give up and return a non-nil
@@ -150,7 +151,8 @@ func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([
 
 // PutBlockHandler is a HandleFunc to address Put block requests.
 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
-       ctx := contextForResponse(context.TODO(), resp)
+       ctx, cancel := contextForResponse(context.TODO(), resp)
+       defer cancel()
 
        hash := mux.Vars(req)["hash"]
 
@@ -190,8 +192,11 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
        bufs.Put(buf)
 
        if err != nil {
-               ke := err.(*KeepError)
-               http.Error(resp, ke.Error(), ke.HTTPCode)
+               code := http.StatusInternalServerError
+               if err, ok := err.(*KeepError); ok {
+                       code = err.HTTPCode
+               }
+               http.Error(resp, err.Error(), code)
                return
        }
 
@@ -570,7 +575,7 @@ func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWr
                size, err := vol.Get(ctx, hash, buf)
                select {
                case <-ctx.Done():
-                       return 0, ctx.Err()
+                       return 0, ErrClientDisconnect
                default:
                }
                if err != nil {
@@ -642,16 +647,21 @@ func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
        // If we already have this data, it's intact on disk, and we
        // can update its timestamp, return success. If we have
        // different data with the same hash, return failure.
-       if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
+       if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
                return n, err
+       } else if ctx.Err() != nil {
+               return 0, ErrClientDisconnect
        }
 
        // Choose a Keep volume to write to.
        // If this volume fails, try all of the volumes in order.
        if vol := KeepVM.NextWritable(); vol != nil {
-               if err := vol.Put(context.TODO(), hash, block); err == nil {
+               if err := vol.Put(ctx, hash, block); err == nil {
                        return vol.Replication(), nil // success!
                }
+               if ctx.Err() != nil {
+                       return 0, ErrClientDisconnect
+               }
        }
 
        writables := KeepVM.AllWritable()
@@ -663,10 +673,8 @@ func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
        allFull := true
        for _, vol := range writables {
                err := vol.Put(ctx, hash, block)
-               select {
-               case <-ctx.Done():
-                       return 0, ctx.Err()
-               default:
+               if ctx.Err() != nil {
+                       return 0, ErrClientDisconnect
                }
                if err == nil {
                        return vol.Replication(), nil // success!
@@ -693,10 +701,13 @@ func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
 // the relevant block's modification time in order to protect it from
 // premature garbage collection. Otherwise, it returns a non-nil
 // error.
-func CompareAndTouch(hash string, buf []byte) (int, error) {
+func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
        var bestErr error = NotFoundError
        for _, vol := range KeepVM.AllWritable() {
-               if err := vol.Compare(hash, buf); err == CollisionError {
+               err := vol.Compare(ctx, hash, buf)
+               if ctx.Err() != nil {
+                       return 0, ctx.Err()
+               } else if err == CollisionError {
                        // Stop if we have a block with same hash but
                        // different content. (It will be impossible
                        // to tell which one is wanted if we have