Refactor the multi-host salt install page.
[arvados.git] / services / keepstore / pull_worker_test.go
index 9e547f30d0c1c29cd8f510c5b72c2ae6ceba4acf..2626e66d8898745b9f29c42d9beda9ee580626a4 100644 (file)
@@ -2,24 +2,30 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package keepstore
 
 import (
        "bytes"
+       "context"
        "errors"
        "io"
        "io/ioutil"
        "net/http"
        "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/keepclient"
+       "github.com/prometheus/client_golang/prometheus"
        . "gopkg.in/check.v1"
+       check "gopkg.in/check.v1"
 )
 
 var _ = Suite(&PullWorkerTestSuite{})
 
 type PullWorkerTestSuite struct {
+       cluster *arvados.Cluster
+       handler *handler
+
        testPullLists map[string]string
        readContent   string
        readError     error
@@ -28,7 +34,16 @@ type PullWorkerTestSuite struct {
 }
 
 func (s *PullWorkerTestSuite) SetUpTest(c *C) {
-       theConfig.systemAuthToken = "arbitrary data manager token"
+       s.cluster = testCluster(c)
+       s.cluster.Volumes = map[string]arvados.Volume{
+               "zzzzz-nyw5e-000000000000000": {Driver: "mock"},
+               "zzzzz-nyw5e-111111111111111": {Driver: "mock"},
+       }
+       s.cluster.Collections.BlobReplicateConcurrency = 1
+
+       s.handler = &handler{}
+       c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
+
        s.readContent = ""
        s.readError = nil
        s.putContent = []byte{}
@@ -38,27 +53,6 @@ func (s *PullWorkerTestSuite) SetUpTest(c *C) {
        // This behavior is verified using these two maps in the
        // "TestPullWorkerPullList_with_two_items_latest_replacing_old"
        s.testPullLists = make(map[string]string)
-
-       KeepVM = MakeTestVolumeManager(2)
-
-       // Normally the pull queue and workers are started by main()
-       // -- tests need to set up their own.
-       arv, err := arvadosclient.MakeArvadosClient()
-       c.Assert(err, IsNil)
-       keepClient, err := keepclient.MakeKeepClient(arv)
-       c.Assert(err, IsNil)
-       pullq = NewWorkQueue()
-       go RunPullWorker(pullq, keepClient)
-}
-
-func (s *PullWorkerTestSuite) TearDownTest(c *C) {
-       KeepVM.Close()
-       KeepVM = nil
-       pullq.Close()
-       pullq = nil
-       teardown()
-       theConfig = DefaultConfig()
-       theConfig.Start()
 }
 
 var firstPullList = []byte(`[
@@ -99,9 +93,10 @@ type PullWorkerTestData struct {
 // Ensure MountUUID in a pull list is correctly translated to a Volume
 // argument passed to writePulledBlock().
 func (s *PullWorkerTestSuite) TestSpecifyMountUUID(c *C) {
-       defer func(f func(Volume, []byte, string)) {
+       defer func(f func(*RRVolumeManager, Volume, []byte, string) error) {
                writePulledBlock = f
        }(writePulledBlock)
+       pullq := s.handler.Handler.(*router).pullq
 
        for _, spec := range []struct {
                sendUUID     string
@@ -112,22 +107,23 @@ func (s *PullWorkerTestSuite) TestSpecifyMountUUID(c *C) {
                        expectVolume: nil,
                },
                {
-                       sendUUID:     KeepVM.Mounts()[0].UUID,
-                       expectVolume: KeepVM.Mounts()[0].volume,
+                       sendUUID:     s.handler.volmgr.Mounts()[0].UUID,
+                       expectVolume: s.handler.volmgr.Mounts()[0].Volume,
                },
        } {
-               writePulledBlock = func(v Volume, _ []byte, _ string) {
+               writePulledBlock = func(_ *RRVolumeManager, v Volume, _ []byte, _ string) error {
                        c.Check(v, Equals, spec.expectVolume)
+                       return nil
                }
 
-               resp := IssueRequest(&RequestTester{
+               resp := IssueRequest(s.handler, &RequestTester{
                        uri:      "/pull",
-                       apiToken: theConfig.systemAuthToken,
+                       apiToken: s.cluster.SystemRootToken,
                        method:   "PUT",
                        requestBody: []byte(`[{
                                "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
                                "servers":["server_1","server_2"],
-                               "mountuuid":"` + spec.sendUUID + `"}]`),
+                               "mount_uuid":"` + spec.sendUUID + `"}]`),
                })
                c.Assert(resp.Code, Equals, http.StatusOK)
                expectEqualWithin(c, time.Second, 0, func() interface{} {
@@ -140,7 +136,7 @@ func (s *PullWorkerTestSuite) TestSpecifyMountUUID(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
        testData := PullWorkerTestData{
                name:         "TestPullWorkerPullList_with_two_locators",
-               req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
+               req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList, ""},
                responseCode: http.StatusOK,
                responseBody: "Received 2 pull requests\n",
                readContent:  "hello",
@@ -154,7 +150,7 @@ func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
        testData := PullWorkerTestData{
                name:         "TestPullWorkerPullList_with_one_locator",
-               req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
+               req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList, ""},
                responseCode: http.StatusOK,
                responseBody: "Received 1 pull requests\n",
                readContent:  "hola",
@@ -168,7 +164,7 @@ func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
        testData := PullWorkerTestData{
                name:         "TestPullWorker_error_on_get_one_locator",
-               req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
+               req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList, ""},
                responseCode: http.StatusOK,
                responseBody: "Received 1 pull requests\n",
                readContent:  "unused",
@@ -182,7 +178,7 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
        testData := PullWorkerTestData{
                name:         "TestPullWorker_error_on_get_two_locators",
-               req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
+               req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList, ""},
                responseCode: http.StatusOK,
                responseBody: "Received 2 pull requests\n",
                readContent:  "unused",
@@ -196,7 +192,7 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
        testData := PullWorkerTestData{
                name:         "TestPullWorker_error_on_put_one_locator",
-               req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
+               req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList, ""},
                responseCode: http.StatusOK,
                responseBody: "Received 1 pull requests\n",
                readContent:  "hello hello",
@@ -210,7 +206,7 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
        testData := PullWorkerTestData{
                name:         "TestPullWorker_error_on_put_two_locators",
-               req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
+               req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList, ""},
                responseCode: http.StatusOK,
                responseBody: "Received 2 pull requests\n",
                readContent:  "hello again",
@@ -225,7 +221,7 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorker_invalidToken(c *C) {
        testData := PullWorkerTestData{
                name:         "TestPullWorkerPullList_with_two_locators",
-               req:          RequestTester{"/pull", "invalidToken", "PUT", firstPullList},
+               req:          RequestTester{"/pull", "invalidToken", "PUT", firstPullList, ""},
                responseCode: http.StatusUnauthorized,
                responseBody: "Unauthorized\n",
                readContent:  "hello",
@@ -237,6 +233,8 @@ func (s *PullWorkerTestSuite) TestPullWorker_invalidToken(c *C) {
 }
 
 func (s *PullWorkerTestSuite) performTest(testData PullWorkerTestData, c *C) {
+       pullq := s.handler.Handler.(*router).pullq
+
        s.testPullLists[testData.name] = testData.responseBody
 
        processedPullLists := make(map[string]string)
@@ -246,7 +244,7 @@ func (s *PullWorkerTestSuite) performTest(testData PullWorkerTestData, c *C) {
                GetContent = orig
        }(GetContent)
        GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) {
-               c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(1))
+               c.Assert(getStatusItem(s.handler, "PullQueue", "InProgress"), Equals, float64(1))
                processedPullLists[testData.name] = testData.responseBody
                if testData.readError {
                        err = errors.New("Error getting data")
@@ -260,19 +258,21 @@ func (s *PullWorkerTestSuite) performTest(testData PullWorkerTestData, c *C) {
        }
 
        // Override writePulledBlock to mock PutBlock functionality
-       defer func(orig func(Volume, []byte, string)) { writePulledBlock = orig }(writePulledBlock)
-       writePulledBlock = func(v Volume, content []byte, locator string) {
+       defer func(orig func(*RRVolumeManager, Volume, []byte, string) error) { writePulledBlock = orig }(writePulledBlock)
+       writePulledBlock = func(_ *RRVolumeManager, v Volume, content []byte, locator string) error {
                if testData.putError {
                        s.putError = errors.New("Error putting data")
-                       return
+                       return s.putError
                }
                s.putContent = content
+               return nil
        }
 
-       c.Check(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
-       c.Check(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
+       c.Check(getStatusItem(s.handler, "PullQueue", "InProgress"), Equals, float64(0))
+       c.Check(getStatusItem(s.handler, "PullQueue", "Queued"), Equals, float64(0))
+       c.Check(getStatusItem(s.handler, "Version"), Not(Equals), "")
 
-       response := IssueRequest(&testData.req)
+       response := IssueRequest(s.handler, &testData.req)
        c.Assert(response.Code, Equals, testData.responseCode)
        c.Assert(response.Body.String(), Equals, testData.responseBody)