From: Tom Clegg Date: Mon, 10 Aug 2015 16:45:45 +0000 (-0400) Subject: 6260: Pull entire status object out of WorkQueue atomically. X-Git-Tag: 1.1.0~1408^2~3 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/d89b7ae1f6fa35dd3627ead14c855751f1de2193 6260: Pull entire status object out of WorkQueue atomically. --- diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go index a6665f6164..a86bb6a5b5 100644 --- a/services/keepstore/handlers.go +++ b/services/keepstore/handlers.go @@ -192,12 +192,6 @@ type PoolStatus struct { Len int `json:"BuffersInUse"` } -type WorkQueueStatus struct { - InProgress int - Outstanding int - Queued int -} - type NodeStatus struct { Volumes []*VolumeStatus `json:"volumes"` BufferPool PoolStatus @@ -238,22 +232,20 @@ func readNodeStatus(st *NodeStatus) { st.BufferPool.Alloc = bufs.Alloc() st.BufferPool.Cap = bufs.Cap() st.BufferPool.Len = bufs.Len() - readWorkQueueStatus(&st.PullQueue, pullq) - readWorkQueueStatus(&st.TrashQueue, trashq) + st.PullQueue = getWorkQueueStatus(pullq) + st.TrashQueue = getWorkQueueStatus(trashq) runtime.ReadMemStats(&st.Memory) } -// Populate a WorkQueueStatus. This is not atomic, so race conditions -// can cause InProgress + Queued != Outstanding. -func readWorkQueueStatus(st *WorkQueueStatus, q *WorkQueue) { +// return a WorkQueueStatus for the given queue. If q is nil (which +// should never happen except in test suites), return a zero status +// value instead of crashing. +func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus { if q == nil { // This should only happen during tests. - *st = WorkQueueStatus{} - return + return WorkQueueStatus{} } - st.InProgress = q.CountInProgress() - st.Outstanding = q.CountOutstanding() - st.Queued = q.CountQueued() + return q.Status() } // DeleteHandler processes DELETE requests. diff --git a/services/keepstore/pull_worker_test.go b/services/keepstore/pull_worker_test.go index e8d390ab2b..37d83b3280 100644 --- a/services/keepstore/pull_worker_test.go +++ b/services/keepstore/pull_worker_test.go @@ -281,14 +281,16 @@ func performTest(testData PullWorkerTestData, c *C) { } c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(0)) - c.Assert(getStatusItem("PullQueue", "Outstanding"), Equals, float64(0)) c.Assert(getStatusItem("PullQueue", "Queued"), Equals, float64(0)) response := IssueRequest(&testData.req) c.Assert(response.Code, Equals, testData.response_code) c.Assert(response.Body.String(), Equals, testData.response_body) - expectEqualWithin(c, time.Second, 0, func() interface{} { return pullq.CountOutstanding() }) + expectEqualWithin(c, time.Second, 0, func() interface{} { + st := pullq.Status() + return st.InProgress + st.Queued + }) if testData.name == "TestPullWorker_pull_list_with_two_items_latest_replacing_old" { c.Assert(len(testPullLists), Equals, 2) diff --git a/services/keepstore/trash_worker_test.go b/services/keepstore/trash_worker_test.go index 433eef5786..40b291e6f3 100644 --- a/services/keepstore/trash_worker_test.go +++ b/services/keepstore/trash_worker_test.go @@ -271,26 +271,23 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) { } assertStatusItem("InProgress", 0) - assertStatusItem("Outstanding", 0) assertStatusItem("Queued", 0) listLen := trashList.Len() trashq.ReplaceQueue(trashList) // Wait for worker to take request(s) - expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.CountOutstanding() }) - expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.CountInProgress() }) + expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.Status().InProgress }) // Ensure status.json also reports work is happening assertStatusItem("InProgress", float64(1)) - assertStatusItem("Outstanding", float64(listLen)) assertStatusItem("Queued", float64(listLen-1)) // Let worker proceed close(gate) // Wait for worker to finish - expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.CountOutstanding() }) + expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.Status().InProgress }) // Verify Locator1 to be un/deleted as expected data, _ := GetBlock(testData.Locator1, false) diff --git a/services/keepstore/work_queue.go b/services/keepstore/work_queue.go index 6ec52742a5..58e4966595 100644 --- a/services/keepstore/work_queue.go +++ b/services/keepstore/work_queue.go @@ -85,10 +85,8 @@ package main import "container/list" type WorkQueue struct { - countInProgress chan int - countOutstanding chan int - countQueued chan int - newlist chan *list.List + getStatus chan WorkQueueStatus + newlist chan *list.List // Workers get work items by reading from this channel. NextItem <-chan interface{} // Each worker must send struct{}{} to ReportDone exactly once @@ -98,6 +96,11 @@ type WorkQueue struct { ReportDone chan<- struct{} } +type WorkQueueStatus struct { + InProgress int + Queued int +} + // NewWorkQueue returns a new empty WorkQueue. // func NewWorkQueue() *WorkQueue { @@ -105,33 +108,34 @@ func NewWorkQueue() *WorkQueue { reportDone := make(chan struct{}) newList := make(chan *list.List) b := WorkQueue{ - countQueued: make(chan int), - countInProgress: make(chan int), - countOutstanding: make(chan int), - newlist: newList, - NextItem: nextItem, - ReportDone: reportDone, + getStatus: make(chan WorkQueueStatus), + newlist: newList, + NextItem: nextItem, + ReportDone: reportDone, } go func() { // Read new work lists from the newlist channel. - // Reply to "length" and "get next item" queries by - // sending to the countQueued and nextItem channels + // Reply to "status" and "get next item" queries by + // sending to the getStatus and nextItem channels // respectively. Return when the newlist channel // closes. todo := &list.List{} - countInProgress := 0 + status := WorkQueueStatus{} // When we're done, close the output channel; workers will // shut down next time they ask for new work. defer close(nextItem) - defer close(b.countInProgress) - defer close(b.countOutstanding) - defer close(b.countQueued) + defer close(b.getStatus) + // nextChan and nextVal are both nil when we have + // nothing to send; otherwise they are, respectively, + // the nextItem channel and the next work item to send + // to it. var nextChan chan interface{} var nextVal interface{} - for newList != nil || countInProgress > 0 { + + for newList != nil || status.InProgress > 0 { select { case p, ok := <-newList: if !ok { @@ -142,7 +146,8 @@ func NewWorkQueue() *WorkQueue { if todo == nil { todo = &list.List{} } - if todo.Len() == 0 { + status.Queued = todo.Len() + if status.Queued == 0 { // Stop sending work nextChan = nil nextVal = nil @@ -151,9 +156,10 @@ func NewWorkQueue() *WorkQueue { nextVal = todo.Front().Value } case nextChan <- nextVal: - countInProgress++ todo.Remove(todo.Front()) - if todo.Len() == 0 { + status.InProgress++ + status.Queued-- + if status.Queued == 0 { // Stop sending work nextChan = nil nextVal = nil @@ -161,10 +167,8 @@ func NewWorkQueue() *WorkQueue { nextVal = todo.Front().Value } case <-reportDone: - countInProgress-- - case b.countInProgress <- countInProgress: - case b.countOutstanding <- todo.Len() + countInProgress: - case b.countQueued <- todo.Len(): + status.InProgress-- + case b.getStatus <- status: } } }() @@ -184,31 +188,19 @@ func (b *WorkQueue) ReplaceQueue(list *list.List) { // abandons any pending requests, but allows any pull request already // in progress to continue. // -// After Close, CountX methods will return correct values, NextItem -// will be closed, and ReplaceQueue will panic. +// After Close, Status will return correct values, NextItem will be +// closed, and ReplaceQueue will panic. // func (b *WorkQueue) Close() { close(b.newlist) } -// CountOutstanding returns the number of items in the queue or in -// progress. A return value of 0 guarantees all existing work (work -// that was sent to ReplaceQueue before CountOutstanding was called) -// has completed. -// -func (b *WorkQueue) CountOutstanding() int { - // If the channel is closed, we get zero, which is correct. - return <-b.countOutstanding -} - -// CountQueued returns the number of items in the current queue. -// -func (b *WorkQueue) CountQueued() int { - return <-b.countQueued -} - -// Len returns the number of items in progress. +// Status returns an up-to-date WorkQueueStatus reflecting the current +// queue status. // -func (b *WorkQueue) CountInProgress() int { - return <-b.countInProgress +func (b *WorkQueue) Status() WorkQueueStatus { + // If the channel is closed, we get the nil value of + // WorkQueueStatus, which is an accurate description of a + // finished queue. + return <-b.getStatus } diff --git a/services/keepstore/work_queue_test.go b/services/keepstore/work_queue_test.go index df0fa9c612..7844a2b87c 100644 --- a/services/keepstore/work_queue_test.go +++ b/services/keepstore/work_queue_test.go @@ -84,9 +84,9 @@ func expectEqualWithin(t fatalfer, timeout time.Duration, expect interface{}, f } } -func expectCountQueued(t fatalfer, b *WorkQueue, expectCountQueued int) { - if l := b.CountQueued(); l != expectCountQueued { - t.Fatalf("Got CountQueued()==%d, expected %d", l, expectCountQueued) +func expectQueued(t fatalfer, b *WorkQueue, expectQueued int) { + if l := b.Status().Queued; l != expectQueued { + t.Fatalf("Got Queued==%d, expected %d", l, expectQueued) } } @@ -94,18 +94,25 @@ func TestWorkQueueDoneness(t *testing.T) { b := NewWorkQueue() defer b.Close() b.ReplaceQueue(makeTestWorkList([]int{1, 2, 3})) - expectCountQueued(t, b, 3) + expectQueued(t, b, 3) + gate := make(chan struct{}) go func() { + <-gate for _ = range b.NextItem { - //time.Sleep(time.Duration(delay.(int)) * time.Millisecond) + <-gate time.Sleep(time.Millisecond) b.ReportDone <- struct{}{} } }() - expectEqualWithin(t, time.Second, 0, func() interface{} { return b.CountOutstanding() }) - b.ReplaceQueue(makeTestWorkList([]int{400, 5, 6})) - expectEqualWithin(t, time.Second, 3, func() interface{} { return b.CountOutstanding() }) - expectEqualWithin(t, time.Second, 0, func() interface{} { return b.CountOutstanding() }) + expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress }) + b.ReplaceQueue(makeTestWorkList([]int{4, 5, 6})) + for i := 1; i <= 3; i++ { + gate <- struct{}{} + expectEqualWithin(t, time.Second, 3-i, func() interface{} { return b.Status().Queued }) + expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress }) + } + close(gate) + expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress }) expectChannelEmpty(t, b.NextItem) } @@ -114,10 +121,10 @@ func TestWorkQueueReadWrite(t *testing.T) { var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34} b := NewWorkQueue() - expectCountQueued(t, b, 0) + expectQueued(t, b, 0) b.ReplaceQueue(makeTestWorkList(input)) - expectCountQueued(t, b, len(input)) + expectQueued(t, b, len(input)) doWorkItems(t, b, input) expectChannelEmpty(t, b.NextItem) @@ -148,7 +155,7 @@ func TestWorkQueueEarlyRead(t *testing.T) { // finish. b.ReplaceQueue(makeTestWorkList(input)) <-done - expectCountQueued(t, b, 0) + expectQueued(t, b, 0) } // After Close(), NextItem closes, work finishes, then stats return zero. @@ -166,10 +173,10 @@ func TestWorkQueueClose(t *testing.T) { // Wait for worker to take item 1 <-mark b.Close() - expectEqualWithin(t, time.Second, 1, func() interface{} { return b.CountOutstanding() }) + expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress }) // Tell worker to report done mark <- struct{}{} - expectEqualWithin(t, time.Second, 0, func() interface{} { return b.CountOutstanding() }) + expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress }) expectChannelClosedWithin(t, time.Second, b.NextItem) }