9068: Move buffer allocation from volumes to GetBlockHandler.
[arvados.git] / services / keepstore / handler_test.go
index 33d585ae1e69f18868f7eb57278637c7f855761b..7c17424ba568227790469e4e32867f33fea8ff4e 100644 (file)
@@ -561,7 +561,8 @@ func TestDeleteHandler(t *testing.T) {
                        expectedDc, responseDc)
        }
        // Confirm the block has been deleted
-       _, err := vols[0].Get(TestHash)
+       buf := make([]byte, BlockSize)
+       _, err := vols[0].Get(TestHash, buf)
        var blockDeleted = os.IsNotExist(err)
        if !blockDeleted {
                t.Error("superuserExistingBlockReq: block not deleted")
@@ -585,7 +586,7 @@ func TestDeleteHandler(t *testing.T) {
                        expectedDc, responseDc)
        }
        // Confirm the block has NOT been deleted.
-       _, err = vols[0].Get(TestHash)
+       _, err = vols[0].Get(TestHash, buf)
        if err != nil {
                t.Errorf("testing delete on new block: %s\n", err)
        }
@@ -913,6 +914,65 @@ func TestPutHandlerNoBufferleak(t *testing.T) {
        }
 }
 
+type notifyingResponseRecorder struct {
+       *httptest.ResponseRecorder
+       closer chan bool
+}
+
+func (r *notifyingResponseRecorder) CloseNotify() <-chan bool {
+       return r.closer
+}
+
+func TestGetHandlerClientDisconnect(t *testing.T) {
+       defer func(was bool) {
+               enforcePermissions = was
+       }(enforcePermissions)
+       enforcePermissions = false
+
+       defer func(orig *bufferPool) {
+               bufs = orig
+       }(bufs)
+       bufs = newBufferPool(1, BlockSize)
+       defer bufs.Put(bufs.Get(BlockSize))
+
+       KeepVM = MakeTestVolumeManager(2)
+       defer KeepVM.Close()
+
+       if err := KeepVM.AllWritable()[0].Put(TestHash, TestBlock); err != nil {
+               t.Error(err)
+       }
+
+       resp := &notifyingResponseRecorder{
+               ResponseRecorder: httptest.NewRecorder(),
+               closer:           make(chan bool, 1),
+       }
+       if _, ok := http.ResponseWriter(resp).(http.CloseNotifier); !ok {
+               t.Fatal("notifyingResponseRecorder is broken")
+       }
+       // If anyone asks, the client has disconnected.
+       resp.closer <- true
+
+       ok := make(chan struct{})
+       go func() {
+               req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
+               (&LoggingRESTRouter{MakeRESTRouter()}).ServeHTTP(resp, req)
+               ok <- struct{}{}
+       }()
+
+       select {
+       case <-time.After(20 * time.Second):
+               t.Fatal("request took >20s, close notifier must be broken")
+       case <-ok:
+       }
+
+       ExpectStatusCode(t, "client disconnect", http.StatusServiceUnavailable, resp.ResponseRecorder)
+       for i, v := range KeepVM.AllWritable() {
+               if calls := v.(*MockVolume).called["GET"]; calls != 0 {
+                       t.Errorf("volume %d got %d calls, expected 0", i, calls)
+               }
+       }
+}
+
 // Invoke the GetBlockHandler a bunch of times to test for bufferpool resource
 // leak.
 func TestGetHandlerNoBufferleak(t *testing.T) {