3 /* A BlockWorkList concurrently processes blocks needing attention.
5 Tasks currently handled by BlockWorkList:
9 A BlockWorkList is instantiated with NewBlockWorkList(), which
10 launches a manager in a goroutine. The manager listens on a
11 channel for data to be assigned to it via the ReplaceList() method.
13 A worker gets items to process from a BlockWorkList by reading the
14 NextItem channel. The list manager continuously writes items to
17 Example (simplified) implementation of a trash collector:
19 type DeleteRequest struct {
25 trashList := NewBlockWorkList()
27 // Start a concurrent worker to read items from the NextItem
28 // channel until it is closed, deleting each one.
29 go func(list BlockWorkList) {
30 for i := range list.NextItem {
31 req := i.(DeleteRequest)
32 if time.Now() > req.age {
38 // Set up a HTTP handler for PUT /trash
39 router.HandleFunc(`/trash`,
40 func(w http.ResponseWriter, req *http.Request) {
41 // Parse the request body into a list.List
42 // of DeleteRequests, and give this list to the
44 trash := parseBody(req.Body)
45 trashList.ReplaceList(trash)
48 Methods available on a BlockWorkList:
51 Replaces the current item list with a new one. The list
52 manager discards any unprocessed items on the existing
53 list and replaces it with the new one. If the worker is
54 processing a list item when ReplaceList is called, it
55 finishes processing before receiving items from the new
58 Shuts down the manager and the worker cleanly.
61 import "container/list"
63 type BlockWorkList struct {
65 newlist chan *list.List
66 NextItem chan *list.Element
69 // NewBlockWorkList returns a new worklist, and launches a listener
70 // goroutine that waits for work and farms it out to workers.
72 func NewBlockWorkList() *BlockWorkList {
75 newlist: make(chan *list.List),
76 NextItem: make(chan *list.Element),
82 // ReplaceList sends a new list of pull requests to the manager goroutine.
83 // The manager will discard any outstanding pull list and begin
84 // working on the new list.
86 func (b *BlockWorkList) ReplaceList(list *list.List) {
90 // Close shuts down the manager and terminates the goroutine, which
91 // completes any pull request in progress and abandons any pending
94 func (b *BlockWorkList) Close() {
98 // listen is run in a goroutine. It reads new pull lists from its
99 // input queue until the queue is closed.
100 func (b *BlockWorkList) listen() {
102 current_list *list.List
103 current_item *list.Element
106 // When we're done, close the output channel to shut down any
108 defer close(b.NextItem)
111 // If the current list is empty, wait for a new list before
112 // even checking if workers are ready.
113 if current_item == nil {
114 if p, ok := <-b.newlist; ok {
117 // The channel was closed; shut down.
120 current_item = current_list.Front()
123 case p, ok := <-b.newlist:
126 current_item = current_list.Front()
128 // The input channel is closed; time to shut down
131 case b.NextItem <- current_item:
132 current_item = current_item.Next()