13497: Merge branch 'master' into 13497-controller
[arvados.git] / services / keepstore / work_queue.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 /* A WorkQueue is an asynchronous thread-safe queue manager.  It
8    provides a channel from which items can be read off the queue, and
9    permits replacing the contents of the queue at any time.
10
11    The overall work flow for a WorkQueue is as follows:
12
13      1. A WorkQueue is created with NewWorkQueue().  This
14         function instantiates a new WorkQueue and starts a manager
15         goroutine.  The manager listens on an input channel
16         (manager.newlist) and an output channel (manager.NextItem).
17
18      2. The manager first waits for a new list of requests on the
19         newlist channel.  When another goroutine calls
20         manager.ReplaceQueue(lst), it sends lst over the newlist
21         channel to the manager.  The manager goroutine now has
22         ownership of the list.
23
24      3. Once the manager has this initial list, it listens on both the
25         input and output channels for one of the following to happen:
26
27           a. A worker attempts to read an item from the NextItem
28              channel.  The manager sends the next item from the list
29              over this channel to the worker, and loops.
30
31           b. New data is sent to the manager on the newlist channel.
32              This happens when another goroutine calls
33              manager.ReplaceItem() with a new list.  The manager
34              discards the current list, replaces it with the new one,
35              and begins looping again.
36
37           c. The input channel is closed.  The manager closes its
38              output channel (signalling any workers to quit) and
39              terminates.
40
41    Tasks currently handled by WorkQueue:
42      * the pull list
43      * the trash list
44
45    Example usage:
46
47         // Any kind of user-defined type can be used with the
48         // WorkQueue.
49                 type FrobRequest struct {
50                         frob string
51                 }
52
53                 // Make a work list.
54                 froblist := NewWorkQueue()
55
56                 // Start a concurrent worker to read items from the NextItem
57                 // channel until it is closed, deleting each one.
58                 go func(list WorkQueue) {
59                         for i := range list.NextItem {
60                                 req := i.(FrobRequest)
61                                 frob.Run(req)
62                         }
63                 }(froblist)
64
65                 // Set up a HTTP handler for PUT /frob
66                 router.HandleFunc(`/frob`,
67                         func(w http.ResponseWriter, req *http.Request) {
68                                 // Parse the request body into a list.List
69                                 // of FrobRequests, and give this list to the
70                                 // frob manager.
71                                 newfrobs := parseBody(req.Body)
72                                 froblist.ReplaceQueue(newfrobs)
73                         }).Methods("PUT")
74
75    Methods available on a WorkQueue:
76
77                 ReplaceQueue(list)
78                         Replaces the current item list with a new one.  The list
79             manager discards any unprocessed items on the existing
80             list and replaces it with the new one. If the worker is
81             processing a list item when ReplaceQueue is called, it
82             finishes processing before receiving items from the new
83             list.
84                 Close()
85                         Shuts down the manager goroutine. When Close is called,
86                         the manager closes the NextItem channel.
87 */
88
89 import "container/list"
90
91 // WorkQueue definition
92 type WorkQueue struct {
93         getStatus chan WorkQueueStatus
94         newlist   chan *list.List
95         // Workers get work items by reading from this channel.
96         NextItem <-chan interface{}
97         // Each worker must send struct{}{} to DoneItem exactly once
98         // for each work item received from NextItem, when it stops
99         // working on that item (regardless of whether the work was
100         // successful).
101         DoneItem chan<- struct{}
102 }
103
104 // WorkQueueStatus reflects the queue status.
105 type WorkQueueStatus struct {
106         InProgress int
107         Queued     int
108 }
109
110 // NewWorkQueue returns a new empty WorkQueue.
111 //
112 func NewWorkQueue() *WorkQueue {
113         nextItem := make(chan interface{})
114         reportDone := make(chan struct{})
115         newList := make(chan *list.List)
116         b := WorkQueue{
117                 getStatus: make(chan WorkQueueStatus),
118                 newlist:   newList,
119                 NextItem:  nextItem,
120                 DoneItem:  reportDone,
121         }
122         go func() {
123                 // Read new work lists from the newlist channel.
124                 // Reply to "status" and "get next item" queries by
125                 // sending to the getStatus and nextItem channels
126                 // respectively. Return when the newlist channel
127                 // closes.
128
129                 todo := &list.List{}
130                 status := WorkQueueStatus{}
131
132                 // When we're done, close the output channel; workers will
133                 // shut down next time they ask for new work.
134                 defer close(nextItem)
135                 defer close(b.getStatus)
136
137                 // nextChan and nextVal are both nil when we have
138                 // nothing to send; otherwise they are, respectively,
139                 // the nextItem channel and the next work item to send
140                 // to it.
141                 var nextChan chan interface{}
142                 var nextVal interface{}
143
144                 for newList != nil || status.InProgress > 0 {
145                         select {
146                         case p, ok := <-newList:
147                                 if !ok {
148                                         // Closed, stop receiving
149                                         newList = nil
150                                 }
151                                 todo = p
152                                 if todo == nil {
153                                         todo = &list.List{}
154                                 }
155                                 status.Queued = todo.Len()
156                                 if status.Queued == 0 {
157                                         // Stop sending work
158                                         nextChan = nil
159                                         nextVal = nil
160                                 } else {
161                                         nextChan = nextItem
162                                         nextVal = todo.Front().Value
163                                 }
164                         case nextChan <- nextVal:
165                                 todo.Remove(todo.Front())
166                                 status.InProgress++
167                                 status.Queued--
168                                 if status.Queued == 0 {
169                                         // Stop sending work
170                                         nextChan = nil
171                                         nextVal = nil
172                                 } else {
173                                         nextVal = todo.Front().Value
174                                 }
175                         case <-reportDone:
176                                 status.InProgress--
177                         case b.getStatus <- status:
178                         }
179                 }
180         }()
181         return &b
182 }
183
184 // ReplaceQueue abandons any work items left in the existing queue,
185 // and starts giving workers items from the given list. After giving
186 // it to ReplaceQueue, the caller must not read or write the given
187 // list.
188 //
189 func (b *WorkQueue) ReplaceQueue(list *list.List) {
190         b.newlist <- list
191 }
192
193 // Close shuts down the manager and terminates the goroutine, which
194 // abandons any pending requests, but allows any pull request already
195 // in progress to continue.
196 //
197 // After Close, Status will return correct values, NextItem will be
198 // closed, and ReplaceQueue will panic.
199 //
200 func (b *WorkQueue) Close() {
201         close(b.newlist)
202 }
203
204 // Status returns an up-to-date WorkQueueStatus reflecting the current
205 // queue status.
206 //
207 func (b *WorkQueue) Status() WorkQueueStatus {
208         // If the channel is closed, we get the nil value of
209         // WorkQueueStatus, which is an accurate description of a
210         // finished queue.
211         return <-b.getStatus
212 }