+++ /dev/null
-package main
-
-/* A WorkQueue is an asynchronous thread-safe queue manager. It
- provides a channel from which items can be read off the queue, and
- permits replacing the contents of the queue at any time.
-
- The overall work flow for a WorkQueue is as follows:
-
- 1. A WorkQueue is created with NewWorkQueue(). This
- function instantiates a new WorkQueue and starts a manager
- goroutine. The manager listens on an input channel
- (manager.newlist) and an output channel (manager.NextItem).
-
- 2. The manager first waits for a new list of requests on the
- newlist channel. When another goroutine calls
- manager.ReplaceQueue(lst), it sends lst over the newlist
- channel to the manager. The manager goroutine now has
- ownership of the list.
-
- 3. Once the manager has this initial list, it listens on both the
- input and output channels for one of the following to happen:
-
- a. A worker attempts to read an item from the NextItem
- channel. The manager sends the next item from the list
- over this channel to the worker, and loops.
-
- b. New data is sent to the manager on the newlist channel.
- This happens when another goroutine calls
- manager.ReplaceItem() with a new list. The manager
- discards the current list, replaces it with the new one,
- and begins looping again.
-
- c. The input channel is closed. The manager closes its
- output channel (signalling any workers to quit) and
- terminates.
-
- Tasks currently handled by WorkQueue:
- * the pull list
- * the trash list
-
- Example usage:
-
- // Any kind of user-defined type can be used with the
- // WorkQueue.
- type FrobRequest struct {
- frob string
- }
-
- // Make a work list.
- froblist := NewWorkQueue()
-
- // Start a concurrent worker to read items from the NextItem
- // channel until it is closed, deleting each one.
- go func(list WorkQueue) {
- for i := range list.NextItem {
- req := i.(FrobRequest)
- frob.Run(req)
- }
- }(froblist)
-
- // Set up a HTTP handler for PUT /frob
- router.HandleFunc(`/frob`,
- func(w http.ResponseWriter, req *http.Request) {
- // Parse the request body into a list.List
- // of FrobRequests, and give this list to the
- // frob manager.
- newfrobs := parseBody(req.Body)
- froblist.ReplaceQueue(newfrobs)
- }).Methods("PUT")
-
- Methods available on a WorkQueue:
-
- ReplaceQueue(list)
- Replaces the current item list with a new one. The list
- manager discards any unprocessed items on the existing
- list and replaces it with the new one. If the worker is
- processing a list item when ReplaceQueue is called, it
- finishes processing before receiving items from the new
- list.
- Close()
- Shuts down the manager goroutine. When Close is called,
- the manager closes the NextItem channel.
-*/
-
-import "container/list"
-
-type WorkQueue struct {
- newlist chan *list.List
- NextItem chan interface{}
-}
-
-// NewWorkQueue returns a new worklist, and launches a listener
-// goroutine that waits for work and farms it out to workers.
-//
-func NewWorkQueue() *WorkQueue {
- b := WorkQueue{
- newlist: make(chan *list.List),
- NextItem: make(chan interface{}),
- }
- go b.listen()
- return &b
-}
-
-// ReplaceQueue sends a new list of pull requests to the manager goroutine.
-// The manager will discard any outstanding pull list and begin
-// working on the new list.
-//
-func (b *WorkQueue) ReplaceQueue(list *list.List) {
- b.newlist <- list
-}
-
-// Close shuts down the manager and terminates the goroutine, which
-// completes any pull request in progress and abandons any pending
-// requests.
-//
-func (b *WorkQueue) Close() {
- close(b.newlist)
-}
-
-// listen is run in a goroutine. It reads new pull lists from its
-// input queue until the queue is closed.
-// listen takes ownership of the list that is passed to it.
-//
-// Note that the routine does not ever need to access the list
-// itself once the current_item has been initialized, so we do
-// not bother to keep a pointer to the list. Because it is a
-// doubly linked list, holding on to the current item will keep
-// it from garbage collection.
-//
-func (b *WorkQueue) listen() {
- var current_item *list.Element
-
- // When we're done, close the output channel to shut down any
- // workers.
- defer close(b.NextItem)
-
- for {
- // If the current list is empty, wait for a new list before
- // even checking if workers are ready.
- if current_item == nil {
- if p, ok := <-b.newlist; ok {
- current_item = p.Front()
- } else {
- // The channel was closed; shut down.
- return
- }
- }
- select {
- case p, ok := <-b.newlist:
- if ok {
- current_item = p.Front()
- } else {
- // The input channel is closed; time to shut down
- return
- }
- case b.NextItem <- current_item.Value:
- current_item = current_item.Next()
- }
- }
-}