9374: Consolidate various Container structs as arvados.Container.
[arvados.git] / sdk / go / dispatch / dispatch.go
1 // Framework for monitoring the Arvados container Queue, Locks container
2 // records, and runs goroutine callbacks which implement execution and
3 // monitoring of the containers.
4 package dispatch
5
6 import (
7         "git.curoverse.com/arvados.git/sdk/go/arvados"
8         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9         "log"
10         "os"
11         "os/signal"
12         "sync"
13         "syscall"
14         "time"
15 )
16
17 const (
18         Queued    = arvados.ContainerStateQueued
19         Locked    = arvados.ContainerStateLocked
20         Running   = arvados.ContainerStateRunning
21         Complete  = arvados.ContainerStateComplete
22         Cancelled = arvados.ContainerStateCancelled
23 )
24
25 type apiClientAuthorization struct {
26         UUID     string `json:"uuid"`
27         APIToken string `json:"api_token"`
28 }
29
30 type apiClientAuthorizationList struct {
31         Items []apiClientAuthorization `json:"items"`
32 }
33
34 // Dispatcher holds the state of the dispatcher
35 type Dispatcher struct {
36         // The Arvados client
37         Arv arvadosclient.ArvadosClient
38
39         // When a new queued container appears and is either already owned by
40         // this dispatcher or is successfully locked, the dispatcher will call
41         // go RunContainer().  The RunContainer() goroutine gets a channel over
42         // which it will receive updates to the container state.  The
43         // RunContainer() goroutine should only assume status updates come when
44         // the container record changes on the API server; if it needs to
45         // monitor the job submission to the underlying slurm/grid engine/etc
46         // queue it should spin up its own polling goroutines.  When the
47         // channel is closed, that means the container is no longer being
48         // handled by this dispatcher and the goroutine should terminate.  The
49         // goroutine is responsible for draining the 'status' channel, failure
50         // to do so may deadlock the dispatcher.
51         RunContainer func(*Dispatcher, arvados.Container, chan arvados.Container)
52
53         // Amount of time to wait between polling for updates.
54         PollInterval time.Duration
55
56         // Channel used to signal that RunDispatcher loop should exit.
57         DoneProcessing chan struct{}
58
59         mineMutex  sync.Mutex
60         mineMap    map[string]chan arvados.Container
61         Auth       apiClientAuthorization
62         containers chan arvados.Container
63 }
64
65 // Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
66 // for which this process is actively starting/monitoring.  Returns channel to
67 // be used to send container status updates.
68 func (dispatcher *Dispatcher) setMine(uuid string) chan arvados.Container {
69         dispatcher.mineMutex.Lock()
70         defer dispatcher.mineMutex.Unlock()
71         if ch, ok := dispatcher.mineMap[uuid]; ok {
72                 return ch
73         }
74
75         ch := make(chan arvados.Container)
76         dispatcher.mineMap[uuid] = ch
77         return ch
78 }
79
80 // Release a container which is no longer being monitored.
81 func (dispatcher *Dispatcher) notMine(uuid string) {
82         dispatcher.mineMutex.Lock()
83         defer dispatcher.mineMutex.Unlock()
84         if ch, ok := dispatcher.mineMap[uuid]; ok {
85                 close(ch)
86                 delete(dispatcher.mineMap, uuid)
87         }
88 }
89
90 // checkMine returns true if there is a channel for updates associated
91 // with container c.  If update is true, also send the container record on
92 // the channel.
93 func (dispatcher *Dispatcher) checkMine(c arvados.Container, update bool) bool {
94         dispatcher.mineMutex.Lock()
95         defer dispatcher.mineMutex.Unlock()
96         ch, ok := dispatcher.mineMap[c.UUID]
97         if ok {
98                 if update {
99                         ch <- c
100                 }
101                 return true
102         }
103         return false
104 }
105
106 func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) {
107         var containers arvados.ContainerList
108         err := dispatcher.Arv.List("containers", params, &containers)
109         if err != nil {
110                 log.Printf("Error getting list of containers: %q", err)
111                 return
112         }
113
114         if containers.ItemsAvailable > len(containers.Items) {
115                 // TODO: support paging
116                 log.Printf("Warning!  %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
117                         containers.ItemsAvailable,
118                         len(containers.Items))
119         }
120         for _, container := range containers.Items {
121                 touched[container.UUID] = true
122                 dispatcher.containers <- container
123         }
124 }
125
126 func (dispatcher *Dispatcher) pollContainers() {
127         ticker := time.NewTicker(dispatcher.PollInterval)
128
129         paramsQ := arvadosclient.Dict{
130                 "filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}},
131                 "order":   []string{"priority desc"},
132                 "limit":   "1000"}
133         paramsP := arvadosclient.Dict{
134                 "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.Auth.UUID}},
135                 "limit":   "1000"}
136
137         for {
138                 select {
139                 case <-ticker.C:
140                         touched := make(map[string]bool)
141                         dispatcher.getContainers(paramsQ, touched)
142                         dispatcher.getContainers(paramsP, touched)
143                         dispatcher.mineMutex.Lock()
144                         var monitored []string
145                         for k := range dispatcher.mineMap {
146                                 if _, ok := touched[k]; !ok {
147                                         monitored = append(monitored, k)
148                                 }
149                         }
150                         dispatcher.mineMutex.Unlock()
151                         if monitored != nil {
152                                 dispatcher.getContainers(arvadosclient.Dict{
153                                         "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
154                         }
155                 case <-dispatcher.DoneProcessing:
156                         close(dispatcher.containers)
157                         ticker.Stop()
158                         return
159                 }
160         }
161 }
162
163 func (dispatcher *Dispatcher) handleUpdate(container arvados.Container) {
164         if container.State == Queued && dispatcher.checkMine(container, false) {
165                 // If we previously started the job, something failed, and it
166                 // was re-queued, this dispatcher might still be monitoring it.
167                 // Stop the existing monitor, then try to lock and run it
168                 // again.
169                 dispatcher.notMine(container.UUID)
170         }
171
172         if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued {
173                 // If container is Complete, Cancelled, or Queued, LockedByUUID
174                 // will be nil.  If the container was formerly Locked, moved
175                 // back to Queued and then locked by another dispatcher,
176                 // LockedByUUID will be different.  In either case, we want
177                 // to stop monitoring it.
178                 log.Printf("Container %v now in state %q with locked_by_uuid %q", container.UUID, container.State, container.LockedByUUID)
179                 dispatcher.notMine(container.UUID)
180                 return
181         }
182
183         if dispatcher.checkMine(container, true) {
184                 // Already monitored, sent status update
185                 return
186         }
187
188         if container.State == Queued && container.Priority > 0 {
189                 // Try to take the lock
190                 if err := dispatcher.UpdateState(container.UUID, Locked); err != nil {
191                         return
192                 }
193                 container.State = Locked
194         }
195
196         if container.State == Locked || container.State == Running {
197                 // Not currently monitored but in Locked or Running state and
198                 // owned by this dispatcher, so start monitoring.
199                 go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID))
200         }
201 }
202
203 // UpdateState makes an API call to change the state of a container.
204 func (dispatcher *Dispatcher) UpdateState(uuid string, newState arvados.ContainerState) error {
205         err := dispatcher.Arv.Update("containers", uuid,
206                 arvadosclient.Dict{
207                         "container": arvadosclient.Dict{"state": newState}},
208                 nil)
209         if err != nil {
210                 log.Printf("Error updating container %s to state %q: %q", uuid, newState, err)
211         }
212         return err
213 }
214
215 // RunDispatcher runs the main loop of the dispatcher until receiving a message
216 // on the dispatcher.DoneProcessing channel.  It also installs a signal handler
217 // to terminate gracefully on SIGINT, SIGTERM or SIGQUIT.
218 func (dispatcher *Dispatcher) RunDispatcher() (err error) {
219         err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
220         if err != nil {
221                 log.Printf("Error getting my token UUID: %v", err)
222                 return
223         }
224
225         dispatcher.mineMap = make(map[string]chan arvados.Container)
226         dispatcher.containers = make(chan arvados.Container)
227
228         // Graceful shutdown on signal
229         sigChan := make(chan os.Signal)
230         signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
231
232         go func(sig <-chan os.Signal) {
233                 for sig := range sig {
234                         log.Printf("Caught signal: %v", sig)
235                         dispatcher.DoneProcessing <- struct{}{}
236                 }
237         }(sigChan)
238
239         defer close(sigChan)
240         defer signal.Stop(sigChan)
241
242         go dispatcher.pollContainers()
243         for container := range dispatcher.containers {
244                 dispatcher.handleUpdate(container)
245         }
246
247         return nil
248 }