-// Framework for monitoring the Arvados container Queue, Locks container
-// records, and runs goroutine callbacks which implement execution and
-// monitoring of the containers.
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+// Package dispatch is a helper library for building Arvados container
+// dispatchers.
package dispatch
import (
"context"
"fmt"
- "log"
"sync"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "github.com/Sirupsen/logrus"
)
const (
Cancelled = arvados.ContainerStateCancelled
)
-type runner struct {
- closing bool
- updates chan arvados.Container
+type Logger interface {
+ Printf(string, ...interface{})
+ Warnf(string, ...interface{})
+ Debugf(string, ...interface{})
}
-func (ex *runner) close() {
- if !ex.closing {
- close(ex.updates)
- }
- ex.closing = true
-}
+// Dispatcher struct
+type Dispatcher struct {
+ Arv *arvadosclient.ArvadosClient
-func (ex *runner) update(c arvados.Container) {
- if ex.closing {
- return
- }
- select {
- case <-ex.updates:
- log.Print("debug: executor is handling updates slowly, discarded previous update for %s", c.UUID)
- default:
- }
- ex.updates <- c
-}
+ Logger Logger
-type Dispatcher struct {
- Arv *arvadosclient.ArvadosClient
- PollPeriod time.Duration
+ // Queue polling frequency
+ PollPeriod time.Duration
+
+ // Time to wait between successive attempts to run the same container
MinRetryPeriod time.Duration
- RunContainer Runner
+
+ // Func that implements the container lifecycle. Must be set
+ // to a non-nil DispatchFunc before calling Run().
+ RunContainer DispatchFunc
auth arvados.APIClientAuthorization
mtx sync.Mutex
- running map[string]*runner
+ trackers map[string]*runTracker
throttle throttle
}
-// A Runner executes a container. If it starts any goroutines, it must
-// not return until it can guarantee that none of those goroutines
-// will do anything with this container.
-type Runner func(*Dispatcher, arvados.Container, <-chan arvados.Container)
+// A DispatchFunc executes a container (if the container record is
+// Locked) or resume monitoring an already-running container, and wait
+// until that container exits.
+//
+// While the container runs, the DispatchFunc should listen for
+// updated container records on the provided channel. When the channel
+// closes, the DispatchFunc should stop the container if it's still
+// running, and return.
+//
+// The DispatchFunc should not return until the container is finished.
+type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
+// Run watches the API server's queue for containers that are either
+// ready to run and available to lock, or are already locked by this
+// dispatcher's token. When a new one appears, Run calls RunContainer
+// in a new goroutine.
func (d *Dispatcher) Run(ctx context.Context) error {
+ if d.Logger == nil {
+ d.Logger = logrus.StandardLogger()
+ }
+
err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
if err != nil {
return fmt.Errorf("error getting my token UUID: %v", err)
}
+ d.throttle.hold = d.MinRetryPeriod
+
poll := time.NewTicker(d.PollPeriod)
defer poll.Stop()
for {
- running := make([]string, 0, len(d.running))
+ select {
+ case <-poll.C:
+ break
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+
+ todo := make(map[string]*runTracker)
d.mtx.Lock()
- for uuid := range d.running {
- running = append(running, uuid)
+ // Make a copy of trackers
+ for uuid, tracker := range d.trackers {
+ todo[uuid] = tracker
}
d.mtx.Unlock()
- if len(running) == 0 {
- // API bug: ["uuid", "not in", []] does not match everything
- running = []string{"X"}
- }
- d.checkForUpdates([][]interface{}{
- {"uuid", "in", running}})
- d.checkForUpdates([][]interface{}{
+
+ // Containers I currently own (Locked/Running)
+ querySuccess := d.checkForUpdates([][]interface{}{
+ {"locked_by_uuid", "=", d.auth.UUID}}, todo)
+
+ // Containers I should try to dispatch
+ querySuccess = d.checkForUpdates([][]interface{}{
{"state", "=", Queued},
- {"priority", ">", "0"},
- {"uuid", "not in", running}})
- d.checkForUpdates([][]interface{}{
- {"locked_by_uuid", "=", d.auth.UUID},
- {"uuid", "not in", running}})
- select {
- case <-poll.C:
+ {"priority", ">", "0"}}, todo) && querySuccess
+
+ if !querySuccess {
+ // There was an error in one of the previous queries,
+ // we probably didn't get updates for all the
+ // containers we should have. Don't check them
+ // individually because it may be expensive.
continue
- case <-ctx.Done():
- return ctx.Err()
}
+
+ // Containers I know about but didn't fall into the
+ // above two categories (probably Complete/Cancelled)
+ var missed []string
+ for uuid := range todo {
+ missed = append(missed, uuid)
+ }
+
+ for len(missed) > 0 {
+ var batch []string
+ if len(missed) > 20 {
+ batch = missed[0:20]
+ missed = missed[20:]
+ } else {
+ batch = missed
+ missed = missed[0:0]
+ }
+ querySuccess = d.checkForUpdates([][]interface{}{
+ {"uuid", "in", batch}}, todo) && querySuccess
+ }
+
+ if !querySuccess {
+ // There was an error in one of the previous queries, we probably
+ // didn't see all the containers we should have, so don't shut down
+ // the missed containers.
+ continue
+ }
+
+ // Containers that I know about that didn't show up in any
+ // query should be let go.
+ for uuid, tracker := range todo {
+ d.Logger.Printf("Container %q not returned by any query, stopping tracking.", uuid)
+ tracker.close()
+ }
+
}
}
-func (d *Dispatcher) start(c arvados.Container) *runner {
- ex := &runner{
+// Start a runner in a new goroutine, and send the initial container
+// record to its updates channel.
+func (d *Dispatcher) start(c arvados.Container) *runTracker {
+ tracker := &runTracker{
updates: make(chan arvados.Container, 1),
+ logger: d.Logger,
}
- if d.running == nil {
- d.running = make(map[string]*runner)
- }
- d.running[c.UUID] = ex
+ tracker.updates <- c
go func() {
- d.RunContainer(d, c, ex.updates)
+ d.RunContainer(d, c, tracker.updates)
+ // RunContainer blocks for the lifetime of the container. When
+ // it returns, the tracker should delete itself.
d.mtx.Lock()
- delete(d.running, c.UUID)
+ delete(d.trackers, c.UUID)
d.mtx.Unlock()
}()
- return ex
+ return tracker
}
-func (d *Dispatcher) checkForUpdates(filters [][]interface{}) {
+func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
params := arvadosclient.Dict{
"filters": filters,
- "order": []string{"priority desc"},
- "limit": "1000"}
+ "order": []string{"priority desc"}}
+ offset := 0
+ for {
+ params["offset"] = offset
- var list arvados.ContainerList
- err := d.Arv.List("containers", params, &list)
- if err != nil {
- log.Printf("Error getting list of containers: %q", err)
- return
- }
+ // This list variable must be a new one declared
+ // inside the loop: otherwise, items in the API
+ // response would get deep-merged into the items
+ // loaded in previous iterations.
+ var list arvados.ContainerList
- if list.ItemsAvailable > len(list.Items) {
- // TODO: support paging
- log.Printf("Warning! %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
- list.ItemsAvailable,
- len(list.Items))
+ err := d.Arv.List("containers", params, &list)
+ if err != nil {
+ d.Logger.Warnf("error getting list of containers: %q", err)
+ return false
+ }
+ d.checkListForUpdates(list.Items, todo)
+ offset += len(list.Items)
+ if len(list.Items) == 0 || list.ItemsAvailable <= offset {
+ return true
+ }
}
+}
+func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo map[string]*runTracker) {
d.mtx.Lock()
defer d.mtx.Unlock()
- for _, c := range list.Items {
- ex, running := d.running[c.UUID]
+ if d.trackers == nil {
+ d.trackers = make(map[string]*runTracker)
+ }
+
+ for _, c := range containers {
+ tracker, alreadyTracking := d.trackers[c.UUID]
+ delete(todo, c.UUID)
+
if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
- log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
- } else if running {
+ d.Logger.Debugf("ignoring %s locked by %s", c.UUID, c.LockedByUUID)
+ } else if alreadyTracking {
switch c.State {
case Queued:
- ex.close()
+ tracker.close()
case Locked, Running:
- ex.update(c)
+ tracker.update(c)
case Cancelled, Complete:
- ex.close()
+ tracker.close()
}
} else {
switch c.State {
case Queued:
- if err := d.lock(c.UUID); err != nil {
- log.Printf("Error locking container %s: %s", c.UUID, err)
- } else {
- c.State = Locked
- d.start(c).update(c)
+ if !d.throttle.Check(c.UUID) {
+ break
}
+ err := d.lock(c.UUID)
+ if err != nil {
+ d.Logger.Warnf("error locking container %s: %s", c.UUID, err)
+ break
+ }
+ c.State = Locked
+ d.trackers[c.UUID] = d.start(c)
case Locked, Running:
- d.start(c).update(c)
+ if !d.throttle.Check(c.UUID) {
+ break
+ }
+ d.trackers[c.UUID] = d.start(c)
case Cancelled, Complete:
- ex.close()
+ // no-op (we already stopped monitoring)
}
}
}
"container": arvadosclient.Dict{"state": state},
}, nil)
if err != nil {
- log.Printf("Error updating container %s to state %q: %s", uuid, state, err)
+ d.Logger.Warnf("error updating container %s to state %q: %s", uuid, state, err)
}
return err
}
func (d *Dispatcher) Unlock(uuid string) error {
return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
}
+
+// TrackContainer ensures a tracker is running for the given UUID,
+// regardless of the current state of the container (except: if the
+// container is locked by a different dispatcher, a tracker will not
+// be started). If the container is not in Locked or Running state,
+// the new tracker will close down immediately.
+//
+// This allows the dispatcher to put its own RunContainer func into a
+// cleanup phase (for example, to kill local processes created by a
+// prevous dispatch process that are still running even though the
+// container state is final) without the risk of having multiple
+// goroutines monitoring the same UUID.
+func (d *Dispatcher) TrackContainer(uuid string) error {
+ var cntr arvados.Container
+ err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr)
+ if err != nil {
+ return err
+ }
+ if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID {
+ return nil
+ }
+
+ d.mtx.Lock()
+ defer d.mtx.Unlock()
+ if _, alreadyTracking := d.trackers[uuid]; alreadyTracking {
+ return nil
+ }
+ if d.trackers == nil {
+ d.trackers = make(map[string]*runTracker)
+ }
+ d.trackers[uuid] = d.start(cntr)
+ switch cntr.State {
+ case Queued, Cancelled, Complete:
+ d.trackers[uuid].close()
+ }
+ return nil
+}
+
+type runTracker struct {
+ closing bool
+ updates chan arvados.Container
+ logger Logger
+}
+
+func (tracker *runTracker) close() {
+ if !tracker.closing {
+ close(tracker.updates)
+ }
+ tracker.closing = true
+}
+
+func (tracker *runTracker) update(c arvados.Container) {
+ if tracker.closing {
+ return
+ }
+ select {
+ case <-tracker.updates:
+ tracker.logger.Debugf("runner is handling updates slowly, discarded previous update for %s", c.UUID)
+ default:
+ }
+ tracker.updates <- c
+}