Merge branch '8784-dir-listings'
[arvados.git] / services / keepstore / handler_test.go
index ba923cad768abc6fe7d906bc82bc67b0bef6276d..751a4a77e3c8ba66431ca30d1d58f3cde2208f5d 100644 (file)
@@ -1,3 +1,7 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 // Tests for Keep HTTP handlers:
 //
 //     GetBlockHandler
@@ -11,6 +15,7 @@ package main
 
 import (
        "bytes"
+       "context"
        "encoding/json"
        "fmt"
        "net/http"
@@ -20,6 +25,8 @@ import (
        "strings"
        "testing"
        "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
 )
 
 // A RequestTester represents the parameters for an HTTP request to
@@ -46,19 +53,19 @@ func TestGetHandler(t *testing.T) {
        defer KeepVM.Close()
 
        vols := KeepVM.AllWritable()
-       if err := vols[0].Put(TestHash, TestBlock); err != nil {
+       if err := vols[0].Put(context.Background(), TestHash, TestBlock); err != nil {
                t.Error(err)
        }
 
        // Create locators for testing.
        // Turn on permission settings so we can generate signed locators.
-       enforcePermissions = true
-       PermissionSecret = []byte(knownKey)
-       blobSignatureTTL = 300 * time.Second
+       theConfig.RequireSignatures = true
+       theConfig.blobSigningKey = []byte(knownKey)
+       theConfig.BlobSignatureTTL.Set("5m")
 
        var (
                unsignedLocator  = "/" + TestHash
-               validTimestamp   = time.Now().Add(blobSignatureTTL)
+               validTimestamp   = time.Now().Add(theConfig.BlobSignatureTTL.Duration())
                expiredTimestamp = time.Now().Add(-time.Hour)
                signedLocator    = "/" + SignLocator(TestHash, knownToken, validTimestamp)
                expiredLocator   = "/" + SignLocator(TestHash, knownToken, expiredTimestamp)
@@ -66,7 +73,7 @@ func TestGetHandler(t *testing.T) {
 
        // -----------------
        // Test unauthenticated request with permissions off.
-       enforcePermissions = false
+       theConfig.RequireSignatures = false
 
        // Unauthenticated request, unsigned locator
        // => OK
@@ -90,7 +97,7 @@ func TestGetHandler(t *testing.T) {
 
        // ----------------
        // Permissions: on.
-       enforcePermissions = true
+       theConfig.RequireSignatures = true
 
        // Authenticated request, signed locator
        // => OK
@@ -175,8 +182,8 @@ func TestPutHandler(t *testing.T) {
        // ------------------
        // With a server key.
 
-       PermissionSecret = []byte(knownKey)
-       blobSignatureTTL = 300 * time.Second
+       theConfig.blobSigningKey = []byte(knownKey)
+       theConfig.BlobSignatureTTL.Set("5m")
 
        // When a permission key is available, the locator returned
        // from an authenticated PUT request will be signed.
@@ -220,7 +227,7 @@ func TestPutHandler(t *testing.T) {
 
 func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
        defer teardown()
-       dataManagerToken = "fake-data-manager-token"
+       theConfig.systemAuthToken = "fake-data-manager-token"
        vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()}
        vols[0].Readonly = true
        KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
@@ -232,15 +239,15 @@ func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
                        requestBody: TestBlock,
                })
        defer func(orig bool) {
-               neverDelete = orig
-       }(neverDelete)
-       neverDelete = false
+               theConfig.EnableDelete = orig
+       }(theConfig.EnableDelete)
+       theConfig.EnableDelete = true
        IssueRequest(
                &RequestTester{
                        method:      "DELETE",
                        uri:         "/" + TestHash,
                        requestBody: TestBlock,
-                       apiToken:    dataManagerToken,
+                       apiToken:    theConfig.systemAuthToken,
                })
        type expect struct {
                volnum    int
@@ -274,7 +281,7 @@ func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
 //   - authenticated   /index/prefix request | superuser
 //
 // The only /index requests that should succeed are those issued by the
-// superuser. They should pass regardless of the value of enforcePermissions.
+// superuser. They should pass regardless of the value of RequireSignatures.
 //
 func TestIndexHandler(t *testing.T) {
        defer teardown()
@@ -286,12 +293,12 @@ func TestIndexHandler(t *testing.T) {
        defer KeepVM.Close()
 
        vols := KeepVM.AllWritable()
-       vols[0].Put(TestHash, TestBlock)
-       vols[1].Put(TestHash2, TestBlock2)
-       vols[0].Put(TestHash+".meta", []byte("metadata"))
-       vols[1].Put(TestHash2+".meta", []byte("metadata"))
+       vols[0].Put(context.Background(), TestHash, TestBlock)
+       vols[1].Put(context.Background(), TestHash2, TestBlock2)
+       vols[0].Put(context.Background(), TestHash+".meta", []byte("metadata"))
+       vols[1].Put(context.Background(), TestHash2+".meta", []byte("metadata"))
 
-       dataManagerToken = "DATA MANAGER TOKEN"
+       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
 
        unauthenticatedReq := &RequestTester{
                method: "GET",
@@ -305,7 +312,7 @@ func TestIndexHandler(t *testing.T) {
        superuserReq := &RequestTester{
                method:   "GET",
                uri:      "/index",
-               apiToken: dataManagerToken,
+               apiToken: theConfig.systemAuthToken,
        }
        unauthPrefixReq := &RequestTester{
                method: "GET",
@@ -319,32 +326,32 @@ func TestIndexHandler(t *testing.T) {
        superuserPrefixReq := &RequestTester{
                method:   "GET",
                uri:      "/index/" + TestHash[0:3],
-               apiToken: dataManagerToken,
+               apiToken: theConfig.systemAuthToken,
        }
        superuserNoSuchPrefixReq := &RequestTester{
                method:   "GET",
                uri:      "/index/abcd",
-               apiToken: dataManagerToken,
+               apiToken: theConfig.systemAuthToken,
        }
        superuserInvalidPrefixReq := &RequestTester{
                method:   "GET",
                uri:      "/index/xyz",
-               apiToken: dataManagerToken,
+               apiToken: theConfig.systemAuthToken,
        }
 
        // -------------------------------------------------------------
        // Only the superuser should be allowed to issue /index requests.
 
        // ---------------------------
-       // enforcePermissions enabled
+       // RequireSignatures enabled
        // This setting should not affect tests passing.
-       enforcePermissions = true
+       theConfig.RequireSignatures = true
 
        // unauthenticated /index request
        // => UnauthorizedError
        response := IssueRequest(unauthenticatedReq)
        ExpectStatusCode(t,
-               "enforcePermissions on, unauthenticated request",
+               "RequireSignatures on, unauthenticated request",
                UnauthorizedError.HTTPCode,
                response)
 
@@ -381,9 +388,9 @@ func TestIndexHandler(t *testing.T) {
                response)
 
        // ----------------------------
-       // enforcePermissions disabled
+       // RequireSignatures disabled
        // Valid Request should still pass.
-       enforcePermissions = false
+       theConfig.RequireSignatures = false
 
        // superuser /index request
        // => OK
@@ -475,17 +482,17 @@ func TestDeleteHandler(t *testing.T) {
        defer KeepVM.Close()
 
        vols := KeepVM.AllWritable()
-       vols[0].Put(TestHash, TestBlock)
+       vols[0].Put(context.Background(), TestHash, TestBlock)
 
-       // Explicitly set the blobSignatureTTL to 0 for these
+       // Explicitly set the BlobSignatureTTL to 0 for these
        // tests, to ensure the MockVolume deletes the blocks
        // even though they have just been created.
-       blobSignatureTTL = time.Duration(0)
+       theConfig.BlobSignatureTTL = arvados.Duration(0)
 
        var userToken = "NOT DATA MANAGER TOKEN"
-       dataManagerToken = "DATA MANAGER TOKEN"
+       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
 
-       neverDelete = false
+       theConfig.EnableDelete = true
 
        unauthReq := &RequestTester{
                method: "DELETE",
@@ -501,13 +508,13 @@ func TestDeleteHandler(t *testing.T) {
        superuserExistingBlockReq := &RequestTester{
                method:   "DELETE",
                uri:      "/" + TestHash,
-               apiToken: dataManagerToken,
+               apiToken: theConfig.systemAuthToken,
        }
 
        superuserNonexistentBlockReq := &RequestTester{
                method:   "DELETE",
                uri:      "/" + TestHash2,
-               apiToken: dataManagerToken,
+               apiToken: theConfig.systemAuthToken,
        }
 
        // Unauthenticated request returns PermissionError.
@@ -538,14 +545,14 @@ func TestDeleteHandler(t *testing.T) {
                http.StatusNotFound,
                response)
 
-       // Authenticated admin request for existing block while neverDelete is set.
-       neverDelete = true
+       // Authenticated admin request for existing block while EnableDelete is false.
+       theConfig.EnableDelete = false
        response = IssueRequest(superuserExistingBlockReq)
        ExpectStatusCode(t,
                "authenticated request, existing block, method disabled",
                MethodDisabledError.HTTPCode,
                response)
-       neverDelete = false
+       theConfig.EnableDelete = true
 
        // Authenticated admin request for existing block.
        response = IssueRequest(superuserExistingBlockReq)
@@ -561,16 +568,17 @@ func TestDeleteHandler(t *testing.T) {
                        expectedDc, responseDc)
        }
        // Confirm the block has been deleted
-       _, err := vols[0].Get(TestHash)
+       buf := make([]byte, BlockSize)
+       _, err := vols[0].Get(context.Background(), TestHash, buf)
        var blockDeleted = os.IsNotExist(err)
        if !blockDeleted {
                t.Error("superuserExistingBlockReq: block not deleted")
        }
 
-       // A DELETE request on a block newer than blobSignatureTTL
+       // A DELETE request on a block newer than BlobSignatureTTL
        // should return success but leave the block on the volume.
-       vols[0].Put(TestHash, TestBlock)
-       blobSignatureTTL = time.Hour
+       vols[0].Put(context.Background(), TestHash, TestBlock)
+       theConfig.BlobSignatureTTL = arvados.Duration(time.Hour)
 
        response = IssueRequest(superuserExistingBlockReq)
        ExpectStatusCode(t,
@@ -585,7 +593,7 @@ func TestDeleteHandler(t *testing.T) {
                        expectedDc, responseDc)
        }
        // Confirm the block has NOT been deleted.
-       _, err = vols[0].Get(TestHash)
+       _, err = vols[0].Get(context.Background(), TestHash, buf)
        if err != nil {
                t.Errorf("testing delete on new block: %s\n", err)
        }
@@ -622,7 +630,7 @@ func TestPullHandler(t *testing.T) {
        defer teardown()
 
        var userToken = "USER TOKEN"
-       dataManagerToken = "DATA MANAGER TOKEN"
+       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
 
        pullq = NewWorkQueue()
 
@@ -667,13 +675,13 @@ func TestPullHandler(t *testing.T) {
                },
                {
                        "Valid pull request from the data manager",
-                       RequestTester{"/pull", dataManagerToken, "PUT", goodJSON},
+                       RequestTester{"/pull", theConfig.systemAuthToken, "PUT", goodJSON},
                        http.StatusOK,
                        "Received 3 pull requests\n",
                },
                {
                        "Invalid pull request from the data manager",
-                       RequestTester{"/pull", dataManagerToken, "PUT", badJSON},
+                       RequestTester{"/pull", theConfig.systemAuthToken, "PUT", badJSON},
                        http.StatusBadRequest,
                        "",
                },
@@ -728,7 +736,7 @@ func TestTrashHandler(t *testing.T) {
        defer teardown()
 
        var userToken = "USER TOKEN"
-       dataManagerToken = "DATA MANAGER TOKEN"
+       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
 
        trashq = NewWorkQueue()
 
@@ -771,13 +779,13 @@ func TestTrashHandler(t *testing.T) {
                },
                {
                        "Valid trash list from the data manager",
-                       RequestTester{"/trash", dataManagerToken, "PUT", goodJSON},
+                       RequestTester{"/trash", theConfig.systemAuthToken, "PUT", goodJSON},
                        http.StatusOK,
                        "Received 3 trash requests\n",
                },
                {
                        "Invalid trash list from the data manager",
-                       RequestTester{"/trash", dataManagerToken, "PUT", badJSON},
+                       RequestTester{"/trash", theConfig.systemAuthToken, "PUT", badJSON},
                        http.StatusBadRequest,
                        "",
                },
@@ -814,7 +822,7 @@ func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder {
        if rt.apiToken != "" {
                req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
        }
-       loggingRouter := MakeLoggingRESTRouter()
+       loggingRouter := MakeRESTRouter()
        loggingRouter.ServeHTTP(response, req)
        return response
 }
@@ -872,7 +880,7 @@ func TestPutNeedsOnlyOneBuffer(t *testing.T) {
        select {
        case <-ok:
        case <-time.After(time.Second):
-               t.Fatal("PUT deadlocks with maxBuffers==1")
+               t.Fatal("PUT deadlocks with MaxBuffers==1")
        }
 }
 
@@ -887,7 +895,7 @@ func TestPutHandlerNoBufferleak(t *testing.T) {
 
        ok := make(chan bool)
        go func() {
-               for i := 0; i < maxBuffers+1; i++ {
+               for i := 0; i < theConfig.MaxBuffers+1; i++ {
                        // Unauthenticated request, no server key
                        // => OK (unsigned response)
                        unsignedLocator := "/" + TestHash
@@ -913,9 +921,68 @@ func TestPutHandlerNoBufferleak(t *testing.T) {
        }
 }
 
+type notifyingResponseRecorder struct {
+       *httptest.ResponseRecorder
+       closer chan bool
+}
+
+func (r *notifyingResponseRecorder) CloseNotify() <-chan bool {
+       return r.closer
+}
+
+func TestGetHandlerClientDisconnect(t *testing.T) {
+       defer func(was bool) {
+               theConfig.RequireSignatures = was
+       }(theConfig.RequireSignatures)
+       theConfig.RequireSignatures = false
+
+       defer func(orig *bufferPool) {
+               bufs = orig
+       }(bufs)
+       bufs = newBufferPool(1, BlockSize)
+       defer bufs.Put(bufs.Get(BlockSize))
+
+       KeepVM = MakeTestVolumeManager(2)
+       defer KeepVM.Close()
+
+       if err := KeepVM.AllWritable()[0].Put(context.Background(), TestHash, TestBlock); err != nil {
+               t.Error(err)
+       }
+
+       resp := &notifyingResponseRecorder{
+               ResponseRecorder: httptest.NewRecorder(),
+               closer:           make(chan bool, 1),
+       }
+       if _, ok := http.ResponseWriter(resp).(http.CloseNotifier); !ok {
+               t.Fatal("notifyingResponseRecorder is broken")
+       }
+       // If anyone asks, the client has disconnected.
+       resp.closer <- true
+
+       ok := make(chan struct{})
+       go func() {
+               req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
+               (&LoggingRESTRouter{router: MakeRESTRouter()}).ServeHTTP(resp, req)
+               ok <- struct{}{}
+       }()
+
+       select {
+       case <-time.After(20 * time.Second):
+               t.Fatal("request took >20s, close notifier must be broken")
+       case <-ok:
+       }
+
+       ExpectStatusCode(t, "client disconnect", http.StatusServiceUnavailable, resp.ResponseRecorder)
+       for i, v := range KeepVM.AllWritable() {
+               if calls := v.(*MockVolume).called["GET"]; calls != 0 {
+                       t.Errorf("volume %d got %d calls, expected 0", i, calls)
+               }
+       }
+}
+
 // Invoke the GetBlockHandler a bunch of times to test for bufferpool resource
 // leak.
-func TestGetHandlerNoBufferleak(t *testing.T) {
+func TestGetHandlerNoBufferLeak(t *testing.T) {
        defer teardown()
 
        // Prepare two test Keep volumes. Our block is stored on the second volume.
@@ -923,13 +990,13 @@ func TestGetHandlerNoBufferleak(t *testing.T) {
        defer KeepVM.Close()
 
        vols := KeepVM.AllWritable()
-       if err := vols[0].Put(TestHash, TestBlock); err != nil {
+       if err := vols[0].Put(context.Background(), TestHash, TestBlock); err != nil {
                t.Error(err)
        }
 
        ok := make(chan bool)
        go func() {
-               for i := 0; i < maxBuffers+1; i++ {
+               for i := 0; i < theConfig.MaxBuffers+1; i++ {
                        // Unauthenticated request, unsigned locator
                        // => OK
                        unsignedLocator := "/" + TestHash
@@ -954,3 +1021,122 @@ func TestGetHandlerNoBufferleak(t *testing.T) {
        case <-ok:
        }
 }
+
+func TestPutReplicationHeader(t *testing.T) {
+       defer teardown()
+
+       KeepVM = MakeTestVolumeManager(2)
+       defer KeepVM.Close()
+
+       resp := IssueRequest(&RequestTester{
+               method:      "PUT",
+               uri:         "/" + TestHash,
+               requestBody: TestBlock,
+       })
+       if r := resp.Header().Get("X-Keep-Replicas-Stored"); r != "1" {
+               t.Errorf("Got X-Keep-Replicas-Stored: %q, expected %q", r, "1")
+       }
+}
+
+func TestUntrashHandler(t *testing.T) {
+       defer teardown()
+
+       // Set up Keep volumes
+       KeepVM = MakeTestVolumeManager(2)
+       defer KeepVM.Close()
+       vols := KeepVM.AllWritable()
+       vols[0].Put(context.Background(), TestHash, TestBlock)
+
+       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
+
+       // unauthenticatedReq => UnauthorizedError
+       unauthenticatedReq := &RequestTester{
+               method: "PUT",
+               uri:    "/untrash/" + TestHash,
+       }
+       response := IssueRequest(unauthenticatedReq)
+       ExpectStatusCode(t,
+               "Unauthenticated request",
+               UnauthorizedError.HTTPCode,
+               response)
+
+       // notDataManagerReq => UnauthorizedError
+       notDataManagerReq := &RequestTester{
+               method:   "PUT",
+               uri:      "/untrash/" + TestHash,
+               apiToken: knownToken,
+       }
+
+       response = IssueRequest(notDataManagerReq)
+       ExpectStatusCode(t,
+               "Non-datamanager token",
+               UnauthorizedError.HTTPCode,
+               response)
+
+       // datamanagerWithBadHashReq => StatusBadRequest
+       datamanagerWithBadHashReq := &RequestTester{
+               method:   "PUT",
+               uri:      "/untrash/thisisnotalocator",
+               apiToken: theConfig.systemAuthToken,
+       }
+       response = IssueRequest(datamanagerWithBadHashReq)
+       ExpectStatusCode(t,
+               "Bad locator in untrash request",
+               http.StatusBadRequest,
+               response)
+
+       // datamanagerWrongMethodReq => StatusBadRequest
+       datamanagerWrongMethodReq := &RequestTester{
+               method:   "GET",
+               uri:      "/untrash/" + TestHash,
+               apiToken: theConfig.systemAuthToken,
+       }
+       response = IssueRequest(datamanagerWrongMethodReq)
+       ExpectStatusCode(t,
+               "Only PUT method is supported for untrash",
+               http.StatusBadRequest,
+               response)
+
+       // datamanagerReq => StatusOK
+       datamanagerReq := &RequestTester{
+               method:   "PUT",
+               uri:      "/untrash/" + TestHash,
+               apiToken: theConfig.systemAuthToken,
+       }
+       response = IssueRequest(datamanagerReq)
+       ExpectStatusCode(t,
+               "",
+               http.StatusOK,
+               response)
+       expected := "Successfully untrashed on: [MockVolume],[MockVolume]"
+       if response.Body.String() != expected {
+               t.Errorf(
+                       "Untrash response mismatched: expected %s, got:\n%s",
+                       expected, response.Body.String())
+       }
+}
+
+func TestUntrashHandlerWithNoWritableVolumes(t *testing.T) {
+       defer teardown()
+
+       // Set up readonly Keep volumes
+       vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()}
+       vols[0].Readonly = true
+       vols[1].Readonly = true
+       KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
+       defer KeepVM.Close()
+
+       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
+
+       // datamanagerReq => StatusOK
+       datamanagerReq := &RequestTester{
+               method:   "PUT",
+               uri:      "/untrash/" + TestHash,
+               apiToken: theConfig.systemAuthToken,
+       }
+       response := IssueRequest(datamanagerReq)
+       ExpectStatusCode(t,
+               "No writable volumes",
+               http.StatusNotFound,
+               response)
+}