11644: Add pull-to-mount-UUID test. Tidy up pull worker and tests.
authorTom Clegg <tom@curoverse.com>
Tue, 16 May 2017 16:43:39 +0000 (12:43 -0400)
committerTom Clegg <tom@curoverse.com>
Tue, 16 May 2017 16:48:19 +0000 (12:48 -0400)
services/keepstore/pull_worker.go
services/keepstore/pull_worker_integration_test.go
services/keepstore/pull_worker_test.go
services/keepstore/work_queue_test.go

index 150b5ca2349a6e2765099feb39abc1a3f90ebedd..58266a19ce27b4cdba514ea9c461c5247908560f 100644 (file)
@@ -13,34 +13,30 @@ import (
        log "github.com/Sirupsen/logrus"
 )
 
-// RunPullWorker is used by Keepstore to initiate pull worker channel goroutine.
-//     The channel will process pull list.
-//             For each (next) pull request:
-//                     For each locator listed, execute Pull on the server(s) listed
-//                     Skip the rest of the servers if no errors
-//             Repeat
-//
+// RunPullWorker receives PullRequests from pullq, invokes
+// PullItemAndProcess on each one. After each PR, it logs a message
+// indicating whether the pull was successful.
 func RunPullWorker(pullq *WorkQueue, keepClient *keepclient.KeepClient) {
-       nextItem := pullq.NextItem
-       for item := range nextItem {
-               pullRequest := item.(PullRequest)
-               err := PullItemAndProcess(item.(PullRequest), keepClient)
+       for item := range pullq.NextItem {
+               pr := item.(PullRequest)
+               err := PullItemAndProcess(pr, keepClient)
                pullq.DoneItem <- struct{}{}
                if err == nil {
-                       log.Printf("Pull %s success", pullRequest)
+                       log.Printf("Pull %s success", pr)
                } else {
-                       log.Printf("Pull %s error: %s", pullRequest, err)
+                       log.Printf("Pull %s error: %s", pr, err)
                }
        }
 }
 
-// PullItemAndProcess pulls items from PullQueue and processes them.
-//     For each Pull request:
-//             Generate a random API token.
-//             Generate a permission signature using this token, timestamp ~60 seconds in the future, and desired block hash.
-//             Using this token & signature, retrieve the given block.
-//             Write to storage
+// PullItemAndProcess executes a pull request by retrieving the
+// specified block from one of the specified servers, and storing it
+// on a local volume.
 //
+// If the PR specifies a non-blank mount UUID, PullItemAndProcess will
+// only attempt to write the data to the corresponding
+// volume. Otherwise it writes to any local volume, as a PUT request
+// would.
 func PullItemAndProcess(pullRequest PullRequest, keepClient *keepclient.KeepClient) error {
        var vol Volume
        if uuid := pullRequest.MountUUID; uuid != "" {
index 7ba72672cfc95831b9e547b2d5430fbdea385821..8c7a1e222ddc8905041161cee67d605334285a65 100644 (file)
@@ -4,6 +4,7 @@ import (
        "bytes"
        "errors"
        "io"
+       "io/ioutil"
        "net/http"
        "os"
        "strings"
@@ -122,12 +123,11 @@ func performPullWorkerIntegrationTest(testData PullWorkIntegrationTestData, pull
        defer func(orig func(string, *keepclient.KeepClient) (io.ReadCloser, int64, string, error)) {
                GetContent = orig
        }(GetContent)
-       GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (
-               reader io.ReadCloser, contentLength int64, url string, err error) {
+       GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) {
                if testData.GetError != "" {
                        return nil, 0, "", errors.New(testData.GetError)
                }
-               rdr := &ClosingBuffer{bytes.NewBufferString(testData.Content)}
+               rdr := ioutil.NopCloser(bytes.NewBufferString(testData.Content))
                return rdr, int64(len(testData.Content)), "", nil
        }
 
index d8dc695e29e376ee4f7f1fecbac41cab99c15c44..e31979f2273464d605560d2c9d446d3be23ba690 100644 (file)
@@ -4,6 +4,7 @@ import (
        "bytes"
        "errors"
        "io"
+       "io/ioutil"
        "net/http"
        "time"
 
@@ -14,39 +15,48 @@ import (
 
 var _ = Suite(&PullWorkerTestSuite{})
 
-type PullWorkerTestSuite struct{}
-
-var testPullLists map[string]string
-var readContent string
-var readError error
-var putContent []byte
-var putError error
-var currentTestData PullWorkerTestData
+type PullWorkerTestSuite struct {
+       testPullLists map[string]string
+       readContent   string
+       readError     error
+       putContent    []byte
+       putError      error
+}
 
 func (s *PullWorkerTestSuite) SetUpTest(c *C) {
-       readContent = ""
-       readError = nil
-       putContent = []byte("")
-       putError = nil
+       theConfig.systemAuthToken = "arbitrary data manager token"
+       s.readContent = ""
+       s.readError = nil
+       s.putContent = []byte{}
+       s.putError = nil
 
        // When a new pull request arrives, the old one will be overwritten.
        // This behavior is verified using these two maps in the
        // "TestPullWorkerPullList_with_two_items_latest_replacing_old"
-       testPullLists = make(map[string]string)
-}
+       s.testPullLists = make(map[string]string)
 
-// Since keepstore does not come into picture in tests,
-// we need to explicitly start the goroutine in tests.
-func RunTestPullWorker(c *C) {
+       KeepVM = MakeTestVolumeManager(2)
+
+       // Normally the pull queue and workers are started by main()
+       // -- tests need to set up their own.
        arv, err := arvadosclient.MakeArvadosClient()
-       c.Assert(err, Equals, nil)
+       c.Assert(err, IsNil)
        keepClient, err := keepclient.MakeKeepClient(arv)
-       c.Assert(err, Equals, nil)
-
+       c.Assert(err, IsNil)
        pullq = NewWorkQueue()
        go RunPullWorker(pullq, keepClient)
 }
 
+func (s *PullWorkerTestSuite) TearDownTest(c *C) {
+       KeepVM.Close()
+       KeepVM = nil
+       pullq.Close()
+       pullq = nil
+       teardown()
+       theConfig = DefaultConfig()
+       theConfig.Start()
+}
+
 var firstPullList = []byte(`[
                {
                        "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
@@ -82,10 +92,48 @@ type PullWorkerTestData struct {
        putError     bool
 }
 
-func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
-       defer teardown()
+// Ensure MountUUID in a pull list is correctly translated to a Volume
+// argument passed to writePulledBlock().
+func (s *PullWorkerTestSuite) TestSpecifyMountUUID(c *C) {
+       defer func(f func(Volume, []byte, string)) {
+               writePulledBlock = f
+       }(writePulledBlock)
+
+       for _, spec := range []struct {
+               sendUUID     string
+               expectVolume Volume
+       }{
+               {
+                       sendUUID:     "",
+                       expectVolume: nil,
+               },
+               {
+                       sendUUID:     KeepVM.Mounts()[0].UUID,
+                       expectVolume: KeepVM.Mounts()[0].volume,
+               },
+       } {
+               writePulledBlock = func(v Volume, _ []byte, _ string) {
+                       c.Check(v, Equals, spec.expectVolume)
+               }
+
+               resp := IssueRequest(&RequestTester{
+                       uri:      "/pull",
+                       apiToken: theConfig.systemAuthToken,
+                       method:   "PUT",
+                       requestBody: []byte(`[{
+                               "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
+                               "servers":["server_1","server_2"],
+                               "mountuuid":"` + spec.sendUUID + `"}]`),
+               })
+               c.Assert(resp.Code, Equals, http.StatusOK)
+               expectEqualWithin(c, time.Second, 0, func() interface{} {
+                       st := pullq.Status()
+                       return st.InProgress + st.Queued
+               })
+       }
+}
 
-       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
+func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
        testData := PullWorkerTestData{
                name:         "TestPullWorkerPullList_with_two_locators",
                req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
@@ -96,13 +144,10 @@ func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
                putError:     false,
        }
 
-       performTest(testData, c)
+       s.performTest(testData, c)
 }
 
 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
-       defer teardown()
-
-       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
        testData := PullWorkerTestData{
                name:         "TestPullWorkerPullList_with_one_locator",
                req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
@@ -113,13 +158,10 @@ func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
                putError:     false,
        }
 
-       performTest(testData, c)
+       s.performTest(testData, c)
 }
 
 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
-       defer teardown()
-
-       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
        testData := PullWorkerTestData{
                name:         "TestPullWorker_error_on_get_one_locator",
                req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
@@ -130,13 +172,10 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
                putError:     false,
        }
 
-       performTest(testData, c)
+       s.performTest(testData, c)
 }
 
 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
-       defer teardown()
-
-       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
        testData := PullWorkerTestData{
                name:         "TestPullWorker_error_on_get_two_locators",
                req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
@@ -147,13 +186,10 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
                putError:     false,
        }
 
-       performTest(testData, c)
+       s.performTest(testData, c)
 }
 
 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
-       defer teardown()
-
-       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
        testData := PullWorkerTestData{
                name:         "TestPullWorker_error_on_put_one_locator",
                req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
@@ -164,13 +200,10 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
                putError:     true,
        }
 
-       performTest(testData, c)
+       s.performTest(testData, c)
 }
 
 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
-       defer teardown()
-
-       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
        testData := PullWorkerTestData{
                name:         "TestPullWorker_error_on_put_two_locators",
                req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
@@ -181,41 +214,11 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
                putError:     true,
        }
 
-       performTest(testData, c)
-}
-
-// When a new pull request arrives, the old one is replaced. This test
-// is used to check that behavior by first putting an item on the queue,
-// and then performing the test. Thus the "testPullLists" has two entries;
-// however, processedPullLists will see only the newest item in the list.
-func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_items_latest_replacing_old(c *C) {
-       defer teardown()
-
-       var firstInput = []int{1}
-       pullq = NewWorkQueue()
-       pullq.ReplaceQueue(makeTestWorkList(firstInput))
-       testPullLists["Added_before_actual_test_item"] = string(1)
-
-       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
-       testData := PullWorkerTestData{
-               name:         "TestPullWorkerPullList_with_two_items_latest_replacing_old",
-               req:          RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
-               responseCode: http.StatusOK,
-               responseBody: "Received 1 pull requests\n",
-               readContent:  "hola de nuevo",
-               readError:    false,
-               putError:     false,
-       }
-
-       performTest(testData, c)
+       s.performTest(testData, c)
 }
 
 // In this case, the item will not be placed on pullq
 func (s *PullWorkerTestSuite) TestPullWorker_invalidToken(c *C) {
-       defer teardown()
-
-       theConfig.systemAuthToken = "DATA MANAGER TOKEN"
-
        testData := PullWorkerTestData{
                name:         "TestPullWorkerPullList_with_two_locators",
                req:          RequestTester{"/pull", "invalidToken", "PUT", firstPullList},
@@ -226,18 +229,11 @@ func (s *PullWorkerTestSuite) TestPullWorker_invalidToken(c *C) {
                putError:     false,
        }
 
-       performTest(testData, c)
+       s.performTest(testData, c)
 }
 
-func performTest(testData PullWorkerTestData, c *C) {
-       KeepVM = MakeTestVolumeManager(2)
-       defer KeepVM.Close()
-
-       RunTestPullWorker(c)
-       defer pullq.Close()
-
-       currentTestData = testData
-       testPullLists[testData.name] = testData.responseBody
+func (s *PullWorkerTestSuite) performTest(testData PullWorkerTestData, c *C) {
+       s.testPullLists[testData.name] = testData.responseBody
 
        processedPullLists := make(map[string]string)
 
@@ -250,28 +246,27 @@ func performTest(testData PullWorkerTestData, c *C) {
                processedPullLists[testData.name] = testData.responseBody
                if testData.readError {
                        err = errors.New("Error getting data")
-                       readError = err
-                       return nil, 0, "", err
+                       s.readError = err
+                       return
                }
-               readContent = testData.readContent
-               cb := &ClosingBuffer{bytes.NewBufferString(testData.readContent)}
-               var rc io.ReadCloser
-               rc = cb
-               return rc, int64(len(testData.readContent)), "", nil
+               s.readContent = testData.readContent
+               reader = ioutil.NopCloser(bytes.NewBufferString(testData.readContent))
+               contentLength = int64(len(testData.readContent))
+               return
        }
 
        // Override writePulledBlock to mock PutBlock functionality
        defer func(orig func(Volume, []byte, string)) { writePulledBlock = orig }(writePulledBlock)
        writePulledBlock = func(v Volume, content []byte, locator string) {
                if testData.putError {
-                       putError = errors.New("Error putting data")
+                       s.putError = errors.New("Error putting data")
                        return
                }
-               putContent = content
+               s.putContent = content
        }
 
-       c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
-       c.Assert(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
+       c.Check(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
+       c.Check(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
 
        response := IssueRequest(&testData.req)
        c.Assert(response.Code, Equals, testData.responseCode)
@@ -283,42 +278,34 @@ func performTest(testData PullWorkerTestData, c *C) {
        })
 
        if testData.name == "TestPullWorkerPullList_with_two_items_latest_replacing_old" {
-               c.Assert(len(testPullLists), Equals, 2)
+               c.Assert(len(s.testPullLists), Equals, 2)
                c.Assert(len(processedPullLists), Equals, 1)
-               c.Assert(testPullLists["Added_before_actual_test_item"], NotNil)
-               c.Assert(testPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
+               c.Assert(s.testPullLists["Added_before_actual_test_item"], NotNil)
+               c.Assert(s.testPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
                c.Assert(processedPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
        } else {
                if testData.responseCode == http.StatusOK {
-                       c.Assert(len(testPullLists), Equals, 1)
+                       c.Assert(len(s.testPullLists), Equals, 1)
                        c.Assert(len(processedPullLists), Equals, 1)
-                       c.Assert(testPullLists[testData.name], NotNil)
+                       c.Assert(s.testPullLists[testData.name], NotNil)
                } else {
-                       c.Assert(len(testPullLists), Equals, 1)
+                       c.Assert(len(s.testPullLists), Equals, 1)
                        c.Assert(len(processedPullLists), Equals, 0)
                }
        }
 
        if testData.readError {
-               c.Assert(readError, NotNil)
+               c.Assert(s.readError, NotNil)
        } else if testData.responseCode == http.StatusOK {
-               c.Assert(readError, IsNil)
-               c.Assert(readContent, Equals, testData.readContent)
+               c.Assert(s.readError, IsNil)
+               c.Assert(s.readContent, Equals, testData.readContent)
                if testData.putError {
-                       c.Assert(putError, NotNil)
+                       c.Assert(s.putError, NotNil)
                } else {
-                       c.Assert(putError, IsNil)
-                       c.Assert(string(putContent), Equals, testData.readContent)
+                       c.Assert(s.putError, IsNil)
+                       c.Assert(string(s.putContent), Equals, testData.readContent)
                }
        }
 
        expectChannelEmpty(c, pullq.NextItem)
 }
-
-type ClosingBuffer struct {
-       *bytes.Buffer
-}
-
-func (cb *ClosingBuffer) Close() (err error) {
-       return
-}
index 6b31795293ebd38eaa3837316fe001519c91b072..8893fb94068649aa32a2b3d7434724bb6024fd2f 100644 (file)
@@ -11,7 +11,7 @@ type fatalfer interface {
        Fatalf(string, ...interface{})
 }
 
-func makeTestWorkList(ary []int) *list.List {
+func makeTestWorkList(ary []interface{}) *list.List {
        l := list.New()
        for _, n := range ary {
                l.PushBack(n)
@@ -53,7 +53,7 @@ func expectChannelClosedWithin(t fatalfer, timeout time.Duration, c <-chan inter
        }
 }
 
-func doWorkItems(t fatalfer, q *WorkQueue, expected []int) {
+func doWorkItems(t fatalfer, q *WorkQueue, expected []interface{}) {
        for i := range expected {
                actual, ok := <-q.NextItem
                if !ok {
@@ -93,7 +93,7 @@ func expectQueued(t fatalfer, b *WorkQueue, expectQueued int) {
 func TestWorkQueueDoneness(t *testing.T) {
        b := NewWorkQueue()
        defer b.Close()
-       b.ReplaceQueue(makeTestWorkList([]int{1, 2, 3}))
+       b.ReplaceQueue(makeTestWorkList([]interface{}{1, 2, 3}))
        expectQueued(t, b, 3)
        gate := make(chan struct{})
        go func() {
@@ -105,7 +105,7 @@ func TestWorkQueueDoneness(t *testing.T) {
                }
        }()
        expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
-       b.ReplaceQueue(makeTestWorkList([]int{4, 5, 6}))
+       b.ReplaceQueue(makeTestWorkList([]interface{}{4, 5, 6}))
        for i := 1; i <= 3; i++ {
                gate <- struct{}{}
                expectEqualWithin(t, time.Second, 3-i, func() interface{} { return b.Status().Queued })
@@ -118,7 +118,7 @@ func TestWorkQueueDoneness(t *testing.T) {
 
 // Create a WorkQueue, generate a list for it, and instantiate a worker.
 func TestWorkQueueReadWrite(t *testing.T) {
-       var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
+       var input = []interface{}{1, 1, 2, 3, 5, 8, 13, 21, 34}
 
        b := NewWorkQueue()
        expectQueued(t, b, 0)
@@ -133,7 +133,7 @@ func TestWorkQueueReadWrite(t *testing.T) {
 
 // Start a worker before the list has any input.
 func TestWorkQueueEarlyRead(t *testing.T) {
-       var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
+       var input = []interface{}{1, 1, 2, 3, 5, 8, 13, 21, 34}
 
        b := NewWorkQueue()
        defer b.Close()
@@ -161,7 +161,7 @@ func TestWorkQueueEarlyRead(t *testing.T) {
 // After Close(), NextItem closes, work finishes, then stats return zero.
 func TestWorkQueueClose(t *testing.T) {
        b := NewWorkQueue()
-       input := []int{1, 2, 3, 4, 5, 6, 7, 8}
+       input := []interface{}{1, 2, 3, 4, 5, 6, 7, 8}
        mark := make(chan struct{})
        go func() {
                <-b.NextItem
@@ -185,8 +185,8 @@ func TestWorkQueueClose(t *testing.T) {
 // available.
 func TestWorkQueueReaderBlocks(t *testing.T) {
        var (
-               inputBeforeBlock = []int{1, 2, 3, 4, 5}
-               inputAfterBlock  = []int{6, 7, 8, 9, 10}
+               inputBeforeBlock = []interface{}{1, 2, 3, 4, 5}
+               inputAfterBlock  = []interface{}{6, 7, 8, 9, 10}
        )
 
        b := NewWorkQueue()
@@ -219,8 +219,8 @@ func TestWorkQueueReaderBlocks(t *testing.T) {
 
 // Replace one active work list with another.
 func TestWorkQueueReplaceQueue(t *testing.T) {
-       var firstInput = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
-       var replaceInput = []int{1, 4, 9, 16, 25, 36, 49, 64, 81}
+       var firstInput = []interface{}{1, 1, 2, 3, 5, 8, 13, 21, 34}
+       var replaceInput = []interface{}{1, 4, 9, 16, 25, 36, 49, 64, 81}
 
        b := NewWorkQueue()
        b.ReplaceQueue(makeTestWorkList(firstInput))