+ go func() {
+ // Read new work lists from the newlist channel.
+ // Reply to "length" and "get next item" queries by
+ // sending to the countQueued and nextItem channels
+ // respectively. Return when the newlist channel
+ // closes.
+
+ todo := &list.List{}
+ countInProgress := 0
+
+ // When we're done, close the output channel; workers will
+ // shut down next time they ask for new work.
+ defer close(nextItem)
+ defer close(b.countInProgress)
+ defer close(b.countOutstanding)
+ defer close(b.countQueued)
+
+ var nextChan chan interface{}
+ var nextVal interface{}
+ for newList != nil || countInProgress > 0 {
+ select {
+ case p, ok := <-newList:
+ if !ok {
+ // Closed, stop receiving
+ newList = nil
+ }
+ todo = p
+ if todo == nil {
+ todo = &list.List{}
+ }
+ if todo.Len() == 0 {
+ // Stop sending work
+ nextChan = nil
+ nextVal = nil
+ } else {
+ nextChan = nextItem
+ nextVal = todo.Front().Value
+ }
+ case nextChan <- nextVal:
+ countInProgress++
+ todo.Remove(todo.Front())
+ if todo.Len() == 0 {
+ // Stop sending work
+ nextChan = nil
+ nextVal = nil
+ } else {
+ nextVal = todo.Front().Value
+ }
+ case <-reportDone:
+ countInProgress--
+ case b.countInProgress <- countInProgress:
+ case b.countOutstanding <- todo.Len() + countInProgress:
+ case b.countQueued <- todo.Len():
+ }
+ }
+ }()