Merge branch 'master' into 9318-dashboard-uses-work-units
[arvados.git] / sdk / go / dispatch / dispatch.go
1 // Framework for monitoring the Arvados container Queue, Locks container
2 // records, and runs goroutine callbacks which implement execution and
3 // monitoring of the containers.
4 package dispatch
5
6 import (
7         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8         "log"
9         "os"
10         "os/signal"
11         "sync"
12         "syscall"
13         "time"
14 )
15
16 // Constants for container states
17 const (
18         Queued    = "Queued"
19         Locked    = "Locked"
20         Running   = "Running"
21         Complete  = "Complete"
22         Cancelled = "Cancelled"
23 )
24
25 type apiClientAuthorization struct {
26         UUID     string `json:"uuid"`
27         APIToken string `json:"api_token"`
28 }
29
30 type apiClientAuthorizationList struct {
31         Items []apiClientAuthorization `json:"items"`
32 }
33
34 // Represents an Arvados container record
35 type Container struct {
36         UUID               string           `json:"uuid"`
37         State              string           `json:"state"`
38         Priority           int              `json:"priority"`
39         RuntimeConstraints map[string]int64 `json:"runtime_constraints"`
40         LockedByUUID       string           `json:"locked_by_uuid"`
41 }
42
43 // ContainerList is a list of the containers from api
44 type ContainerList struct {
45         Items          []Container `json:"items"`
46         ItemsAvailable int         `json:"items_available"`
47 }
48
49 // Dispatcher holds the state of the dispatcher
50 type Dispatcher struct {
51         // The Arvados client
52         Arv arvadosclient.ArvadosClient
53
54         // When a new queued container appears and is either already owned by
55         // this dispatcher or is successfully locked, the dispatcher will call
56         // go RunContainer().  The RunContainer() goroutine gets a channel over
57         // which it will receive updates to the container state.  The
58         // RunContainer() goroutine should only assume status updates come when
59         // the container record changes on the API server; if it needs to
60         // monitor the job submission to the underlying slurm/grid engine/etc
61         // queue it should spin up its own polling goroutines.  When the
62         // channel is closed, that means the container is no longer being
63         // handled by this dispatcher and the goroutine should terminate.  The
64         // goroutine is responsible for draining the 'status' channel, failure
65         // to do so may deadlock the dispatcher.
66         RunContainer func(*Dispatcher, Container, chan Container)
67
68         // Amount of time to wait between polling for updates.
69         PollInterval time.Duration
70
71         // Channel used to signal that RunDispatcher loop should exit.
72         DoneProcessing chan struct{}
73
74         mineMutex  sync.Mutex
75         mineMap    map[string]chan Container
76         Auth       apiClientAuthorization
77         containers chan Container
78 }
79
80 // Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
81 // for which this process is actively starting/monitoring.  Returns channel to
82 // be used to send container status updates.
83 func (dispatcher *Dispatcher) setMine(uuid string) chan Container {
84         dispatcher.mineMutex.Lock()
85         defer dispatcher.mineMutex.Unlock()
86         if ch, ok := dispatcher.mineMap[uuid]; ok {
87                 return ch
88         }
89
90         ch := make(chan Container)
91         dispatcher.mineMap[uuid] = ch
92         return ch
93 }
94
95 // Release a container which is no longer being monitored.
96 func (dispatcher *Dispatcher) notMine(uuid string) {
97         dispatcher.mineMutex.Lock()
98         defer dispatcher.mineMutex.Unlock()
99         if ch, ok := dispatcher.mineMap[uuid]; ok {
100                 close(ch)
101                 delete(dispatcher.mineMap, uuid)
102         }
103 }
104
105 // checkMine returns true/false if there is a channel for updates associated
106 // with container c.  If update is true, also send the container record on
107 // the channel.
108 func (dispatcher *Dispatcher) checkMine(c Container, update bool) bool {
109         dispatcher.mineMutex.Lock()
110         defer dispatcher.mineMutex.Unlock()
111         ch, ok := dispatcher.mineMap[c.UUID]
112         if ok {
113                 if update {
114                         ch <- c
115                 }
116                 return true
117         }
118         return false
119 }
120
121 func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) {
122         var containers ContainerList
123         err := dispatcher.Arv.List("containers", params, &containers)
124         if err != nil {
125                 log.Printf("Error getting list of containers: %q", err)
126                 return
127         }
128
129         if containers.ItemsAvailable > len(containers.Items) {
130                 // TODO: support paging
131                 log.Printf("Warning!  %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
132                         containers.ItemsAvailable,
133                         len(containers.Items))
134         }
135         for _, container := range containers.Items {
136                 touched[container.UUID] = true
137                 dispatcher.containers <- container
138         }
139 }
140
141 func (dispatcher *Dispatcher) pollContainers() {
142         ticker := time.NewTicker(dispatcher.PollInterval)
143
144         paramsQ := arvadosclient.Dict{
145                 "filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}},
146                 "order":   []string{"priority desc"},
147                 "limit":   "1000"}
148         paramsP := arvadosclient.Dict{
149                 "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.Auth.UUID}},
150                 "limit":   "1000"}
151
152         for {
153                 select {
154                 case <-ticker.C:
155                         touched := make(map[string]bool)
156                         dispatcher.getContainers(paramsQ, touched)
157                         dispatcher.getContainers(paramsP, touched)
158                         dispatcher.mineMutex.Lock()
159                         var monitored []string
160                         for k := range dispatcher.mineMap {
161                                 if _, ok := touched[k]; !ok {
162                                         monitored = append(monitored, k)
163                                 }
164                         }
165                         dispatcher.mineMutex.Unlock()
166                         if monitored != nil {
167                                 dispatcher.getContainers(arvadosclient.Dict{
168                                         "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
169                         }
170                 case <-dispatcher.DoneProcessing:
171                         close(dispatcher.containers)
172                         ticker.Stop()
173                         return
174                 }
175         }
176 }
177
178 func (dispatcher *Dispatcher) handleUpdate(container Container) {
179         if container.State == Queued && dispatcher.checkMine(container, false) {
180                 // If we previously started the job, something failed, and it
181                 // was re-queued, this dispatcher might still be monitoring it.
182                 // Stop the existing monitor, then try to lock and run it
183                 // again.
184                 dispatcher.notMine(container.UUID)
185         }
186
187         if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued {
188                 // If container is Complete, Cancelled, or Queued, LockedByUUID
189                 // will be nil.  If the container was formerly Locked, moved
190                 // back to Queued and then locked by another dispatcher,
191                 // LockedByUUID will be different.  In either case, we want
192                 // to stop monitoring it.
193                 log.Printf("Container %v now in state %q with locked_by_uuid %q", container.UUID, container.State, container.LockedByUUID)
194                 dispatcher.notMine(container.UUID)
195                 return
196         }
197
198         if dispatcher.checkMine(container, true) {
199                 // Already monitored, sent status update
200                 return
201         }
202
203         if container.State == Queued && container.Priority > 0 {
204                 // Try to take the lock
205                 if err := dispatcher.UpdateState(container.UUID, Locked); err != nil {
206                         return
207                 }
208                 container.State = Locked
209         }
210
211         if container.State == Locked || container.State == Running {
212                 // Not currently monitored but in Locked or Running state and
213                 // owned by this dispatcher, so start monitoring.
214                 go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID))
215         }
216 }
217
218 // UpdateState makes an API call to change the state of a container.
219 func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error {
220         err := dispatcher.Arv.Update("containers", uuid,
221                 arvadosclient.Dict{
222                         "container": arvadosclient.Dict{"state": newState}},
223                 nil)
224         if err != nil {
225                 log.Printf("Error updating container %s to state %q: %q", uuid, newState, err)
226         }
227         return err
228 }
229
230 // RunDispatcher runs the main loop of the dispatcher until receiving a message
231 // on the dispatcher.DoneProcessing channel.  It also installs a signal handler
232 // to terminate gracefully on SIGINT, SIGTERM or SIGQUIT.
233 func (dispatcher *Dispatcher) RunDispatcher() (err error) {
234         err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
235         if err != nil {
236                 log.Printf("Error getting my token UUID: %v", err)
237                 return
238         }
239
240         dispatcher.mineMap = make(map[string]chan Container)
241         dispatcher.containers = make(chan Container)
242
243         // Graceful shutdown on signal
244         sigChan := make(chan os.Signal)
245         signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
246
247         go func(sig <-chan os.Signal) {
248                 for sig := range sig {
249                         log.Printf("Caught signal: %v", sig)
250                         dispatcher.DoneProcessing <- struct{}{}
251                 }
252         }(sigChan)
253
254         defer close(sigChan)
255         defer signal.Stop(sigChan)
256
257         go dispatcher.pollContainers()
258         for container := range dispatcher.containers {
259                 dispatcher.handleUpdate(container)
260         }
261
262         return nil
263 }