Merge branch '8784-dir-listings'
[arvados.git] / sdk / go / dispatch / dispatch.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 // Package dispatch is a helper library for building Arvados container
6 // dispatchers.
7 package dispatch
8
9 import (
10         "context"
11         "fmt"
12         "log"
13         "sync"
14         "time"
15
16         "git.curoverse.com/arvados.git/sdk/go/arvados"
17         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
18 )
19
20 const (
21         Queued    = arvados.ContainerStateQueued
22         Locked    = arvados.ContainerStateLocked
23         Running   = arvados.ContainerStateRunning
24         Complete  = arvados.ContainerStateComplete
25         Cancelled = arvados.ContainerStateCancelled
26 )
27
28 // Dispatcher struct
29 type Dispatcher struct {
30         Arv *arvadosclient.ArvadosClient
31
32         // Queue polling frequency
33         PollPeriod time.Duration
34
35         // Time to wait between successive attempts to run the same container
36         MinRetryPeriod time.Duration
37
38         // Func that implements the container lifecycle. Must be set
39         // to a non-nil DispatchFunc before calling Run().
40         RunContainer DispatchFunc
41
42         auth     arvados.APIClientAuthorization
43         mtx      sync.Mutex
44         trackers map[string]*runTracker
45         throttle throttle
46 }
47
48 // A DispatchFunc executes a container (if the container record is
49 // Locked) or resume monitoring an already-running container, and wait
50 // until that container exits.
51 //
52 // While the container runs, the DispatchFunc should listen for
53 // updated container records on the provided channel. When the channel
54 // closes, the DispatchFunc should stop the container if it's still
55 // running, and return.
56 //
57 // The DispatchFunc should not return until the container is finished.
58 type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
59
60 // Run watches the API server's queue for containers that are either
61 // ready to run and available to lock, or are already locked by this
62 // dispatcher's token. When a new one appears, Run calls RunContainer
63 // in a new goroutine.
64 func (d *Dispatcher) Run(ctx context.Context) error {
65         err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
66         if err != nil {
67                 return fmt.Errorf("error getting my token UUID: %v", err)
68         }
69
70         d.throttle.hold = d.MinRetryPeriod
71
72         poll := time.NewTicker(d.PollPeriod)
73         defer poll.Stop()
74
75         for {
76                 tracked := d.trackedUUIDs()
77                 d.checkForUpdates([][]interface{}{
78                         {"uuid", "in", tracked}})
79                 d.checkForUpdates([][]interface{}{
80                         {"locked_by_uuid", "=", d.auth.UUID},
81                         {"uuid", "not in", tracked}})
82                 d.checkForUpdates([][]interface{}{
83                         {"state", "=", Queued},
84                         {"priority", ">", "0"},
85                         {"uuid", "not in", tracked}})
86                 select {
87                 case <-poll.C:
88                         continue
89                 case <-ctx.Done():
90                         return ctx.Err()
91                 }
92         }
93 }
94
95 func (d *Dispatcher) trackedUUIDs() []string {
96         d.mtx.Lock()
97         defer d.mtx.Unlock()
98         if len(d.trackers) == 0 {
99                 // API bug: ["uuid", "not in", []] does not work as
100                 // expected, but this does:
101                 return []string{"this-uuid-does-not-exist"}
102         }
103         uuids := make([]string, 0, len(d.trackers))
104         for x := range d.trackers {
105                 uuids = append(uuids, x)
106         }
107         return uuids
108 }
109
110 // Start a runner in a new goroutine, and send the initial container
111 // record to its updates channel.
112 func (d *Dispatcher) start(c arvados.Container) *runTracker {
113         tracker := &runTracker{updates: make(chan arvados.Container, 1)}
114         tracker.updates <- c
115         go func() {
116                 d.RunContainer(d, c, tracker.updates)
117
118                 d.mtx.Lock()
119                 delete(d.trackers, c.UUID)
120                 d.mtx.Unlock()
121         }()
122         return tracker
123 }
124
125 func (d *Dispatcher) checkForUpdates(filters [][]interface{}) {
126         params := arvadosclient.Dict{
127                 "filters": filters,
128                 "order":   []string{"priority desc"}}
129
130         var list arvados.ContainerList
131         for offset, more := 0, true; more; offset += len(list.Items) {
132                 params["offset"] = offset
133                 err := d.Arv.List("containers", params, &list)
134                 if err != nil {
135                         log.Printf("Error getting list of containers: %q", err)
136                         return
137                 }
138                 more = len(list.Items) > 0 && list.ItemsAvailable > len(list.Items)+offset
139                 d.checkListForUpdates(list.Items)
140         }
141 }
142
143 func (d *Dispatcher) checkListForUpdates(containers []arvados.Container) {
144         d.mtx.Lock()
145         defer d.mtx.Unlock()
146         if d.trackers == nil {
147                 d.trackers = make(map[string]*runTracker)
148         }
149
150         for _, c := range containers {
151                 tracker, alreadyTracking := d.trackers[c.UUID]
152                 if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
153                         log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
154                 } else if alreadyTracking {
155                         switch c.State {
156                         case Queued:
157                                 tracker.close()
158                         case Locked, Running:
159                                 tracker.update(c)
160                         case Cancelled, Complete:
161                                 tracker.close()
162                         }
163                 } else {
164                         switch c.State {
165                         case Queued:
166                                 if !d.throttle.Check(c.UUID) {
167                                         break
168                                 }
169                                 err := d.lock(c.UUID)
170                                 if err != nil {
171                                         log.Printf("debug: error locking container %s: %s", c.UUID, err)
172                                         break
173                                 }
174                                 c.State = Locked
175                                 d.trackers[c.UUID] = d.start(c)
176                         case Locked, Running:
177                                 if !d.throttle.Check(c.UUID) {
178                                         break
179                                 }
180                                 d.trackers[c.UUID] = d.start(c)
181                         case Cancelled, Complete:
182                                 // no-op (we already stopped monitoring)
183                         }
184                 }
185         }
186 }
187
188 // UpdateState makes an API call to change the state of a container.
189 func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) error {
190         err := d.Arv.Update("containers", uuid,
191                 arvadosclient.Dict{
192                         "container": arvadosclient.Dict{"state": state},
193                 }, nil)
194         if err != nil {
195                 log.Printf("Error updating container %s to state %q: %s", uuid, state, err)
196         }
197         return err
198 }
199
200 // Lock makes the lock API call which updates the state of a container to Locked.
201 func (d *Dispatcher) lock(uuid string) error {
202         return d.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
203 }
204
205 // Unlock makes the unlock API call which updates the state of a container to Queued.
206 func (d *Dispatcher) Unlock(uuid string) error {
207         return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
208 }
209
210 // TrackContainer ensures a tracker is running for the given UUID,
211 // regardless of the current state of the container (except: if the
212 // container is locked by a different dispatcher, a tracker will not
213 // be started). If the container is not in Locked or Running state,
214 // the new tracker will close down immediately.
215 //
216 // This allows the dispatcher to put its own RunContainer func into a
217 // cleanup phase (for example, to kill local processes created by a
218 // prevous dispatch process that are still running even though the
219 // container state is final) without the risk of having multiple
220 // goroutines monitoring the same UUID.
221 func (d *Dispatcher) TrackContainer(uuid string) error {
222         var cntr arvados.Container
223         err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr)
224         if err != nil {
225                 return err
226         }
227         if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID {
228                 return nil
229         }
230
231         d.mtx.Lock()
232         defer d.mtx.Unlock()
233         if _, alreadyTracking := d.trackers[uuid]; alreadyTracking {
234                 return nil
235         }
236         if d.trackers == nil {
237                 d.trackers = make(map[string]*runTracker)
238         }
239         d.trackers[uuid] = d.start(cntr)
240         switch cntr.State {
241         case Queued, Cancelled, Complete:
242                 d.trackers[uuid].close()
243         }
244         return nil
245 }
246
247 type runTracker struct {
248         closing bool
249         updates chan arvados.Container
250 }
251
252 func (tracker *runTracker) close() {
253         if !tracker.closing {
254                 close(tracker.updates)
255         }
256         tracker.closing = true
257 }
258
259 func (tracker *runTracker) update(c arvados.Container) {
260         if tracker.closing {
261                 return
262         }
263         select {
264         case <-tracker.updates:
265                 log.Printf("debug: runner is handling updates slowly, discarded previous update for %s", c.UUID)
266         default:
267         }
268         tracker.updates <- c
269 }