Merge branch 'master' into 3889-functional-testing
[arvados.git] / services / keepstore / work_queue.go
1 package main
2
3 /* A WorkQueue is an asynchronous thread-safe queue manager.  It
4    provides a channel from which items can be read off the queue, and
5    permits replacing the contents of the queue at any time.
6
7    The overall work flow for a WorkQueue is as follows:
8
9      1. A WorkQueue is created with NewWorkQueue().  This
10         function instantiates a new WorkQueue and starts a manager
11         goroutine.  The manager listens on an input channel
12         (manager.newlist) and an output channel (manager.NextItem).
13
14      2. The manager first waits for a new list of requests on the
15         newlist channel.  When another goroutine calls
16         manager.ReplaceQueue(lst), it sends lst over the newlist
17         channel to the manager.  The manager goroutine now has
18         ownership of the list.
19
20      3. Once the manager has this initial list, it listens on both the
21         input and output channels for one of the following to happen:
22
23           a. A worker attempts to read an item from the NextItem
24              channel.  The manager sends the next item from the list
25              over this channel to the worker, and loops.
26
27           b. New data is sent to the manager on the newlist channel.
28              This happens when another goroutine calls
29              manager.ReplaceItem() with a new list.  The manager
30              discards the current list, replaces it with the new one,
31              and begins looping again.
32
33           c. The input channel is closed.  The manager closes its
34              output channel (signalling any workers to quit) and
35              terminates.
36
37    Tasks currently handled by WorkQueue:
38      * the pull list
39      * the trash list
40
41    Example usage:
42
43         // Any kind of user-defined type can be used with the
44         // WorkQueue.
45                 type FrobRequest struct {
46                         frob string
47                 }
48
49                 // Make a work list.
50                 froblist := NewWorkQueue()
51
52                 // Start a concurrent worker to read items from the NextItem
53                 // channel until it is closed, deleting each one.
54                 go func(list WorkQueue) {
55                         for i := range list.NextItem {
56                                 req := i.(FrobRequest)
57                                 frob.Run(req)
58                         }
59                 }(froblist)
60
61                 // Set up a HTTP handler for PUT /frob
62                 router.HandleFunc(`/frob`,
63                         func(w http.ResponseWriter, req *http.Request) {
64                                 // Parse the request body into a list.List
65                                 // of FrobRequests, and give this list to the
66                                 // frob manager.
67                                 newfrobs := parseBody(req.Body)
68                                 froblist.ReplaceQueue(newfrobs)
69                         }).Methods("PUT")
70
71    Methods available on a WorkQueue:
72
73                 ReplaceQueue(list)
74                         Replaces the current item list with a new one.  The list
75             manager discards any unprocessed items on the existing
76             list and replaces it with the new one. If the worker is
77             processing a list item when ReplaceQueue is called, it
78             finishes processing before receiving items from the new
79             list.
80                 Close()
81                         Shuts down the manager goroutine. When Close is called,
82                         the manager closes the NextItem channel.
83 */
84
85 import "container/list"
86
87 type WorkQueue struct {
88         newlist  chan *list.List
89         NextItem chan interface{}
90 }
91
92 // NewWorkQueue returns a new worklist, and launches a listener
93 // goroutine that waits for work and farms it out to workers.
94 //
95 func NewWorkQueue() *WorkQueue {
96         b := WorkQueue{
97                 newlist:  make(chan *list.List),
98                 NextItem: make(chan interface{}),
99         }
100         go b.listen()
101         return &b
102 }
103
104 // ReplaceQueue sends a new list of pull requests to the manager goroutine.
105 // The manager will discard any outstanding pull list and begin
106 // working on the new list.
107 //
108 func (b *WorkQueue) ReplaceQueue(list *list.List) {
109         b.newlist <- list
110 }
111
112 // Close shuts down the manager and terminates the goroutine, which
113 // completes any pull request in progress and abandons any pending
114 // requests.
115 //
116 func (b *WorkQueue) Close() {
117         close(b.newlist)
118 }
119
120 // listen is run in a goroutine. It reads new pull lists from its
121 // input queue until the queue is closed.
122 // listen takes ownership of the list that is passed to it.
123 //
124 // Note that the routine does not ever need to access the list
125 // itself once the current_item has been initialized, so we do
126 // not bother to keep a pointer to the list. Because it is a
127 // doubly linked list, holding on to the current item will keep
128 // it from garbage collection.
129 //
130 func (b *WorkQueue) listen() {
131         var current_item *list.Element
132
133         // When we're done, close the output channel to shut down any
134         // workers.
135         defer close(b.NextItem)
136
137         for {
138                 // If the current list is empty, wait for a new list before
139                 // even checking if workers are ready.
140                 if current_item == nil {
141                         if p, ok := <-b.newlist; ok {
142                                 current_item = p.Front()
143                         } else {
144                                 // The channel was closed; shut down.
145                                 return
146                         }
147                 }
148                 select {
149                 case p, ok := <-b.newlist:
150                         if ok {
151                                 current_item = p.Front()
152                         } else {
153                                 // The input channel is closed; time to shut down
154                                 return
155                         }
156                 case b.NextItem <- current_item.Value:
157                         current_item = current_item.Next()
158                 }
159         }
160 }