import "container/list"
type WorkQueue struct {
- newlist chan *list.List
- NextItem chan interface{}
+ countInProgress chan int
+ countOutstanding chan int
+ countQueued chan int
+ newlist chan *list.List
+ // Workers get work items by reading from this channel.
+ NextItem <-chan interface{}
+ // Each worker must send struct{}{} to ReportDone exactly once
+ // for each work item received from NextItem, when it stops
+ // working on that item (regardless of whether the work was
+ // successful).
+ ReportDone chan<- struct{}
}
-// NewWorkQueue returns a new worklist, and launches a listener
-// goroutine that waits for work and farms it out to workers.
+// NewWorkQueue returns a new empty WorkQueue.
//
func NewWorkQueue() *WorkQueue {
+ nextItem := make(chan interface{})
+ reportDone := make(chan struct{})
+ newList := make(chan *list.List)
b := WorkQueue{
- newlist: make(chan *list.List),
- NextItem: make(chan interface{}),
+ countQueued: make(chan int),
+ countInProgress: make(chan int),
+ countOutstanding: make(chan int),
+ newlist: newList,
+ NextItem: nextItem,
+ ReportDone: reportDone,
}
- go b.listen()
+ go func() {
+ // Read new work lists from the newlist channel.
+ // Reply to "length" and "get next item" queries by
+ // sending to the countQueued and nextItem channels
+ // respectively. Return when the newlist channel
+ // closes.
+
+ todo := &list.List{}
+ countInProgress := 0
+
+ // When we're done, close the output channel; workers will
+ // shut down next time they ask for new work.
+ defer close(nextItem)
+ defer close(b.countInProgress)
+ defer close(b.countOutstanding)
+ defer close(b.countQueued)
+
+ var nextChan chan interface{}
+ var nextVal interface{}
+ for newList != nil || countInProgress > 0 {
+ select {
+ case p, ok := <-newList:
+ if !ok {
+ // Closed, stop receiving
+ newList = nil
+ }
+ todo = p
+ if todo == nil {
+ todo = &list.List{}
+ }
+ if todo.Len() == 0 {
+ // Stop sending work
+ nextChan = nil
+ nextVal = nil
+ } else {
+ nextChan = nextItem
+ nextVal = todo.Front().Value
+ }
+ case nextChan <- nextVal:
+ countInProgress++
+ todo.Remove(todo.Front())
+ if todo.Len() == 0 {
+ // Stop sending work
+ nextChan = nil
+ nextVal = nil
+ } else {
+ nextVal = todo.Front().Value
+ }
+ case <-reportDone:
+ countInProgress--
+ case b.countInProgress <- countInProgress:
+ case b.countOutstanding <- todo.Len() + countInProgress:
+ case b.countQueued <- todo.Len():
+ }
+ }
+ }()
return &b
}
-// ReplaceQueue sends a new list of pull requests to the manager goroutine.
-// The manager will discard any outstanding pull list and begin
-// working on the new list.
+// ReplaceQueue abandons any work items left in the existing queue,
+// and starts giving workers items from the given list. After giving
+// it to ReplaceQueue, the caller must not read or write the given
+// list.
//
func (b *WorkQueue) ReplaceQueue(list *list.List) {
b.newlist <- list
}
// Close shuts down the manager and terminates the goroutine, which
-// completes any pull request in progress and abandons any pending
-// requests.
+// abandons any pending requests, but allows any pull request already
+// in progress to continue.
+//
+// After Close, CountX methods will return correct values, NextItem
+// will be closed, and ReplaceQueue will panic.
//
func (b *WorkQueue) Close() {
close(b.newlist)
}
-// listen is run in a goroutine. It reads new pull lists from its
-// input queue until the queue is closed.
-// listen takes ownership of the list that is passed to it.
+// CountOutstanding returns the number of items in the queue or in
+// progress. A return value of 0 guarantees all existing work (work
+// that was sent to ReplaceQueue before CountOutstanding was called)
+// has completed.
//
-// Note that the routine does not ever need to access the list
-// itself once the current_item has been initialized, so we do
-// not bother to keep a pointer to the list. Because it is a
-// doubly linked list, holding on to the current item will keep
-// it from garbage collection.
+func (b *WorkQueue) CountOutstanding() int {
+ // If the channel is closed, we get zero, which is correct.
+ return <-b.countOutstanding
+}
+
+// CountQueued returns the number of items in the current queue.
//
-func (b *WorkQueue) listen() {
- var current_item *list.Element
-
- // When we're done, close the output channel to shut down any
- // workers.
- defer close(b.NextItem)
-
- for {
- // If the current list is empty, wait for a new list before
- // even checking if workers are ready.
- if current_item == nil {
- if p, ok := <-b.newlist; ok {
- current_item = p.Front()
- } else {
- // The channel was closed; shut down.
- return
- }
- }
- select {
- case p, ok := <-b.newlist:
- if ok {
- current_item = p.Front()
- } else {
- // The input channel is closed; time to shut down
- return
- }
- case b.NextItem <- current_item.Value:
- current_item = current_item.Next()
- }
- }
+func (b *WorkQueue) CountQueued() int {
+ return <-b.countQueued
+}
+
+// Len returns the number of items in progress.
+//
+func (b *WorkQueue) CountInProgress() int {
+ return <-b.countInProgress
}