6260: gofmt
[arvados.git] / services / keepstore / work_queue.go
index 9509cacd774f5acd64372b55b1841bab55e5b023..6ec52742a57afb826128678950d9c90000553233 100644 (file)
@@ -85,76 +85,130 @@ package main
 import "container/list"
 
 type WorkQueue struct {
-       newlist  chan *list.List
-       NextItem chan interface{}
+       countInProgress  chan int
+       countOutstanding chan int
+       countQueued      chan int
+       newlist          chan *list.List
+       // Workers get work items by reading from this channel.
+       NextItem <-chan interface{}
+       // Each worker must send struct{}{} to ReportDone exactly once
+       // for each work item received from NextItem, when it stops
+       // working on that item (regardless of whether the work was
+       // successful).
+       ReportDone chan<- struct{}
 }
 
-// NewWorkQueue returns a new worklist, and launches a listener
-// goroutine that waits for work and farms it out to workers.
+// NewWorkQueue returns a new empty WorkQueue.
 //
 func NewWorkQueue() *WorkQueue {
+       nextItem := make(chan interface{})
+       reportDone := make(chan struct{})
+       newList := make(chan *list.List)
        b := WorkQueue{
-               newlist:  make(chan *list.List),
-               NextItem: make(chan interface{}),
+               countQueued:      make(chan int),
+               countInProgress:  make(chan int),
+               countOutstanding: make(chan int),
+               newlist:          newList,
+               NextItem:         nextItem,
+               ReportDone:       reportDone,
        }
-       go b.listen()
+       go func() {
+               // Read new work lists from the newlist channel.
+               // Reply to "length" and "get next item" queries by
+               // sending to the countQueued and nextItem channels
+               // respectively. Return when the newlist channel
+               // closes.
+
+               todo := &list.List{}
+               countInProgress := 0
+
+               // When we're done, close the output channel; workers will
+               // shut down next time they ask for new work.
+               defer close(nextItem)
+               defer close(b.countInProgress)
+               defer close(b.countOutstanding)
+               defer close(b.countQueued)
+
+               var nextChan chan interface{}
+               var nextVal interface{}
+               for newList != nil || countInProgress > 0 {
+                       select {
+                       case p, ok := <-newList:
+                               if !ok {
+                                       // Closed, stop receiving
+                                       newList = nil
+                               }
+                               todo = p
+                               if todo == nil {
+                                       todo = &list.List{}
+                               }
+                               if todo.Len() == 0 {
+                                       // Stop sending work
+                                       nextChan = nil
+                                       nextVal = nil
+                               } else {
+                                       nextChan = nextItem
+                                       nextVal = todo.Front().Value
+                               }
+                       case nextChan <- nextVal:
+                               countInProgress++
+                               todo.Remove(todo.Front())
+                               if todo.Len() == 0 {
+                                       // Stop sending work
+                                       nextChan = nil
+                                       nextVal = nil
+                               } else {
+                                       nextVal = todo.Front().Value
+                               }
+                       case <-reportDone:
+                               countInProgress--
+                       case b.countInProgress <- countInProgress:
+                       case b.countOutstanding <- todo.Len() + countInProgress:
+                       case b.countQueued <- todo.Len():
+                       }
+               }
+       }()
        return &b
 }
 
-// ReplaceQueue sends a new list of pull requests to the manager goroutine.
-// The manager will discard any outstanding pull list and begin
-// working on the new list.
+// ReplaceQueue abandons any work items left in the existing queue,
+// and starts giving workers items from the given list. After giving
+// it to ReplaceQueue, the caller must not read or write the given
+// list.
 //
 func (b *WorkQueue) ReplaceQueue(list *list.List) {
        b.newlist <- list
 }
 
 // Close shuts down the manager and terminates the goroutine, which
-// completes any pull request in progress and abandons any pending
-// requests.
+// abandons any pending requests, but allows any pull request already
+// in progress to continue.
+//
+// After Close, CountX methods will return correct values, NextItem
+// will be closed, and ReplaceQueue will panic.
 //
 func (b *WorkQueue) Close() {
        close(b.newlist)
 }
 
-// listen is run in a goroutine. It reads new pull lists from its
-// input queue until the queue is closed.
-// listen takes ownership of the list that is passed to it.
+// CountOutstanding returns the number of items in the queue or in
+// progress. A return value of 0 guarantees all existing work (work
+// that was sent to ReplaceQueue before CountOutstanding was called)
+// has completed.
 //
-// Note that the routine does not ever need to access the list
-// itself once the current_item has been initialized, so we do
-// not bother to keep a pointer to the list. Because it is a
-// doubly linked list, holding on to the current item will keep
-// it from garbage collection.
+func (b *WorkQueue) CountOutstanding() int {
+       // If the channel is closed, we get zero, which is correct.
+       return <-b.countOutstanding
+}
+
+// CountQueued returns the number of items in the current queue.
 //
-func (b *WorkQueue) listen() {
-       var current_item *list.Element
-
-       // When we're done, close the output channel to shut down any
-       // workers.
-       defer close(b.NextItem)
-
-       for {
-               // If the current list is empty, wait for a new list before
-               // even checking if workers are ready.
-               if current_item == nil {
-                       if p, ok := <-b.newlist; ok {
-                               current_item = p.Front()
-                       } else {
-                               // The channel was closed; shut down.
-                               return
-                       }
-               }
-               select {
-               case p, ok := <-b.newlist:
-                       if ok {
-                               current_item = p.Front()
-                       } else {
-                               // The input channel is closed; time to shut down
-                               return
-                       }
-               case b.NextItem <- current_item.Value:
-                       current_item = current_item.Next()
-               }
-       }
+func (b *WorkQueue) CountQueued() int {
+       return <-b.countQueued
+}
+
+// Len returns the number of items in progress.
+//
+func (b *WorkQueue) CountInProgress() int {
+       return <-b.countInProgress
 }