+ go func() {
+ // Read new work lists from the newlist channel.
+ // Reply to "status" and "get next item" queries by
+ // sending to the getStatus and nextItem channels
+ // respectively. Return when the newlist channel
+ // closes.
+
+ todo := &list.List{}
+ status := WorkQueueStatus{}
+
+ // When we're done, close the output channel; workers will
+ // shut down next time they ask for new work.
+ defer close(nextItem)
+ defer close(b.getStatus)
+
+ // nextChan and nextVal are both nil when we have
+ // nothing to send; otherwise they are, respectively,
+ // the nextItem channel and the next work item to send
+ // to it.
+ var nextChan chan interface{}
+ var nextVal interface{}
+
+ for newList != nil || status.InProgress > 0 {
+ select {
+ case p, ok := <-newList:
+ if !ok {
+ // Closed, stop receiving
+ newList = nil
+ }
+ todo = p
+ if todo == nil {
+ todo = &list.List{}
+ }
+ status.Queued = todo.Len()
+ if status.Queued == 0 {
+ // Stop sending work
+ nextChan = nil
+ nextVal = nil
+ } else {
+ nextChan = nextItem
+ nextVal = todo.Front().Value
+ }
+ case nextChan <- nextVal:
+ todo.Remove(todo.Front())
+ status.InProgress++
+ status.Queued--
+ if status.Queued == 0 {
+ // Stop sending work
+ nextChan = nil
+ nextVal = nil
+ } else {
+ nextVal = todo.Front().Value
+ }
+ case <-reportDone:
+ status.InProgress--
+ case b.getStatus <- status:
+ }
+ }
+ }()