-// Framework for monitoring the Arvados container Queue, Locks container
-// records, and runs goroutine callbacks which implement execution and
-// monitoring of the containers.
+// Package dispatch is a helper library for building Arvados container
+// dispatchers.
package dispatch
import (
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "context"
+ "fmt"
"log"
- "os"
- "os/signal"
"sync"
- "syscall"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
)
const (
Cancelled = arvados.ContainerStateCancelled
)
-// Dispatcher holds the state of the dispatcher
+// Dispatcher struct
type Dispatcher struct {
- // The Arvados client
Arv *arvadosclient.ArvadosClient
- // When a new queued container appears and is either already owned by
- // this dispatcher or is successfully locked, the dispatcher will call
- // go RunContainer(). The RunContainer() goroutine gets a channel over
- // which it will receive updates to the container state. The
- // RunContainer() goroutine should only assume status updates come when
- // the container record changes on the API server; if it needs to
- // monitor the job submission to the underlying slurm/grid engine/etc
- // queue it should spin up its own polling goroutines. When the
- // channel is closed, that means the container is no longer being
- // handled by this dispatcher and the goroutine should terminate. The
- // goroutine is responsible for draining the 'status' channel, failure
- // to do so may deadlock the dispatcher.
- RunContainer func(*Dispatcher, arvados.Container, chan arvados.Container)
-
- // Amount of time to wait between polling for updates.
- PollInterval time.Duration
+ // Queue polling frequency
+ PollPeriod time.Duration
- // Channel used to signal that RunDispatcher loop should exit.
- DoneProcessing chan struct{}
+ // Time to wait between successive attempts to run the same container
+ MinRetryPeriod time.Duration
- mineMutex sync.Mutex
- mineMap map[string]chan arvados.Container
- Auth arvados.APIClientAuthorization
- containers chan arvados.Container
-}
+ // Func that implements the container lifecycle. Must be set
+ // to a non-nil DispatchFunc before calling Run().
+ RunContainer DispatchFunc
-// Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
-// for which this process is actively starting/monitoring. Returns channel to
-// be used to send container status updates.
-func (dispatcher *Dispatcher) setMine(uuid string) chan arvados.Container {
- dispatcher.mineMutex.Lock()
- defer dispatcher.mineMutex.Unlock()
- if ch, ok := dispatcher.mineMap[uuid]; ok {
- return ch
- }
-
- ch := make(chan arvados.Container)
- dispatcher.mineMap[uuid] = ch
- return ch
-}
-
-// Release a container which is no longer being monitored.
-func (dispatcher *Dispatcher) notMine(uuid string) {
- dispatcher.mineMutex.Lock()
- defer dispatcher.mineMutex.Unlock()
- if ch, ok := dispatcher.mineMap[uuid]; ok {
- close(ch)
- delete(dispatcher.mineMap, uuid)
- }
-}
-
-// checkMine returns true if there is a channel for updates associated
-// with container c. If update is true, also send the container record on
-// the channel.
-func (dispatcher *Dispatcher) checkMine(c arvados.Container, update bool) bool {
- dispatcher.mineMutex.Lock()
- defer dispatcher.mineMutex.Unlock()
- ch, ok := dispatcher.mineMap[c.UUID]
- if ok {
- if update {
- ch <- c
- }
- return true
- }
- return false
+ auth arvados.APIClientAuthorization
+ mtx sync.Mutex
+ trackers map[string]*runTracker
+ throttle throttle
}
-func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) {
- var containers arvados.ContainerList
- err := dispatcher.Arv.List("containers", params, &containers)
+// A DispatchFunc executes a container (if the container record is
+// Locked) or resume monitoring an already-running container, and wait
+// until that container exits.
+//
+// While the container runs, the DispatchFunc should listen for
+// updated container records on the provided channel. When the channel
+// closes, the DispatchFunc should stop the container if it's still
+// running, and return.
+//
+// The DispatchFunc should not return until the container is finished.
+type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
+
+// Run watches the API server's queue for containers that are either
+// ready to run and available to lock, or are already locked by this
+// dispatcher's token. When a new one appears, Run calls RunContainer
+// in a new goroutine.
+func (d *Dispatcher) Run(ctx context.Context) error {
+ err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
if err != nil {
- log.Printf("Error getting list of containers: %q", err)
- return
- }
-
- if containers.ItemsAvailable > len(containers.Items) {
- // TODO: support paging
- log.Printf("Warning! %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
- containers.ItemsAvailable,
- len(containers.Items))
+ return fmt.Errorf("error getting my token UUID: %v", err)
}
- for _, container := range containers.Items {
- touched[container.UUID] = true
- dispatcher.containers <- container
- }
-}
-func (dispatcher *Dispatcher) pollContainers() {
- ticker := time.NewTicker(dispatcher.PollInterval)
+ d.throttle.hold = d.MinRetryPeriod
- paramsQ := arvadosclient.Dict{
- "filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}},
- "order": []string{"priority desc"},
- "limit": "1000"}
- paramsP := arvadosclient.Dict{
- "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.Auth.UUID}},
- "limit": "1000"}
+ poll := time.NewTicker(d.PollPeriod)
+ defer poll.Stop()
for {
+ tracked := d.trackedUUIDs()
+ d.checkForUpdates([][]interface{}{
+ {"uuid", "in", tracked}})
+ d.checkForUpdates([][]interface{}{
+ {"locked_by_uuid", "=", d.auth.UUID},
+ {"uuid", "not in", tracked}})
+ d.checkForUpdates([][]interface{}{
+ {"state", "=", Queued},
+ {"priority", ">", "0"},
+ {"uuid", "not in", tracked}})
select {
- case <-ticker.C:
- touched := make(map[string]bool)
- dispatcher.getContainers(paramsQ, touched)
- dispatcher.getContainers(paramsP, touched)
- dispatcher.mineMutex.Lock()
- var monitored []string
- for k := range dispatcher.mineMap {
- if _, ok := touched[k]; !ok {
- monitored = append(monitored, k)
- }
- }
- dispatcher.mineMutex.Unlock()
- if monitored != nil {
- dispatcher.getContainers(arvadosclient.Dict{
- "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
- }
- case <-dispatcher.DoneProcessing:
- close(dispatcher.containers)
- ticker.Stop()
- return
+ case <-poll.C:
+ continue
+ case <-ctx.Done():
+ return ctx.Err()
}
}
}
-func (dispatcher *Dispatcher) handleUpdate(container arvados.Container) {
- if container.State == Queued && dispatcher.checkMine(container, false) {
- // If we previously started the job, something failed, and it
- // was re-queued, this dispatcher might still be monitoring it.
- // Stop the existing monitor, then try to lock and run it
- // again.
- dispatcher.notMine(container.UUID)
+func (d *Dispatcher) trackedUUIDs() []string {
+ d.mtx.Lock()
+ defer d.mtx.Unlock()
+ if len(d.trackers) == 0 {
+ // API bug: ["uuid", "not in", []] does not work as
+ // expected, but this does:
+ return []string{"this-uuid-does-not-exist"}
}
-
- if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued {
- // If container is Complete, Cancelled, or Queued, LockedByUUID
- // will be nil. If the container was formerly Locked, moved
- // back to Queued and then locked by another dispatcher,
- // LockedByUUID will be different. In either case, we want
- // to stop monitoring it.
- log.Printf("Container %v now in state %q with locked_by_uuid %q", container.UUID, container.State, container.LockedByUUID)
- dispatcher.notMine(container.UUID)
- return
+ uuids := make([]string, 0, len(d.trackers))
+ for x := range d.trackers {
+ uuids = append(uuids, x)
}
+ return uuids
+}
- if dispatcher.checkMine(container, true) {
- // Already monitored, sent status update
- return
- }
+// Start a runner in a new goroutine, and send the initial container
+// record to its updates channel.
+func (d *Dispatcher) start(c arvados.Container) *runTracker {
+ tracker := &runTracker{updates: make(chan arvados.Container, 1)}
+ tracker.updates <- c
+ go func() {
+ d.RunContainer(d, c, tracker.updates)
+
+ d.mtx.Lock()
+ delete(d.trackers, c.UUID)
+ d.mtx.Unlock()
+ }()
+ return tracker
+}
- if container.State == Queued && container.Priority > 0 {
- // Try to take the lock
- if err := dispatcher.Lock(container.UUID); err != nil {
+func (d *Dispatcher) checkForUpdates(filters [][]interface{}) {
+ params := arvadosclient.Dict{
+ "filters": filters,
+ "order": []string{"priority desc"}}
+
+ var list arvados.ContainerList
+ for offset, more := 0, true; more; offset += len(list.Items) {
+ params["offset"] = offset
+ err := d.Arv.List("containers", params, &list)
+ if err != nil {
+ log.Printf("Error getting list of containers: %q", err)
return
}
- container.State = Locked
+ more = len(list.Items) > 0 && list.ItemsAvailable > len(list.Items)+offset
+ d.checkListForUpdates(list.Items)
}
+}
- if container.State == Locked || container.State == Running {
- // Not currently monitored but in Locked or Running state and
- // owned by this dispatcher, so start monitoring.
- go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID))
+func (d *Dispatcher) checkListForUpdates(containers []arvados.Container) {
+ d.mtx.Lock()
+ defer d.mtx.Unlock()
+ if d.trackers == nil {
+ d.trackers = make(map[string]*runTracker)
+ }
+
+ for _, c := range containers {
+ tracker, alreadyTracking := d.trackers[c.UUID]
+ if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
+ log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
+ } else if alreadyTracking {
+ switch c.State {
+ case Queued:
+ tracker.close()
+ case Locked, Running:
+ tracker.update(c)
+ case Cancelled, Complete:
+ tracker.close()
+ }
+ } else {
+ switch c.State {
+ case Queued:
+ if !d.throttle.Check(c.UUID) {
+ break
+ }
+ err := d.lock(c.UUID)
+ if err != nil {
+ log.Printf("debug: error locking container %s: %s", c.UUID, err)
+ break
+ }
+ c.State = Locked
+ d.trackers[c.UUID] = d.start(c)
+ case Locked, Running:
+ if !d.throttle.Check(c.UUID) {
+ break
+ }
+ d.trackers[c.UUID] = d.start(c)
+ case Cancelled, Complete:
+ // no-op (we already stopped monitoring)
+ }
+ }
}
}
// UpdateState makes an API call to change the state of a container.
-func (dispatcher *Dispatcher) UpdateState(uuid string, newState arvados.ContainerState) error {
- err := dispatcher.Arv.Update("containers", uuid,
+func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) error {
+ err := d.Arv.Update("containers", uuid,
arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": newState}},
- nil)
+ "container": arvadosclient.Dict{"state": state},
+ }, nil)
if err != nil {
- log.Printf("Error updating container %s to state %q: %q", uuid, newState, err)
+ log.Printf("Error updating container %s to state %q: %s", uuid, state, err)
}
return err
}
// Lock makes the lock API call which updates the state of a container to Locked.
-func (dispatcher *Dispatcher) Lock(uuid string) error {
- err := dispatcher.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
- if err != nil {
- log.Printf("Error locking container %s: %q", uuid, err)
- }
- return err
+func (d *Dispatcher) lock(uuid string) error {
+ return d.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
}
// Unlock makes the unlock API call which updates the state of a container to Queued.
-func (dispatcher *Dispatcher) Unlock(uuid string) error {
- err := dispatcher.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
- if err != nil {
- log.Printf("Error unlocking container %s: %q", uuid, err)
- }
- return err
+func (d *Dispatcher) Unlock(uuid string) error {
+ return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
}
-// RunDispatcher runs the main loop of the dispatcher until receiving a message
-// on the dispatcher.DoneProcessing channel. It also installs a signal handler
-// to terminate gracefully on SIGINT, SIGTERM or SIGQUIT.
-func (dispatcher *Dispatcher) RunDispatcher() (err error) {
- err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
- if err != nil {
- log.Printf("Error getting my token UUID: %v", err)
- return
+// TrackContainer starts a tracker for given uuid if one is not already existing, despite its state.
+func (d *Dispatcher) TrackContainer(uuid string) {
+ d.mtx.Lock()
+ defer d.mtx.Unlock()
+
+ if d.trackers == nil {
+ d.trackers = make(map[string]*runTracker)
}
- dispatcher.mineMap = make(map[string]chan arvados.Container)
- dispatcher.containers = make(chan arvados.Container)
+ _, alreadyTracking := d.trackers[uuid]
+ if alreadyTracking {
+ return
+ }
- // Graceful shutdown on signal
- sigChan := make(chan os.Signal)
- signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
+ var cntr arvados.Container
+ err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr)
+ if err != nil {
+ log.Printf("Error getting container %s: %s", uuid, err)
+ return
+ }
- go func(sig <-chan os.Signal) {
- for sig := range sig {
- log.Printf("Caught signal: %v", sig)
- dispatcher.DoneProcessing <- struct{}{}
- }
- }(sigChan)
+ d.trackers[uuid] = d.start(c)
+}
- defer close(sigChan)
- defer signal.Stop(sigChan)
+type runTracker struct {
+ closing bool
+ updates chan arvados.Container
+}
- go dispatcher.pollContainers()
- for container := range dispatcher.containers {
- dispatcher.handleUpdate(container)
+func (tracker *runTracker) close() {
+ if !tracker.closing {
+ close(tracker.updates)
}
+ tracker.closing = true
+}
- return nil
+func (tracker *runTracker) update(c arvados.Container) {
+ if tracker.closing {
+ return
+ }
+ select {
+ case <-tracker.updates:
+ log.Printf("debug: runner is handling updates slowly, discarded previous update for %s", c.UUID)
+ default:
+ }
+ tracker.updates <- c
}