Merge branch 'master' into 8650-container-work-unit
[arvados.git] / sdk / go / dispatch / dispatch.go
1 // Framework for monitoring the Arvados container Queue, Locks container
2 // records, and runs goroutine callbacks which implement execution and
3 // monitoring of the containers.
4 package dispatch
5
6 import (
7         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8         "log"
9         "os"
10         "os/signal"
11         "sync"
12         "syscall"
13         "time"
14 )
15
16 // Constants for container states
17 const (
18         Queued    = "Queued"
19         Locked    = "Locked"
20         Running   = "Running"
21         Complete  = "Complete"
22         Cancelled = "Cancelled"
23 )
24
25 type apiClientAuthorization struct {
26         UUID     string `json:"uuid"`
27         APIToken string `json:"api_token"`
28 }
29
30 type apiClientAuthorizationList struct {
31         Items []apiClientAuthorization `json:"items"`
32 }
33
34 // Represents an Arvados container record
35 type Container struct {
36         UUID               string           `json:"uuid"`
37         State              string           `json:"state"`
38         Priority           int              `json:"priority"`
39         RuntimeConstraints map[string]int64 `json:"runtime_constraints"`
40         LockedByUUID       string           `json:"locked_by_uuid"`
41 }
42
43 // ContainerList is a list of the containers from api
44 type ContainerList struct {
45         Items          []Container `json:"items"`
46         ItemsAvailable int         `json:"items_available"`
47 }
48
49 // Dispatcher holds the state of the dispatcher
50 type Dispatcher struct {
51         // The Arvados client
52         Arv arvadosclient.ArvadosClient
53
54         // When a new queued container appears and is either already owned by
55         // this dispatcher or is successfully locked, the dispatcher will call
56         // go RunContainer().  The RunContainer() goroutine gets a channel over
57         // which it will receive updates to the container state.  The
58         // RunContainer() goroutine should only assume status updates come when
59         // the container record changes on the API server; if it needs to
60         // monitor the job submission to the underlying slurm/grid engine/etc
61         // queue it should spin up its own polling goroutines.  When the
62         // channel is closed, that means the container is no longer being
63         // handled by this dispatcher and the goroutine should terminate.  The
64         // goroutine is responsible for draining the 'status' channel, failure
65         // to do so may deadlock the dispatcher.
66         RunContainer func(*Dispatcher, Container, chan Container)
67
68         // Amount of time to wait between polling for updates.
69         PollInterval time.Duration
70
71         // Channel used to signal that RunDispatcher loop should exit.
72         DoneProcessing chan struct{}
73
74         mineMutex  sync.Mutex
75         mineMap    map[string]chan Container
76         Auth       apiClientAuthorization
77         containers chan Container
78 }
79
80 // Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
81 // for which this process is actively starting/monitoring.  Returns channel to
82 // be used to send container status updates.
83 func (dispatcher *Dispatcher) setMine(uuid string) chan Container {
84         dispatcher.mineMutex.Lock()
85         defer dispatcher.mineMutex.Unlock()
86         if ch, ok := dispatcher.mineMap[uuid]; ok {
87                 return ch
88         }
89
90         ch := make(chan Container)
91         dispatcher.mineMap[uuid] = ch
92         return ch
93 }
94
95 // Release a container which is no longer being monitored.
96 func (dispatcher *Dispatcher) notMine(uuid string) {
97         dispatcher.mineMutex.Lock()
98         defer dispatcher.mineMutex.Unlock()
99         if ch, ok := dispatcher.mineMap[uuid]; ok {
100                 close(ch)
101                 delete(dispatcher.mineMap, uuid)
102         }
103 }
104
105 // Check if there is a channel for updates associated with this container.  If
106 // so send the container record on the channel and return true, if not return
107 // false.
108 func (dispatcher *Dispatcher) updateMine(c Container) bool {
109         dispatcher.mineMutex.Lock()
110         defer dispatcher.mineMutex.Unlock()
111         ch, ok := dispatcher.mineMap[c.UUID]
112         if ok {
113                 ch <- c
114                 return true
115         }
116         return false
117 }
118
119 func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) {
120         var containers ContainerList
121         err := dispatcher.Arv.List("containers", params, &containers)
122         if err != nil {
123                 log.Printf("Error getting list of containers: %q", err)
124                 return
125         }
126
127         if containers.ItemsAvailable > len(containers.Items) {
128                 // TODO: support paging
129                 log.Printf("Warning!  %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
130                         containers.ItemsAvailable,
131                         len(containers.Items))
132         }
133         for _, container := range containers.Items {
134                 touched[container.UUID] = true
135                 dispatcher.containers <- container
136         }
137 }
138
139 func (dispatcher *Dispatcher) pollContainers() {
140         ticker := time.NewTicker(dispatcher.PollInterval)
141
142         paramsQ := arvadosclient.Dict{
143                 "filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}},
144                 "order":   []string{"priority desc"},
145                 "limit":   "1000"}
146         paramsP := arvadosclient.Dict{
147                 "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.Auth.UUID}},
148                 "limit":   "1000"}
149
150         for {
151                 select {
152                 case <-ticker.C:
153                         touched := make(map[string]bool)
154                         dispatcher.getContainers(paramsQ, touched)
155                         dispatcher.getContainers(paramsP, touched)
156                         dispatcher.mineMutex.Lock()
157                         var monitored []string
158                         for k := range dispatcher.mineMap {
159                                 if _, ok := touched[k]; !ok {
160                                         monitored = append(monitored, k)
161                                 }
162                         }
163                         dispatcher.mineMutex.Unlock()
164                         if monitored != nil {
165                                 dispatcher.getContainers(arvadosclient.Dict{
166                                         "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
167                         }
168                 case <-dispatcher.DoneProcessing:
169                         close(dispatcher.containers)
170                         ticker.Stop()
171                         return
172                 }
173         }
174 }
175
176 func (dispatcher *Dispatcher) handleUpdate(container Container) {
177         if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued {
178                 // If container is Complete, Cancelled, or Queued, LockedByUUID
179                 // will be nil.  If the container was formerly Locked, moved
180                 // back to Queued and then locked by another dispatcher,
181                 // LockedByUUID will be different.  In either case, we want
182                 // to stop monitoring it.
183                 log.Printf("Container %v now in state %q with locked_by_uuid %q", container.UUID, container.State, container.LockedByUUID)
184                 dispatcher.notMine(container.UUID)
185                 return
186         }
187
188         if dispatcher.updateMine(container) {
189                 // Already monitored, sent status update
190                 return
191         }
192
193         if container.State == Queued {
194                 // Try to take the lock
195                 if err := dispatcher.UpdateState(container.UUID, Locked); err != nil {
196                         return
197                 }
198                 container.State = Locked
199         }
200
201         if container.State == Locked || container.State == Running {
202                 // Not currently monitored but in Locked or Running state and
203                 // owned by this dispatcher, so start monitoring.
204                 go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID))
205         }
206 }
207
208 // UpdateState makes an API call to change the state of a container.
209 func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error {
210         err := dispatcher.Arv.Update("containers", uuid,
211                 arvadosclient.Dict{
212                         "container": arvadosclient.Dict{"state": newState}},
213                 nil)
214         if err != nil {
215                 log.Printf("Error updating container %s to state %q: %q", uuid, newState, err)
216         }
217         return err
218 }
219
220 // RunDispatcher runs the main loop of the dispatcher until receiving a message
221 // on the dispatcher.DoneProcessing channel.  It also installs a signal handler
222 // to terminate gracefully on SIGINT, SIGTERM or SIGQUIT.
223 func (dispatcher *Dispatcher) RunDispatcher() (err error) {
224         err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
225         if err != nil {
226                 log.Printf("Error getting my token UUID: %v", err)
227                 return
228         }
229
230         dispatcher.mineMap = make(map[string]chan Container)
231         dispatcher.containers = make(chan Container)
232
233         // Graceful shutdown on signal
234         sigChan := make(chan os.Signal)
235         signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
236
237         go func(sig <-chan os.Signal) {
238                 for sig := range sig {
239                         log.Printf("Caught signal: %v", sig)
240                         dispatcher.DoneProcessing <- struct{}{}
241                 }
242         }(sigChan)
243
244         defer close(sigChan)
245         defer signal.Stop(sigChan)
246
247         go dispatcher.pollContainers()
248         for container := range dispatcher.containers {
249                 dispatcher.handleUpdate(container)
250         }
251
252         return nil
253 }