6260: Pull entire status object out of WorkQueue atomically.
authorTom Clegg <tom@curoverse.com>
Mon, 10 Aug 2015 16:45:45 +0000 (12:45 -0400)
committerTom Clegg <tom@curoverse.com>
Mon, 10 Aug 2015 16:45:45 +0000 (12:45 -0400)
services/keepstore/handlers.go
services/keepstore/pull_worker_test.go
services/keepstore/trash_worker_test.go
services/keepstore/work_queue.go
services/keepstore/work_queue_test.go

index a6665f61640dfdb5f0791b4fc16fc34f084fe4bf..a86bb6a5b552887836e24cb858191bcbe920e479 100644 (file)
@@ -192,12 +192,6 @@ type PoolStatus struct {
        Len   int    `json:"BuffersInUse"`
 }
 
-type WorkQueueStatus struct {
-       InProgress  int
-       Outstanding int
-       Queued      int
-}
-
 type NodeStatus struct {
        Volumes    []*VolumeStatus `json:"volumes"`
        BufferPool PoolStatus
@@ -238,22 +232,20 @@ func readNodeStatus(st *NodeStatus) {
        st.BufferPool.Alloc = bufs.Alloc()
        st.BufferPool.Cap = bufs.Cap()
        st.BufferPool.Len = bufs.Len()
-       readWorkQueueStatus(&st.PullQueue, pullq)
-       readWorkQueueStatus(&st.TrashQueue, trashq)
+       st.PullQueue = getWorkQueueStatus(pullq)
+       st.TrashQueue = getWorkQueueStatus(trashq)
        runtime.ReadMemStats(&st.Memory)
 }
 
-// Populate a WorkQueueStatus. This is not atomic, so race conditions
-// can cause InProgress + Queued != Outstanding.
-func readWorkQueueStatus(st *WorkQueueStatus, q *WorkQueue) {
+// return a WorkQueueStatus for the given queue. If q is nil (which
+// should never happen except in test suites), return a zero status
+// value instead of crashing.
+func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
        if q == nil {
                // This should only happen during tests.
-               *st = WorkQueueStatus{}
-               return
+               return WorkQueueStatus{}
        }
-       st.InProgress = q.CountInProgress()
-       st.Outstanding = q.CountOutstanding()
-       st.Queued = q.CountQueued()
+       return q.Status()
 }
 
 // DeleteHandler processes DELETE requests.
index e8d390ab2b755dc558a7201ce75b6ecd194348b9..37d83b32802af1432bf7ed8f2af5826a3d757914 100644 (file)
@@ -281,14 +281,16 @@ func performTest(testData PullWorkerTestData, c *C) {
        }
 
        c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
-       c.Assert(getStatusItem("PullQueue", "Outstanding"), Equals, float64(0))
        c.Assert(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
 
        response := IssueRequest(&testData.req)
        c.Assert(response.Code, Equals, testData.response_code)
        c.Assert(response.Body.String(), Equals, testData.response_body)
 
-       expectEqualWithin(c, time.Second, 0, func() interface{} { return pullq.CountOutstanding() })
+       expectEqualWithin(c, time.Second, 0, func() interface{} {
+               st := pullq.Status()
+               return st.InProgress + st.Queued
+       })
 
        if testData.name == "TestPullWorker_pull_list_with_two_items_latest_replacing_old" {
                c.Assert(len(testPullLists), Equals, 2)
index 433eef57863d751154df4e2c5e937da589b9bab3..40b291e6f3a0d268eef374d43a6f489701d02ab9 100644 (file)
@@ -271,26 +271,23 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
        }
 
        assertStatusItem("InProgress", 0)
-       assertStatusItem("Outstanding", 0)
        assertStatusItem("Queued", 0)
 
        listLen := trashList.Len()
        trashq.ReplaceQueue(trashList)
 
        // Wait for worker to take request(s)
-       expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.CountOutstanding() })
-       expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.CountInProgress() })
+       expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.Status().InProgress })
 
        // Ensure status.json also reports work is happening
        assertStatusItem("InProgress", float64(1))
-       assertStatusItem("Outstanding", float64(listLen))
        assertStatusItem("Queued", float64(listLen-1))
 
        // Let worker proceed
        close(gate)
 
        // Wait for worker to finish
-       expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.CountOutstanding() })
+       expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.Status().InProgress })
 
        // Verify Locator1 to be un/deleted as expected
        data, _ := GetBlock(testData.Locator1, false)
