X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/9ae180ec18f1f397889a4531a12999942edd003a..143a5f355100b837daa428231df0370b525a1f9f:/services/keepstore/pull_worker_test.go diff --git a/services/keepstore/pull_worker_test.go b/services/keepstore/pull_worker_test.go index 124c9b835c..37d83b3280 100644 --- a/services/keepstore/pull_worker_test.go +++ b/services/keepstore/pull_worker_test.go @@ -9,6 +9,7 @@ import ( "io" "net/http" "testing" + "time" ) type PullWorkerTestSuite struct{} @@ -22,7 +23,6 @@ func TestPullWorker(t *testing.T) { var _ = Suite(&PullWorkerTestSuite{}) var testPullLists map[string]string -var processedPullLists map[string]string var readContent string var readError error var putContent []byte @@ -39,7 +39,6 @@ func (s *PullWorkerTestSuite) SetUpTest(c *C) { // This behavior is verified using these two maps in the // "TestPullWorker_pull_list_with_two_items_latest_replacing_old" testPullLists = make(map[string]string) - processedPullLists = make(map[string]string) } // Since keepstore does not come into picture in tests, @@ -56,14 +55,13 @@ func RunTestPullWorker(c *C) { var first_pull_list = []byte(`[ { - "locator":"locator1", + "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3", "servers":[ "server_1", "server_2" ] - }, - { - "locator":"locator2", + },{ + "locator":"37b51d194a7513e45b56f6524f2d51f2+3", "servers":[ "server_3" ] @@ -72,10 +70,10 @@ var first_pull_list = []byte(`[ var second_pull_list = []byte(`[ { - "locator":"locator3", + "locator":"73feffa4b7f6bb68e44cf984c85f6e88+3", "servers":[ "server_1", - "server_2" + "server_2" ] } ]`) @@ -238,15 +236,23 @@ func (s *PullWorkerTestSuite) TestPullWorker_invalid_data_manager_token(c *C) { } func performTest(testData PullWorkerTestData, c *C) { + KeepVM = MakeTestVolumeManager(2) + defer KeepVM.Close() + RunTestPullWorker(c) + defer pullq.Close() currentTestData = testData testPullLists[testData.name] = testData.response_body - // Override GetContent to mock keepclient Get functionality - GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) ( - reader io.ReadCloser, contentLength int64, url string, err error) { + processedPullLists := make(map[string]string) + // Override GetContent to mock keepclient Get functionality + defer func(orig func(string, *keepclient.KeepClient) (io.ReadCloser, int64, string, error)) { + GetContent = orig + }(GetContent) + GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) { + c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(1)) processedPullLists[testData.name] = testData.response_body if testData.read_error { err = errors.New("Error getting data") @@ -262,6 +268,7 @@ func performTest(testData PullWorkerTestData, c *C) { } // Override PutContent to mock PutBlock functionality + defer func(orig func([]byte, string) error) { PutContent = orig }(PutContent) PutContent = func(content []byte, locator string) (err error) { if testData.put_error { err = errors.New("Error putting data") @@ -273,13 +280,17 @@ func performTest(testData PullWorkerTestData, c *C) { } } - response := IssueRequest(&testData.req) - c.Assert(testData.response_code, Equals, response.Code) - c.Assert(testData.response_body, Equals, response.Body.String()) + c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(0)) + c.Assert(getStatusItem("PullQueue", "Queued"), Equals, float64(0)) - expectWorkerChannelEmpty(c, pullq.NextItem) + response := IssueRequest(&testData.req) + c.Assert(response.Code, Equals, testData.response_code) + c.Assert(response.Body.String(), Equals, testData.response_body) - pullq.Close() + expectEqualWithin(c, time.Second, 0, func() interface{} { + st := pullq.Status() + return st.InProgress + st.Queued + }) if testData.name == "TestPullWorker_pull_list_with_two_items_latest_replacing_old" { c.Assert(len(testPullLists), Equals, 2) @@ -310,6 +321,8 @@ func performTest(testData PullWorkerTestData, c *C) { c.Assert(string(putContent), Equals, testData.read_content) } } + + expectChannelEmpty(c, pullq.NextItem) } type ClosingBuffer struct { @@ -319,19 +332,3 @@ type ClosingBuffer struct { func (cb *ClosingBuffer) Close() (err error) { return } - -func expectWorkerChannelEmpty(c *C, workerChannel <-chan interface{}) { - select { - case item := <-workerChannel: - c.Fatalf("Received value (%v) from channel that was expected to be empty", item) - default: - } -} - -func expectWorkerChannelNotEmpty(c *C, workerChannel <-chan interface{}) { - select { - case item := <-workerChannel: - c.Fatalf("Received value (%v) from channel that was expected to be empty", item) - default: - } -}