Merge branch 'master' into 9397-prepopulate-output-directory
[arvados.git] / sdk / go / dispatch / dispatch.go
1 // Framework for monitoring the Arvados container Queue, Locks container
2 // records, and runs goroutine callbacks which implement execution and
3 // monitoring of the containers.
4 package dispatch
5
6 import (
7         "git.curoverse.com/arvados.git/sdk/go/arvados"
8         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9         "log"
10         "sync"
11         "time"
12 )
13
14 const (
15         Queued    = arvados.ContainerStateQueued
16         Locked    = arvados.ContainerStateLocked
17         Running   = arvados.ContainerStateRunning
18         Complete  = arvados.ContainerStateComplete
19         Cancelled = arvados.ContainerStateCancelled
20 )
21
22 // Dispatcher holds the state of the dispatcher
23 type Dispatcher struct {
24         // The Arvados client
25         Arv *arvadosclient.ArvadosClient
26
27         // When a new queued container appears and is either already owned by
28         // this dispatcher or is successfully locked, the dispatcher will call
29         // go RunContainer().  The RunContainer() goroutine gets a channel over
30         // which it will receive updates to the container state.  The
31         // RunContainer() goroutine should only assume status updates come when
32         // the container record changes on the API server; if it needs to
33         // monitor the job submission to the underlying slurm/grid engine/etc
34         // queue it should spin up its own polling goroutines.  When the
35         // channel is closed, that means the container is no longer being
36         // handled by this dispatcher and the goroutine should terminate.  The
37         // goroutine is responsible for draining the 'status' channel, failure
38         // to do so may deadlock the dispatcher.
39         RunContainer func(*Dispatcher, arvados.Container, chan arvados.Container)
40
41         // Amount of time to wait between polling for updates.
42         PollPeriod time.Duration
43
44         // Minimum time between two attempts to run the same container
45         MinRetryPeriod time.Duration
46
47         mineMutex sync.Mutex
48         mineMap   map[string]chan arvados.Container
49         Auth      arvados.APIClientAuthorization
50
51         throttle throttle
52
53         stop chan struct{}
54 }
55
56 // Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
57 // for which this process is actively starting/monitoring.  Returns channel to
58 // be used to send container status updates.
59 func (dispatcher *Dispatcher) setMine(uuid string) chan arvados.Container {
60         dispatcher.mineMutex.Lock()
61         defer dispatcher.mineMutex.Unlock()
62         if ch, ok := dispatcher.mineMap[uuid]; ok {
63                 return ch
64         }
65
66         ch := make(chan arvados.Container)
67         dispatcher.mineMap[uuid] = ch
68         return ch
69 }
70
71 // Release a container which is no longer being monitored.
72 func (dispatcher *Dispatcher) notMine(uuid string) {
73         dispatcher.mineMutex.Lock()
74         defer dispatcher.mineMutex.Unlock()
75         if ch, ok := dispatcher.mineMap[uuid]; ok {
76                 close(ch)
77                 delete(dispatcher.mineMap, uuid)
78         }
79 }
80
81 // checkMine returns true if there is a channel for updates associated
82 // with container c.  If update is true, also send the container record on
83 // the channel.
84 func (dispatcher *Dispatcher) checkMine(c arvados.Container, update bool) bool {
85         dispatcher.mineMutex.Lock()
86         defer dispatcher.mineMutex.Unlock()
87         ch, ok := dispatcher.mineMap[c.UUID]
88         if ok {
89                 if update {
90                         ch <- c
91                 }
92                 return true
93         }
94         return false
95 }
96
97 func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) {
98         var containers arvados.ContainerList
99         err := dispatcher.Arv.List("containers", params, &containers)
100         if err != nil {
101                 log.Printf("Error getting list of containers: %q", err)
102                 return
103         }
104
105         if containers.ItemsAvailable > len(containers.Items) {
106                 // TODO: support paging
107                 log.Printf("Warning!  %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
108                         containers.ItemsAvailable,
109                         len(containers.Items))
110         }
111         for _, container := range containers.Items {
112                 touched[container.UUID] = true
113                 dispatcher.handleUpdate(container)
114         }
115 }
116
117 func (dispatcher *Dispatcher) pollContainers(stop chan struct{}) {
118         ticker := time.NewTicker(dispatcher.PollPeriod)
119         defer ticker.Stop()
120
121         paramsQ := arvadosclient.Dict{
122                 "filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}},
123                 "order":   []string{"priority desc"},
124                 "limit":   "1000"}
125         paramsP := arvadosclient.Dict{
126                 "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.Auth.UUID}},
127                 "limit":   "1000"}
128
129         for {
130                 touched := make(map[string]bool)
131                 dispatcher.getContainers(paramsQ, touched)
132                 dispatcher.getContainers(paramsP, touched)
133                 dispatcher.mineMutex.Lock()
134                 var monitored []string
135                 for k := range dispatcher.mineMap {
136                         if _, ok := touched[k]; !ok {
137                                 monitored = append(monitored, k)
138                         }
139                 }
140                 dispatcher.mineMutex.Unlock()
141                 if monitored != nil {
142                         dispatcher.getContainers(arvadosclient.Dict{
143                                 "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
144                 }
145                 select {
146                 case <-ticker.C:
147                 case <-stop:
148                         return
149                 }
150         }
151 }
152
153 func (dispatcher *Dispatcher) handleUpdate(container arvados.Container) {
154         if container.State == Queued && dispatcher.checkMine(container, false) {
155                 // If we previously started the job, something failed, and it
156                 // was re-queued, this dispatcher might still be monitoring it.
157                 // Stop the existing monitor, then try to lock and run it
158                 // again.
159                 dispatcher.notMine(container.UUID)
160         }
161
162         if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued {
163                 // If container is Complete, Cancelled, or Queued, LockedByUUID
164                 // will be nil.  If the container was formerly Locked, moved
165                 // back to Queued and then locked by another dispatcher,
166                 // LockedByUUID will be different.  In either case, we want
167                 // to stop monitoring it.
168                 log.Printf("Container %v now in state %q with locked_by_uuid %q", container.UUID, container.State, container.LockedByUUID)
169                 dispatcher.notMine(container.UUID)
170                 return
171         }
172
173         if dispatcher.checkMine(container, true) {
174                 // Already monitored, sent status update
175                 return
176         }
177
178         if container.State == Queued && container.Priority > 0 {
179                 if !dispatcher.throttle.Check(container.UUID) {
180                         return
181                 }
182                 // Try to take the lock
183                 if err := dispatcher.Lock(container.UUID); err != nil {
184                         return
185                 }
186                 container.State = Locked
187         }
188
189         if container.State == Locked || container.State == Running {
190                 // Not currently monitored but in Locked or Running state and
191                 // owned by this dispatcher, so start monitoring.
192                 go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID))
193         }
194 }
195
196 // UpdateState makes an API call to change the state of a container.
197 func (dispatcher *Dispatcher) UpdateState(uuid string, newState arvados.ContainerState) error {
198         err := dispatcher.Arv.Update("containers", uuid,
199                 arvadosclient.Dict{
200                         "container": arvadosclient.Dict{"state": newState}},
201                 nil)
202         if err != nil {
203                 log.Printf("Error updating container %s to state %q: %q", uuid, newState, err)
204         }
205         return err
206 }
207
208 // Lock makes the lock API call which updates the state of a container to Locked.
209 func (dispatcher *Dispatcher) Lock(uuid string) error {
210         err := dispatcher.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
211         if err != nil {
212                 log.Printf("Error locking container %s: %q", uuid, err)
213         }
214         return err
215 }
216
217 // Unlock makes the unlock API call which updates the state of a container to Queued.
218 func (dispatcher *Dispatcher) Unlock(uuid string) error {
219         err := dispatcher.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
220         if err != nil {
221                 log.Printf("Error unlocking container %s: %q", uuid, err)
222         }
223         return err
224 }
225
226 // Stop causes Run to return after the current polling cycle.
227 func (dispatcher *Dispatcher) Stop() {
228         if dispatcher.stop == nil {
229                 // already stopped
230                 return
231         }
232         close(dispatcher.stop)
233         dispatcher.stop = nil
234 }
235
236 // Run runs the main loop of the dispatcher.
237 func (dispatcher *Dispatcher) Run() (err error) {
238         err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
239         if err != nil {
240                 log.Printf("Error getting my token UUID: %v", err)
241                 return
242         }
243
244         dispatcher.mineMap = make(map[string]chan arvados.Container)
245         dispatcher.stop = make(chan struct{})
246         dispatcher.throttle.hold = dispatcher.MinRetryPeriod
247         dispatcher.pollContainers(dispatcher.stop)
248         return nil
249 }