import "container/list"
+// WorkQueue definition
type WorkQueue struct {
- countInProgress chan int
- countOutstanding chan int
- countQueued chan int
- newlist chan *list.List
+ getStatus chan WorkQueueStatus
+ newlist chan *list.List
// Workers get work items by reading from this channel.
NextItem <-chan interface{}
- // Each worker must send struct{}{} to ReportDone exactly once
+ // Each worker must send struct{}{} to DoneItem exactly once
// for each work item received from NextItem, when it stops
// working on that item (regardless of whether the work was
// successful).
- ReportDone chan<- struct{}
+ DoneItem chan<- struct{}
+}
+
+// WorkQueueStatus reflects the queue status.
+type WorkQueueStatus struct {
+ InProgress int
+ Queued int
}
// NewWorkQueue returns a new empty WorkQueue.
reportDone := make(chan struct{})
newList := make(chan *list.List)
b := WorkQueue{
- countQueued: make(chan int),
- countInProgress: make(chan int),
- countOutstanding: make(chan int),
- newlist: newList,
- NextItem: nextItem,
- ReportDone: reportDone,
+ getStatus: make(chan WorkQueueStatus),
+ newlist: newList,
+ NextItem: nextItem,
+ DoneItem: reportDone,
}
go func() {
// Read new work lists from the newlist channel.
- // Reply to "length" and "get next item" queries by
- // sending to the countQueued and nextItem channels
+ // Reply to "status" and "get next item" queries by
+ // sending to the getStatus and nextItem channels
// respectively. Return when the newlist channel
// closes.
todo := &list.List{}
- countInProgress := 0
+ status := WorkQueueStatus{}
// When we're done, close the output channel; workers will
// shut down next time they ask for new work.
defer close(nextItem)
- defer close(b.countInProgress)
- defer close(b.countOutstanding)
- defer close(b.countQueued)
+ defer close(b.getStatus)
+ // nextChan and nextVal are both nil when we have
+ // nothing to send; otherwise they are, respectively,
+ // the nextItem channel and the next work item to send
+ // to it.
var nextChan chan interface{}
var nextVal interface{}
- for newList != nil || countInProgress > 0 {
+
+ for newList != nil || status.InProgress > 0 {
select {
case p, ok := <-newList:
if !ok {
if todo == nil {
todo = &list.List{}
}
- if todo.Len() == 0 {
+ status.Queued = todo.Len()
+ if status.Queued == 0 {
// Stop sending work
nextChan = nil
nextVal = nil
nextVal = todo.Front().Value
}
case nextChan <- nextVal:
- countInProgress++
todo.Remove(todo.Front())
- if todo.Len() == 0 {
+ status.InProgress++
+ status.Queued--
+ if status.Queued == 0 {
// Stop sending work
nextChan = nil
nextVal = nil
nextVal = todo.Front().Value
}
case <-reportDone:
- countInProgress--
- case b.countInProgress <- countInProgress:
- case b.countOutstanding <- todo.Len() + countInProgress:
- case b.countQueued <- todo.Len():
+ status.InProgress--
+ case b.getStatus <- status:
}
}
}()
// abandons any pending requests, but allows any pull request already
// in progress to continue.
//
-// After Close, CountX methods will return correct values, NextItem
-// will be closed, and ReplaceQueue will panic.
+// After Close, Status will return correct values, NextItem will be
+// closed, and ReplaceQueue will panic.
//
func (b *WorkQueue) Close() {
close(b.newlist)
}
-// CountOutstanding returns the number of items in the queue or in
-// progress. A return value of 0 guarantees all existing work (work
-// that was sent to ReplaceQueue before CountOutstanding was called)
-// has completed.
-//
-func (b *WorkQueue) CountOutstanding() int {
- // If the channel is closed, we get zero, which is correct.
- return <-b.countOutstanding
-}
-
-// CountQueued returns the number of items in the current queue.
-//
-func (b *WorkQueue) CountQueued() int {
- return <-b.countQueued
-}
-
-// Len returns the number of items in progress.
+// Status returns an up-to-date WorkQueueStatus reflecting the current
+// queue status.
//
-func (b *WorkQueue) CountInProgress() int {
- return <-b.countInProgress
+func (b *WorkQueue) Status() WorkQueueStatus {
+ // If the channel is closed, we get the nil value of
+ // WorkQueueStatus, which is an accurate description of a
+ // finished queue.
+ return <-b.getStatus
}