index 6ec52742a57afb826128678950d9c90000553233..58e49665950dd929b54668ec4ce7bf5c87174d57 100644 (file)
@@ -85,10 +85,8 @@ package main
 import "container/list"
 
 type WorkQueue struct {
-       countInProgress  chan int
-       countOutstanding chan int
-       countQueued      chan int
-       newlist          chan *list.List
+       getStatus chan WorkQueueStatus
+       newlist   chan *list.List
        // Workers get work items by reading from this channel.
        NextItem <-chan interface{}
        // Each worker must send struct{}{} to ReportDone exactly once
@@ -98,6 +96,11 @@ type WorkQueue struct {
        ReportDone chan<- struct{}
 }
 
+type WorkQueueStatus struct {
+       InProgress int
+       Queued     int
+}
+
 // NewWorkQueue returns a new empty WorkQueue.
 //
 func NewWorkQueue() *WorkQueue {
@@ -105,33 +108,34 @@ func NewWorkQueue() *WorkQueue {
        reportDone := make(chan struct{})
        newList := make(chan *list.List)
        b := WorkQueue{
-               countQueued:      make(chan int),
-               countInProgress:  make(chan int),
-               countOutstanding: make(chan int),
-               newlist:          newList,
-               NextItem:         nextItem,
-               ReportDone:       reportDone,
+               getStatus:  make(chan WorkQueueStatus),
+               newlist:    newList,
+               NextItem:   nextItem,
+               ReportDone: reportDone,
        }
        go func() {
                // Read new work lists from the newlist channel.
-               // Reply to "length" and "get next item" queries by
-               // sending to the countQueued and nextItem channels
+               // Reply to "status" and "get next item" queries by
+               // sending to the getStatus and nextItem channels
                // respectively. Return when the newlist channel
                // closes.
 
                todo := &list.List{}
-               countInProgress := 0
+               status := WorkQueueStatus{}
 
                // When we're done, close the output channel; workers will
                // shut down next time they ask for new work.
                defer close(nextItem)
-               defer close(b.countInProgress)
-               defer close(b.countOutstanding)
-               defer close(b.countQueued)
+               defer close(b.getStatus)
 
+               // nextChan and nextVal are both nil when we have
+               // nothing to send; otherwise they are, respectively,
+               // the nextItem channel and the next work item to send
+               // to it.
                var nextChan chan interface{}
                var nextVal interface{}
-               for newList != nil || countInProgress > 0 {
+
+               for newList != nil || status.InProgress > 0 {
                        select {
                        case p, ok := <-newList:
                                if !ok {
@@ -142,7 +146,8 @@ func NewWorkQueue() *WorkQueue {
                                if todo == nil {
                                        todo = &list.List{}
                                }
-                               if todo.Len() == 0 {
+                               status.Queued = todo.Len()
+                               if status.Queued == 0 {
                                        // Stop sending work
                                        nextChan = nil
                                        nextVal = nil
@@ -151,9 +156,10 @@ func NewWorkQueue() *WorkQueue {
                                        nextVal = todo.Front().Value
                                }
                        case nextChan <- nextVal:
-                               countInProgress++
                                todo.Remove(todo.Front())
-                               if todo.Len() == 0 {
+                               status.InProgress++
+                               status.Queued--
+                               if status.Queued == 0 {
                                        // Stop sending work
                                        nextChan = nil
                                        nextVal = nil
@@ -161,10 +167,8 @@ func NewWorkQueue() *WorkQueue {
                                        nextVal = todo.Front().Value
                                }
                        case <-reportDone:
-                               countInProgress--
-                       case b.countInProgress <- countInProgress:
-                       case b.countOutstanding <- todo.Len() + countInProgress:
-                       case b.countQueued <- todo.Len():
+                               status.InProgress--
+                       case b.getStatus <- status:
                        }
                }
        }()
@@ -184,31 +188,19 @@ func (b *WorkQueue) ReplaceQueue(list *list.List) {
 // abandons any pending requests, but allows any pull request already
 // in progress to continue.
 //
-// After Close, CountX methods will return correct values, NextItem
-// will be closed, and ReplaceQueue will panic.
+// After Close, Status will return correct values, NextItem will be
+// closed, and ReplaceQueue will panic.
 //
 func (b *WorkQueue) Close() {
        close(b.newlist)
 }
 
-// CountOutstanding returns the number of items in the queue or in
-// progress. A return value of 0 guarantees all existing work (work
-// that was sent to ReplaceQueue before CountOutstanding was called)
-// has completed.
-//
-func (b *WorkQueue) CountOutstanding() int {
-       // If the channel is closed, we get zero, which is correct.
-       return <-b.countOutstanding
-}
-
-// CountQueued returns the number of items in the current queue.
-//
-func (b *WorkQueue) CountQueued() int {
-       return <-b.countQueued
-}
-
-// Len returns the number of items in progress.
+// Status returns an up-to-date WorkQueueStatus reflecting the current
+// queue status.
 //
-func (b *WorkQueue) CountInProgress() int {
-       return <-b.countInProgress
+func (b *WorkQueue) Status() WorkQueueStatus {
+       // If the channel is closed, we get the nil value of
+       // WorkQueueStatus, which is an accurate description of a
+       // finished queue.
+       return <-b.getStatus
 }
index df0fa9c61284ba37f5cc68a97bc8e664038fa1e5..7844a2b87cde29546239f505dc5cf949781f87ab 100644 (file)
@@ -84,9 +84,9 @@ func expectEqualWithin(t fatalfer, timeout time.Duration, expect interface{}, f
        }
 }
 
-func expectCountQueued(t fatalfer, b *WorkQueue, expectCountQueued int) {
-       if l := b.CountQueued(); l != expectCountQueued {
-               t.Fatalf("Got CountQueued()==%d, expected %d", l, expectCountQueued)
+func expectQueued(t fatalfer, b *WorkQueue, expectQueued int) {
+       if l := b.Status().Queued; l != expectQueued {
+               t.Fatalf("Got Queued==%d, expected %d", l, expectQueued)
        }
 }
 
@@ -94,18 +94,25 @@ func TestWorkQueueDoneness(t *testing.T) {
        b := NewWorkQueue()
        defer b.Close()
        b.ReplaceQueue(makeTestWorkList([]int{1, 2, 3}))
-       expectCountQueued(t, b, 3)
+       expectQueued(t, b, 3)
+       gate := make(chan struct{})
        go func() {
+               <-gate
                for _ = range b.NextItem {
-                       //time.Sleep(time.Duration(delay.(int)) * time.Millisecond)
+                       <-gate
                        time.Sleep(time.Millisecond)
                        b.ReportDone <- struct{}{}
                }
        }()
-       expectEqualWithin(t, time.Second, 0, func() interface{} { return b.CountOutstanding() })
-       b.ReplaceQueue(makeTestWorkList([]int{400, 5, 6}))
-       expectEqualWithin(t, time.Second, 3, func() interface{} { return b.CountOutstanding() })
-       expectEqualWithin(t, time.Second, 0, func() interface{} { return b.CountOutstanding() })
+       expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
+       b.ReplaceQueue(makeTestWorkList([]int{4, 5, 6}))
+       for i := 1; i <= 3; i++ {
+               gate <- struct{}{}
+               expectEqualWithin(t, time.Second, 3-i, func() interface{} { return b.Status().Queued })
+               expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress })
+       }
+       close(gate)
+       expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
        expectChannelEmpty(t, b.NextItem)
 }
 
@@ -114,10 +121,10 @@ func TestWorkQueueReadWrite(t *testing.T) {
        var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
 
        b := NewWorkQueue()
-       expectCountQueued(t, b, 0)
+       expectQueued(t, b, 0)
 
        b.ReplaceQueue(makeTestWorkList(input))
-       expectCountQueued(t, b, len(input))
+       expectQueued(t, b, len(input))
 
        doWorkItems(t, b, input)
        expectChannelEmpty(t, b.NextItem)
@@ -148,7 +155,7 @@ func TestWorkQueueEarlyRead(t *testing.T) {
        // finish.
        b.ReplaceQueue(makeTestWorkList(input))
        <-done
-       expectCountQueued(t, b, 0)
+       expectQueued(t, b, 0)
 }
 
 // After Close(), NextItem closes, work finishes, then stats return zero.
@@ -166,10 +173,10 @@ func TestWorkQueueClose(t *testing.T) {
        // Wait for worker to take item 1
        <-mark
        b.Close()
-       expectEqualWithin(t, time.Second, 1, func() interface{} { return b.CountOutstanding() })
+       expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress })
        // Tell worker to report done
        mark <- struct{}{}
-       expectEqualWithin(t, time.Second, 0, func() interface{} { return b.CountOutstanding() })
+       expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
        expectChannelClosedWithin(t, time.Second, b.NextItem)
 }