Merge branch 'master' into 6572-doc-copy-pipeline
authorradhika <radhika@curoverse.com>
Wed, 12 Aug 2015 15:05:14 +0000 (11:05 -0400)
committerradhika <radhika@curoverse.com>
Wed, 12 Aug 2015 15:05:14 +0000 (11:05 -0400)
13 files changed:
doc/_config.yml
services/keepstore/bufferpool_test.go
services/keepstore/handlers.go
services/keepstore/perms_test.go
services/keepstore/pull_worker.go
services/keepstore/pull_worker_integration_test.go
services/keepstore/pull_worker_test.go
services/keepstore/status_test.go [new file with mode: 0644]
services/keepstore/trash_worker.go
services/keepstore/trash_worker_test.go
services/keepstore/volume_test.go
services/keepstore/work_queue.go
services/keepstore/work_queue_test.go

index dfff6d03f3605748eeb816a80ede642227ef2e3a..f21fdeef81dd24f662c23ce8adf9a115752430aa 100644 (file)
@@ -150,10 +150,10 @@ navbar:
       - install/install-manual-prerequisites.html.textile.liquid
       - install/install-sso.html.textile.liquid
       - install/install-api-server.html.textile.liquid
+      - install/install-arv-git-httpd.html.textile.liquid
       - install/install-workbench-app.html.textile.liquid
       - install/install-shell-server.html.textile.liquid
       - install/create-standard-objects.html.textile.liquid
-      - install/install-arv-git-httpd.html.textile.liquid
       - install/install-keepstore.html.textile.liquid
       - install/install-keepproxy.html.textile.liquid
       - install/install-crunch-dispatch.html.textile.liquid
index 718e2cac6092fe4400e4d370945e17d603010dac..95d118e221de6b8516654f8a133b871da00c5cd2 100644 (file)
@@ -21,6 +21,11 @@ func init() {
        bufs = newBufferPool(maxBuffers, BLOCKSIZE)
 }
 
