import (
"bytes"
+ "context"
"errors"
"io"
"io/ioutil"
"net/http"
"time"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/keepclient"
+ "github.com/prometheus/client_golang/prometheus"
. "gopkg.in/check.v1"
+ check "gopkg.in/check.v1"
)
var _ = Suite(&PullWorkerTestSuite{})
type PullWorkerTestSuite struct {
+ cluster *arvados.Cluster
+ handler *handler
+
testPullLists map[string]string
readContent string
readError error
}
func (s *PullWorkerTestSuite) SetUpTest(c *C) {
- theConfig.systemAuthToken = "arbitrary data manager token"
+ s.cluster = testCluster(c)
+ s.cluster.Volumes = map[string]arvados.Volume{
+ "zzzzz-nyw5e-000000000000000": {Driver: "mock"},
+ "zzzzz-nyw5e-111111111111111": {Driver: "mock"},
+ }
+ s.cluster.Collections.BlobReplicateConcurrency = 1
+
+ s.handler = &handler{}
+ c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
+
s.readContent = ""
s.readError = nil
s.putContent = []byte{}
// This behavior is verified using these two maps in the
// "TestPullWorkerPullList_with_two_items_latest_replacing_old"
s.testPullLists = make(map[string]string)
-
- KeepVM = MakeTestVolumeManager(2)
-
- // Normally the pull queue and workers are started by main()
- // -- tests need to set up their own.
- arv, err := arvadosclient.MakeArvadosClient()
- c.Assert(err, IsNil)
- keepClient, err := keepclient.MakeKeepClient(arv)
- c.Assert(err, IsNil)
- pullq = NewWorkQueue()
- go RunPullWorker(pullq, keepClient)
-}
-
-func (s *PullWorkerTestSuite) TearDownTest(c *C) {
- KeepVM.Close()
- KeepVM = nil
- pullq.Close()
- pullq = nil
- teardown()
- theConfig = DefaultConfig()
- theConfig.Start()
}
var firstPullList = []byte(`[
// Ensure MountUUID in a pull list is correctly translated to a Volume
// argument passed to writePulledBlock().
func (s *PullWorkerTestSuite) TestSpecifyMountUUID(c *C) {
- defer func(f func(Volume, []byte, string)) {
+ defer func(f func(*RRVolumeManager, Volume, []byte, string) error) {
writePulledBlock = f
}(writePulledBlock)
+ pullq := s.handler.Handler.(*router).pullq
for _, spec := range []struct {
sendUUID string
expectVolume: nil,
},
{
- sendUUID: KeepVM.Mounts()[0].UUID,
- expectVolume: KeepVM.Mounts()[0].volume,
+ sendUUID: s.handler.volmgr.Mounts()[0].UUID,
+ expectVolume: s.handler.volmgr.Mounts()[0].Volume,
},
} {
- writePulledBlock = func(v Volume, _ []byte, _ string) {
+ writePulledBlock = func(_ *RRVolumeManager, v Volume, _ []byte, _ string) error {
c.Check(v, Equals, spec.expectVolume)
+ return nil
}
- resp := IssueRequest(&RequestTester{
+ resp := IssueRequest(s.handler, &RequestTester{
uri: "/pull",
- apiToken: theConfig.systemAuthToken,
+ apiToken: s.cluster.SystemRootToken,
method: "PUT",
requestBody: []byte(`[{
"locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
"servers":["server_1","server_2"],
- "mountuuid":"` + spec.sendUUID + `"}]`),
+ "mount_uuid":"` + spec.sendUUID + `"}]`),
})
c.Assert(resp.Code, Equals, http.StatusOK)
expectEqualWithin(c, time.Second, 0, func() interface{} {
func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
testData := PullWorkerTestData{
name: "TestPullWorkerPullList_with_two_locators",
- req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
+ req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList},
responseCode: http.StatusOK,
responseBody: "Received 2 pull requests\n",
readContent: "hello",
func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
testData := PullWorkerTestData{
name: "TestPullWorkerPullList_with_one_locator",
- req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
+ req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList},
responseCode: http.StatusOK,
responseBody: "Received 1 pull requests\n",
readContent: "hola",
func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
testData := PullWorkerTestData{
name: "TestPullWorker_error_on_get_one_locator",
- req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
+ req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList},
responseCode: http.StatusOK,
responseBody: "Received 1 pull requests\n",
readContent: "unused",
func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
testData := PullWorkerTestData{
name: "TestPullWorker_error_on_get_two_locators",
- req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
+ req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList},
responseCode: http.StatusOK,
responseBody: "Received 2 pull requests\n",
readContent: "unused",
func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
testData := PullWorkerTestData{
name: "TestPullWorker_error_on_put_one_locator",
- req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
+ req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList},
responseCode: http.StatusOK,
responseBody: "Received 1 pull requests\n",
readContent: "hello hello",
func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
testData := PullWorkerTestData{
name: "TestPullWorker_error_on_put_two_locators",
- req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
+ req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList},
responseCode: http.StatusOK,
responseBody: "Received 2 pull requests\n",
readContent: "hello again",
}
func (s *PullWorkerTestSuite) performTest(testData PullWorkerTestData, c *C) {
+ pullq := s.handler.Handler.(*router).pullq
+
s.testPullLists[testData.name] = testData.responseBody
processedPullLists := make(map[string]string)
GetContent = orig
}(GetContent)
GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) {
- c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(1))
+ c.Assert(getStatusItem(s.handler, "PullQueue", "InProgress"), Equals, float64(1))
processedPullLists[testData.name] = testData.responseBody
if testData.readError {
err = errors.New("Error getting data")
}
// Override writePulledBlock to mock PutBlock functionality
- defer func(orig func(Volume, []byte, string)) { writePulledBlock = orig }(writePulledBlock)
- writePulledBlock = func(v Volume, content []byte, locator string) {
+ defer func(orig func(*RRVolumeManager, Volume, []byte, string) error) { writePulledBlock = orig }(writePulledBlock)
+ writePulledBlock = func(_ *RRVolumeManager, v Volume, content []byte, locator string) error {
if testData.putError {
s.putError = errors.New("Error putting data")
- return
+ return s.putError
}
s.putContent = content
+ return nil
}
- c.Check(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
- c.Check(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
+ c.Check(getStatusItem(s.handler, "PullQueue", "InProgress"), Equals, float64(0))
+ c.Check(getStatusItem(s.handler, "PullQueue", "Queued"), Equals, float64(0))
+ c.Check(getStatusItem(s.handler, "Version"), Not(Equals), "")
- response := IssueRequest(&testData.req)
+ response := IssueRequest(s.handler, &testData.req)
c.Assert(response.Code, Equals, testData.responseCode)
c.Assert(response.Body.String(), Equals, testData.responseBody)