1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
5 // Package dispatch is a helper library for building Arvados container
16 "git.arvados.org/arvados.git/lib/dispatchcloud"
17 "git.arvados.org/arvados.git/sdk/go/arvados"
18 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
19 "github.com/sirupsen/logrus"
23 Queued = arvados.ContainerStateQueued
24 Locked = arvados.ContainerStateLocked
25 Running = arvados.ContainerStateRunning
26 Complete = arvados.ContainerStateComplete
27 Cancelled = arvados.ContainerStateCancelled
30 type Logger interface {
31 Printf(string, ...interface{})
32 Warnf(string, ...interface{})
33 Debugf(string, ...interface{})
37 type Dispatcher struct {
38 Arv *arvadosclient.ArvadosClient
42 // Batch size for container queries
45 // Queue polling frequency
46 PollPeriod time.Duration
48 // Time to wait between successive attempts to run the same container
49 MinRetryPeriod time.Duration
51 // Func that implements the container lifecycle. Must be set
52 // to a non-nil DispatchFunc before calling Run().
53 RunContainer DispatchFunc
55 auth arvados.APIClientAuthorization
57 trackers map[string]*runTracker
61 // A DispatchFunc executes a container (if the container record is
62 // Locked) or resume monitoring an already-running container, and wait
63 // until that container exits.
65 // While the container runs, the DispatchFunc should listen for
66 // updated container records on the provided channel. When the channel
67 // closes, the DispatchFunc should stop the container if it's still
68 // running, and return.
70 // The DispatchFunc should not return until the container is finished.
71 type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container) error
73 // Run watches the API server's queue for containers that are either
74 // ready to run and available to lock, or are already locked by this
75 // dispatcher's token. When a new one appears, Run calls RunContainer
76 // in a new goroutine.
77 func (d *Dispatcher) Run(ctx context.Context) error {
79 d.Logger = logrus.StandardLogger()
82 err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
84 return fmt.Errorf("error getting my token UUID: %v", err)
87 d.throttle.hold = d.MinRetryPeriod
89 poll := time.NewTicker(d.PollPeriod)
103 for _, tracker := range d.trackers {
109 todo := make(map[string]*runTracker)
111 // Make a copy of trackers
112 for uuid, tracker := range d.trackers {
117 // Containers I currently own (Locked/Running)
118 querySuccess := d.checkForUpdates([][]interface{}{
119 {"locked_by_uuid", "=", d.auth.UUID}}, todo)
121 // Containers I should try to dispatch
122 querySuccess = d.checkForUpdates([][]interface{}{
123 {"state", "=", Queued},
124 {"priority", ">", "0"}}, todo) && querySuccess
127 // There was an error in one of the previous queries,
128 // we probably didn't get updates for all the
129 // containers we should have. Don't check them
130 // individually because it may be expensive.
134 // Containers I know about but didn't fall into the
135 // above two categories (probably Complete/Cancelled)
137 for uuid := range todo {
138 missed = append(missed, uuid)
141 for len(missed) > 0 {
143 if len(missed) > 20 {
150 querySuccess = d.checkForUpdates([][]interface{}{
151 {"uuid", "in", batch}}, todo) && querySuccess
155 // There was an error in one of the previous queries, we probably
156 // didn't see all the containers we should have, so don't shut down
157 // the missed containers.
161 // Containers that I know about that didn't show up in any
162 // query should be let go.
163 for uuid, tracker := range todo {
164 d.Logger.Printf("Container %q not returned by any query, stopping tracking.", uuid)
171 // Start a runner in a new goroutine, and send the initial container
172 // record to its updates channel.
173 func (d *Dispatcher) start(c arvados.Container) *runTracker {
174 tracker := &runTracker{
175 updates: make(chan arvados.Container, 1),
180 fallbackState := Queued
181 err := d.RunContainer(d, c, tracker.updates)
183 text := fmt.Sprintf("Error running container %s: %s", c.UUID, err)
184 if err, ok := err.(dispatchcloud.ConstraintsNotSatisfiableError); ok {
185 fallbackState = Cancelled
186 var logBuf bytes.Buffer
187 fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", c.UUID, err)
188 if len(err.AvailableTypes) == 0 {
189 fmt.Fprint(&logBuf, "No instance types are configured.\n")
191 fmt.Fprint(&logBuf, "Available instance types:\n")
192 for _, t := range err.AvailableTypes {
194 "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
195 t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price)
198 text = logBuf.String()
200 d.Logger.Printf("%s", text)
201 lr := arvadosclient.Dict{"log": arvadosclient.Dict{
202 "object_uuid": c.UUID,
203 "event_type": "dispatch",
204 "properties": map[string]string{"text": text}}}
205 d.Arv.Create("logs", lr, nil)
207 // If checkListForUpdates() doesn't close the tracker
208 // after 2 queue updates, try to move the container to
209 // the fallback state, which should eventually work
210 // and cause the tracker to close.
212 for upd := range tracker.updates {
214 if upd.State == Locked || upd.State == Running {
215 // Tracker didn't clean up before
216 // returning -- or this is the first
217 // update and it contains stale
218 // information from before
219 // RunContainer() returned.
221 // Avoid generating confusing
222 // logs / API calls in the
226 d.Logger.Printf("container %s state is still %s, changing to %s", c.UUID, upd.State, fallbackState)
227 d.UpdateState(c.UUID, fallbackState)
234 func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
235 var countList arvados.ContainerList
236 params := arvadosclient.Dict{
240 "order": []string{"priority desc"}}
241 err := d.Arv.List("containers", params, &countList)
243 d.Logger.Warnf("error getting count of containers: %q", err)
246 itemsAvailable := countList.ItemsAvailable
247 params = arvadosclient.Dict{
250 "limit": d.BatchSize,
251 "order": []string{"priority desc"}}
254 params["offset"] = offset
256 // This list variable must be a new one declared
257 // inside the loop: otherwise, items in the API
258 // response would get deep-merged into the items
259 // loaded in previous iterations.
260 var list arvados.ContainerList
262 err := d.Arv.List("containers", params, &list)
264 d.Logger.Warnf("error getting list of containers: %q", err)
267 d.checkListForUpdates(list.Items, todo)
268 offset += len(list.Items)
269 if len(list.Items) == 0 || itemsAvailable <= offset {
275 func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo map[string]*runTracker) {
278 if d.trackers == nil {
279 d.trackers = make(map[string]*runTracker)
282 for _, c := range containers {
283 tracker, alreadyTracking := d.trackers[c.UUID]
286 if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
287 d.Logger.Debugf("ignoring %s locked by %s", c.UUID, c.LockedByUUID)
288 } else if alreadyTracking {
290 case Queued, Cancelled, Complete:
291 d.Logger.Debugf("update has %s in state %s, closing tracker", c.UUID, c.State)
293 delete(d.trackers, c.UUID)
294 case Locked, Running:
295 d.Logger.Debugf("update has %s in state %s, updating tracker", c.UUID, c.State)
301 if !d.throttle.Check(c.UUID) {
304 err := d.lock(c.UUID)
306 d.Logger.Warnf("error locking container %s: %s", c.UUID, err)
310 d.trackers[c.UUID] = d.start(c)
311 case Locked, Running:
312 if !d.throttle.Check(c.UUID) {
315 d.trackers[c.UUID] = d.start(c)
316 case Cancelled, Complete:
317 // no-op (we already stopped monitoring)
323 // UpdateState makes an API call to change the state of a container.
324 func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) error {
325 err := d.Arv.Update("containers", uuid,
327 "container": arvadosclient.Dict{"state": state},
330 d.Logger.Warnf("error updating container %s to state %q: %s", uuid, state, err)
335 // Lock makes the lock API call which updates the state of a container to Locked.
336 func (d *Dispatcher) lock(uuid string) error {
337 return d.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
340 // Unlock makes the unlock API call which updates the state of a container to Queued.
341 func (d *Dispatcher) Unlock(uuid string) error {
342 return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
345 // TrackContainer ensures a tracker is running for the given UUID,
346 // regardless of the current state of the container (except: if the
347 // container is locked by a different dispatcher, a tracker will not
348 // be started). If the container is not in Locked or Running state,
349 // the new tracker will close down immediately.
351 // This allows the dispatcher to put its own RunContainer func into a
352 // cleanup phase (for example, to kill local processes created by a
353 // prevous dispatch process that are still running even though the
354 // container state is final) without the risk of having multiple
355 // goroutines monitoring the same UUID.
356 func (d *Dispatcher) TrackContainer(uuid string) error {
357 var cntr arvados.Container
358 err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr)
362 if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID {
368 if _, alreadyTracking := d.trackers[uuid]; alreadyTracking {
371 if d.trackers == nil {
372 d.trackers = make(map[string]*runTracker)
374 d.trackers[uuid] = d.start(cntr)
376 case Queued, Cancelled, Complete:
377 d.trackers[uuid].close()
382 type runTracker struct {
384 updates chan arvados.Container
388 func (tracker *runTracker) close() {
389 if !tracker.closing {
390 close(tracker.updates)
392 tracker.closing = true
395 func (tracker *runTracker) update(c arvados.Container) {
400 case <-tracker.updates:
401 tracker.logger.Debugf("runner is handling updates slowly, discarded previous update for %s", c.UUID)