7490: added several error condition check tests for datamanager/keep package; increas...
[arvados.git] / services / keepstore / work_queue.go
1 package main
2
3 /* A WorkQueue is an asynchronous thread-safe queue manager.  It
4    provides a channel from which items can be read off the queue, and
5    permits replacing the contents of the queue at any time.
6
7    The overall work flow for a WorkQueue is as follows:
8
9      1. A WorkQueue is created with NewWorkQueue().  This
10         function instantiates a new WorkQueue and starts a manager
11         goroutine.  The manager listens on an input channel
12         (manager.newlist) and an output channel (manager.NextItem).
13
14      2. The manager first waits for a new list of requests on the
15         newlist channel.  When another goroutine calls
16         manager.ReplaceQueue(lst), it sends lst over the newlist
17         channel to the manager.  The manager goroutine now has
18         ownership of the list.
19
20      3. Once the manager has this initial list, it listens on both the
21         input and output channels for one of the following to happen:
22
23           a. A worker attempts to read an item from the NextItem
24              channel.  The manager sends the next item from the list
25              over this channel to the worker, and loops.
26
27           b. New data is sent to the manager on the newlist channel.
28              This happens when another goroutine calls
29              manager.ReplaceItem() with a new list.  The manager
30              discards the current list, replaces it with the new one,
31              and begins looping again.
32
33           c. The input channel is closed.  The manager closes its
34              output channel (signalling any workers to quit) and
35              terminates.
36
37    Tasks currently handled by WorkQueue:
38      * the pull list
39      * the trash list
40
41    Example usage:
42
43         // Any kind of user-defined type can be used with the
44         // WorkQueue.
45                 type FrobRequest struct {
46                         frob string
47                 }
48
49                 // Make a work list.
50                 froblist := NewWorkQueue()
51
52                 // Start a concurrent worker to read items from the NextItem
53                 // channel until it is closed, deleting each one.
54                 go func(list WorkQueue) {
55                         for i := range list.NextItem {
56                                 req := i.(FrobRequest)
57                                 frob.Run(req)
58                         }
59                 }(froblist)
60
61                 // Set up a HTTP handler for PUT /frob
62                 router.HandleFunc(`/frob`,
63                         func(w http.ResponseWriter, req *http.Request) {
64                                 // Parse the request body into a list.List
65                                 // of FrobRequests, and give this list to the
66                                 // frob manager.
67                                 newfrobs := parseBody(req.Body)
68                                 froblist.ReplaceQueue(newfrobs)
69                         }).Methods("PUT")
70
71    Methods available on a WorkQueue:
72
73                 ReplaceQueue(list)
74                         Replaces the current item list with a new one.  The list
75             manager discards any unprocessed items on the existing
76             list and replaces it with the new one. If the worker is
77             processing a list item when ReplaceQueue is called, it
78             finishes processing before receiving items from the new
79             list.
80                 Close()
81                         Shuts down the manager goroutine. When Close is called,
82                         the manager closes the NextItem channel.
83 */
84
85 import "container/list"
86
87 // WorkQueue definition
88 type WorkQueue struct {
89         getStatus chan WorkQueueStatus
90         newlist   chan *list.List
91         // Workers get work items by reading from this channel.
92         NextItem <-chan interface{}
93         // Each worker must send struct{}{} to DoneItem exactly once
94         // for each work item received from NextItem, when it stops
95         // working on that item (regardless of whether the work was
96         // successful).
97         DoneItem chan<- struct{}
98 }
99
100 // WorkQueueStatus reflects the queue status.
101 type WorkQueueStatus struct {
102         InProgress int
103         Queued     int
104 }
105
106 // NewWorkQueue returns a new empty WorkQueue.
107 //
108 func NewWorkQueue() *WorkQueue {
109         nextItem := make(chan interface{})
110         reportDone := make(chan struct{})
111         newList := make(chan *list.List)
112         b := WorkQueue{
113                 getStatus: make(chan WorkQueueStatus),
114                 newlist:   newList,
115                 NextItem:  nextItem,
116                 DoneItem:  reportDone,
117         }
118         go func() {
119                 // Read new work lists from the newlist channel.
120                 // Reply to "status" and "get next item" queries by
121                 // sending to the getStatus and nextItem channels
122                 // respectively. Return when the newlist channel
123                 // closes.
124
125                 todo := &list.List{}
126                 status := WorkQueueStatus{}
127
128                 // When we're done, close the output channel; workers will
129                 // shut down next time they ask for new work.
130                 defer close(nextItem)
131                 defer close(b.getStatus)
132
133                 // nextChan and nextVal are both nil when we have
134                 // nothing to send; otherwise they are, respectively,
135                 // the nextItem channel and the next work item to send
136                 // to it.
137                 var nextChan chan interface{}
138                 var nextVal interface{}
139
140                 for newList != nil || status.InProgress > 0 {
141                         select {
142                         case p, ok := <-newList:
143                                 if !ok {
144                                         // Closed, stop receiving
145                                         newList = nil
146                                 }
147                                 todo = p
148                                 if todo == nil {
149                                         todo = &list.List{}
150                                 }
151                                 status.Queued = todo.Len()
152                                 if status.Queued == 0 {
153                                         // Stop sending work
154                                         nextChan = nil
155                                         nextVal = nil
156                                 } else {
157                                         nextChan = nextItem
158                                         nextVal = todo.Front().Value
159                                 }
160                         case nextChan <- nextVal:
161                                 todo.Remove(todo.Front())
162                                 status.InProgress++
163                                 status.Queued--
164                                 if status.Queued == 0 {
165                                         // Stop sending work
166                                         nextChan = nil
167                                         nextVal = nil
168                                 } else {
169                                         nextVal = todo.Front().Value
170                                 }
171                         case <-reportDone:
172                                 status.InProgress--
173                         case b.getStatus <- status:
174                         }
175                 }
176         }()
177         return &b
178 }
179
180 // ReplaceQueue abandons any work items left in the existing queue,
181 // and starts giving workers items from the given list. After giving
182 // it to ReplaceQueue, the caller must not read or write the given
183 // list.
184 //
185 func (b *WorkQueue) ReplaceQueue(list *list.List) {
186         b.newlist <- list
187 }
188
189 // Close shuts down the manager and terminates the goroutine, which
190 // abandons any pending requests, but allows any pull request already
191 // in progress to continue.
192 //
193 // After Close, Status will return correct values, NextItem will be
194 // closed, and ReplaceQueue will panic.
195 //
196 func (b *WorkQueue) Close() {
197         close(b.newlist)
198 }
199
200 // Status returns an up-to-date WorkQueueStatus reflecting the current
201 // queue status.
202 //
203 func (b *WorkQueue) Status() WorkQueueStatus {
204         // If the channel is closed, we get the nil value of
205         // WorkQueueStatus, which is an accurate description of a
206         // finished queue.
207         return <-b.getStatus
208 }