Merge branch 'master' into 10293-cwl-cr-output
[arvados.git] / services / keepstore / handlers.go
index 69807d9e62117ee62981b9430924affde9a5a355..289dce15a06168572f5269d7fed82bdb31a75075 100644 (file)
@@ -72,7 +72,8 @@ func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
 
 // GetBlockHandler is a HandleFunc to address Get block requests.
 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
-       ctx := contextForResponse(context.TODO(), resp)
+       ctx, cancel := contextForResponse(context.TODO(), resp)
+       defer cancel()
 
        if theConfig.RequireSignatures {
                locator := req.URL.Path[1:] // strip leading slash
@@ -89,7 +90,7 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
        // isn't here, we can return 404 now instead of waiting for a
        // buffer.
 
-       buf, err := getBufferForResponseWriter(resp, bufs, BlockSize)
+       buf, err := getBufferWithContext(ctx, bufs, BlockSize)
        if err != nil {
                http.Error(resp, err.Error(), http.StatusServiceUnavailable)
                return
@@ -111,40 +112,33 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
        resp.Write(buf[:size])
 }
 
-// Return a new context that gets cancelled by resp's
-// CloseNotifier. If resp does not implement http.CloseNotifier,
-// return parent.
-func contextForResponse(parent context.Context, resp http.ResponseWriter) context.Context {
-       cn, ok := resp.(http.CloseNotifier)
-       if !ok {
-               return parent
-       }
+// Return a new context that gets cancelled by resp's CloseNotifier.
+func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
        ctx, cancel := context.WithCancel(parent)
-       go func(c <-chan bool) {
-               <-c
-               cancel()
-       }(cn.CloseNotify())
-       return ctx
+       if cn, ok := resp.(http.CloseNotifier); ok {
+               go func(c <-chan bool) {
+                       select {
+                       case <-c:
+                               theConfig.debugLogf("cancel context")
+                               cancel()
+                       case <-ctx.Done():
+                       }
+               }(cn.CloseNotify())
+       }
+       return ctx, cancel
 }
 
 // Get a buffer from the pool -- but give up and return a non-nil
-// error if resp implements http.CloseNotifier and tells us that the
-// client has disconnected before we get a buffer.
-func getBufferForResponseWriter(resp http.ResponseWriter, bufs *bufferPool, bufSize int) ([]byte, error) {
-       var closeNotifier <-chan bool
-       if resp, ok := resp.(http.CloseNotifier); ok {
-               closeNotifier = resp.CloseNotify()
-       }
-       var buf []byte
+// error if ctx ends before we get a buffer.
+func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
        bufReady := make(chan []byte)
        go func() {
                bufReady <- bufs.Get(bufSize)
-               close(bufReady)
        }()
        select {
-       case buf = <-bufReady:
+       case buf := <-bufReady:
                return buf, nil
-       case <-closeNotifier:
+       case <-ctx.Done():
                go func() {
                        // Even if closeNotifier happened first, we
                        // need to keep waiting for our buf so we can
@@ -157,7 +151,8 @@ func getBufferForResponseWriter(resp http.ResponseWriter, bufs *bufferPool, bufS
 
 // PutBlockHandler is a HandleFunc to address Put block requests.
 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
-       ctx := contextForResponse(context.TODO(), resp)
+       ctx, cancel := contextForResponse(context.TODO(), resp)
+       defer cancel()
 
        hash := mux.Vars(req)["hash"]
 
@@ -180,7 +175,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
                return
        }
 
-       buf, err := getBufferForResponseWriter(resp, bufs, int(req.ContentLength))
+       buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
        if err != nil {
                http.Error(resp, err.Error(), http.StatusServiceUnavailable)
                return
@@ -197,8 +192,11 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
        bufs.Put(buf)
 
        if err != nil {
-               ke := err.(*KeepError)
-               http.Error(resp, ke.Error(), ke.HTTPCode)
+               code := http.StatusInternalServerError
+               if err, ok := err.(*KeepError); ok {
+                       code = err.HTTPCode
+               }
+               http.Error(resp, err.Error(), code)
                return
        }
 
@@ -577,7 +575,7 @@ func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWr
                size, err := vol.Get(ctx, hash, buf)
                select {
                case <-ctx.Done():
-                       return 0, ctx.Err()
+                       return 0, ErrClientDisconnect
                default:
                }
                if err != nil {
@@ -649,16 +647,21 @@ func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
        // If we already have this data, it's intact on disk, and we
        // can update its timestamp, return success. If we have
        // different data with the same hash, return failure.
-       if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
+       if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
                return n, err
+       } else if ctx.Err() != nil {
+               return 0, ErrClientDisconnect
        }
 
        // Choose a Keep volume to write to.
        // If this volume fails, try all of the volumes in order.
        if vol := KeepVM.NextWritable(); vol != nil {
-               if err := vol.Put(context.TODO(), hash, block); err == nil {
+               if err := vol.Put(ctx, hash, block); err == nil {
                        return vol.Replication(), nil // success!
                }
+               if ctx.Err() != nil {
+                       return 0, ErrClientDisconnect
+               }
        }
 
        writables := KeepVM.AllWritable()
@@ -670,10 +673,8 @@ func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
        allFull := true
        for _, vol := range writables {
                err := vol.Put(ctx, hash, block)
-               select {
-               case <-ctx.Done():
-                       return 0, ctx.Err()
-               default:
+               if ctx.Err() != nil {
+                       return 0, ErrClientDisconnect
                }
                if err == nil {
                        return vol.Replication(), nil // success!
@@ -700,10 +701,13 @@ func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
 // the relevant block's modification time in order to protect it from
 // premature garbage collection. Otherwise, it returns a non-nil
 // error.
-func CompareAndTouch(hash string, buf []byte) (int, error) {
+func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
        var bestErr error = NotFoundError
        for _, vol := range KeepVM.AllWritable() {
-               if err := vol.Compare(hash, buf); err == CollisionError {
+               err := vol.Compare(ctx, hash, buf)
+               if ctx.Err() != nil {
+                       return 0, ctx.Err()
+               } else if err == CollisionError {
                        // Stop if we have a block with same hash but
                        // different content. (It will be impossible
                        // to tell which one is wanted if we have