Merge branch 'master' into 9766-register-workflow
[arvados.git] / sdk / go / dispatch / dispatch.go
1 // Framework for monitoring the Arvados container Queue, Locks container
2 // records, and runs goroutine callbacks which implement execution and
3 // monitoring of the containers.
4 package dispatch
5
6 import (
7         "git.curoverse.com/arvados.git/sdk/go/arvados"
8         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9         "log"
10         "os"
11         "os/signal"
12         "sync"
13         "syscall"
14         "time"
15 )
16
17 const (
18         Queued    = arvados.ContainerStateQueued
19         Locked    = arvados.ContainerStateLocked
20         Running   = arvados.ContainerStateRunning
21         Complete  = arvados.ContainerStateComplete
22         Cancelled = arvados.ContainerStateCancelled
23 )
24
25 // Dispatcher holds the state of the dispatcher
26 type Dispatcher struct {
27         // The Arvados client
28         Arv arvadosclient.ArvadosClient
29
30         // When a new queued container appears and is either already owned by
31         // this dispatcher or is successfully locked, the dispatcher will call
32         // go RunContainer().  The RunContainer() goroutine gets a channel over
33         // which it will receive updates to the container state.  The
34         // RunContainer() goroutine should only assume status updates come when
35         // the container record changes on the API server; if it needs to
36         // monitor the job submission to the underlying slurm/grid engine/etc
37         // queue it should spin up its own polling goroutines.  When the
38         // channel is closed, that means the container is no longer being
39         // handled by this dispatcher and the goroutine should terminate.  The
40         // goroutine is responsible for draining the 'status' channel, failure
41         // to do so may deadlock the dispatcher.
42         RunContainer func(*Dispatcher, arvados.Container, chan arvados.Container)
43
44         // Amount of time to wait between polling for updates.
45         PollInterval time.Duration
46
47         // Channel used to signal that RunDispatcher loop should exit.
48         DoneProcessing chan struct{}
49
50         mineMutex  sync.Mutex
51         mineMap    map[string]chan arvados.Container
52         Auth       arvados.APIClientAuthorization
53         containers chan arvados.Container
54 }
55
56 // Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
57 // for which this process is actively starting/monitoring.  Returns channel to
58 // be used to send container status updates.
59 func (dispatcher *Dispatcher) setMine(uuid string) chan arvados.Container {
60         dispatcher.mineMutex.Lock()
61         defer dispatcher.mineMutex.Unlock()
62         if ch, ok := dispatcher.mineMap[uuid]; ok {
63                 return ch
64         }
65
66         ch := make(chan arvados.Container)
67         dispatcher.mineMap[uuid] = ch
68         return ch
69 }
70
71 // Release a container which is no longer being monitored.
72 func (dispatcher *Dispatcher) notMine(uuid string) {
73         dispatcher.mineMutex.Lock()
74         defer dispatcher.mineMutex.Unlock()
75         if ch, ok := dispatcher.mineMap[uuid]; ok {
76                 close(ch)
77                 delete(dispatcher.mineMap, uuid)
78         }
79 }
80
81 // checkMine returns true if there is a channel for updates associated
82 // with container c.  If update is true, also send the container record on
83 // the channel.
84 func (dispatcher *Dispatcher) checkMine(c arvados.Container, update bool) bool {
85         dispatcher.mineMutex.Lock()
86         defer dispatcher.mineMutex.Unlock()
87         ch, ok := dispatcher.mineMap[c.UUID]
88         if ok {
89                 if update {
90                         ch <- c
91                 }
92                 return true
93         }
94         return false
95 }
96
97 func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) {
98         var containers arvados.ContainerList
99         err := dispatcher.Arv.List("containers", params, &containers)
100         if err != nil {
101                 log.Printf("Error getting list of containers: %q", err)
102                 return
103         }
104
105         if containers.ItemsAvailable > len(containers.Items) {
106                 // TODO: support paging
107                 log.Printf("Warning!  %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
108                         containers.ItemsAvailable,
109                         len(containers.Items))
110         }
111         for _, container := range containers.Items {
112                 touched[container.UUID] = true
113                 dispatcher.containers <- container
114         }
115 }
116
117 func (dispatcher *Dispatcher) pollContainers() {
118         ticker := time.NewTicker(dispatcher.PollInterval)
119
120         paramsQ := arvadosclient.Dict{
121                 "filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}},
122                 "order":   []string{"priority desc"},
123                 "limit":   "1000"}
124         paramsP := arvadosclient.Dict{
125                 "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.Auth.UUID}},
126                 "limit":   "1000"}
127
128         for {
129                 select {
130                 case <-ticker.C:
131                         touched := make(map[string]bool)
132                         dispatcher.getContainers(paramsQ, touched)
133                         dispatcher.getContainers(paramsP, touched)
134                         dispatcher.mineMutex.Lock()
135                         var monitored []string
136                         for k := range dispatcher.mineMap {
137                                 if _, ok := touched[k]; !ok {
138                                         monitored = append(monitored, k)
139                                 }
140                         }
141                         dispatcher.mineMutex.Unlock()
142                         if monitored != nil {
143                                 dispatcher.getContainers(arvadosclient.Dict{
144                                         "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
145                         }
146                 case <-dispatcher.DoneProcessing:
147                         close(dispatcher.containers)
148                         ticker.Stop()
149                         return
150                 }
151         }
152 }
153
154 func (dispatcher *Dispatcher) handleUpdate(container arvados.Container) {
155         if container.State == Queued && dispatcher.checkMine(container, false) {
156                 // If we previously started the job, something failed, and it
157                 // was re-queued, this dispatcher might still be monitoring it.
158                 // Stop the existing monitor, then try to lock and run it
159                 // again.
160                 dispatcher.notMine(container.UUID)
161         }
162
163         if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued {
164                 // If container is Complete, Cancelled, or Queued, LockedByUUID
165                 // will be nil.  If the container was formerly Locked, moved
166                 // back to Queued and then locked by another dispatcher,
167                 // LockedByUUID will be different.  In either case, we want
168                 // to stop monitoring it.
169                 log.Printf("Container %v now in state %q with locked_by_uuid %q", container.UUID, container.State, container.LockedByUUID)
170                 dispatcher.notMine(container.UUID)
171                 return
172         }
173
174         if dispatcher.checkMine(container, true) {
175                 // Already monitored, sent status update
176                 return
177         }
178
179         if container.State == Queued && container.Priority > 0 {
180                 // Try to take the lock
181                 if err := dispatcher.Lock(container.UUID); err != nil {
182                         return
183                 }
184                 container.State = Locked
185         }
186
187         if container.State == Locked || container.State == Running {
188                 // Not currently monitored but in Locked or Running state and
189                 // owned by this dispatcher, so start monitoring.
190                 go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID))
191         }
192 }
193
194 // UpdateState makes an API call to change the state of a container.
195 func (dispatcher *Dispatcher) UpdateState(uuid string, newState arvados.ContainerState) error {
196         err := dispatcher.Arv.Update("containers", uuid,
197                 arvadosclient.Dict{
198                         "container": arvadosclient.Dict{"state": newState}},
199                 nil)
200         if err != nil {
201                 log.Printf("Error updating container %s to state %q: %q", uuid, newState, err)
202         }
203         return err
204 }
205
206 // Lock makes the lock API call which updates the state of a container to Locked.
207 func (dispatcher *Dispatcher) Lock(uuid string) error {
208         err := dispatcher.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
209         if err != nil {
210                 log.Printf("Error locking container %s: %q", uuid, err)
211         }
212         return err
213 }
214
215 // Unlock makes the unlock API call which updates the state of a container to Queued.
216 func (dispatcher *Dispatcher) Unlock(uuid string) error {
217         err := dispatcher.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
218         if err != nil {
219                 log.Printf("Error unlocking container %s: %q", uuid, err)
220         }
221         return err
222 }
223
224 // RunDispatcher runs the main loop of the dispatcher until receiving a message
225 // on the dispatcher.DoneProcessing channel.  It also installs a signal handler
226 // to terminate gracefully on SIGINT, SIGTERM or SIGQUIT.
227 func (dispatcher *Dispatcher) RunDispatcher() (err error) {
228         err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
229         if err != nil {
230                 log.Printf("Error getting my token UUID: %v", err)
231                 return
232         }
233
234         dispatcher.mineMap = make(map[string]chan arvados.Container)
235         dispatcher.containers = make(chan arvados.Container)
236
237         // Graceful shutdown on signal
238         sigChan := make(chan os.Signal)
239         signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
240
241         go func(sig <-chan os.Signal) {
242                 for sig := range sig {
243                         log.Printf("Caught signal: %v", sig)
244                         dispatcher.DoneProcessing <- struct{}{}
245                 }
246         }(sigChan)
247
248         defer close(sigChan)
249         defer signal.Stop(sigChan)
250
251         go dispatcher.pollContainers()
252         for container := range dispatcher.containers {
253                 dispatcher.handleUpdate(container)
254         }
255
256         return nil
257 }