Merge branch 'pr/24'
[arvados.git] / services / keepstore / pull_worker_test.go
index 820f9515681d79b82d71517944b40b7e84790a8e..c6a41953842b6c2ab9403c84c04d17cee839fabd 100644 (file)
@@ -9,6 +9,7 @@ import (
        "io"
        "net/http"
        "testing"
+       "time"
 )
 
 type PullWorkerTestSuite struct{}
@@ -22,7 +23,6 @@ func TestPullWorker(t *testing.T) {
 var _ = Suite(&PullWorkerTestSuite{})
 
 var testPullLists map[string]string
-var processedPullLists map[string]string
 var readContent string
 var readError error
 var putContent []byte
@@ -37,9 +37,8 @@ func (s *PullWorkerTestSuite) SetUpTest(c *C) {
 
        // When a new pull request arrives, the old one will be overwritten.
        // This behavior is verified using these two maps in the
-       // "TestPullWorker_pull_list_with_two_items_latest_replacing_old"
+       // "TestPullWorkerPullList_with_two_items_latest_replacing_old"
        testPullLists = make(map[string]string)
-       processedPullLists = make(map[string]string)
 }
 
 // Since keepstore does not come into picture in tests,
@@ -54,71 +53,70 @@ func RunTestPullWorker(c *C) {
        go RunPullWorker(pullq, keepClient)
 }
 
-var first_pull_list = []byte(`[
+var firstPullList = []byte(`[
                {
-                       "locator":"locator1",
+                       "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
                        "servers":[
                                "server_1",
                                "server_2"
                        ]
-               },
-    {
-                       "locator":"locator2",
+               },{
+                       "locator":"37b51d194a7513e45b56f6524f2d51f2+3",
                        "servers":[
                                "server_3"
                        ]
                }
        ]`)
 
-var second_pull_list = []byte(`[
+var secondPullList = []byte(`[
                {
-                       "locator":"locator3",
+                       "locator":"73feffa4b7f6bb68e44cf984c85f6e88+3",
                        "servers":[
                                "server_1",
-        "server_2"
+                               "server_2"
                        ]
                }
        ]`)
 
 type PullWorkerTestData struct {
-       name          string
-       req           RequestTester
-       response_code int
-       response_body string
-       read_content  string
-       read_error    bool
-       put_error     bool
+       name         string
+       req          RequestTester
+       responseCode int
+       responseBody string
+       readContent  string
+       readError    bool
+       putError     bool
 }
 
-func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_two_locators(c *C) {
+func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
        defer teardown()
 
-       data_manager_token = "DATA MANAGER TOKEN"
+       dataManagerToken = "DATA MANAGER TOKEN"
        testData := PullWorkerTestData{
-               name:          "TestPullWorker_pull_list_with_two_locators",
-               req:           RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
-               response_code: http.StatusOK,
-               response_body: "Received 2 pull requests\n",
-               read_content:  "hello",
-               read_error:    false,
-               put_error:     false,
+               name:         "TestPullWorkerPullList_with_two_locators",
+               req:          RequestTester{"/pull", dataManagerToken, "PUT", firstPullList},
+               responseCode: http.StatusOK,
+               responseBody: "Received 2 pull requests\n",
+               readContent:  "hello",
+               readError:    false,
+               putError:     false,
        }
 
        performTest(testData, c)
 }
 
-func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_one_locator(c *C) {
+func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
        defer teardown()
 
-       data_manager_token = "DATA MANAGER TOKEN"
+       dataManagerToken = "DATA MANAGER TOKEN"
        testData := PullWorkerTestData{
-               name:          "TestPullWorker_pull_list_with_one_locator",
-               req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
-               response_code: http.StatusOK,
-               response_body: "Received 1 pull requests\n",
-               read_content:  "hola",
-               read_error:    false,
-               put_error:     false,
+               name:         "TestPullWorkerPullList_with_one_locator",
+               req:          RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
+               responseCode: http.StatusOK,
+               responseBody: "Received 1 pull requests\n",
+               readContent:  "hola",
+               readError:    false,
+               putError:     false,
        }
 
        performTest(testData, c)
@@ -127,15 +125,15 @@ func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_one_locator(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
        defer teardown()
 
-       data_manager_token = "DATA MANAGER TOKEN"
+       dataManagerToken = "DATA MANAGER TOKEN"
        testData := PullWorkerTestData{
-               name:          "TestPullWorker_error_on_get_one_locator",
-               req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
-               response_code: http.StatusOK,
-               response_body: "Received 1 pull requests\n",
-               read_content:  "unused",
-               read_error:    true,
-               put_error:     false,
+               name:         "TestPullWorker_error_on_get_one_locator",
+               req:          RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
+               responseCode: http.StatusOK,
+               responseBody: "Received 1 pull requests\n",
+               readContent:  "unused",
+               readError:    true,
+               putError:     false,
        }
 
        performTest(testData, c)
@@ -144,15 +142,15 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
        defer teardown()
 
-       data_manager_token = "DATA MANAGER TOKEN"
+       dataManagerToken = "DATA MANAGER TOKEN"
        testData := PullWorkerTestData{
-               name:          "TestPullWorker_error_on_get_two_locators",
-               req:           RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
-               response_code: http.StatusOK,
-               response_body: "Received 2 pull requests\n",
-               read_content:  "unused",
-               read_error:    true,
-               put_error:     false,
+               name:         "TestPullWorker_error_on_get_two_locators",
+               req:          RequestTester{"/pull", dataManagerToken, "PUT", firstPullList},
+               responseCode: http.StatusOK,
+               responseBody: "Received 2 pull requests\n",
+               readContent:  "unused",
+               readError:    true,
+               putError:     false,
        }
 
        performTest(testData, c)
@@ -161,15 +159,15 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
        defer teardown()
 
-       data_manager_token = "DATA MANAGER TOKEN"
+       dataManagerToken = "DATA MANAGER TOKEN"
        testData := PullWorkerTestData{
-               name:          "TestPullWorker_error_on_put_one_locator",
-               req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
-               response_code: http.StatusOK,
-               response_body: "Received 1 pull requests\n",
-               read_content:  "hello hello",
-               read_error:    false,
-               put_error:     true,
+               name:         "TestPullWorker_error_on_put_one_locator",
+               req:          RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
+               responseCode: http.StatusOK,
+               responseBody: "Received 1 pull requests\n",
+               readContent:  "hello hello",
+               readError:    false,
+               putError:     true,
        }
 
        performTest(testData, c)
@@ -178,15 +176,15 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
        defer teardown()
 
-       data_manager_token = "DATA MANAGER TOKEN"
+       dataManagerToken = "DATA MANAGER TOKEN"
        testData := PullWorkerTestData{
-               name:          "TestPullWorker_error_on_put_two_locators",
-               req:           RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
-               response_code: http.StatusOK,
-               response_body: "Received 2 pull requests\n",
-               read_content:  "hello again",
-               read_error:    false,
-               put_error:     true,
+               name:         "TestPullWorker_error_on_put_two_locators",
+               req:          RequestTester{"/pull", dataManagerToken, "PUT", firstPullList},
+               responseCode: http.StatusOK,
+               responseBody: "Received 2 pull requests\n",
+               readContent:  "hello again",
+               readError:    false,
+               putError:     true,
        }
 
        performTest(testData, c)
@@ -196,7 +194,7 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
 // is used to check that behavior by first putting an item on the queue,
 // and then performing the test. Thus the "testPullLists" has two entries;
 // however, processedPullLists will see only the newest item in the list.
-func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_two_items_latest_replacing_old(c *C) {
+func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_items_latest_replacing_old(c *C) {
        defer teardown()
 
        var firstInput = []int{1}
@@ -204,89 +202,125 @@ func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_two_items_latest_rep
        pullq.ReplaceQueue(makeTestWorkList(firstInput))
        testPullLists["Added_before_actual_test_item"] = string(1)
 
-       data_manager_token = "DATA MANAGER TOKEN"
+       dataManagerToken = "DATA MANAGER TOKEN"
+       testData := PullWorkerTestData{
+               name:         "TestPullWorkerPullList_with_two_items_latest_replacing_old",
+               req:          RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
+               responseCode: http.StatusOK,
+               responseBody: "Received 1 pull requests\n",
+               readContent:  "hola de nuevo",
+               readError:    false,
+               putError:     false,
+       }
+
+       performTest(testData, c)
+}
+
+// In this case, the item will not be placed on pullq
+func (s *PullWorkerTestSuite) TestPullWorker_invalid_dataManagerToken(c *C) {
+       defer teardown()
+
+       dataManagerToken = "DATA MANAGER TOKEN"
+
        testData := PullWorkerTestData{
-               name:          "TestPullWorker_pull_list_with_two_items_latest_replacing_old",
-               req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
-               response_code: http.StatusOK,
-               response_body: "Received 1 pull requests\n",
-               read_content:  "hola de nuevo",
-               read_error:    false,
-               put_error:     false,
+               name:         "TestPullWorkerPullList_with_two_locators",
+               req:          RequestTester{"/pull", "invalid_dataManagerToken", "PUT", firstPullList},
+               responseCode: http.StatusUnauthorized,
+               responseBody: "Unauthorized\n",
+               readContent:  "hello",
+               readError:    false,
+               putError:     false,
        }
 
        performTest(testData, c)
 }
 
 func performTest(testData PullWorkerTestData, c *C) {
+       KeepVM = MakeTestVolumeManager(2)
+       defer KeepVM.Close()
+
        RunTestPullWorker(c)
+       defer pullq.Close()
 
        currentTestData = testData
-       testPullLists[testData.name] = testData.response_body
+       testPullLists[testData.name] = testData.responseBody
 
-       // Override GetContent to mock keepclient Get functionality
-       GetContent = func(signedLocator string, keepClient keepclient.KeepClient) (
-               reader io.ReadCloser, contentLength int64, url string, err error) {
+       processedPullLists := make(map[string]string)
 
-               processedPullLists[testData.name] = testData.response_body
-               if testData.read_error {
+       // Override GetContent to mock keepclient Get functionality
+       defer func(orig func(string, *keepclient.KeepClient) (io.ReadCloser, int64, string, error)) {
+               GetContent = orig
+       }(GetContent)
+       GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) {
+               c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(1))
+               processedPullLists[testData.name] = testData.responseBody
+               if testData.readError {
                        err = errors.New("Error getting data")
                        readError = err
                        return nil, 0, "", err
-               } else {
-                       readContent = testData.read_content
-                       cb := &ClosingBuffer{bytes.NewBufferString(testData.read_content)}
-                       var rc io.ReadCloser
-                       rc = cb
-                       return rc, int64(len(testData.read_content)), "", nil
                }
+               readContent = testData.readContent
+               cb := &ClosingBuffer{bytes.NewBufferString(testData.readContent)}
+               var rc io.ReadCloser
+               rc = cb
+               return rc, int64(len(testData.readContent)), "", nil
        }
 
        // Override PutContent to mock PutBlock functionality
+       defer func(orig func([]byte, string) error) { PutContent = orig }(PutContent)
        PutContent = func(content []byte, locator string) (err error) {
-               if testData.put_error {
+               if testData.putError {
                        err = errors.New("Error putting data")
                        putError = err
                        return err
-               } else {
-                       putContent = content
-                       return nil
                }
+               putContent = content
+               return nil
        }
 
-       response := IssueRequest(&testData.req)
-       c.Assert(testData.response_code, Equals, response.Code)
-       c.Assert(testData.response_body, Equals, response.Body.String())
+       c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
+       c.Assert(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
 
-       expectWorkerChannelEmpty(c, pullq.NextItem)
+       response := IssueRequest(&testData.req)
+       c.Assert(response.Code, Equals, testData.responseCode)
+       c.Assert(response.Body.String(), Equals, testData.responseBody)
 
-       pullq.Close()
+       expectEqualWithin(c, time.Second, 0, func() interface{} {
+               st := pullq.Status()
+               return st.InProgress + st.Queued
+       })
 
-       if testData.name == "TestPullWorker_pull_list_with_two_items_latest_replacing_old" {
+       if testData.name == "TestPullWorkerPullList_with_two_items_latest_replacing_old" {
                c.Assert(len(testPullLists), Equals, 2)
                c.Assert(len(processedPullLists), Equals, 1)
                c.Assert(testPullLists["Added_before_actual_test_item"], NotNil)
-               c.Assert(testPullLists["TestPullWorker_pull_list_with_two_items_latest_replacing_old"], NotNil)
-               c.Assert(processedPullLists["TestPullWorker_pull_list_with_two_items_latest_replacing_old"], NotNil)
+               c.Assert(testPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
+               c.Assert(processedPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
        } else {
-               c.Assert(len(testPullLists), Equals, 1)
-               c.Assert(len(processedPullLists), Equals, 1)
-               c.Assert(testPullLists[testData.name], NotNil)
+               if testData.responseCode == http.StatusOK {
+                       c.Assert(len(testPullLists), Equals, 1)
+                       c.Assert(len(processedPullLists), Equals, 1)
+                       c.Assert(testPullLists[testData.name], NotNil)
+               } else {
+                       c.Assert(len(testPullLists), Equals, 1)
+                       c.Assert(len(processedPullLists), Equals, 0)
+               }
        }
 
-       if testData.read_error {
+       if testData.readError {
                c.Assert(readError, NotNil)
-       } else {
+       } else if testData.responseCode == http.StatusOK {
                c.Assert(readError, IsNil)
-               c.Assert(readContent, Equals, testData.read_content)
-               if testData.put_error {
+               c.Assert(readContent, Equals, testData.readContent)
+               if testData.putError {
                        c.Assert(putError, NotNil)
                } else {
                        c.Assert(putError, IsNil)
-                       c.Assert(string(putContent), Equals, testData.read_content)
+                       c.Assert(string(putContent), Equals, testData.readContent)
                }
        }
 
+       expectChannelEmpty(c, pullq.NextItem)
 }
 
 type ClosingBuffer struct {
@@ -296,19 +330,3 @@ type ClosingBuffer struct {
 func (cb *ClosingBuffer) Close() (err error) {
        return
 }
-
-func expectWorkerChannelEmpty(c *C, workerChannel <-chan interface{}) {
-       select {
-       case item := <-workerChannel:
-               c.Fatalf("Received value (%v) from channel that was expected to be empty", item)
-       default:
-       }
-}
-
-func expectWorkerChannelNotEmpty(c *C, workerChannel <-chan interface{}) {
-       select {
-       case item := <-workerChannel:
-               c.Fatalf("Received value (%v) from channel that was expected to be empty", item)
-       default:
-       }
-}