6260: Expose queue sizes in /status.json. Fix sleep/race in trash_worker_test.
authorTom Clegg <tom@curoverse.com>
Mon, 10 Aug 2015 15:56:48 +0000 (11:56 -0400)
committerTom Clegg <tom@curoverse.com>
Mon, 10 Aug 2015 16:44:52 +0000 (12:44 -0400)
services/keepstore/handlers.go
services/keepstore/pull_worker_test.go
services/keepstore/status_test.go [new file with mode: 0644]
services/keepstore/trash_worker_test.go
services/keepstore/volume_test.go

index c4ecfb4c88b66dacccc9904c4ea9da5a64631a64..a6665f61640dfdb5f0791b4fc16fc34f084fe4bf 100644 (file)
@@ -192,9 +192,17 @@ type PoolStatus struct {
        Len   int    `json:"BuffersInUse"`
 }
 
+type WorkQueueStatus struct {
+       InProgress  int
+       Outstanding int
+       Queued      int
+}
+
 type NodeStatus struct {
        Volumes    []*VolumeStatus `json:"volumes"`
        BufferPool PoolStatus
+       PullQueue  WorkQueueStatus
+       TrashQueue WorkQueueStatus
        Memory     runtime.MemStats
 }
 
@@ -203,7 +211,7 @@ var stLock sync.Mutex
 
 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
        stLock.Lock()
-       ReadNodeStatus(&st)
+       readNodeStatus(&st)
        jstat, err := json.Marshal(&st)
        stLock.Unlock()
        if err == nil {
@@ -215,10 +223,8 @@ func StatusHandler(resp http.ResponseWriter, req *http.Request) {
        }
 }
 
-// ReadNodeStatus populates the given NodeStatus struct with current
-// values.
-//
-func ReadNodeStatus(st *NodeStatus) {
+// populate the given NodeStatus struct with current values.
+func readNodeStatus(st *NodeStatus) {
        vols := KeepVM.AllReadable()
        if cap(st.Volumes) < len(vols) {
                st.Volumes = make([]*VolumeStatus, len(vols))
@@ -232,9 +238,24 @@ func ReadNodeStatus(st *NodeStatus) {
        st.BufferPool.Alloc = bufs.Alloc()
        st.BufferPool.Cap = bufs.Cap()
        st.BufferPool.Len = bufs.Len()
+       readWorkQueueStatus(&st.PullQueue, pullq)
+       readWorkQueueStatus(&st.TrashQueue, trashq)
        runtime.ReadMemStats(&st.Memory)
 }
 
+// Populate a WorkQueueStatus. This is not atomic, so race conditions
+// can cause InProgress + Queued != Outstanding.
+func readWorkQueueStatus(st *WorkQueueStatus, q *WorkQueue) {
+       if q == nil {
+               // This should only happen during tests.
+               *st = WorkQueueStatus{}
+               return
+       }
+       st.InProgress = q.CountInProgress()
+       st.Outstanding = q.CountOutstanding()
+       st.Queued = q.CountQueued()
+}
+
 // DeleteHandler processes DELETE requests.
 //
 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
index 0d4f9be75beeac7b05141f1bac26be796ba9e94b..e8d390ab2b755dc558a7201ce75b6ecd194348b9 100644 (file)
@@ -236,6 +236,9 @@ func (s *PullWorkerTestSuite) TestPullWorker_invalid_data_manager_token(c *C) {
 }
 
 func performTest(testData PullWorkerTestData, c *C) {
+       KeepVM = MakeTestVolumeManager(2)
+       defer KeepVM.Close()
+
        RunTestPullWorker(c)
        defer pullq.Close()
 
@@ -249,6 +252,7 @@ func performTest(testData PullWorkerTestData, c *C) {
                GetContent = orig
        }(GetContent)
        GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) {
+               c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(1))
                processedPullLists[testData.name] = testData.response_body
                if testData.read_error {
                        err = errors.New("Error getting data")
@@ -276,6 +280,10 @@ func performTest(testData PullWorkerTestData, c *C) {
                }
        }
 
+       c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
+       c.Assert(getStatusItem("PullQueue", "Outstanding"), Equals, float64(0))
+       c.Assert(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
+
        response := IssueRequest(&testData.req)
        c.Assert(response.Code, Equals, testData.response_code)
        c.Assert(response.Body.String(), Equals, testData.response_body)
diff --git a/services/keepstore/status_test.go b/services/keepstore/status_test.go
new file mode 100644 (file)
index 0000000..134b016
--- /dev/null
@@ -0,0 +1,15 @@
+package main
+
+import (
+       "encoding/json"
+)
+
+func getStatusItem(keys ...string) interface{} {
+       resp := IssueRequest(&RequestTester{"/status.json", "", "GET", nil})
+       var s interface{}
+       json.NewDecoder(resp.Body).Decode(&s)
+       for _, k := range keys {
+               s = s.(map[string]interface{})[k]
+       }
+       return s
+}
index 8268191b08eb47c7520f26c9063e51e31e695ca6..433eef57863d751154df4e2c5e937da589b9bab3 100644 (file)
@@ -258,8 +258,39 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
        }
        go RunTrashWorker(trashq)
 
+       // Install gate so all local operations block until we say go
+       gate := make(chan struct{})
+       for _, v := range vols {
+               v.(*MockVolume).Gate = gate
+       }
+
+       assertStatusItem := func(k string, expect float64) {
+               if v := getStatusItem("TrashQueue", k); v != expect {
+                       t.Errorf("Got %s %v, expected %v", k, v, expect)
+               }
+       }
+
+       assertStatusItem("InProgress", 0)
+       assertStatusItem("Outstanding", 0)
+       assertStatusItem("Queued", 0)
+
+       listLen := trashList.Len()
        trashq.ReplaceQueue(trashList)
-       time.Sleep(10 * time.Millisecond) // give a moment to finish processing the list
+
+       // Wait for worker to take request(s)
+       expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.CountOutstanding() })
+       expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.CountInProgress() })
+
+       // Ensure status.json also reports work is happening
+       assertStatusItem("InProgress", float64(1))
+       assertStatusItem("Outstanding", float64(listLen))
+       assertStatusItem("Queued", float64(listLen-1))
+
+       // Let worker proceed
+       close(gate)
+
+       // Wait for worker to finish
+       expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.CountOutstanding() })
 
        // Verify Locator1 to be un/deleted as expected
        data, _ := GetBlock(testData.Locator1, false)
