6260: Fix races in keepstore tests. Expose WorkQueue in-progress/queued stats.
[arvados.git] / services / keepstore / work_queue.go
1 package main
2
3 /* A WorkQueue is an asynchronous thread-safe queue manager.  It
4    provides a channel from which items can be read off the queue, and
5    permits replacing the contents of the queue at any time.
6
7    The overall work flow for a WorkQueue is as follows:
8
9      1. A WorkQueue is created with NewWorkQueue().  This
10         function instantiates a new WorkQueue and starts a manager
11         goroutine.  The manager listens on an input channel
12         (manager.newlist) and an output channel (manager.NextItem).
13
14      2. The manager first waits for a new list of requests on the
15         newlist channel.  When another goroutine calls
16         manager.ReplaceQueue(lst), it sends lst over the newlist
17         channel to the manager.  The manager goroutine now has
18         ownership of the list.
19
20      3. Once the manager has this initial list, it listens on both the
21         input and output channels for one of the following to happen:
22
23           a. A worker attempts to read an item from the NextItem
24              channel.  The manager sends the next item from the list
25              over this channel to the worker, and loops.
26
27           b. New data is sent to the manager on the newlist channel.
28              This happens when another goroutine calls
29              manager.ReplaceItem() with a new list.  The manager
30              discards the current list, replaces it with the new one,
31              and begins looping again.
32
33           c. The input channel is closed.  The manager closes its
34              output channel (signalling any workers to quit) and
35              terminates.
36
37    Tasks currently handled by WorkQueue:
38      * the pull list
39      * the trash list
40
41    Example usage:
42
43         // Any kind of user-defined type can be used with the
44         // WorkQueue.
45                 type FrobRequest struct {
46                         frob string
47                 }
48
49                 // Make a work list.
50                 froblist := NewWorkQueue()
51
52                 // Start a concurrent worker to read items from the NextItem
53                 // channel until it is closed, deleting each one.
54                 go func(list WorkQueue) {
55                         for i := range list.NextItem {
56                                 req := i.(FrobRequest)
57                                 frob.Run(req)
58                         }
59                 }(froblist)
60
61                 // Set up a HTTP handler for PUT /frob
62                 router.HandleFunc(`/frob`,
63                         func(w http.ResponseWriter, req *http.Request) {
64                                 // Parse the request body into a list.List
65                                 // of FrobRequests, and give this list to the
66                                 // frob manager.
67                                 newfrobs := parseBody(req.Body)
68                                 froblist.ReplaceQueue(newfrobs)
69                         }).Methods("PUT")
70
71    Methods available on a WorkQueue:
72
73                 ReplaceQueue(list)
74                         Replaces the current item list with a new one.  The list
75             manager discards any unprocessed items on the existing
76             list and replaces it with the new one. If the worker is
77             processing a list item when ReplaceQueue is called, it
78             finishes processing before receiving items from the new
79             list.
80                 Close()
81                         Shuts down the manager goroutine. When Close is called,
82                         the manager closes the NextItem channel.
83 */
84
85 import "container/list"
86
87 type WorkQueue struct {
88         countInProgress  chan int
89         countOutstanding chan int
90         countQueued      chan int
91         newlist          chan *list.List
92         // Workers get work items by reading from this channel.
93         NextItem <-chan interface{}
94         // Each worker must send struct{}{} to ReportDone exactly once
95         // for each work item received from NextItem, when it stops
96         // working on that item (regardless of whether the work was
97         // successful).
98         ReportDone chan<- struct{}
99 }
100
101 // NewWorkQueue returns a new empty WorkQueue.
102 //
103 func NewWorkQueue() *WorkQueue {
104         nextItem := make(chan interface{})
105         reportDone := make(chan struct{})
106         newList := make(chan *list.List)
107         b := WorkQueue{
108                 countQueued:      make(chan int),
109                 countInProgress:  make(chan int),
110                 countOutstanding: make(chan int),
111                 newlist:          newList,
112                 NextItem:         nextItem,
113                 ReportDone:       reportDone,
114         }
115         go func() {
116                 // Read new work lists from the newlist channel.
117                 // Reply to "length" and "get next item" queries by
118                 // sending to the countQueued and nextItem channels
119                 // respectively. Return when the newlist channel
120                 // closes.
121
122                 todo := &list.List{}
123                 countInProgress := 0
124
125                 // When we're done, close the output channel; workers will
126                 // shut down next time they ask for new work.
127                 defer close(nextItem)
128                 defer close(b.countInProgress)
129                 defer close(b.countOutstanding)
130                 defer close(b.countQueued)
131
132                 var nextChan chan interface{}
133                 var nextVal interface{}
134                 for newList != nil || countInProgress > 0 {
135                         select {
136                         case p, ok := <-newList:
137                                 if !ok {
138                                         // Closed, stop receiving
139                                         newList = nil
140                                 }
141                                 todo = p
142                                 if todo == nil {
143                                         todo = &list.List{}
144                                 }
145                                 if todo.Len() == 0 {
146                                         // Stop sending work
147                                         nextChan = nil
148                                         nextVal = nil
149                                 } else {
150                                         nextChan = nextItem
151                                         nextVal = todo.Front().Value
152                                 }
153                         case nextChan <- nextVal:
154                                 countInProgress++
155                                 todo.Remove(todo.Front())
156                                 if todo.Len() == 0 {
157                                         // Stop sending work
158                                         nextChan = nil
159                                         nextVal = nil
160                                 } else {
161                                         nextVal = todo.Front().Value
162                                 }
163                         case <-reportDone:
164                                 countInProgress--
165                         case b.countInProgress <- countInProgress:
166                         case b.countOutstanding <- todo.Len() + countInProgress:
167                         case b.countQueued <- todo.Len():
168                         }
169                 }
170         }()
171         return &b
172 }
173
174 // ReplaceQueue abandons any work items left in the existing queue,
175 // and starts giving workers items from the given list. After giving
176 // it to ReplaceQueue, the caller must not read or write the given
177 // list.
178 //
179 func (b *WorkQueue) ReplaceQueue(list *list.List) {
180         b.newlist <- list
181 }
182
183 // Close shuts down the manager and terminates the goroutine, which
184 // abandons any pending requests, but allows any pull request already
185 // in progress to continue.
186 //
187 // After Close, CountX methods will return correct values, NextItem
188 // will be closed, and ReplaceQueue will panic.
189 //
190 func (b *WorkQueue) Close() {
191         close(b.newlist)
192 }
193
194 // CountOutstanding returns the number of items in the queue or in
195 // progress. A return value of 0 guarantees all existing work (work
196 // that was sent to ReplaceQueue before CountOutstanding was called)
197 // has completed.
198 //
199 func (b *WorkQueue) CountOutstanding() int {
200         // If the channel is closed, we get zero, which is correct.
201         return <-b.countOutstanding
202 }
203
204 // CountQueued returns the number of items in the current queue.
205 //
206 func (b *WorkQueue) CountQueued() int {
207         return <-b.countQueued
208 }
209
210 // Len returns the number of items in progress.
211 //
212 func (b *WorkQueue) CountInProgress() int {
213         return <-b.countInProgress
214 }