1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
5 // Package dispatch is a helper library for building Arvados container
16 "git.curoverse.com/arvados.git/sdk/go/arvados"
17 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
21 Queued = arvados.ContainerStateQueued
22 Locked = arvados.ContainerStateLocked
23 Running = arvados.ContainerStateRunning
24 Complete = arvados.ContainerStateComplete
25 Cancelled = arvados.ContainerStateCancelled
29 type Dispatcher struct {
30 Arv *arvadosclient.ArvadosClient
32 // Queue polling frequency
33 PollPeriod time.Duration
35 // Time to wait between successive attempts to run the same container
36 MinRetryPeriod time.Duration
38 // Func that implements the container lifecycle. Must be set
39 // to a non-nil DispatchFunc before calling Run().
40 RunContainer DispatchFunc
42 auth arvados.APIClientAuthorization
44 trackers map[string]*runTracker
48 // A DispatchFunc executes a container (if the container record is
49 // Locked) or resume monitoring an already-running container, and wait
50 // until that container exits.
52 // While the container runs, the DispatchFunc should listen for
53 // updated container records on the provided channel. When the channel
54 // closes, the DispatchFunc should stop the container if it's still
55 // running, and return.
57 // The DispatchFunc should not return until the container is finished.
58 type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
60 // Run watches the API server's queue for containers that are either
61 // ready to run and available to lock, or are already locked by this
62 // dispatcher's token. When a new one appears, Run calls RunContainer
63 // in a new goroutine.
64 func (d *Dispatcher) Run(ctx context.Context) error {
65 err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
67 return fmt.Errorf("error getting my token UUID: %v", err)
70 d.throttle.hold = d.MinRetryPeriod
72 poll := time.NewTicker(d.PollPeriod)
83 todo := make(map[string]*runTracker)
85 // Make a copy of trackers
86 for uuid, tracker := range d.trackers {
91 // Containers I currently own (Locked/Running)
92 querySuccess := d.checkForUpdates([][]interface{}{
93 {"locked_by_uuid", "=", d.auth.UUID}}, todo)
95 // Containers I should try to dispatch
96 querySuccess = d.checkForUpdates([][]interface{}{
97 {"state", "=", Queued},
98 {"priority", ">", "0"}}, todo) && querySuccess
101 // There was an error in one of the previous queries,
102 // we probably didn't get updates for all the
103 // containers we should have. Don't check them
104 // individually because it may be expensive.
108 // Containers I know about but didn't fall into the
109 // above two categories (probably Complete/Cancelled)
111 for uuid := range todo {
112 missed = append(missed, uuid)
115 for len(missed) > 0 {
117 if len(missed) > 20 {
124 querySuccess = d.checkForUpdates([][]interface{}{
125 {"uuid", "in", batch}}, todo) && querySuccess
129 // There was an error in one of the previous queries, we probably
130 // didn't see all the containers we should have, so don't shut down
131 // the missed containers.
135 // Containers that I know about that didn't show up in any
136 // query should be let go.
137 for uuid, tracker := range todo {
138 log.Printf("Container %q not returned by any query, stopping tracking.", uuid)
145 // Start a runner in a new goroutine, and send the initial container
146 // record to its updates channel.
147 func (d *Dispatcher) start(c arvados.Container) *runTracker {
148 tracker := &runTracker{updates: make(chan arvados.Container, 1)}
151 d.RunContainer(d, c, tracker.updates)
152 // RunContainer blocks for the lifetime of the container. When
153 // it returns, the tracker should delete itself.
155 delete(d.trackers, c.UUID)
161 func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
162 params := arvadosclient.Dict{
164 "order": []string{"priority desc"}}
166 var list arvados.ContainerList
167 for offset, more := 0, true; more; offset += len(list.Items) {
168 params["offset"] = offset
169 err := d.Arv.List("containers", params, &list)
171 log.Printf("Error getting list of containers: %q", err)
174 more = len(list.Items) > 0 && list.ItemsAvailable > len(list.Items)+offset
175 d.checkListForUpdates(list.Items, todo)
180 func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo map[string]*runTracker) {
183 if d.trackers == nil {
184 d.trackers = make(map[string]*runTracker)
187 for _, c := range containers {
188 tracker, alreadyTracking := d.trackers[c.UUID]
191 if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
192 log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
193 } else if alreadyTracking {
197 case Locked, Running:
199 case Cancelled, Complete:
205 if !d.throttle.Check(c.UUID) {
208 err := d.lock(c.UUID)
210 log.Printf("debug: error locking container %s: %s", c.UUID, err)
214 d.trackers[c.UUID] = d.start(c)
215 case Locked, Running:
216 if !d.throttle.Check(c.UUID) {
219 d.trackers[c.UUID] = d.start(c)
220 case Cancelled, Complete:
221 // no-op (we already stopped monitoring)
227 // UpdateState makes an API call to change the state of a container.
228 func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) error {
229 err := d.Arv.Update("containers", uuid,
231 "container": arvadosclient.Dict{"state": state},
234 log.Printf("Error updating container %s to state %q: %s", uuid, state, err)
239 // Lock makes the lock API call which updates the state of a container to Locked.
240 func (d *Dispatcher) lock(uuid string) error {
241 return d.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
244 // Unlock makes the unlock API call which updates the state of a container to Queued.
245 func (d *Dispatcher) Unlock(uuid string) error {
246 return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
249 // TrackContainer ensures a tracker is running for the given UUID,
250 // regardless of the current state of the container (except: if the
251 // container is locked by a different dispatcher, a tracker will not
252 // be started). If the container is not in Locked or Running state,
253 // the new tracker will close down immediately.
255 // This allows the dispatcher to put its own RunContainer func into a
256 // cleanup phase (for example, to kill local processes created by a
257 // prevous dispatch process that are still running even though the
258 // container state is final) without the risk of having multiple
259 // goroutines monitoring the same UUID.
260 func (d *Dispatcher) TrackContainer(uuid string) error {
261 var cntr arvados.Container
262 err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr)
266 if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID {
272 if _, alreadyTracking := d.trackers[uuid]; alreadyTracking {
275 if d.trackers == nil {
276 d.trackers = make(map[string]*runTracker)
278 d.trackers[uuid] = d.start(cntr)
280 case Queued, Cancelled, Complete:
281 d.trackers[uuid].close()
286 type runTracker struct {
288 updates chan arvados.Container
291 func (tracker *runTracker) close() {
292 if !tracker.closing {
293 close(tracker.updates)
295 tracker.closing = true
298 func (tracker *runTracker) update(c arvados.Container) {
303 case <-tracker.updates:
304 log.Printf("debug: runner is handling updates slowly, discarded previous update for %s", c.UUID)