1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
7 /* A WorkQueue is an asynchronous thread-safe queue manager. It
8 provides a channel from which items can be read off the queue, and
9 permits replacing the contents of the queue at any time.
11 The overall work flow for a WorkQueue is as follows:
13 1. A WorkQueue is created with NewWorkQueue(). This
14 function instantiates a new WorkQueue and starts a manager
15 goroutine. The manager listens on an input channel
16 (manager.newlist) and an output channel (manager.NextItem).
18 2. The manager first waits for a new list of requests on the
19 newlist channel. When another goroutine calls
20 manager.ReplaceQueue(lst), it sends lst over the newlist
21 channel to the manager. The manager goroutine now has
22 ownership of the list.
24 3. Once the manager has this initial list, it listens on both the
25 input and output channels for one of the following to happen:
27 a. A worker attempts to read an item from the NextItem
28 channel. The manager sends the next item from the list
29 over this channel to the worker, and loops.
31 b. New data is sent to the manager on the newlist channel.
32 This happens when another goroutine calls
33 manager.ReplaceItem() with a new list. The manager
34 discards the current list, replaces it with the new one,
35 and begins looping again.
37 c. The input channel is closed. The manager closes its
38 output channel (signalling any workers to quit) and
41 Tasks currently handled by WorkQueue:
47 // Any kind of user-defined type can be used with the
49 type FrobRequest struct {
54 froblist := NewWorkQueue()
56 // Start a concurrent worker to read items from the NextItem
57 // channel until it is closed, deleting each one.
58 go func(list WorkQueue) {
59 for i := range list.NextItem {
60 req := i.(FrobRequest)
65 // Set up a HTTP handler for PUT /frob
66 router.HandleFunc(`/frob`,
67 func(w http.ResponseWriter, req *http.Request) {
68 // Parse the request body into a list.List
69 // of FrobRequests, and give this list to the
71 newfrobs := parseBody(req.Body)
72 froblist.ReplaceQueue(newfrobs)
75 Methods available on a WorkQueue:
78 Replaces the current item list with a new one. The list
79 manager discards any unprocessed items on the existing
80 list and replaces it with the new one. If the worker is
81 processing a list item when ReplaceQueue is called, it
82 finishes processing before receiving items from the new
85 Shuts down the manager goroutine. When Close is called,
86 the manager closes the NextItem channel.
89 import "container/list"
91 // WorkQueue definition
92 type WorkQueue struct {
93 getStatus chan WorkQueueStatus
94 newlist chan *list.List
95 // Workers get work items by reading from this channel.
96 NextItem <-chan interface{}
97 // Each worker must send struct{}{} to DoneItem exactly once
98 // for each work item received from NextItem, when it stops
99 // working on that item (regardless of whether the work was
101 DoneItem chan<- struct{}
104 // WorkQueueStatus reflects the queue status.
105 type WorkQueueStatus struct {
110 // NewWorkQueue returns a new empty WorkQueue.
111 func NewWorkQueue() *WorkQueue {
112 nextItem := make(chan interface{})
113 reportDone := make(chan struct{})
114 newList := make(chan *list.List)
116 getStatus: make(chan WorkQueueStatus),
119 DoneItem: reportDone,
122 // Read new work lists from the newlist channel.
123 // Reply to "status" and "get next item" queries by
124 // sending to the getStatus and nextItem channels
125 // respectively. Return when the newlist channel
129 status := WorkQueueStatus{}
131 // When we're done, close the output channel; workers will
132 // shut down next time they ask for new work.
133 defer close(nextItem)
134 defer close(b.getStatus)
136 // nextChan and nextVal are both nil when we have
137 // nothing to send; otherwise they are, respectively,
138 // the nextItem channel and the next work item to send
140 var nextChan chan interface{}
141 var nextVal interface{}
143 for newList != nil || status.InProgress > 0 {
145 case p, ok := <-newList:
147 // Closed, stop receiving
154 status.Queued = todo.Len()
155 if status.Queued == 0 {
161 nextVal = todo.Front().Value
163 case nextChan <- nextVal:
164 todo.Remove(todo.Front())
167 if status.Queued == 0 {
172 nextVal = todo.Front().Value
176 case b.getStatus <- status:
183 // ReplaceQueue abandons any work items left in the existing queue,
184 // and starts giving workers items from the given list. After giving
185 // it to ReplaceQueue, the caller must not read or write the given
187 func (b *WorkQueue) ReplaceQueue(list *list.List) {
191 // Close shuts down the manager and terminates the goroutine, which
192 // abandons any pending requests, but allows any pull request already
193 // in progress to continue.
195 // After Close, Status will return correct values, NextItem will be
196 // closed, and ReplaceQueue will panic.
197 func (b *WorkQueue) Close() {
201 // Status returns an up-to-date WorkQueueStatus reflecting the current
203 func (b *WorkQueue) Status() WorkQueueStatus {
204 // If the channel is closed, we get the nil value of
205 // WorkQueueStatus, which is an accurate description of a