index 261501992f8080110062cc7be7a1828052f24014..d660017aaa253a59cf1b4eb32a215f37f3ff64c3 100644 (file)
@@ -22,13 +22,20 @@ type MockVolume struct {
        // Readonly volumes return an error for Put, Delete, and
        // Touch.
        Readonly bool
-       called   map[string]int
-       mutex    sync.Mutex
+       // Every operation (except Status) starts by receiving from
+       // Gate. Send one value to unblock one operation; close the
+       // channel to unblock all. By default, it is a closed channel,
+       // so all operations proceed without blocking.
+       Gate   chan struct{}
+       called map[string]int
+       mutex  sync.Mutex
 }
 
 // CreateMockVolume returns a non-Bad, non-Readonly, Touchable mock
 // volume.
 func CreateMockVolume() *MockVolume {
+       gate := make(chan struct{})
+       close(gate)
        return &MockVolume{
                Store:      make(map[string][]byte),
                Timestamps: make(map[string]time.Time),
@@ -36,6 +43,7 @@ func CreateMockVolume() *MockVolume {
                Touchable:  true,
                Readonly:   false,
                called:     map[string]int{},
+               Gate:       gate,
        }
 }
 
@@ -62,6 +70,7 @@ func (v *MockVolume) gotCall(method string) {
 
 func (v *MockVolume) Get(loc string) ([]byte, error) {
        v.gotCall("Get")
+       <-v.Gate
        if v.Bad {
                return nil, errors.New("Bad volume")
        } else if block, ok := v.Store[loc]; ok {
@@ -74,6 +83,7 @@ func (v *MockVolume) Get(loc string) ([]byte, error) {
 
 func (v *MockVolume) Put(loc string, block []byte) error {
        v.gotCall("Put")
+       <-v.Gate
        if v.Bad {
                return errors.New("Bad volume")
        }
@@ -86,6 +96,7 @@ func (v *MockVolume) Put(loc string, block []byte) error {
 
 func (v *MockVolume) Touch(loc string) error {
        v.gotCall("Touch")
+       <-v.Gate
        if v.Readonly {
                return MethodDisabledError
        }
@@ -98,6 +109,7 @@ func (v *MockVolume) Touch(loc string) error {
 
 func (v *MockVolume) Mtime(loc string) (time.Time, error) {
        v.gotCall("Mtime")
+       <-v.Gate
        var mtime time.Time
        var err error
        if v.Bad {
@@ -112,6 +124,7 @@ func (v *MockVolume) Mtime(loc string) (time.Time, error) {
 
 func (v *MockVolume) IndexTo(prefix string, w io.Writer) error {
        v.gotCall("IndexTo")
+       <-v.Gate
        for loc, block := range v.Store {
                if !IsValidLocator(loc) || !strings.HasPrefix(loc, prefix) {
                        continue
@@ -127,6 +140,7 @@ func (v *MockVolume) IndexTo(prefix string, w io.Writer) error {
 
 func (v *MockVolume) Delete(loc string) error {
        v.gotCall("Delete")
+       <-v.Gate
        if v.Readonly {
                return MethodDisabledError
        }