+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
import (
"bytes"
"errors"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
- . "gopkg.in/check.v1"
"io"
+ "io/ioutil"
"net/http"
- "testing"
"time"
-)
-
-type PullWorkerTestSuite struct{}
-// Gocheck boilerplate
-func TestPullWorker(t *testing.T) {
- TestingT(t)
-}
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ . "gopkg.in/check.v1"
+)
-// Gocheck boilerplate
var _ = Suite(&PullWorkerTestSuite{})
-var testPullLists map[string]string
-var readContent string
-var readError error
-var putContent []byte
-var putError error
-var currentTestData PullWorkerTestData
+type PullWorkerTestSuite struct {
+ testPullLists map[string]string
+ readContent string
+ readError error
+ putContent []byte
+ putError error
+}
func (s *PullWorkerTestSuite) SetUpTest(c *C) {
- readContent = ""
- readError = nil
- putContent = []byte("")
- putError = nil
+ theConfig.systemAuthToken = "arbitrary data manager token"
+ s.readContent = ""
+ s.readError = nil
+ s.putContent = []byte{}
+ s.putError = nil
// When a new pull request arrives, the old one will be overwritten.
// This behavior is verified using these two maps in the
// "TestPullWorkerPullList_with_two_items_latest_replacing_old"
- testPullLists = make(map[string]string)
-}
+ s.testPullLists = make(map[string]string)
-// Since keepstore does not come into picture in tests,
-// we need to explicitly start the goroutine in tests.
-func RunTestPullWorker(c *C) {
- arv, err := arvadosclient.MakeArvadosClient()
- c.Assert(err, Equals, nil)
- keepClient, err := keepclient.MakeKeepClient(&arv)
- c.Assert(err, Equals, nil)
+ KeepVM = MakeTestVolumeManager(2)
+ // Normally the pull queue and workers are started by main()
+ // -- tests need to set up their own.
+ arv, err := arvadosclient.MakeArvadosClient()
+ c.Assert(err, IsNil)
+ keepClient, err := keepclient.MakeKeepClient(arv)
+ c.Assert(err, IsNil)
pullq = NewWorkQueue()
go RunPullWorker(pullq, keepClient)
}
+func (s *PullWorkerTestSuite) TearDownTest(c *C) {
+ KeepVM.Close()
+ KeepVM = nil
+ pullq.Close()
+ pullq = nil
+ teardown()
+ theConfig = DefaultConfig()
+ theConfig.Start()
+}
+
var firstPullList = []byte(`[
{
"locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
putError bool
}
-func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
- defer teardown()
+// Ensure MountUUID in a pull list is correctly translated to a Volume
+// argument passed to writePulledBlock().
+func (s *PullWorkerTestSuite) TestSpecifyMountUUID(c *C) {
+ defer func(f func(Volume, []byte, string)) {
+ writePulledBlock = f
+ }(writePulledBlock)
+
+ for _, spec := range []struct {
+ sendUUID string
+ expectVolume Volume
+ }{
+ {
+ sendUUID: "",
+ expectVolume: nil,
+ },
+ {
+ sendUUID: KeepVM.Mounts()[0].UUID,
+ expectVolume: KeepVM.Mounts()[0].volume,
+ },
+ } {
+ writePulledBlock = func(v Volume, _ []byte, _ string) {
+ c.Check(v, Equals, spec.expectVolume)
+ }
+
+ resp := IssueRequest(&RequestTester{
+ uri: "/pull",
+ apiToken: theConfig.systemAuthToken,
+ method: "PUT",
+ requestBody: []byte(`[{
+ "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
+ "servers":["server_1","server_2"],
+ "mount_uuid":"` + spec.sendUUID + `"}]`),
+ })
+ c.Assert(resp.Code, Equals, http.StatusOK)
+ expectEqualWithin(c, time.Second, 0, func() interface{} {
+ st := pullq.Status()
+ return st.InProgress + st.Queued
+ })
+ }
+}
- dataManagerToken = "DATA MANAGER TOKEN"
+func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
testData := PullWorkerTestData{
name: "TestPullWorkerPullList_with_two_locators",
- req: RequestTester{"/pull", dataManagerToken, "PUT", firstPullList},
+ req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
responseCode: http.StatusOK,
responseBody: "Received 2 pull requests\n",
readContent: "hello",
putError: false,
}
- performTest(testData, c)
+ s.performTest(testData, c)
}
func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
- defer teardown()
-
- dataManagerToken = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
name: "TestPullWorkerPullList_with_one_locator",
- req: RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
+ req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
responseCode: http.StatusOK,
responseBody: "Received 1 pull requests\n",
readContent: "hola",
putError: false,
}
- performTest(testData, c)
+ s.performTest(testData, c)
}
func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
- defer teardown()
-
- dataManagerToken = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
name: "TestPullWorker_error_on_get_one_locator",
- req: RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
+ req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
responseCode: http.StatusOK,
responseBody: "Received 1 pull requests\n",
readContent: "unused",
putError: false,
}
- performTest(testData, c)
+ s.performTest(testData, c)
}
func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
- defer teardown()
-
- dataManagerToken = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
name: "TestPullWorker_error_on_get_two_locators",
- req: RequestTester{"/pull", dataManagerToken, "PUT", firstPullList},
+ req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
responseCode: http.StatusOK,
responseBody: "Received 2 pull requests\n",
readContent: "unused",
putError: false,
}
- performTest(testData, c)
+ s.performTest(testData, c)
}
func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
- defer teardown()
-
- dataManagerToken = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
name: "TestPullWorker_error_on_put_one_locator",
- req: RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
+ req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
responseCode: http.StatusOK,
responseBody: "Received 1 pull requests\n",
readContent: "hello hello",
putError: true,
}
- performTest(testData, c)
+ s.performTest(testData, c)
}
func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
- defer teardown()
-
- dataManagerToken = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
name: "TestPullWorker_error_on_put_two_locators",
- req: RequestTester{"/pull", dataManagerToken, "PUT", firstPullList},
+ req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
responseCode: http.StatusOK,
responseBody: "Received 2 pull requests\n",
readContent: "hello again",
putError: true,
}
- performTest(testData, c)
-}
-
-// When a new pull request arrives, the old one is replaced. This test
-// is used to check that behavior by first putting an item on the queue,
-// and then performing the test. Thus the "testPullLists" has two entries;
-// however, processedPullLists will see only the newest item in the list.
-func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_items_latest_replacing_old(c *C) {
- defer teardown()
-
- var firstInput = []int{1}
- pullq = NewWorkQueue()
- pullq.ReplaceQueue(makeTestWorkList(firstInput))
- testPullLists["Added_before_actual_test_item"] = string(1)
-
- dataManagerToken = "DATA MANAGER TOKEN"
- testData := PullWorkerTestData{
- name: "TestPullWorkerPullList_with_two_items_latest_replacing_old",
- req: RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
- responseCode: http.StatusOK,
- responseBody: "Received 1 pull requests\n",
- readContent: "hola de nuevo",
- readError: false,
- putError: false,
- }
-
- performTest(testData, c)
+ s.performTest(testData, c)
}
// In this case, the item will not be placed on pullq
-func (s *PullWorkerTestSuite) TestPullWorker_invalid_dataManagerToken(c *C) {
- defer teardown()
-
- dataManagerToken = "DATA MANAGER TOKEN"
-
+func (s *PullWorkerTestSuite) TestPullWorker_invalidToken(c *C) {
testData := PullWorkerTestData{
name: "TestPullWorkerPullList_with_two_locators",
- req: RequestTester{"/pull", "invalid_dataManagerToken", "PUT", firstPullList},
+ req: RequestTester{"/pull", "invalidToken", "PUT", firstPullList},
responseCode: http.StatusUnauthorized,
responseBody: "Unauthorized\n",
readContent: "hello",
putError: false,
}
- performTest(testData, c)
+ s.performTest(testData, c)
}
-func performTest(testData PullWorkerTestData, c *C) {
- KeepVM = MakeTestVolumeManager(2)
- defer KeepVM.Close()
-
- RunTestPullWorker(c)
- defer pullq.Close()
-
- currentTestData = testData
- testPullLists[testData.name] = testData.responseBody
+func (s *PullWorkerTestSuite) performTest(testData PullWorkerTestData, c *C) {
+ s.testPullLists[testData.name] = testData.responseBody
processedPullLists := make(map[string]string)
processedPullLists[testData.name] = testData.responseBody
if testData.readError {
err = errors.New("Error getting data")
- readError = err
- return nil, 0, "", err
+ s.readError = err
+ return
}
- readContent = testData.readContent
- cb := &ClosingBuffer{bytes.NewBufferString(testData.readContent)}
- var rc io.ReadCloser
- rc = cb
- return rc, int64(len(testData.readContent)), "", nil
+ s.readContent = testData.readContent
+ reader = ioutil.NopCloser(bytes.NewBufferString(testData.readContent))
+ contentLength = int64(len(testData.readContent))
+ return
}
- // Override PutContent to mock PutBlock functionality
- defer func(orig func([]byte, string) error) { PutContent = orig }(PutContent)
- PutContent = func(content []byte, locator string) (err error) {
+ // Override writePulledBlock to mock PutBlock functionality
+ defer func(orig func(Volume, []byte, string)) { writePulledBlock = orig }(writePulledBlock)
+ writePulledBlock = func(v Volume, content []byte, locator string) {
if testData.putError {
- err = errors.New("Error putting data")
- putError = err
- return err
+ s.putError = errors.New("Error putting data")
+ return
}
- putContent = content
- return nil
+ s.putContent = content
}
- c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
- c.Assert(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
+ c.Check(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
+ c.Check(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
+ c.Check(getStatusItem("Version"), Not(Equals), "")
response := IssueRequest(&testData.req)
c.Assert(response.Code, Equals, testData.responseCode)
})
if testData.name == "TestPullWorkerPullList_with_two_items_latest_replacing_old" {
- c.Assert(len(testPullLists), Equals, 2)
+ c.Assert(len(s.testPullLists), Equals, 2)
c.Assert(len(processedPullLists), Equals, 1)
- c.Assert(testPullLists["Added_before_actual_test_item"], NotNil)
- c.Assert(testPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
+ c.Assert(s.testPullLists["Added_before_actual_test_item"], NotNil)
+ c.Assert(s.testPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
c.Assert(processedPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
} else {
if testData.responseCode == http.StatusOK {
- c.Assert(len(testPullLists), Equals, 1)
+ c.Assert(len(s.testPullLists), Equals, 1)
c.Assert(len(processedPullLists), Equals, 1)
- c.Assert(testPullLists[testData.name], NotNil)
+ c.Assert(s.testPullLists[testData.name], NotNil)
} else {
- c.Assert(len(testPullLists), Equals, 1)
+ c.Assert(len(s.testPullLists), Equals, 1)
c.Assert(len(processedPullLists), Equals, 0)
}
}
if testData.readError {
- c.Assert(readError, NotNil)
+ c.Assert(s.readError, NotNil)
} else if testData.responseCode == http.StatusOK {
- c.Assert(readError, IsNil)
- c.Assert(readContent, Equals, testData.readContent)
+ c.Assert(s.readError, IsNil)
+ c.Assert(s.readContent, Equals, testData.readContent)
if testData.putError {
- c.Assert(putError, NotNil)
+ c.Assert(s.putError, NotNil)
} else {
- c.Assert(putError, IsNil)
- c.Assert(string(putContent), Equals, testData.readContent)
+ c.Assert(s.putError, IsNil)
+ c.Assert(string(s.putContent), Equals, testData.readContent)
}
}
expectChannelEmpty(c, pullq.NextItem)
}
-
-type ClosingBuffer struct {
- *bytes.Buffer
-}
-
-func (cb *ClosingBuffer) Close() (err error) {
- return
-}