Log if client hangs up while waiting for unix volume Serialize lock.
[arvados.git] / services / keepstore / handler_test.go
index dc9bcb117f0508e748a97ff3cb2a736aa5c00178..ad907ef10138f213e3831223d867fd3c114736d9 100644 (file)
@@ -1,3 +1,7 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 // Tests for Keep HTTP handlers:
 //
 //     GetBlockHandler
@@ -11,6 +15,7 @@ package main
 
 import (
        "bytes"
+       "context"
        "encoding/json"
        "fmt"
        "net/http"
@@ -22,8 +27,14 @@ import (
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "github.com/prometheus/client_golang/prometheus"
 )
 
+var testCluster = &arvados.Cluster{
+       ClusterID: "zzzzz",
+}
+
 // A RequestTester represents the parameters for an HTTP request to
 // be issued on behalf of a unit test.
 type RequestTester struct {
@@ -39,6 +50,7 @@ type RequestTester struct {
 //   - permissions on, authenticated request, unsigned locator
 //   - permissions on, unauthenticated request, signed locator
 //   - permissions on, authenticated request, expired locator
+//   - permissions on, authenticated request, signed locator, transient error from backend
 //
 func TestGetHandler(t *testing.T) {
        defer teardown()
@@ -48,7 +60,7 @@ func TestGetHandler(t *testing.T) {
        defer KeepVM.Close()
 
        vols := KeepVM.AllWritable()
-       if err := vols[0].Put(TestHash, TestBlock); err != nil {
+       if err := vols[0].Put(context.Background(), TestHash, TestBlock); err != nil {
                t.Error(err)
        }
 
@@ -141,6 +153,23 @@ func TestGetHandler(t *testing.T) {
        ExpectStatusCode(t,
                "Authenticated request, expired locator",
                ExpiredError.HTTPCode, response)
+
+       // Authenticated request, signed locator
+       // => 503 Server busy (transient error)
+
+       // Set up the block owning volume to respond with errors
+       vols[0].(*MockVolume).Bad = true
+       vols[0].(*MockVolume).BadVolumeError = VolumeBusyError
+       response = IssueRequest(&RequestTester{
+               method:   "GET",
+               uri:      signedLocator,
+               apiToken: knownToken,
+       })
+       // A transient error from one volume while the other doesn't find the block
+       // should make the service return a 503 so that clients can retry.
+       ExpectStatusCode(t,
+               "Volume backend busy",
+               503, response)
 }
 
 // Test PutBlockHandler on the following situations:
@@ -288,10 +317,10 @@ func TestIndexHandler(t *testing.T) {
        defer KeepVM.Close()
 
        vols := KeepVM.AllWritable()
-       vols[0].Put(TestHash, TestBlock)
-       vols[1].Put(TestHash2, TestBlock2)
-       vols[0].Put(TestHash+".meta", []byte("metadata"))
-       vols[1].Put(TestHash2+".meta", []byte("metadata"))
+       vols[0].Put(context.Background(), TestHash, TestBlock)
+       vols[1].Put(context.Background(), TestHash2, TestBlock2)
+       vols[0].Put(context.Background(), TestHash+".meta", []byte("metadata"))
+       vols[1].Put(context.Background(), TestHash2+".meta", []byte("metadata"))
 
        theConfig.systemAuthToken = "DATA MANAGER TOKEN"
 
@@ -477,7 +506,7 @@ func TestDeleteHandler(t *testing.T) {
        defer KeepVM.Close()
 
        vols := KeepVM.AllWritable()
-       vols[0].Put(TestHash, TestBlock)
+       vols[0].Put(context.Background(), TestHash, TestBlock)
 
        // Explicitly set the BlobSignatureTTL to 0 for these
        // tests, to ensure the MockVolume deletes the blocks
@@ -564,7 +593,7 @@ func TestDeleteHandler(t *testing.T) {
        }
        // Confirm the block has been deleted
        buf := make([]byte, BlockSize)
-       _, err := vols[0].Get(TestHash, buf)
+       _, err := vols[0].Get(context.Background(), TestHash, buf)
        var blockDeleted = os.IsNotExist(err)
        if !blockDeleted {
                t.Error("superuserExistingBlockReq: block not deleted")
@@ -572,7 +601,7 @@ func TestDeleteHandler(t *testing.T) {
 
        // A DELETE request on a block newer than BlobSignatureTTL
        // should return success but leave the block on the volume.
-       vols[0].Put(TestHash, TestBlock)
+       vols[0].Put(context.Background(), TestHash, TestBlock)
        theConfig.BlobSignatureTTL = arvados.Duration(time.Hour)
 
        response = IssueRequest(superuserExistingBlockReq)
@@ -588,7 +617,7 @@ func TestDeleteHandler(t *testing.T) {
                        expectedDc, responseDc)
        }
        // Confirm the block has NOT been deleted.
-       _, err = vols[0].Get(TestHash, buf)
+       _, err = vols[0].Get(context.Background(), TestHash, buf)
        if err != nil {
                t.Errorf("testing delete on new block: %s\n", err)
        }
@@ -817,7 +846,19 @@ func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder {
        if rt.apiToken != "" {
                req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
        }
-       loggingRouter := MakeRESTRouter()
+       loggingRouter := MakeRESTRouter(testCluster, prometheus.NewRegistry())
+       loggingRouter.ServeHTTP(response, req)
+       return response
+}
+
+func IssueHealthCheckRequest(rt *RequestTester) *httptest.ResponseRecorder {
+       response := httptest.NewRecorder()
+       body := bytes.NewReader(rt.requestBody)
+       req, _ := http.NewRequest(rt.method, rt.uri, body)
+       if rt.apiToken != "" {
+               req.Header.Set("Authorization", "Bearer "+rt.apiToken)
+       }
+       loggingRouter := MakeRESTRouter(testCluster, prometheus.NewRegistry())
        loggingRouter.ServeHTTP(response, req)
        return response
 }
@@ -940,7 +981,7 @@ func TestGetHandlerClientDisconnect(t *testing.T) {
        KeepVM = MakeTestVolumeManager(2)
        defer KeepVM.Close()
 
-       if err := KeepVM.AllWritable()[0].Put(TestHash, TestBlock); err != nil {
+       if err := KeepVM.AllWritable()[0].Put(context.Background(), TestHash, TestBlock); err != nil {
                t.Error(err)
        }
 
@@ -957,7 +998,7 @@ func TestGetHandlerClientDisconnect(t *testing.T) {
        ok := make(chan struct{})
        go func() {
                req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
-               (&LoggingRESTRouter{MakeRESTRouter()}).ServeHTTP(resp, req)
+               MakeRESTRouter(testCluster, prometheus.NewRegistry()).ServeHTTP(resp, req)
                ok <- struct{}{}
        }()
 
@@ -985,7 +1026,7 @@ func TestGetHandlerNoBufferLeak(t *testing.T) {
        defer KeepVM.Close()
 
        vols := KeepVM.AllWritable()
-       if err := vols[0].Put(TestHash, TestBlock); err != nil {
+       if err := vols[0].Put(context.Background(), TestHash, TestBlock); err != nil {
                t.Error(err)
        }
 
@@ -1040,7 +1081,7 @@ func TestUntrashHandler(t *testing.T) {
        KeepVM = MakeTestVolumeManager(2)
        defer KeepVM.Close()
        vols := KeepVM.AllWritable()
-       vols[0].Put(TestHash, TestBlock)
+       vols[0].Put(context.Background(), TestHash, TestBlock)
 
        theConfig.systemAuthToken = "DATA MANAGER TOKEN"
 
@@ -1089,7 +1130,7 @@ func TestUntrashHandler(t *testing.T) {
        response = IssueRequest(datamanagerWrongMethodReq)
        ExpectStatusCode(t,
                "Only PUT method is supported for untrash",
-               http.StatusBadRequest,
+               http.StatusMethodNotAllowed,
                response)
 
        // datamanagerReq => StatusOK
@@ -1135,3 +1176,21 @@ func TestUntrashHandlerWithNoWritableVolumes(t *testing.T) {
                http.StatusNotFound,
                response)
 }
+
+func TestHealthCheckPing(t *testing.T) {
+       theConfig.ManagementToken = arvadostest.ManagementToken
+       pingReq := &RequestTester{
+               method:   "GET",
+               uri:      "/_health/ping",
+               apiToken: arvadostest.ManagementToken,
+       }
+       response := IssueHealthCheckRequest(pingReq)
+       ExpectStatusCode(t,
+               "",
+               http.StatusOK,
+               response)
+       want := `{"health":"OK"}`
+       if !strings.Contains(response.Body.String(), want) {
+               t.Errorf("expected response to include %s: got %s", want, response.Body.String())
+       }
+}