10467: Interrupt Compare operation if caller disconnects.
[arvados.git] / services / keepstore / handlers.go
index ba2078fdd79372efd22bd76069e29b19748c86ed..289dce15a06168572f5269d7fed82bdb31a75075 100644 (file)
@@ -192,8 +192,11 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
        bufs.Put(buf)
 
        if err != nil {
-               ke := err.(*KeepError)
-               http.Error(resp, ke.Error(), ke.HTTPCode)
+               code := http.StatusInternalServerError
+               if err, ok := err.(*KeepError); ok {
+                       code = err.HTTPCode
+               }
+               http.Error(resp, err.Error(), code)
                return
        }
 
@@ -572,7 +575,7 @@ func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWr
                size, err := vol.Get(ctx, hash, buf)
                select {
                case <-ctx.Done():
-                       return 0, ctx.Err()
+                       return 0, ErrClientDisconnect
                default:
                }
                if err != nil {
@@ -644,16 +647,21 @@ func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
        // If we already have this data, it's intact on disk, and we
        // can update its timestamp, return success. If we have
        // different data with the same hash, return failure.
-       if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
+       if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
                return n, err
+       } else if ctx.Err() != nil {
+               return 0, ErrClientDisconnect
        }
 
        // Choose a Keep volume to write to.
        // If this volume fails, try all of the volumes in order.
        if vol := KeepVM.NextWritable(); vol != nil {
-               if err := vol.Put(context.TODO(), hash, block); err == nil {
+               if err := vol.Put(ctx, hash, block); err == nil {
                        return vol.Replication(), nil // success!
                }
+               if ctx.Err() != nil {
+                       return 0, ErrClientDisconnect
+               }
        }
 
        writables := KeepVM.AllWritable()
@@ -665,10 +673,8 @@ func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
        allFull := true
        for _, vol := range writables {
                err := vol.Put(ctx, hash, block)
-               select {
-               case <-ctx.Done():
-                       return 0, ctx.Err()
-               default:
+               if ctx.Err() != nil {
+                       return 0, ErrClientDisconnect
                }
                if err == nil {
                        return vol.Replication(), nil // success!
@@ -695,10 +701,13 @@ func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
 // the relevant block's modification time in order to protect it from
 // premature garbage collection. Otherwise, it returns a non-nil
 // error.
-func CompareAndTouch(hash string, buf []byte) (int, error) {
+func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
        var bestErr error = NotFoundError
        for _, vol := range KeepVM.AllWritable() {
-               if err := vol.Compare(hash, buf); err == CollisionError {
+               err := vol.Compare(ctx, hash, buf)
+               if ctx.Err() != nil {
+                       return 0, ctx.Err()
+               } else if err == CollisionError {
                        // Stop if we have a block with same hash but
                        // different content. (It will be impossible
                        // to tell which one is wanted if we have