1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
5 // Package dispatch is a helper library for building Arvados container
15 "git.arvados.org/arvados.git/sdk/go/arvados"
16 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
17 "github.com/sirupsen/logrus"
21 Queued = arvados.ContainerStateQueued
22 Locked = arvados.ContainerStateLocked
23 Running = arvados.ContainerStateRunning
24 Complete = arvados.ContainerStateComplete
25 Cancelled = arvados.ContainerStateCancelled
28 type Logger interface {
29 Printf(string, ...interface{})
30 Warnf(string, ...interface{})
31 Debugf(string, ...interface{})
35 type Dispatcher struct {
36 Arv *arvadosclient.ArvadosClient
40 // Batch size for container queries
43 // Queue polling frequency
44 PollPeriod time.Duration
46 // Time to wait between successive attempts to run the same container
47 MinRetryPeriod time.Duration
49 // Func that implements the container lifecycle. Must be set
50 // to a non-nil DispatchFunc before calling Run().
51 RunContainer DispatchFunc
53 auth arvados.APIClientAuthorization
55 trackers map[string]*runTracker
59 // A DispatchFunc executes a container (if the container record is
60 // Locked) or resume monitoring an already-running container, and wait
61 // until that container exits.
63 // While the container runs, the DispatchFunc should listen for
64 // updated container records on the provided channel. When the channel
65 // closes, the DispatchFunc should stop the container if it's still
66 // running, and return.
68 // The DispatchFunc should not return until the container is finished.
69 type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
71 // Run watches the API server's queue for containers that are either
72 // ready to run and available to lock, or are already locked by this
73 // dispatcher's token. When a new one appears, Run calls RunContainer
74 // in a new goroutine.
75 func (d *Dispatcher) Run(ctx context.Context) error {
77 d.Logger = logrus.StandardLogger()
80 err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
82 return fmt.Errorf("error getting my token UUID: %v", err)
85 d.throttle.hold = d.MinRetryPeriod
87 poll := time.NewTicker(d.PollPeriod)
102 todo := make(map[string]*runTracker)
104 // Make a copy of trackers
105 for uuid, tracker := range d.trackers {
110 // Containers I currently own (Locked/Running)
111 querySuccess := d.checkForUpdates([][]interface{}{
112 {"locked_by_uuid", "=", d.auth.UUID}}, todo)
114 // Containers I should try to dispatch
115 querySuccess = d.checkForUpdates([][]interface{}{
116 {"state", "=", Queued},
117 {"priority", ">", "0"}}, todo) && querySuccess
120 // There was an error in one of the previous queries,
121 // we probably didn't get updates for all the
122 // containers we should have. Don't check them
123 // individually because it may be expensive.
127 // Containers I know about but didn't fall into the
128 // above two categories (probably Complete/Cancelled)
130 for uuid := range todo {
131 missed = append(missed, uuid)
134 for len(missed) > 0 {
136 if len(missed) > 20 {
143 querySuccess = d.checkForUpdates([][]interface{}{
144 {"uuid", "in", batch}}, todo) && querySuccess
148 // There was an error in one of the previous queries, we probably
149 // didn't see all the containers we should have, so don't shut down
150 // the missed containers.
154 // Containers that I know about that didn't show up in any
155 // query should be let go.
156 for uuid, tracker := range todo {
157 d.Logger.Printf("Container %q not returned by any query, stopping tracking.", uuid)
164 // Start a runner in a new goroutine, and send the initial container
165 // record to its updates channel.
166 func (d *Dispatcher) start(c arvados.Container) *runTracker {
167 tracker := &runTracker{
168 updates: make(chan arvados.Container, 1),
173 d.RunContainer(d, c, tracker.updates)
174 // RunContainer blocks for the lifetime of the container. When
175 // it returns, the tracker should delete itself.
177 delete(d.trackers, c.UUID)
183 func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
184 var countList arvados.ContainerList
185 params := arvadosclient.Dict{
189 "order": []string{"priority desc"}}
190 err := d.Arv.List("containers", params, &countList)
192 d.Logger.Warnf("error getting count of containers: %q", err)
195 itemsAvailable := countList.ItemsAvailable
196 params = arvadosclient.Dict{
199 "limit": d.BatchSize,
200 "order": []string{"priority desc"}}
203 params["offset"] = offset
205 // This list variable must be a new one declared
206 // inside the loop: otherwise, items in the API
207 // response would get deep-merged into the items
208 // loaded in previous iterations.
209 var list arvados.ContainerList
211 err := d.Arv.List("containers", params, &list)
213 d.Logger.Warnf("error getting list of containers: %q", err)
216 d.checkListForUpdates(list.Items, todo)
217 offset += len(list.Items)
218 if len(list.Items) == 0 || itemsAvailable <= offset {
224 func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo map[string]*runTracker) {
227 if d.trackers == nil {
228 d.trackers = make(map[string]*runTracker)
231 for _, c := range containers {
232 tracker, alreadyTracking := d.trackers[c.UUID]
235 if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
236 d.Logger.Debugf("ignoring %s locked by %s", c.UUID, c.LockedByUUID)
237 } else if alreadyTracking {
241 case Locked, Running:
243 case Cancelled, Complete:
249 if !d.throttle.Check(c.UUID) {
252 err := d.lock(c.UUID)
254 d.Logger.Warnf("error locking container %s: %s", c.UUID, err)
258 d.trackers[c.UUID] = d.start(c)
259 case Locked, Running:
260 if !d.throttle.Check(c.UUID) {
263 d.trackers[c.UUID] = d.start(c)
264 case Cancelled, Complete:
265 // no-op (we already stopped monitoring)
271 // UpdateState makes an API call to change the state of a container.
272 func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) error {
273 err := d.Arv.Update("containers", uuid,
275 "container": arvadosclient.Dict{"state": state},
278 d.Logger.Warnf("error updating container %s to state %q: %s", uuid, state, err)
283 // Lock makes the lock API call which updates the state of a container to Locked.
284 func (d *Dispatcher) lock(uuid string) error {
285 return d.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
288 // Unlock makes the unlock API call which updates the state of a container to Queued.
289 func (d *Dispatcher) Unlock(uuid string) error {
290 return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
293 // TrackContainer ensures a tracker is running for the given UUID,
294 // regardless of the current state of the container (except: if the
295 // container is locked by a different dispatcher, a tracker will not
296 // be started). If the container is not in Locked or Running state,
297 // the new tracker will close down immediately.
299 // This allows the dispatcher to put its own RunContainer func into a
300 // cleanup phase (for example, to kill local processes created by a
301 // prevous dispatch process that are still running even though the
302 // container state is final) without the risk of having multiple
303 // goroutines monitoring the same UUID.
304 func (d *Dispatcher) TrackContainer(uuid string) error {
305 var cntr arvados.Container
306 err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr)
310 if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID {
316 if _, alreadyTracking := d.trackers[uuid]; alreadyTracking {
319 if d.trackers == nil {
320 d.trackers = make(map[string]*runTracker)
322 d.trackers[uuid] = d.start(cntr)
324 case Queued, Cancelled, Complete:
325 d.trackers[uuid].close()
330 type runTracker struct {
332 updates chan arvados.Container
336 func (tracker *runTracker) close() {
337 if !tracker.closing {
338 close(tracker.updates)
340 tracker.closing = true
343 func (tracker *runTracker) update(c arvados.Container) {
348 case <-tracker.updates:
349 tracker.logger.Debugf("runner is handling updates slowly, discarded previous update for %s", c.UUID)