+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
import (
"bytes"
+ "context"
"errors"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
- . "gopkg.in/check.v1"
"io"
+ "io/ioutil"
"net/http"
"time"
+
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/keepclient"
+ "github.com/prometheus/client_golang/prometheus"
+ . "gopkg.in/check.v1"
+ check "gopkg.in/check.v1"
)
var _ = Suite(&PullWorkerTestSuite{})
-type PullWorkerTestSuite struct{}
+type PullWorkerTestSuite struct {
+ cluster *arvados.Cluster
+ handler *handler
-var testPullLists map[string]string
-var readContent string
-var readError error
-var putContent []byte
-var putError error
-var currentTestData PullWorkerTestData
+ testPullLists map[string]string
+ readContent string
+ readError error
+ putContent []byte
+ putError error
+}
func (s *PullWorkerTestSuite) SetUpTest(c *C) {
- readContent = ""
- readError = nil
- putContent = []byte("")
- putError = nil
+ s.cluster = testCluster(c)
+ s.cluster.Volumes = map[string]arvados.Volume{
+ "zzzzz-nyw5e-000000000000000": {Driver: "mock"},
+ "zzzzz-nyw5e-111111111111111": {Driver: "mock"},
+ }
+ s.cluster.Collections.BlobReplicateConcurrency = 1
+
+ s.handler = &handler{}
+ c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
+
+ s.readContent = ""
+ s.readError = nil
+ s.putContent = []byte{}
+ s.putError = nil
// When a new pull request arrives, the old one will be overwritten.
// This behavior is verified using these two maps in the
// "TestPullWorkerPullList_with_two_items_latest_replacing_old"
- testPullLists = make(map[string]string)
-}
-
-// Since keepstore does not come into picture in tests,
-// we need to explicitly start the goroutine in tests.
-func RunTestPullWorker(c *C) {
- arv, err := arvadosclient.MakeArvadosClient()
- c.Assert(err, Equals, nil)
- keepClient, err := keepclient.MakeKeepClient(arv)
- c.Assert(err, Equals, nil)
-
- pullq = NewWorkQueue()
- go RunPullWorker(pullq, keepClient)
+ s.testPullLists = make(map[string]string)
}
var firstPullList = []byte(`[
putError bool
}
-func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
- defer teardown()
+// Ensure MountUUID in a pull list is correctly translated to a Volume
+// argument passed to writePulledBlock().
+func (s *PullWorkerTestSuite) TestSpecifyMountUUID(c *C) {
+ defer func(f func(*RRVolumeManager, Volume, []byte, string) error) {
+ writePulledBlock = f
+ }(writePulledBlock)
+ pullq := s.handler.Handler.(*router).pullq
+
+ for _, spec := range []struct {
+ sendUUID string
+ expectVolume Volume
+ }{
+ {
+ sendUUID: "",
+ expectVolume: nil,
+ },
+ {
+ sendUUID: s.handler.volmgr.Mounts()[0].UUID,
+ expectVolume: s.handler.volmgr.Mounts()[0].Volume,
+ },
+ } {
+ writePulledBlock = func(_ *RRVolumeManager, v Volume, _ []byte, _ string) error {
+ c.Check(v, Equals, spec.expectVolume)
+ return nil
+ }
+
+ resp := IssueRequest(s.handler, &RequestTester{
+ uri: "/pull",
+ apiToken: s.cluster.SystemRootToken,
+ method: "PUT",
+ requestBody: []byte(`[{
+ "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
+ "servers":["server_1","server_2"],
+ "mount_uuid":"` + spec.sendUUID + `"}]`),
+ })
+ c.Assert(resp.Code, Equals, http.StatusOK)
+ expectEqualWithin(c, time.Second, 0, func() interface{} {
+ st := pullq.Status()
+ return st.InProgress + st.Queued
+ })
+ }
+}
- dataManagerToken = "DATA MANAGER TOKEN"
+func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
testData := PullWorkerTestData{
name: "TestPullWorkerPullList_with_two_locators",
- req: RequestTester{"/pull", dataManagerToken, "PUT", firstPullList},
+ req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList},
responseCode: http.StatusOK,
responseBody: "Received 2 pull requests\n",
readContent: "hello",
putError: false,
}
- performTest(testData, c)
+ s.performTest(testData, c)
}
func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
- defer teardown()
-
- dataManagerToken = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
name: "TestPullWorkerPullList_with_one_locator",
- req: RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
+ req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList},
responseCode: http.StatusOK,
responseBody: "Received 1 pull requests\n",
readContent: "hola",
putError: false,
}
- performTest(testData, c)
+ s.performTest(testData, c)
}
func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
- defer teardown()
-
- dataManagerToken = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
name: "TestPullWorker_error_on_get_one_locator",
- req: RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
+ req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList},
responseCode: http.StatusOK,
responseBody: "Received 1 pull requests\n",
readContent: "unused",
putError: false,
}
- performTest(testData, c)
+ s.performTest(testData, c)
}
func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
- defer teardown()
-
- dataManagerToken = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
name: "TestPullWorker_error_on_get_two_locators",
- req: RequestTester{"/pull", dataManagerToken, "PUT", firstPullList},
+ req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList},
responseCode: http.StatusOK,
responseBody: "Received 2 pull requests\n",
readContent: "unused",
putError: false,
}
- performTest(testData, c)
+ s.performTest(testData, c)
}
func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
- defer teardown()
-
- dataManagerToken = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
name: "TestPullWorker_error_on_put_one_locator",
- req: RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
+ req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList},
responseCode: http.StatusOK,
responseBody: "Received 1 pull requests\n",
readContent: "hello hello",
putError: true,
}
- performTest(testData, c)
+ s.performTest(testData, c)
}
func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
- defer teardown()
-
- dataManagerToken = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
name: "TestPullWorker_error_on_put_two_locators",
- req: RequestTester{"/pull", dataManagerToken, "PUT", firstPullList},
+ req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList},
responseCode: http.StatusOK,
responseBody: "Received 2 pull requests\n",
readContent: "hello again",
putError: true,
}
- performTest(testData, c)
-}
-
-// When a new pull request arrives, the old one is replaced. This test
-// is used to check that behavior by first putting an item on the queue,
-// and then performing the test. Thus the "testPullLists" has two entries;
-// however, processedPullLists will see only the newest item in the list.
-func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_items_latest_replacing_old(c *C) {
- defer teardown()
-
- var firstInput = []int{1}
- pullq = NewWorkQueue()
- pullq.ReplaceQueue(makeTestWorkList(firstInput))
- testPullLists["Added_before_actual_test_item"] = string(1)
-
- dataManagerToken = "DATA MANAGER TOKEN"
- testData := PullWorkerTestData{
- name: "TestPullWorkerPullList_with_two_items_latest_replacing_old",
- req: RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
- responseCode: http.StatusOK,
- responseBody: "Received 1 pull requests\n",
- readContent: "hola de nuevo",
- readError: false,
- putError: false,
- }
-
- performTest(testData, c)
+ s.performTest(testData, c)
}
// In this case, the item will not be placed on pullq
-func (s *PullWorkerTestSuite) TestPullWorker_invalid_dataManagerToken(c *C) {
- defer teardown()
-
- dataManagerToken = "DATA MANAGER TOKEN"
-
+func (s *PullWorkerTestSuite) TestPullWorker_invalidToken(c *C) {
testData := PullWorkerTestData{
name: "TestPullWorkerPullList_with_two_locators",
- req: RequestTester{"/pull", "invalid_dataManagerToken", "PUT", firstPullList},
+ req: RequestTester{"/pull", "invalidToken", "PUT", firstPullList},
responseCode: http.StatusUnauthorized,
responseBody: "Unauthorized\n",
readContent: "hello",
putError: false,
}
- performTest(testData, c)
+ s.performTest(testData, c)
}
-func performTest(testData PullWorkerTestData, c *C) {
- KeepVM = MakeTestVolumeManager(2)
- defer KeepVM.Close()
+func (s *PullWorkerTestSuite) performTest(testData PullWorkerTestData, c *C) {
+ pullq := s.handler.Handler.(*router).pullq
- RunTestPullWorker(c)
- defer pullq.Close()
-
- currentTestData = testData
- testPullLists[testData.name] = testData.responseBody
+ s.testPullLists[testData.name] = testData.responseBody
processedPullLists := make(map[string]string)
GetContent = orig
}(GetContent)
GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) {
- c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(1))
+ c.Assert(getStatusItem(s.handler, "PullQueue", "InProgress"), Equals, float64(1))
processedPullLists[testData.name] = testData.responseBody
if testData.readError {
err = errors.New("Error getting data")
- readError = err
- return nil, 0, "", err
+ s.readError = err
+ return
}
- readContent = testData.readContent
- cb := &ClosingBuffer{bytes.NewBufferString(testData.readContent)}
- var rc io.ReadCloser
- rc = cb
- return rc, int64(len(testData.readContent)), "", nil
+ s.readContent = testData.readContent
+ reader = ioutil.NopCloser(bytes.NewBufferString(testData.readContent))
+ contentLength = int64(len(testData.readContent))
+ return
}
- // Override PutContent to mock PutBlock functionality
- defer func(orig func([]byte, string) error) { PutContent = orig }(PutContent)
- PutContent = func(content []byte, locator string) (err error) {
+ // Override writePulledBlock to mock PutBlock functionality
+ defer func(orig func(*RRVolumeManager, Volume, []byte, string) error) { writePulledBlock = orig }(writePulledBlock)
+ writePulledBlock = func(_ *RRVolumeManager, v Volume, content []byte, locator string) error {
if testData.putError {
- err = errors.New("Error putting data")
- putError = err
- return err
+ s.putError = errors.New("Error putting data")
+ return s.putError
}
- putContent = content
+ s.putContent = content
return nil
}
- c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
- c.Assert(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
+ c.Check(getStatusItem(s.handler, "PullQueue", "InProgress"), Equals, float64(0))
+ c.Check(getStatusItem(s.handler, "PullQueue", "Queued"), Equals, float64(0))
+ c.Check(getStatusItem(s.handler, "Version"), Not(Equals), "")
- response := IssueRequest(&testData.req)
+ response := IssueRequest(s.handler, &testData.req)
c.Assert(response.Code, Equals, testData.responseCode)
c.Assert(response.Body.String(), Equals, testData.responseBody)
})
if testData.name == "TestPullWorkerPullList_with_two_items_latest_replacing_old" {
- c.Assert(len(testPullLists), Equals, 2)
+ c.Assert(len(s.testPullLists), Equals, 2)
c.Assert(len(processedPullLists), Equals, 1)
- c.Assert(testPullLists["Added_before_actual_test_item"], NotNil)
- c.Assert(testPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
+ c.Assert(s.testPullLists["Added_before_actual_test_item"], NotNil)
+ c.Assert(s.testPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
c.Assert(processedPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
} else {
if testData.responseCode == http.StatusOK {
- c.Assert(len(testPullLists), Equals, 1)
+ c.Assert(len(s.testPullLists), Equals, 1)
c.Assert(len(processedPullLists), Equals, 1)
- c.Assert(testPullLists[testData.name], NotNil)
+ c.Assert(s.testPullLists[testData.name], NotNil)
} else {
- c.Assert(len(testPullLists), Equals, 1)
+ c.Assert(len(s.testPullLists), Equals, 1)
c.Assert(len(processedPullLists), Equals, 0)
}
}
if testData.readError {
- c.Assert(readError, NotNil)
+ c.Assert(s.readError, NotNil)
} else if testData.responseCode == http.StatusOK {
- c.Assert(readError, IsNil)
- c.Assert(readContent, Equals, testData.readContent)
+ c.Assert(s.readError, IsNil)
+ c.Assert(s.readContent, Equals, testData.readContent)
if testData.putError {
- c.Assert(putError, NotNil)
+ c.Assert(s.putError, NotNil)
} else {
- c.Assert(putError, IsNil)
- c.Assert(string(putContent), Equals, testData.readContent)
+ c.Assert(s.putError, IsNil)
+ c.Assert(string(s.putContent), Equals, testData.readContent)
}
}
expectChannelEmpty(c, pullq.NextItem)
}
-
-type ClosingBuffer struct {
- *bytes.Buffer
-}
-
-func (cb *ClosingBuffer) Close() (err error) {
- return
-}