Len int `json:"BuffersInUse"`
}
-type WorkQueueStatus struct {
- InProgress int
- Outstanding int
- Queued int
-}
-
type NodeStatus struct {
Volumes []*VolumeStatus `json:"volumes"`
BufferPool PoolStatus
st.BufferPool.Alloc = bufs.Alloc()
st.BufferPool.Cap = bufs.Cap()
st.BufferPool.Len = bufs.Len()
- readWorkQueueStatus(&st.PullQueue, pullq)
- readWorkQueueStatus(&st.TrashQueue, trashq)
+ st.PullQueue = getWorkQueueStatus(pullq)
+ st.TrashQueue = getWorkQueueStatus(trashq)
runtime.ReadMemStats(&st.Memory)
}
-// Populate a WorkQueueStatus. This is not atomic, so race conditions
-// can cause InProgress + Queued != Outstanding.
-func readWorkQueueStatus(st *WorkQueueStatus, q *WorkQueue) {
+// return a WorkQueueStatus for the given queue. If q is nil (which
+// should never happen except in test suites), return a zero status
+// value instead of crashing.
+func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
if q == nil {
// This should only happen during tests.
- *st = WorkQueueStatus{}
- return
+ return WorkQueueStatus{}
}
- st.InProgress = q.CountInProgress()
- st.Outstanding = q.CountOutstanding()
- st.Queued = q.CountQueued()
+ return q.Status()
}
// DeleteHandler processes DELETE requests.
}
c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
- c.Assert(getStatusItem("PullQueue", "Outstanding"), Equals, float64(0))
c.Assert(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
response := IssueRequest(&testData.req)
c.Assert(response.Code, Equals, testData.response_code)
c.Assert(response.Body.String(), Equals, testData.response_body)
- expectEqualWithin(c, time.Second, 0, func() interface{} { return pullq.CountOutstanding() })
+ expectEqualWithin(c, time.Second, 0, func() interface{} {
+ st := pullq.Status()
+ return st.InProgress + st.Queued
+ })
if testData.name == "TestPullWorker_pull_list_with_two_items_latest_replacing_old" {
c.Assert(len(testPullLists), Equals, 2)
}
assertStatusItem("InProgress", 0)
- assertStatusItem("Outstanding", 0)
assertStatusItem("Queued", 0)
listLen := trashList.Len()
trashq.ReplaceQueue(trashList)
// Wait for worker to take request(s)
- expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.CountOutstanding() })
- expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.CountInProgress() })
+ expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.Status().InProgress })
// Ensure status.json also reports work is happening
assertStatusItem("InProgress", float64(1))
- assertStatusItem("Outstanding", float64(listLen))
assertStatusItem("Queued", float64(listLen-1))
// Let worker proceed
close(gate)
// Wait for worker to finish
- expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.CountOutstanding() })
+ expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.Status().InProgress })
// Verify Locator1 to be un/deleted as expected
data, _ := GetBlock(testData.Locator1, false)
import "container/list"
type WorkQueue struct {
- countInProgress chan int
- countOutstanding chan int
- countQueued chan int
- newlist chan *list.List
+ getStatus chan WorkQueueStatus
+ newlist chan *list.List
// Workers get work items by reading from this channel.
NextItem <-chan interface{}
// Each worker must send struct{}{} to ReportDone exactly once
ReportDone chan<- struct{}
}
+type WorkQueueStatus struct {
+ InProgress int
+ Queued int
+}
+
// NewWorkQueue returns a new empty WorkQueue.
//
func NewWorkQueue() *WorkQueue {
reportDone := make(chan struct{})
newList := make(chan *list.List)
b := WorkQueue{
- countQueued: make(chan int),
- countInProgress: make(chan int),
- countOutstanding: make(chan int),
- newlist: newList,
- NextItem: nextItem,
- ReportDone: reportDone,
+ getStatus: make(chan WorkQueueStatus),
+ newlist: newList,
+ NextItem: nextItem,
+ ReportDone: reportDone,
}
go func() {
// Read new work lists from the newlist channel.
- // Reply to "length" and "get next item" queries by
- // sending to the countQueued and nextItem channels
+ // Reply to "status" and "get next item" queries by
+ // sending to the getStatus and nextItem channels
// respectively. Return when the newlist channel
// closes.
todo := &list.List{}
- countInProgress := 0
+ status := WorkQueueStatus{}
// When we're done, close the output channel; workers will
// shut down next time they ask for new work.
defer close(nextItem)
- defer close(b.countInProgress)
- defer close(b.countOutstanding)
- defer close(b.countQueued)
+ defer close(b.getStatus)
+ // nextChan and nextVal are both nil when we have
+ // nothing to send; otherwise they are, respectively,
+ // the nextItem channel and the next work item to send
+ // to it.
var nextChan chan interface{}
var nextVal interface{}
- for newList != nil || countInProgress > 0 {
+
+ for newList != nil || status.InProgress > 0 {
select {
case p, ok := <-newList:
if !ok {
if todo == nil {
todo = &list.List{}
}
- if todo.Len() == 0 {
+ status.Queued = todo.Len()
+ if status.Queued == 0 {
// Stop sending work
nextChan = nil
nextVal = nil
nextVal = todo.Front().Value
}
case nextChan <- nextVal:
- countInProgress++
todo.Remove(todo.Front())
- if todo.Len() == 0 {
+ status.InProgress++
+ status.Queued--
+ if status.Queued == 0 {
// Stop sending work
nextChan = nil
nextVal = nil
nextVal = todo.Front().Value
}
case <-reportDone:
- countInProgress--
- case b.countInProgress <- countInProgress:
- case b.countOutstanding <- todo.Len() + countInProgress:
- case b.countQueued <- todo.Len():
+ status.InProgress--
+ case b.getStatus <- status:
}
}
}()
// abandons any pending requests, but allows any pull request already
// in progress to continue.
//
-// After Close, CountX methods will return correct values, NextItem
-// will be closed, and ReplaceQueue will panic.
+// After Close, Status will return correct values, NextItem will be
+// closed, and ReplaceQueue will panic.
//
func (b *WorkQueue) Close() {
close(b.newlist)
}
-// CountOutstanding returns the number of items in the queue or in
-// progress. A return value of 0 guarantees all existing work (work
-// that was sent to ReplaceQueue before CountOutstanding was called)
-// has completed.
-//
-func (b *WorkQueue) CountOutstanding() int {
- // If the channel is closed, we get zero, which is correct.
- return <-b.countOutstanding
-}
-
-// CountQueued returns the number of items in the current queue.
-//
-func (b *WorkQueue) CountQueued() int {
- return <-b.countQueued
-}
-
-// Len returns the number of items in progress.
+// Status returns an up-to-date WorkQueueStatus reflecting the current
+// queue status.
//
-func (b *WorkQueue) CountInProgress() int {
- return <-b.countInProgress
+func (b *WorkQueue) Status() WorkQueueStatus {
+ // If the channel is closed, we get the nil value of
+ // WorkQueueStatus, which is an accurate description of a
+ // finished queue.
+ return <-b.getStatus
}
}
}
-func expectCountQueued(t fatalfer, b *WorkQueue, expectCountQueued int) {
- if l := b.CountQueued(); l != expectCountQueued {
- t.Fatalf("Got CountQueued()==%d, expected %d", l, expectCountQueued)
+func expectQueued(t fatalfer, b *WorkQueue, expectQueued int) {
+ if l := b.Status().Queued; l != expectQueued {
+ t.Fatalf("Got Queued==%d, expected %d", l, expectQueued)
}
}
b := NewWorkQueue()
defer b.Close()
b.ReplaceQueue(makeTestWorkList([]int{1, 2, 3}))
- expectCountQueued(t, b, 3)
+ expectQueued(t, b, 3)
+ gate := make(chan struct{})
go func() {
+ <-gate
for _ = range b.NextItem {
- //time.Sleep(time.Duration(delay.(int)) * time.Millisecond)
+ <-gate
time.Sleep(time.Millisecond)
b.ReportDone <- struct{}{}
}
}()
- expectEqualWithin(t, time.Second, 0, func() interface{} { return b.CountOutstanding() })
- b.ReplaceQueue(makeTestWorkList([]int{400, 5, 6}))
- expectEqualWithin(t, time.Second, 3, func() interface{} { return b.CountOutstanding() })
- expectEqualWithin(t, time.Second, 0, func() interface{} { return b.CountOutstanding() })
+ expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
+ b.ReplaceQueue(makeTestWorkList([]int{4, 5, 6}))
+ for i := 1; i <= 3; i++ {
+ gate <- struct{}{}
+ expectEqualWithin(t, time.Second, 3-i, func() interface{} { return b.Status().Queued })
+ expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress })
+ }
+ close(gate)
+ expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
expectChannelEmpty(t, b.NextItem)
}
var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
b := NewWorkQueue()
- expectCountQueued(t, b, 0)
+ expectQueued(t, b, 0)
b.ReplaceQueue(makeTestWorkList(input))
- expectCountQueued(t, b, len(input))
+ expectQueued(t, b, len(input))
doWorkItems(t, b, input)
expectChannelEmpty(t, b.NextItem)
// finish.
b.ReplaceQueue(makeTestWorkList(input))
<-done
- expectCountQueued(t, b, 0)
+ expectQueued(t, b, 0)
}
// After Close(), NextItem closes, work finishes, then stats return zero.
// Wait for worker to take item 1
<-mark
b.Close()
- expectEqualWithin(t, time.Second, 1, func() interface{} { return b.CountOutstanding() })
+ expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress })
// Tell worker to report done
mark <- struct{}{}
- expectEqualWithin(t, time.Second, 0, func() interface{} { return b.CountOutstanding() })
+ expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
expectChannelClosedWithin(t, time.Second, b.NextItem)
}