Merge branch 'master' into 3586-job-priority closes #3586
[arvados.git] / services / keepstore / pull_list / pull_list.go
1 package pull_list
2
3 /* The pull_list package manages a list of pull requests sent
4    by Data Manager.
5
6    The interface is:
7
8    pull_list.NewManager() creates and returns a pull_list.Manager. A
9    listener runs in a goroutine, waiting for new requests on its input
10    channels.
11
12    pull_list.SetList() assigns a new pull list to the manager. Any
13    existing list is discarded.
14
15    pull_list.GetList() reports the manager's current pull list.
16
17    pull_list.Close() shuts down the pull list manager.
18 */
19
20 type PullRequest struct {
21         Locator string
22         Servers []string
23 }
24
25 type Manager struct {
26         setlist chan []PullRequest // input channel for setting new lists
27         getlist chan []PullRequest // output channel for getting existing list
28 }
29
30 // NewManager returns a new Manager object.  It launches a goroutine that
31 // waits for pull requests.
32 //
33 func NewManager() *Manager {
34         r := Manager{
35                 make(chan []PullRequest),
36                 make(chan []PullRequest),
37         }
38         go r.listen()
39         return &r
40 }
41
42 // SetList sends a new list of pull requests to the manager goroutine.
43 // The manager will discard any outstanding pull list and begin
44 // working on the new list.
45 //
46 func (r *Manager) SetList(pr []PullRequest) {
47         r.setlist <- pr
48 }
49
50 // GetList reports the contents of the current pull list.
51 func (r *Manager) GetList() []PullRequest {
52         return <-r.getlist
53 }
54
55 // Close shuts down the manager and terminates the goroutine, which
56 // completes any pull request in progress and abandons any pending
57 // requests.
58 //
59 func (r *Manager) Close() {
60         close(r.setlist)
61 }
62
63 // listen is run in a goroutine. It reads new pull lists from its
64 // input queue until the queue is closed.
65 func (r *Manager) listen() {
66         var current []PullRequest
67         for {
68                 select {
69                 case p, ok := <-r.setlist:
70                         if ok {
71                                 current = p
72                         } else {
73                                 // The input channel is closed; time to shut down
74                                 close(r.getlist)
75                                 return
76                         }
77                 case r.getlist <- current:
78                         // no-op
79                 }
80         }
81 }