+// Restore sane default after bufferpool's own tests
+func (s *BufferPoolSuite) TearDownTest(c *C) {
+       bufs = newBufferPool(maxBuffers, BLOCKSIZE)
+}
+
 func (s *BufferPoolSuite) TestBufferPoolBufSize(c *C) {
        bufs := newBufferPool(2, 10)
        b1 := bufs.Get(1)
index d1169d5df9ce5549cdfd4f5a56fb095ad12cc8f6..a86bb6a5b552887836e24cb858191bcbe920e479 100644 (file)
@@ -193,16 +193,19 @@ type PoolStatus struct {
 }
 
 type NodeStatus struct {
-       Volumes    []*VolumeStatus  `json:"volumes"`
+       Volumes    []*VolumeStatus `json:"volumes"`
        BufferPool PoolStatus
+       PullQueue  WorkQueueStatus
+       TrashQueue WorkQueueStatus
        Memory     runtime.MemStats
 }
 
 var st NodeStatus
 var stLock sync.Mutex
+
 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
        stLock.Lock()
-       ReadNodeStatus(&st)
+       readNodeStatus(&st)
        jstat, err := json.Marshal(&st)
        stLock.Unlock()
        if err == nil {
@@ -214,10 +217,8 @@ func StatusHandler(resp http.ResponseWriter, req *http.Request) {
        }
 }
 
-// ReadNodeStatus populates the given NodeStatus struct with current
-// values.
-//
-func ReadNodeStatus(st *NodeStatus) {
+// populate the given NodeStatus struct with current values.
+func readNodeStatus(st *NodeStatus) {
        vols := KeepVM.AllReadable()
        if cap(st.Volumes) < len(vols) {
                st.Volumes = make([]*VolumeStatus, len(vols))
@@ -231,9 +232,22 @@ func ReadNodeStatus(st *NodeStatus) {
        st.BufferPool.Alloc = bufs.Alloc()
        st.BufferPool.Cap = bufs.Cap()
        st.BufferPool.Len = bufs.Len()
+       st.PullQueue = getWorkQueueStatus(pullq)
+       st.TrashQueue = getWorkQueueStatus(trashq)
        runtime.ReadMemStats(&st.Memory)
 }
 
+// return a WorkQueueStatus for the given queue. If q is nil (which
+// should never happen except in test suites), return a zero status
+// value instead of crashing.
+func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
+       if q == nil {
+               // This should only happen during tests.
+               return WorkQueueStatus{}
+       }
+       return q.Status()
+}
+
 // DeleteHandler processes DELETE requests.
 //
 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
index 85883b03a7d33a416947d823957f1ec1a3ffac7d..e43cb8dcd99bf39d4318153525b4f46c660239ce 100644 (file)
@@ -48,7 +48,7 @@ func TestVerifySignatureExtraHints(t *testing.T) {
        PermissionSecret = []byte(known_key)
        defer func() { PermissionSecret = nil }()
 
-       if VerifySignature(known_locator+"+K@xyzzy"+known_sig_hint, known_token) != nil{
+       if VerifySignature(known_locator+"+K@xyzzy"+known_sig_hint, known_token) != nil {
                t.Fatal("Verify cannot handle hint before permission signature")
        }
 
index 3d67cf2c1ec0336e73e4e9e7cca26bf4e758b544..acf861119f47fd1b765bcad461d826c369151968 100644 (file)
@@ -24,6 +24,7 @@ func RunPullWorker(pullq *WorkQueue, keepClient *keepclient.KeepClient) {
        for item := range nextItem {
                pullRequest := item.(PullRequest)
                err := PullItemAndProcess(item.(PullRequest), GenerateRandomApiToken(), keepClient)
+               pullq.DoneItem <- struct{}{}
                if err == nil {
                        log.Printf("Pull %s success", pullRequest)
                } else {
index 762abff533213747b16562b5a6127b0efa4203f2..3e57407369c0dccf8216e3e0835820c0bc419b55 100644 (file)
@@ -107,7 +107,7 @@ func TestPullWorkerIntegration_GetExistingLocator(t *testing.T) {
 func performPullWorkerIntegrationTest(testData PullWorkIntegrationTestData, pullRequest PullRequest, t *testing.T) {
 
        // Override PutContent to mock PutBlock functionality
-       defer func(orig func([]byte, string)(error)) { PutContent = orig }(PutContent)
+       defer func(orig func([]byte, string) error) { PutContent = orig }(PutContent)
        PutContent = func(content []byte, locator string) (err error) {
                if string(content) != testData.Content {
                        t.Errorf("PutContent invoked with unexpected data. Expected: %s; Found: %s", testData.Content, content)
@@ -116,7 +116,9 @@ func performPullWorkerIntegrationTest(testData PullWorkIntegrationTestData, pull
        }
 
        // Override GetContent to mock keepclient Get functionality
-       defer func(orig func(string, *keepclient.KeepClient)(io.ReadCloser, int64, string, error)) { GetContent = orig }(GetContent)
+       defer func(orig func(string, *keepclient.KeepClient) (io.ReadCloser, int64, string, error)) {
+               GetContent = orig
+       }(GetContent)
        GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (
                reader io.ReadCloser, contentLength int64, url string, err error) {
                if testData.GetError != "" {
index 0833bc696763a867bfab287a75805b57ce933ae6..37d83b32802af1432bf7ed8f2af5826a3d757914 100644 (file)
@@ -9,6 +9,7 @@ import (
        "io"
        "net/http"
        "testing"
+       "time"
 )
 
 type PullWorkerTestSuite struct{}
@@ -22,7 +23,6 @@ func TestPullWorker(t *testing.T) {
 var _ = Suite(&PullWorkerTestSuite{})
 
 var testPullLists map[string]string
-var processedPullLists map[string]string
 var readContent string
 var readError error
 var putContent []byte
@@ -39,7 +39,6 @@ func (s *PullWorkerTestSuite) SetUpTest(c *C) {
        // This behavior is verified using these two maps in the
        // "TestPullWorker_pull_list_with_two_items_latest_replacing_old"
        testPullLists = make(map[string]string)
-       processedPullLists = make(map[string]string)
 }
 
 // Since keepstore does not come into picture in tests,
@@ -237,16 +236,23 @@ func (s *PullWorkerTestSuite) TestPullWorker_invalid_data_manager_token(c *C) {
 }
 
 func performTest(testData PullWorkerTestData, c *C) {
+       KeepVM = MakeTestVolumeManager(2)
+       defer KeepVM.Close()
+
        RunTestPullWorker(c)
+       defer pullq.Close()
 
        currentTestData = testData
        testPullLists[testData.name] = testData.response_body
 
-       // Override GetContent to mock keepclient Get functionality
-       defer func(orig func(string, *keepclient.KeepClient)(io.ReadCloser, int64, string, error)) { GetContent = orig }(GetContent)
-       GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (
-               reader io.ReadCloser, contentLength int64, url string, err error) {
+       processedPullLists := make(map[string]string)
 
+       // Override GetContent to mock keepclient Get functionality
+       defer func(orig func(string, *keepclient.KeepClient) (io.ReadCloser, int64, string, error)) {
+               GetContent = orig
+       }(GetContent)
+       GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) {
+               c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(1))
                processedPullLists[testData.name] = testData.response_body
                if testData.read_error {
                        err = errors.New("Error getting data")
@@ -262,7 +268,7 @@ func performTest(testData PullWorkerTestData, c *C) {
        }
 
        // Override PutContent to mock PutBlock functionality
-       defer func(orig func([]byte, string)(error)) { PutContent = orig }(PutContent)
+       defer func(orig func([]byte, string) error) { PutContent = orig }(PutContent)
        PutContent = func(content []byte, locator string) (err error) {
                if testData.put_error {
                        err = errors.New("Error putting data")
@@ -274,13 +280,17 @@ func performTest(testData PullWorkerTestData, c *C) {
                }
        }
 
+       c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
+       c.Assert(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
+
        response := IssueRequest(&testData.req)
        c.Assert(response.Code, Equals, testData.response_code)
        c.Assert(response.Body.String(), Equals, testData.response_body)
 
-       expectWorkerChannelEmpty(c, pullq.NextItem)
-
-       pullq.Close()
+       expectEqualWithin(c, time.Second, 0, func() interface{} {
+               st := pullq.Status()
+               return st.InProgress + st.Queued
+       })
 
        if testData.name == "TestPullWorker_pull_list_with_two_items_latest_replacing_old" {
                c.Assert(len(testPullLists), Equals, 2)
@@ -311,6 +321,8 @@ func performTest(testData PullWorkerTestData, c *C) {
                        c.Assert(string(putContent), Equals, testData.read_content)
                }
        }
+
+       expectChannelEmpty(c, pullq.NextItem)
 }
 
 type ClosingBuffer struct {
@@ -320,19 +332,3 @@ type ClosingBuffer struct {
 func (cb *ClosingBuffer) Close() (err error) {
        return
 }
-
-func expectWorkerChannelEmpty(c *C, workerChannel <-chan interface{}) {
-       select {
-       case item := <-workerChannel:
-               c.Fatalf("Received value (%v) from channel that was expected to be empty", item)
-       default:
-       }
-}
-
-func expectWorkerChannelNotEmpty(c *C, workerChannel <-chan interface{}) {
-       select {
-       case item := <-workerChannel:
-               c.Fatalf("Received value (%v) from channel that was expected to be empty", item)
-       default:
-       }
-}
diff --git a/services/keepstore/status_test.go b/services/keepstore/status_test.go
new file mode 100644 (file)
index 0000000..74de61e
--- /dev/null
@@ -0,0 +1,21 @@
+package main
+
+import (
+       "encoding/json"
+)
+
+// We don't have isolated unit tests for /status.json yet, but we do
+// check (e.g., in pull_worker_test.go) that /status.json reports
+// specific statistics correctly at the appropriate times.
+
+// getStatusItem("foo","bar","baz") retrieves /status.json, decodes
+// the response body into resp, and returns resp["foo"]["bar"]["baz"].
+func getStatusItem(keys ...string) interface{} {
+       resp := IssueRequest(&RequestTester{"/status.json", "", "GET", nil})
+       var s interface{}
+       json.NewDecoder(resp.Body).Decode(&s)
+       for _, k := range keys {
+               s = s.(map[string]interface{})[k]
+       }
+       return s
+}
index 4fbe4bb59624e22e820cf7eab7c2bb5013212e14..8f78658c3a7496473c2d81a7f0d7b13213ef9d5f 100644 (file)
@@ -18,6 +18,7 @@ func RunTrashWorker(trashq *WorkQueue) {
        for item := range trashq.NextItem {
                trashRequest := item.(TrashRequest)
                TrashItem(trashRequest)
+               trashq.DoneItem <- struct{}{}
        }
 }
 
index 8268191b08eb47c7520f26c9063e51e31e695ca6..40b291e6f3a0d268eef374d43a6f489701d02ab9 100644 (file)
@@ -258,8 +258,36 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
        }
        go RunTrashWorker(trashq)
 
+       // Install gate so all local operations block until we say go
+       gate := make(chan struct{})
+       for _, v := range vols {
+               v.(*MockVolume).Gate = gate
+       }
+
+       assertStatusItem := func(k string, expect float64) {
+               if v := getStatusItem("TrashQueue", k); v != expect {
+                       t.Errorf("Got %s %v, expected %v", k, v, expect)
+               }
+       }
+
+       assertStatusItem("InProgress", 0)
+       assertStatusItem("Queued", 0)
+
+       listLen := trashList.Len()
        trashq.ReplaceQueue(trashList)
-       time.Sleep(10 * time.Millisecond) // give a moment to finish processing the list
+
+       // Wait for worker to take request(s)
+       expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.Status().InProgress })
+
+       // Ensure status.json also reports work is happening
+       assertStatusItem("InProgress", float64(1))
+       assertStatusItem("Queued", float64(listLen-1))
+
+       // Let worker proceed
+       close(gate)
+
+       // Wait for worker to finish
+       expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.Status().InProgress })
 
        // Verify Locator1 to be un/deleted as expected
        data, _ := GetBlock(testData.Locator1, false)
index 261501992f8080110062cc7be7a1828052f24014..c5a7491b3d542951983ad67e7eec29baa972f966 100644 (file)
@@ -22,13 +22,23 @@ type MockVolume struct {
        // Readonly volumes return an error for Put, Delete, and
        // Touch.
        Readonly bool
-       called   map[string]int
-       mutex    sync.Mutex
+       // Gate is a "starting gate", allowing test cases to pause
+       // volume operations long enough to inspect state. Every
+       // operation (except Status) starts by receiving from
+       // Gate. Sending one value unblocks one operation; closing the
+       // channel unblocks all operations. By default, Gate is a
+       // closed channel, so all operations proceed without
+       // blocking. See trash_worker_test.go for an example.
+       Gate   chan struct{}
+       called map[string]int
+       mutex  sync.Mutex
 }
 
 // CreateMockVolume returns a non-Bad, non-Readonly, Touchable mock
 // volume.
 func CreateMockVolume() *MockVolume {
+       gate := make(chan struct{})
+       close(gate)
        return &MockVolume{
                Store:      make(map[string][]byte),
                Timestamps: make(map[string]time.Time),
@@ -36,6 +46,7 @@ func CreateMockVolume() *MockVolume {
                Touchable:  true,
                Readonly:   false,
                called:     map[string]int{},
+               Gate:       gate,
        }
 }
 
@@ -62,6 +73,7 @@ func (v *MockVolume) gotCall(method string) {
 
 func (v *MockVolume) Get(loc string) ([]byte, error) {
        v.gotCall("Get")
+       <-v.Gate
        if v.Bad {
                return nil, errors.New("Bad volume")
        } else if block, ok := v.Store[loc]; ok {
@@ -74,6 +86,7 @@ func (v *MockVolume) Get(loc string) ([]byte, error) {
 
 func (v *MockVolume) Put(loc string, block []byte) error {
        v.gotCall("Put")
+       <-v.Gate
        if v.Bad {
                return errors.New("Bad volume")
        }
@@ -86,6 +99,7 @@ func (v *MockVolume) Put(loc string, block []byte) error {
 
 func (v *MockVolume) Touch(loc string) error {
        v.gotCall("Touch")
+       <-v.Gate
        if v.Readonly {
                return MethodDisabledError
        }
@@ -98,6 +112,7 @@ func (v *MockVolume) Touch(loc string) error {
 
 func (v *MockVolume) Mtime(loc string) (time.Time, error) {
        v.gotCall("Mtime")
+       <-v.Gate
        var mtime time.Time
        var err error
        if v.Bad {
@@ -112,6 +127,7 @@ func (v *MockVolume) Mtime(loc string) (time.Time, error) {
 
 func (v *MockVolume) IndexTo(prefix string, w io.Writer) error {
        v.gotCall("IndexTo")
+       <-v.Gate
        for loc, block := range v.Store {
                if !IsValidLocator(loc) || !strings.HasPrefix(loc, prefix) {
                        continue
@@ -127,6 +143,7 @@ func (v *MockVolume) IndexTo(prefix string, w io.Writer) error {
 
 func (v *MockVolume) Delete(loc string) error {
        v.gotCall("Delete")
+       <-v.Gate
        if v.Readonly {
                return MethodDisabledError
        }
index 9509cacd774f5acd64372b55b1841bab55e5b023..f1878ffbbc550250ab88c5ea9a4a694d12d63132 100644 (file)
@@ -85,76 +85,122 @@ package main
 import "container/list"
 
 type WorkQueue struct {
-       newlist  chan *list.List
-       NextItem chan interface{}
+       getStatus chan WorkQueueStatus
+       newlist   chan *list.List
+       // Workers get work items by reading from this channel.
+       NextItem <-chan interface{}
+       // Each worker must send struct{}{} to DoneItem exactly once
+       // for each work item received from NextItem, when it stops
+       // working on that item (regardless of whether the work was
+       // successful).
+       DoneItem chan<- struct{}
 }
 
-// NewWorkQueue returns a new worklist, and launches a listener
-// goroutine that waits for work and farms it out to workers.
+type WorkQueueStatus struct {
+       InProgress int
+       Queued     int
+}
+
+// NewWorkQueue returns a new empty WorkQueue.
 //
 func NewWorkQueue() *WorkQueue {
+       nextItem := make(chan interface{})
+       reportDone := make(chan struct{})
+       newList := make(chan *list.List)
        b := WorkQueue{
-               newlist:  make(chan *list.List),
-               NextItem: make(chan interface{}),
+               getStatus: make(chan WorkQueueStatus),
+               newlist:   newList,
+               NextItem:  nextItem,
+               DoneItem:  reportDone,
        }
-       go b.listen()
+       go func() {
+               // Read new work lists from the newlist channel.
+               // Reply to "status" and "get next item" queries by
+               // sending to the getStatus and nextItem channels
+               // respectively. Return when the newlist channel
+               // closes.
+
+               todo := &list.List{}
+               status := WorkQueueStatus{}
+
+               // When we're done, close the output channel; workers will
+               // shut down next time they ask for new work.
+               defer close(nextItem)
+               defer close(b.getStatus)
+
+               // nextChan and nextVal are both nil when we have
+               // nothing to send; otherwise they are, respectively,
+               // the nextItem channel and the next work item to send
+               // to it.
+               var nextChan chan interface{}
+               var nextVal interface{}
+
+               for newList != nil || status.InProgress > 0 {
+                       select {
+                       case p, ok := <-newList:
+                               if !ok {
+                                       // Closed, stop receiving
+                                       newList = nil
+                               }
+                               todo = p
+                               if todo == nil {
+                                       todo = &list.List{}
+                               }
+                               status.Queued = todo.Len()
+                               if status.Queued == 0 {
+                                       // Stop sending work
+                                       nextChan = nil
+                                       nextVal = nil
+                               } else {
+                                       nextChan = nextItem
+                                       nextVal = todo.Front().Value
+                               }
+                       case nextChan <- nextVal:
+                               todo.Remove(todo.Front())
+                               status.InProgress++
+                               status.Queued--
+                               if status.Queued == 0 {
+                                       // Stop sending work
+                                       nextChan = nil
+                                       nextVal = nil
+                               } else {
+                                       nextVal = todo.Front().Value
+                               }
+                       case <-reportDone:
+                               status.InProgress--
+                       case b.getStatus <- status:
+                       }
+               }
+       }()
        return &b
 }
 
-// ReplaceQueue sends a new list of pull requests to the manager goroutine.
-// The manager will discard any outstanding pull list and begin
-// working on the new list.
+// ReplaceQueue abandons any work items left in the existing queue,
+// and starts giving workers items from the given list. After giving
+// it to ReplaceQueue, the caller must not read or write the given
+// list.
 //
 func (b *WorkQueue) ReplaceQueue(list *list.List) {
        b.newlist <- list
 }
 
 // Close shuts down the manager and terminates the goroutine, which
-// completes any pull request in progress and abandons any pending
-// requests.
+// abandons any pending requests, but allows any pull request already
+// in progress to continue.
+//
+// After Close, Status will return correct values, NextItem will be
+// closed, and ReplaceQueue will panic.
 //
 func (b *WorkQueue) Close() {
        close(b.newlist)
 }
 
-// listen is run in a goroutine. It reads new pull lists from its
-// input queue until the queue is closed.
-// listen takes ownership of the list that is passed to it.
-//
-// Note that the routine does not ever need to access the list
-// itself once the current_item has been initialized, so we do
-// not bother to keep a pointer to the list. Because it is a
-// doubly linked list, holding on to the current item will keep
-// it from garbage collection.
+// Status returns an up-to-date WorkQueueStatus reflecting the current
+// queue status.
 //
-func (b *WorkQueue) listen() {
-       var current_item *list.Element
-
-       // When we're done, close the output channel to shut down any
-       // workers.
-       defer close(b.NextItem)
-
-       for {
-               // If the current list is empty, wait for a new list before
-               // even checking if workers are ready.
-               if current_item == nil {
-                       if p, ok := <-b.newlist; ok {
-                               current_item = p.Front()
-                       } else {
-                               // The channel was closed; shut down.
-                               return
-                       }
-               }
-               select {
-               case p, ok := <-b.newlist:
-                       if ok {
-                               current_item = p.Front()
-                       } else {
-                               // The input channel is closed; time to shut down
-                               return
-                       }
-               case b.NextItem <- current_item.Value:
-                       current_item = current_item.Next()
-               }
-       }
+func (b *WorkQueue) Status() WorkQueueStatus {
+       // If the channel is closed, we get the nil value of
+       // WorkQueueStatus, which is an accurate description of a
+       // finished queue.
+       return <-b.getStatus
 }
index 144e4c252be9ba1a069d8cf862f72c4284a28479..74c67f2dd0a6c1ee69748d24c162d95b5c98b16a 100644 (file)
@@ -2,9 +2,15 @@ package main
 
 import (
        "container/list"
+       "runtime"
        "testing"
+       "time"
 )
 
+type fatalfer interface {
+       Fatalf(string, ...interface{})
+}
+
 func makeTestWorkList(ary []int) *list.List {
        l := list.New()
        for _, n := range ary {
@@ -13,40 +19,101 @@ func makeTestWorkList(ary []int) *list.List {
        return l
 }
 
-func expectChannelEmpty(t *testing.T, c <-chan interface{}) {
+func expectChannelEmpty(t fatalfer, c <-chan interface{}) {
        select {
-       case item := <-c:
-               t.Fatalf("Received value (%v) from channel that we expected to be empty", item)
+       case item, ok := <-c:
+               if ok {
+                       t.Fatalf("Received value (%+v) from channel that we expected to be empty", item)
+               }
        default:
-               // no-op
        }
 }
 
-func expectChannelNotEmpty(t *testing.T, c <-chan interface{}) {
-       if item, ok := <-c; !ok {
-               t.Fatal("expected data on a closed channel")
-       } else if item == nil {
-               t.Fatal("expected data on an empty channel")
+func expectChannelNotEmpty(t fatalfer, c <-chan interface{}) interface{} {
+       select {
+       case item, ok := <-c:
+               if !ok {
+                       t.Fatalf("expected data on a closed channel")
+               }
+               return item
+       case <-time.After(time.Second):
+               t.Fatalf("expected data on an empty channel")
+               return nil
        }
 }
 
-func expectChannelClosed(t *testing.T, c <-chan interface{}) {
-       received, ok := <-c
-       if ok {
-               t.Fatalf("Expected channel to be closed, but received %v instead", received)
+func expectChannelClosedWithin(t fatalfer, timeout time.Duration, c <-chan interface{}) {
+       select {
+       case received, ok := <-c:
+               if ok {
+                       t.Fatalf("Expected channel to be closed, but received %+v instead", received)
+               }
+       case <-time.After(timeout):
+               t.Fatalf("Expected channel to be closed, but it is still open after %v", timeout)
        }
 }
 
-func expectFromChannel(t *testing.T, c <-chan interface{}, expected []int) {
+func doWorkItems(t fatalfer, q *WorkQueue, expected []int) {
        for i := range expected {
-               actual, ok := <-c
-               t.Logf("received %v", actual)
+               actual, ok := <-q.NextItem
                if !ok {
-                       t.Fatalf("Expected %v but channel was closed after receiving the first %d elements correctly.", expected, i)
-               } else if actual.(int) != expected[i] {
-                       t.Fatalf("Expected %v but received '%v' after receiving the first %d elements correctly.", expected[i], actual, i)
+                       t.Fatalf("Expected %+v but channel was closed after receiving %+v as expected.", expected, expected[:i])
+               }
+               q.DoneItem <- struct{}{}
+               if actual.(int) != expected[i] {
+                       t.Fatalf("Expected %+v but received %+v after receiving %+v as expected.", expected[i], actual, expected[:i])
+               }
+       }
+}
+
+func expectEqualWithin(t fatalfer, timeout time.Duration, expect interface{}, f func() interface{}) {
+       ok := make(chan struct{})
+       giveup := false
+       go func() {
+               for f() != expect && !giveup {
+                       time.Sleep(time.Millisecond)
+               }
+               close(ok)
+       }()
+       select {
+       case <-ok:
+       case <-time.After(timeout):
+               giveup = true
+               _, file, line, _ := runtime.Caller(1)
+               t.Fatalf("Still getting %+v, timed out waiting for %+v\n%s:%d", f(), expect, file, line)
+       }
+}
+
+func expectQueued(t fatalfer, b *WorkQueue, expectQueued int) {
+       if l := b.Status().Queued; l != expectQueued {
+               t.Fatalf("Got Queued==%d, expected %d", l, expectQueued)
+       }
+}
+
+func TestWorkQueueDoneness(t *testing.T) {
+       b := NewWorkQueue()
+       defer b.Close()
+       b.ReplaceQueue(makeTestWorkList([]int{1, 2, 3}))
+       expectQueued(t, b, 3)
+       gate := make(chan struct{})
+       go func() {
+               <-gate
+               for _ = range b.NextItem {
+                       <-gate
+                       time.Sleep(time.Millisecond)
+                       b.DoneItem <- struct{}{}
                }
+       }()
+       expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
+       b.ReplaceQueue(makeTestWorkList([]int{4, 5, 6}))
+       for i := 1; i <= 3; i++ {
+               gate <- struct{}{}
+               expectEqualWithin(t, time.Second, 3-i, func() interface{} { return b.Status().Queued })
+               expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress })
        }
+       close(gate)
+       expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
+       expectChannelEmpty(t, b.NextItem)
 }
 
 // Create a WorkQueue, generate a list for it, and instantiate a worker.
@@ -54,9 +121,12 @@ func TestWorkQueueReadWrite(t *testing.T) {
        var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
 
        b := NewWorkQueue()
+       expectQueued(t, b, 0)
+
        b.ReplaceQueue(makeTestWorkList(input))
+       expectQueued(t, b, len(input))
 
-       expectFromChannel(t, b.NextItem, input)
+       doWorkItems(t, b, input)
        expectChannelEmpty(t, b.NextItem)
        b.Close()
 }
@@ -66,6 +136,7 @@ func TestWorkQueueEarlyRead(t *testing.T) {
        var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
 
        b := NewWorkQueue()
+       defer b.Close()
 
        // First, demonstrate that nothing is available on the NextItem
        // channel.
@@ -76,8 +147,7 @@ func TestWorkQueueEarlyRead(t *testing.T) {
        //
        done := make(chan int)
        go func() {
-               expectFromChannel(t, b.NextItem, input)
-               b.Close()
+               doWorkItems(t, b, input)
                done <- 1
        }()
 
@@ -85,8 +155,29 @@ func TestWorkQueueEarlyRead(t *testing.T) {
        // finish.
        b.ReplaceQueue(makeTestWorkList(input))
        <-done
+       expectQueued(t, b, 0)
+}
 
-       expectChannelClosed(t, b.NextItem)
+// After Close(), NextItem closes, work finishes, then stats return zero.
+func TestWorkQueueClose(t *testing.T) {
+       b := NewWorkQueue()
+       input := []int{1, 2, 3, 4, 5, 6, 7, 8}
+       mark := make(chan struct{})
+       go func() {
+               <-b.NextItem
+               mark <- struct{}{}
+               <-mark
+               b.DoneItem <- struct{}{}
+       }()
+       b.ReplaceQueue(makeTestWorkList(input))
+       // Wait for worker to take item 1
+       <-mark
+       b.Close()
+       expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress })
+       // Tell worker to report done
+       mark <- struct{}{}
+       expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
+       expectChannelClosedWithin(t, time.Second, b.NextItem)
 }
 
 // Show that a reader may block when the manager's list is exhausted,
@@ -99,10 +190,11 @@ func TestWorkQueueReaderBlocks(t *testing.T) {
        )
 
        b := NewWorkQueue()
+       defer b.Close()
        sendmore := make(chan int)
        done := make(chan int)
        go func() {
-               expectFromChannel(t, b.NextItem, inputBeforeBlock)
+               doWorkItems(t, b, inputBeforeBlock)
 
                // Confirm that the channel is empty, so a subsequent read
                // on it will block.
@@ -110,8 +202,7 @@ func TestWorkQueueReaderBlocks(t *testing.T) {
 
                // Signal that we're ready for more input.
                sendmore <- 1
-               expectFromChannel(t, b.NextItem, inputAfterBlock)
-               b.Close()
+               doWorkItems(t, b, inputAfterBlock)
                done <- 1
        }()
 
@@ -136,14 +227,14 @@ func TestWorkQueueReplaceQueue(t *testing.T) {
 
        // Read just the first five elements from the work list.
        // Confirm that the channel is not empty.
-       expectFromChannel(t, b.NextItem, firstInput[0:5])
+       doWorkItems(t, b, firstInput[0:5])
        expectChannelNotEmpty(t, b.NextItem)
 
        // Replace the work list and read five more elements.
        // The old list should have been discarded and all new
        // elements come from the new list.
        b.ReplaceQueue(makeTestWorkList(replaceInput))
-       expectFromChannel(t, b.NextItem, replaceInput[0:5])
+       doWorkItems(t, b, replaceInput[0:5])
 
        b.Close()
 }