3 /* A WorkQueue is an asynchronous thread-safe queue manager. It
4 provides a channel from which items can be read off the queue, and
5 permits replacing the contents of the queue at any time.
7 The overall work flow for a WorkQueue is as follows:
9 1. A WorkQueue is created with NewWorkQueue(). This
10 function instantiates a new WorkQueue and starts a manager
11 goroutine. The manager listens on an input channel
12 (manager.newlist) and an output channel (manager.NextItem).
14 2. The manager first waits for a new list of requests on the
15 newlist channel. When another goroutine calls
16 manager.ReplaceQueue(lst), it sends lst over the newlist
17 channel to the manager. The manager goroutine now has
18 ownership of the list.
20 3. Once the manager has this initial list, it listens on both the
21 input and output channels for one of the following to happen:
23 a. A worker attempts to read an item from the NextItem
24 channel. The manager sends the next item from the list
25 over this channel to the worker, and loops.
27 b. New data is sent to the manager on the newlist channel.
28 This happens when another goroutine calls
29 manager.ReplaceItem() with a new list. The manager
30 discards the current list, replaces it with the new one,
31 and begins looping again.
33 c. The input channel is closed. The manager closes its
34 output channel (signalling any workers to quit) and
37 Tasks currently handled by WorkQueue:
43 // Any kind of user-defined type can be used with the
45 type FrobRequest struct {
50 froblist := NewWorkQueue()
52 // Start a concurrent worker to read items from the NextItem
53 // channel until it is closed, deleting each one.
54 go func(list WorkQueue) {
55 for i := range list.NextItem {
56 req := i.(FrobRequest)
61 // Set up a HTTP handler for PUT /frob
62 router.HandleFunc(`/frob`,
63 func(w http.ResponseWriter, req *http.Request) {
64 // Parse the request body into a list.List
65 // of FrobRequests, and give this list to the
67 newfrobs := parseBody(req.Body)
68 froblist.ReplaceQueue(newfrobs)
71 Methods available on a WorkQueue:
74 Replaces the current item list with a new one. The list
75 manager discards any unprocessed items on the existing
76 list and replaces it with the new one. If the worker is
77 processing a list item when ReplaceQueue is called, it
78 finishes processing before receiving items from the new
81 Shuts down the manager goroutine. When Close is called,
82 the manager closes the NextItem channel.
85 import "container/list"
87 type WorkQueue struct {
88 newlist chan *list.List
89 NextItem chan interface{}
92 // NewWorkQueue returns a new worklist, and launches a listener
93 // goroutine that waits for work and farms it out to workers.
95 func NewWorkQueue() *WorkQueue {
97 newlist: make(chan *list.List),
98 NextItem: make(chan interface{}),
104 // ReplaceQueue sends a new list of pull requests to the manager goroutine.
105 // The manager will discard any outstanding pull list and begin
106 // working on the new list.
108 func (b *WorkQueue) ReplaceQueue(list *list.List) {
112 // Close shuts down the manager and terminates the goroutine, which
113 // completes any pull request in progress and abandons any pending
116 func (b *WorkQueue) Close() {
120 // listen is run in a goroutine. It reads new pull lists from its
121 // input queue until the queue is closed.
122 // listen takes ownership of the list that is passed to it.
124 // Note that the routine does not ever need to access the list
125 // itself once the current_item has been initialized, so we do
126 // not bother to keep a pointer to the list. Because it is a
127 // doubly linked list, holding on to the current item will keep
128 // it from garbage collection.
130 func (b *WorkQueue) listen() {
131 var current_item *list.Element
133 // When we're done, close the output channel to shut down any
135 defer close(b.NextItem)
138 // If the current list is empty, wait for a new list before
139 // even checking if workers are ready.
140 if current_item == nil {
141 if p, ok := <-b.newlist; ok {
142 current_item = p.Front()
144 // The channel was closed; shut down.
149 case p, ok := <-b.newlist:
151 current_item = p.Front()
153 // The input channel is closed; time to shut down
156 case b.NextItem <- current_item.Value:
157 current_item = current_item.Next()