8555: Log statistics in EmptyTrash.
[arvados.git] / services / keepstore / handler_test.go
index ba923cad768abc6fe7d906bc82bc67b0bef6276d..7c17424ba568227790469e4e32867f33fea8ff4e 100644 (file)
@@ -561,7 +561,8 @@ func TestDeleteHandler(t *testing.T) {
                        expectedDc, responseDc)
        }
        // Confirm the block has been deleted
-       _, err := vols[0].Get(TestHash)
+       buf := make([]byte, BlockSize)
+       _, err := vols[0].Get(TestHash, buf)
        var blockDeleted = os.IsNotExist(err)
        if !blockDeleted {
                t.Error("superuserExistingBlockReq: block not deleted")
@@ -585,7 +586,7 @@ func TestDeleteHandler(t *testing.T) {
                        expectedDc, responseDc)
        }
        // Confirm the block has NOT been deleted.
-       _, err = vols[0].Get(TestHash)
+       _, err = vols[0].Get(TestHash, buf)
        if err != nil {
                t.Errorf("testing delete on new block: %s\n", err)
        }
@@ -814,7 +815,7 @@ func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder {
        if rt.apiToken != "" {
                req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
        }
-       loggingRouter := MakeLoggingRESTRouter()
+       loggingRouter := MakeRESTRouter()
        loggingRouter.ServeHTTP(response, req)
        return response
 }
@@ -913,6 +914,65 @@ func TestPutHandlerNoBufferleak(t *testing.T) {
        }
 }
 
+type notifyingResponseRecorder struct {
+       *httptest.ResponseRecorder
+       closer chan bool
+}
+
+func (r *notifyingResponseRecorder) CloseNotify() <-chan bool {
+       return r.closer
+}
+
+func TestGetHandlerClientDisconnect(t *testing.T) {
+       defer func(was bool) {
+               enforcePermissions = was
+       }(enforcePermissions)
+       enforcePermissions = false
+
+       defer func(orig *bufferPool) {
+               bufs = orig
+       }(bufs)
+       bufs = newBufferPool(1, BlockSize)
+       defer bufs.Put(bufs.Get(BlockSize))
+
+       KeepVM = MakeTestVolumeManager(2)
+       defer KeepVM.Close()
+
+       if err := KeepVM.AllWritable()[0].Put(TestHash, TestBlock); err != nil {
+               t.Error(err)
+       }
+
+       resp := &notifyingResponseRecorder{
+               ResponseRecorder: httptest.NewRecorder(),
+               closer:           make(chan bool, 1),
+       }
+       if _, ok := http.ResponseWriter(resp).(http.CloseNotifier); !ok {
+               t.Fatal("notifyingResponseRecorder is broken")
+       }
+       // If anyone asks, the client has disconnected.
+       resp.closer <- true
+
+       ok := make(chan struct{})
+       go func() {
+               req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
+               (&LoggingRESTRouter{MakeRESTRouter()}).ServeHTTP(resp, req)
+               ok <- struct{}{}
+       }()
+
+       select {
+       case <-time.After(20 * time.Second):
+               t.Fatal("request took >20s, close notifier must be broken")
+       case <-ok:
+       }
+
+       ExpectStatusCode(t, "client disconnect", http.StatusServiceUnavailable, resp.ResponseRecorder)
+       for i, v := range KeepVM.AllWritable() {
+               if calls := v.(*MockVolume).called["GET"]; calls != 0 {
+                       t.Errorf("volume %d got %d calls, expected 0", i, calls)
+               }
+       }
+}
+
 // Invoke the GetBlockHandler a bunch of times to test for bufferpool resource
 // leak.
 func TestGetHandlerNoBufferleak(t *testing.T) {
@@ -954,3 +1014,122 @@ func TestGetHandlerNoBufferleak(t *testing.T) {
        case <-ok:
        }
 }
