-// Framework for monitoring the Arvados container Queue, Locks container
-// records, and runs goroutine callbacks which implement execution and
-// monitoring of the containers.
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+// Package dispatch is a helper library for building Arvados container
+// dispatchers.
package dispatch
import (
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "context"
+ "fmt"
"log"
"sync"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
)
const (
Cancelled = arvados.ContainerStateCancelled
)
-// Dispatcher holds the state of the dispatcher
+// Dispatcher struct
type Dispatcher struct {
- // The Arvados client
Arv *arvadosclient.ArvadosClient
- // When a new queued container appears and is either already owned by
- // this dispatcher or is successfully locked, the dispatcher will call
- // go RunContainer(). The RunContainer() goroutine gets a channel over
- // which it will receive updates to the container state. The
- // RunContainer() goroutine should only assume status updates come when
- // the container record changes on the API server; if it needs to
- // monitor the job submission to the underlying slurm/grid engine/etc
- // queue it should spin up its own polling goroutines. When the
- // channel is closed, that means the container is no longer being
- // handled by this dispatcher and the goroutine should terminate. The
- // goroutine is responsible for draining the 'status' channel, failure
- // to do so may deadlock the dispatcher.
- RunContainer func(*Dispatcher, arvados.Container, chan arvados.Container)
-
- // Amount of time to wait between polling for updates.
- PollInterval time.Duration
-
- mineMutex sync.Mutex
- mineMap map[string]chan arvados.Container
- Auth arvados.APIClientAuthorization
-
- stop chan struct{}
-}
+ // Queue polling frequency
+ PollPeriod time.Duration
-// Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
-// for which this process is actively starting/monitoring. Returns channel to
-// be used to send container status updates.
-func (dispatcher *Dispatcher) setMine(uuid string) chan arvados.Container {
- dispatcher.mineMutex.Lock()
- defer dispatcher.mineMutex.Unlock()
- if ch, ok := dispatcher.mineMap[uuid]; ok {
- return ch
- }
+ // Time to wait between successive attempts to run the same container
+ MinRetryPeriod time.Duration
- ch := make(chan arvados.Container)
- dispatcher.mineMap[uuid] = ch
- return ch
-}
+ // Func that implements the container lifecycle. Must be set
+ // to a non-nil DispatchFunc before calling Run().
+ RunContainer DispatchFunc
-// Release a container which is no longer being monitored.
-func (dispatcher *Dispatcher) notMine(uuid string) {
- dispatcher.mineMutex.Lock()
- defer dispatcher.mineMutex.Unlock()
- if ch, ok := dispatcher.mineMap[uuid]; ok {
- close(ch)
- delete(dispatcher.mineMap, uuid)
- }
+ auth arvados.APIClientAuthorization
+ mtx sync.Mutex
+ trackers map[string]*runTracker
+ throttle throttle
}
-// checkMine returns true if there is a channel for updates associated
-// with container c. If update is true, also send the container record on
-// the channel.
-func (dispatcher *Dispatcher) checkMine(c arvados.Container, update bool) bool {
- dispatcher.mineMutex.Lock()
- defer dispatcher.mineMutex.Unlock()
- ch, ok := dispatcher.mineMap[c.UUID]
- if ok {
- if update {
- ch <- c
- }
- return true
- }
- return false
-}
+// A DispatchFunc executes a container (if the container record is
+// Locked) or resume monitoring an already-running container, and wait
+// until that container exits.
+//
+// While the container runs, the DispatchFunc should listen for
+// updated container records on the provided channel. When the channel
+// closes, the DispatchFunc should stop the container if it's still
+// running, and return.
+//
+// The DispatchFunc should not return until the container is finished.
+type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
-func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) {
- var containers arvados.ContainerList
- err := dispatcher.Arv.List("containers", params, &containers)
+// Run watches the API server's queue for containers that are either
+// ready to run and available to lock, or are already locked by this
+// dispatcher's token. When a new one appears, Run calls RunContainer
+// in a new goroutine.
+func (d *Dispatcher) Run(ctx context.Context) error {
+ err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
if err != nil {
- log.Printf("Error getting list of containers: %q", err)
- return
- }
-
- if containers.ItemsAvailable > len(containers.Items) {
- // TODO: support paging
- log.Printf("Warning! %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
- containers.ItemsAvailable,
- len(containers.Items))
+ return fmt.Errorf("error getting my token UUID: %v", err)
}
- for _, container := range containers.Items {
- touched[container.UUID] = true
- dispatcher.handleUpdate(container)
- }
-}
-func (dispatcher *Dispatcher) pollContainers(stop chan struct{}) {
- ticker := time.NewTicker(dispatcher.PollInterval)
- defer ticker.Stop()
+ d.throttle.hold = d.MinRetryPeriod
- paramsQ := arvadosclient.Dict{
- "filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}},
- "order": []string{"priority desc"},
- "limit": "1000"}
- paramsP := arvadosclient.Dict{
- "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.Auth.UUID}},
- "limit": "1000"}
+ poll := time.NewTicker(d.PollPeriod)
+ defer poll.Stop()
for {
- touched := make(map[string]bool)
- dispatcher.getContainers(paramsQ, touched)
- dispatcher.getContainers(paramsP, touched)
- dispatcher.mineMutex.Lock()
- var monitored []string
- for k := range dispatcher.mineMap {
- if _, ok := touched[k]; !ok {
- monitored = append(monitored, k)
+ select {
+ case <-poll.C:
+ break
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+
+ todo := make(map[string]*runTracker)
+ d.mtx.Lock()
+ // Make a copy of trackers
+ for uuid, tracker := range d.trackers {
+ todo[uuid] = tracker
+ }
+ d.mtx.Unlock()
+
+ // Containers I currently own (Locked/Running)
+ querySuccess := d.checkForUpdates([][]interface{}{
+ {"locked_by_uuid", "=", d.auth.UUID}}, todo)
+
+ // Containers I should try to dispatch
+ querySuccess = d.checkForUpdates([][]interface{}{
+ {"state", "=", Queued},
+ {"priority", ">", "0"}}, todo) && querySuccess
+
+ if !querySuccess {
+ // There was an error in one of the previous queries,
+ // we probably didn't get updates for all the
+ // containers we should have. Don't check them
+ // individually because it may be expensive.
+ continue
+ }
+
+ // Containers I know about but didn't fall into the
+ // above two categories (probably Complete/Cancelled)
+ var missed []string
+ for uuid := range todo {
+ missed = append(missed, uuid)
+ }
+
+ for len(missed) > 0 {
+ var batch []string
+ if len(missed) > 20 {
+ batch = missed[0:20]
+ missed = missed[20:]
+ } else {
+ batch = missed
+ missed = missed[0:0]
}
+ querySuccess = d.checkForUpdates([][]interface{}{
+ {"uuid", "in", batch}}, todo) && querySuccess
}
- dispatcher.mineMutex.Unlock()
- if monitored != nil {
- dispatcher.getContainers(arvadosclient.Dict{
- "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
+
+ if !querySuccess {
+ // There was an error in one of the previous queries, we probably
+ // didn't see all the containers we should have, so don't shut down
+ // the missed containers.
+ continue
}
- select {
- case <-ticker.C:
- case <-stop:
- return
+
+ // Containers that I know about that didn't show up in any
+ // query should be let go.
+ for uuid, tracker := range todo {
+ log.Printf("Container %q not returned by any query, stopping tracking.", uuid)
+ tracker.close()
}
+
}
}
-func (dispatcher *Dispatcher) handleUpdate(container arvados.Container) {
- if container.State == Queued && dispatcher.checkMine(container, false) {
- // If we previously started the job, something failed, and it
- // was re-queued, this dispatcher might still be monitoring it.
- // Stop the existing monitor, then try to lock and run it
- // again.
- dispatcher.notMine(container.UUID)
- }
+// Start a runner in a new goroutine, and send the initial container
+// record to its updates channel.
+func (d *Dispatcher) start(c arvados.Container) *runTracker {
+ tracker := &runTracker{updates: make(chan arvados.Container, 1)}
+ tracker.updates <- c
+ go func() {
+ d.RunContainer(d, c, tracker.updates)
+ // RunContainer blocks for the lifetime of the container. When
+ // it returns, the tracker should delete itself.
+ d.mtx.Lock()
+ delete(d.trackers, c.UUID)
+ d.mtx.Unlock()
+ }()
+ return tracker
+}
- if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued {
- // If container is Complete, Cancelled, or Queued, LockedByUUID
- // will be nil. If the container was formerly Locked, moved
- // back to Queued and then locked by another dispatcher,
- // LockedByUUID will be different. In either case, we want
- // to stop monitoring it.
- log.Printf("Container %v now in state %q with locked_by_uuid %q", container.UUID, container.State, container.LockedByUUID)
- dispatcher.notMine(container.UUID)
- return
- }
+func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
+ params := arvadosclient.Dict{
+ "filters": filters,
+ "order": []string{"priority desc"}}
- if dispatcher.checkMine(container, true) {
- // Already monitored, sent status update
- return
+ var list arvados.ContainerList
+ for offset, more := 0, true; more; offset += len(list.Items) {
+ params["offset"] = offset
+ err := d.Arv.List("containers", params, &list)
+ if err != nil {
+ log.Printf("Error getting list of containers: %q", err)
+ return false
+ }
+ more = len(list.Items) > 0 && list.ItemsAvailable > len(list.Items)+offset
+ d.checkListForUpdates(list.Items, todo)
}
+ return true
+}
- if container.State == Queued && container.Priority > 0 {
- // Try to take the lock
- if err := dispatcher.Lock(container.UUID); err != nil {
- return
- }
- container.State = Locked
+func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo map[string]*runTracker) {
+ d.mtx.Lock()
+ defer d.mtx.Unlock()
+ if d.trackers == nil {
+ d.trackers = make(map[string]*runTracker)
}
- if container.State == Locked || container.State == Running {
- // Not currently monitored but in Locked or Running state and
- // owned by this dispatcher, so start monitoring.
- go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID))
+ for _, c := range containers {
+ tracker, alreadyTracking := d.trackers[c.UUID]
+ delete(todo, c.UUID)
+
+ if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
+ log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
+ } else if alreadyTracking {
+ switch c.State {
+ case Queued:
+ tracker.close()
+ case Locked, Running:
+ tracker.update(c)
+ case Cancelled, Complete:
+ tracker.close()
+ }
+ } else {
+ switch c.State {
+ case Queued:
+ if !d.throttle.Check(c.UUID) {
+ break
+ }
+ err := d.lock(c.UUID)
+ if err != nil {
+ log.Printf("debug: error locking container %s: %s", c.UUID, err)
+ break
+ }
+ c.State = Locked
+ d.trackers[c.UUID] = d.start(c)
+ case Locked, Running:
+ if !d.throttle.Check(c.UUID) {
+ break
+ }
+ d.trackers[c.UUID] = d.start(c)
+ case Cancelled, Complete:
+ // no-op (we already stopped monitoring)
+ }
+ }
}
}
// UpdateState makes an API call to change the state of a container.
-func (dispatcher *Dispatcher) UpdateState(uuid string, newState arvados.ContainerState) error {
- err := dispatcher.Arv.Update("containers", uuid,
+func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) error {
+ err := d.Arv.Update("containers", uuid,
arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": newState}},
- nil)
+ "container": arvadosclient.Dict{"state": state},
+ }, nil)
if err != nil {
- log.Printf("Error updating container %s to state %q: %q", uuid, newState, err)
+ log.Printf("Error updating container %s to state %q: %s", uuid, state, err)
}
return err
}
// Lock makes the lock API call which updates the state of a container to Locked.
-func (dispatcher *Dispatcher) Lock(uuid string) error {
- err := dispatcher.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
- if err != nil {
- log.Printf("Error locking container %s: %q", uuid, err)
- }
- return err
+func (d *Dispatcher) lock(uuid string) error {
+ return d.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
}
// Unlock makes the unlock API call which updates the state of a container to Queued.
-func (dispatcher *Dispatcher) Unlock(uuid string) error {
- err := dispatcher.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
+func (d *Dispatcher) Unlock(uuid string) error {
+ return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
+}
+
+// TrackContainer ensures a tracker is running for the given UUID,
+// regardless of the current state of the container (except: if the
+// container is locked by a different dispatcher, a tracker will not
+// be started). If the container is not in Locked or Running state,
+// the new tracker will close down immediately.
+//
+// This allows the dispatcher to put its own RunContainer func into a
+// cleanup phase (for example, to kill local processes created by a
+// prevous dispatch process that are still running even though the
+// container state is final) without the risk of having multiple
+// goroutines monitoring the same UUID.
+func (d *Dispatcher) TrackContainer(uuid string) error {
+ var cntr arvados.Container
+ err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr)
if err != nil {
- log.Printf("Error unlocking container %s: %q", uuid, err)
+ return err
}
- return err
+ if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID {
+ return nil
+ }
+
+ d.mtx.Lock()
+ defer d.mtx.Unlock()
+ if _, alreadyTracking := d.trackers[uuid]; alreadyTracking {
+ return nil
+ }
+ if d.trackers == nil {
+ d.trackers = make(map[string]*runTracker)
+ }
+ d.trackers[uuid] = d.start(cntr)
+ switch cntr.State {
+ case Queued, Cancelled, Complete:
+ d.trackers[uuid].close()
+ }
+ return nil
}
-// Stop causes Run to return after the current polling cycle.
-func (dispatcher *Dispatcher) Stop() {
- if dispatcher.stop == nil {
- // already stopped
- return
+type runTracker struct {
+ closing bool
+ updates chan arvados.Container
+}
+
+func (tracker *runTracker) close() {
+ if !tracker.closing {
+ close(tracker.updates)
}
- close(dispatcher.stop)
- dispatcher.stop = nil
+ tracker.closing = true
}
-// Run runs the main loop of the dispatcher.
-func (dispatcher *Dispatcher) Run() (err error) {
- err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
- if err != nil {
- log.Printf("Error getting my token UUID: %v", err)
+func (tracker *runTracker) update(c arvados.Container) {
+ if tracker.closing {
return
}
-
- dispatcher.mineMap = make(map[string]chan arvados.Container)
- dispatcher.stop = make(chan struct{})
- dispatcher.pollContainers(dispatcher.stop)
- return nil
+ select {
+ case <-tracker.updates:
+ log.Printf("debug: runner is handling updates slowly, discarded previous update for %s", c.UUID)
+ default:
+ }
+ tracker.updates <- c
}