704a9ca35b8f06800130a7612db5b5f65d6bb75a
[arvados.git] / services / keepstore / block_work_list.go
1 package main
2
3 /* A BlockWorkList concurrently processes blocks needing attention.
4
5    Tasks currently handled by BlockWorkList:
6      * the pull list
7      * the trash list
8
9    A BlockWorkList is instantiated with NewBlockWorkList(), which
10    launches a manager in a goroutine.  The manager listens on a
11    channel for data to be assigned to it via the ReplaceList() method.
12
13    A worker gets items to process from a BlockWorkList by reading the
14    NextItem channel.  The list manager continuously writes items to
15    this channel.
16
17    Example (simplified) implementation of a trash collector:
18
19                 type DeleteRequest struct {
20                         hash string
21                         age time.Time
22                 }
23
24                 // Make a work list.
25                 trashList := NewBlockWorkList()
26
27                 // Start a concurrent worker to read items from the NextItem
28                 // channel until it is closed, deleting each one.
29                 go func(list BlockWorkList) {
30                         for i := range list.NextItem {
31                                 req := i.(DeleteRequest)
32                                 if time.Now() > req.age {
33                                         deleteBlock(req.hash)
34                                 }
35                         }
36                 }(trashList)
37
38                 // Set up a HTTP handler for PUT /trash
39                 router.HandleFunc(`/trash`,
40                         func(w http.ResponseWriter, req *http.Request) {
41                                 // Parse the request body into a list.List
42                                 // of DeleteRequests, and give this list to the
43                                 // trash collector.
44                                 trash := parseBody(req.Body)
45                                 trashList.ReplaceList(trash)
46                         }).Methods("PUT")
47
48    Methods available on a BlockWorkList:
49
50                 ReplaceList(list)
51                         Replaces the current item list with a new one.  The list
52             manager discards any unprocessed items on the existing
53             list and replaces it with the new one. If the worker is
54             processing a list item when ReplaceList is called, it
55             finishes processing before receiving items from the new
56             list.
57                 Close()
58                         Shuts down the manager and the worker cleanly.
59 */
60
61 import "container/list"
62
63 type BlockWorkList struct {
64         items    *list.List
65         newlist  chan *list.List
66         NextItem chan *list.Element
67 }
68
69 // NewBlockWorkList returns a new worklist, and launches a listener
70 // goroutine that waits for work and farms it out to workers.
71 //
72 func NewBlockWorkList() *BlockWorkList {
73         b := BlockWorkList{
74                 items:    nil,
75                 newlist:  make(chan *list.List),
76                 NextItem: make(chan *list.Element),
77         }
78         go b.listen()
79         return &b
80 }
81
82 // ReplaceList sends a new list of pull requests to the manager goroutine.
83 // The manager will discard any outstanding pull list and begin
84 // working on the new list.
85 //
86 func (b *BlockWorkList) ReplaceList(list *list.List) {
87         b.newlist <- list
88 }
89
90 // Close shuts down the manager and terminates the goroutine, which
91 // completes any pull request in progress and abandons any pending
92 // requests.
93 //
94 func (b *BlockWorkList) Close() {
95         close(b.newlist)
96 }
97
98 // listen is run in a goroutine. It reads new pull lists from its
99 // input queue until the queue is closed.
100 func (b *BlockWorkList) listen() {
101         var (
102                 current_list *list.List
103                 current_item *list.Element
104         )
105
106         // When we're done, close the output channel to shut down any
107         // workers.
108         defer close(b.NextItem)
109
110         for {
111                 // If the current list is empty, wait for a new list before
112                 // even checking if workers are ready.
113                 if current_item == nil {
114                         if p, ok := <-b.newlist; ok {
115                                 current_list = p
116                         } else {
117                                 // The channel was closed; shut down.
118                                 return
119                         }
120                         current_item = current_list.Front()
121                 }
122                 select {
123                 case p, ok := <-b.newlist:
124                         if ok {
125                                 current_list = p
126                                 current_item = current_list.Front()
127                         } else {
128                                 // The input channel is closed; time to shut down
129                                 return
130                         }
131                 case b.NextItem <- current_item:
132                         current_item = current_item.Next()
133                 }
134         }
135 }