+
+func TestPutReplicationHeader(t *testing.T) {
+       defer teardown()
+
+       KeepVM = MakeTestVolumeManager(2)
+       defer KeepVM.Close()
+
+       resp := IssueRequest(&RequestTester{
+               method:      "PUT",
+               uri:         "/" + TestHash,
+               requestBody: TestBlock,
+       })
+       if r := resp.Header().Get("X-Keep-Replicas-Stored"); r != "1" {
+               t.Errorf("Got X-Keep-Replicas-Stored: %q, expected %q", r, "1")
+       }
+}
+
+func TestUntrashHandler(t *testing.T) {
+       defer teardown()
+
+       // Set up Keep volumes
+       KeepVM = MakeTestVolumeManager(2)
+       defer KeepVM.Close()
+       vols := KeepVM.AllWritable()
+       vols[0].Put(TestHash, TestBlock)
+
+       dataManagerToken = "DATA MANAGER TOKEN"
+
+       // unauthenticatedReq => UnauthorizedError
+       unauthenticatedReq := &RequestTester{
+               method: "PUT",
+               uri:    "/untrash/" + TestHash,
+       }
+       response := IssueRequest(unauthenticatedReq)
+       ExpectStatusCode(t,
+               "Unauthenticated request",
+               UnauthorizedError.HTTPCode,
+               response)
+
+       // notDataManagerReq => UnauthorizedError
+       notDataManagerReq := &RequestTester{
+               method:   "PUT",
+               uri:      "/untrash/" + TestHash,
+               apiToken: knownToken,
+       }
+
+       response = IssueRequest(notDataManagerReq)
+       ExpectStatusCode(t,
+               "Non-datamanager token",
+               UnauthorizedError.HTTPCode,
+               response)
+
+       // datamanagerWithBadHashReq => StatusBadRequest
+       datamanagerWithBadHashReq := &RequestTester{
+               method:   "PUT",
+               uri:      "/untrash/thisisnotalocator",
+               apiToken: dataManagerToken,
+       }
+       response = IssueRequest(datamanagerWithBadHashReq)
+       ExpectStatusCode(t,
+               "Bad locator in untrash request",
+               http.StatusBadRequest,
+               response)
+
+       // datamanagerWrongMethodReq => StatusBadRequest
+       datamanagerWrongMethodReq := &RequestTester{
+               method:   "GET",
+               uri:      "/untrash/" + TestHash,
+               apiToken: dataManagerToken,
+       }
+       response = IssueRequest(datamanagerWrongMethodReq)
+       ExpectStatusCode(t,
+               "Only PUT method is supported for untrash",
+               http.StatusBadRequest,
+               response)
+
+       // datamanagerReq => StatusOK
+       datamanagerReq := &RequestTester{
+               method:   "PUT",
+               uri:      "/untrash/" + TestHash,
+               apiToken: dataManagerToken,
+       }
+       response = IssueRequest(datamanagerReq)
+       ExpectStatusCode(t,
+               "",
+               http.StatusOK,
+               response)
+       expected := "Successfully untrashed on: [MockVolume],[MockVolume]"
+       if response.Body.String() != expected {
+               t.Errorf(
+                       "Untrash response mismatched: expected %s, got:\n%s",
+                       expected, response.Body.String())
+       }
+}
+
+func TestUntrashHandlerWithNoWritableVolumes(t *testing.T) {
+       defer teardown()
+
+       // Set up readonly Keep volumes
+       vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()}
+       vols[0].Readonly = true
+       vols[1].Readonly = true
+       KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
+       defer KeepVM.Close()
+
+       dataManagerToken = "DATA MANAGER TOKEN"
+
+       // datamanagerReq => StatusOK
+       datamanagerReq := &RequestTester{
+               method:   "PUT",
+               uri:      "/untrash/" + TestHash,
+               apiToken: dataManagerToken,
+       }
+       response := IssueRequest(datamanagerReq)
+       ExpectStatusCode(t,
+               "No writable volumes",
+               http.StatusNotFound,
+               response)
+}