-// Framework for monitoring the Arvados container Queue, Locks container
-// records, and runs goroutine callbacks which implement execution and
-// monitoring of the containers.
+// Package dispatch is a helper library for building Arvados container
+// dispatchers.
package dispatch
import (
)
type Dispatcher struct {
- Arv *arvadosclient.ArvadosClient
- PollPeriod time.Duration
+ Arv *arvadosclient.ArvadosClient
+
+ // Queue polling frequency
+ PollPeriod time.Duration
+
+ // Time to wait between successive attempts to run the same container
MinRetryPeriod time.Duration
- RunContainer Runner
+
+ // Func that implements the container lifecycle. Must be set
+ // to a non-nil DispatchFunc before calling Run().
+ RunContainer DispatchFunc
auth arvados.APIClientAuthorization
mtx sync.Mutex
- running map[string]*runTracker
+ trackers map[string]*runTracker
throttle throttle
}
-// A Runner executes a container. If it starts any goroutines, it must
-// not return until it can guarantee that none of those goroutines
-// will do anything with this container.
-type Runner func(*Dispatcher, arvados.Container, <-chan arvados.Container)
-
+// A DispatchFunc executes a container (if the container record is
+// Locked) or resume monitoring an already-running container, and wait
+// until that container exits.
+//
+// While the container runs, the DispatchFunc should listen for
+// updated container records on the provided channel. When the channel
+// closes, the DispatchFunc should stop the container if it's still
+// running, and return.
+//
+// The DispatchFunc should not return until the container is finished.
+type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
+
+// Run watches the API server's queue for containers that are either
+// ready to run and available to lock, or are already locked by this
+// dispatcher's token. When a new one appears, Run calls RunContainer
+// in a new goroutine.
func (d *Dispatcher) Run(ctx context.Context) error {
err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
if err != nil {
return fmt.Errorf("error getting my token UUID: %v", err)
}
+ d.throttle.hold = d.MinRetryPeriod
+
poll := time.NewTicker(d.PollPeriod)
defer poll.Stop()
for {
+ tracked := d.trackedUUIDs()
d.checkForUpdates([][]interface{}{
- {"uuid", "in", d.runningUUIDs()}})
+ {"uuid", "in", tracked}})
d.checkForUpdates([][]interface{}{
{"locked_by_uuid", "=", d.auth.UUID},
- {"uuid", "not in", d.runningUUIDs()}})
+ {"uuid", "not in", tracked}})
d.checkForUpdates([][]interface{}{
{"state", "=", Queued},
{"priority", ">", "0"},
- {"uuid", "not in", d.runningUUIDs()}})
+ {"uuid", "not in", tracked}})
select {
case <-poll.C:
continue
}
}
-func (d *Dispatcher) runningUUIDs() []string {
+func (d *Dispatcher) trackedUUIDs() []string {
d.mtx.Lock()
defer d.mtx.Unlock()
- if len(d.running) == 0 {
- // API bug: ["uuid", "not in", []] does not match everything
- return []string{"X"}
+ if len(d.trackers) == 0 {
+ // API bug: ["uuid", "not in", []] does not work as
+ // expected, but this does:
+ return []string{"this-uuid-does-not-exist"}
}
- uuids := make([]string, 0, len(d.running))
- for x := range d.running {
+ uuids := make([]string, 0, len(d.trackers))
+ for x := range d.trackers {
uuids = append(uuids, x)
}
return uuids
// Start a runner in a new goroutine, and send the initial container
// record to its updates channel.
func (d *Dispatcher) start(c arvados.Container) *runTracker {
- updates := make(chan arvados.Container, 1)
- tracker := &runTracker{updates: updates}
+ tracker := &runTracker{updates: make(chan arvados.Container, 1)}
tracker.updates <- c
go func() {
d.RunContainer(d, c, tracker.updates)
d.mtx.Lock()
- delete(d.running, c.UUID)
+ delete(d.trackers, c.UUID)
d.mtx.Unlock()
}()
return tracker
func (d *Dispatcher) checkForUpdates(filters [][]interface{}) {
params := arvadosclient.Dict{
"filters": filters,
- "order": []string{"priority desc"},
- "limit": "1000"}
+ "order": []string{"priority desc"}}
var list arvados.ContainerList
- err := d.Arv.List("containers", params, &list)
- if err != nil {
- log.Printf("Error getting list of containers: %q", err)
- return
- }
-
- if list.ItemsAvailable > len(list.Items) {
- // TODO: support paging
- log.Printf("Warning! %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
- list.ItemsAvailable,
- len(list.Items))
+ for offset, more := 0, true; more; offset += len(list.Items) {
+ params["offset"] = offset
+ err := d.Arv.List("containers", params, &list)
+ if err != nil {
+ log.Printf("Error getting list of containers: %q", err)
+ return
+ }
+ more = len(list.Items) > 0 && list.ItemsAvailable > len(list.Items)+offset
+ d.checkListForUpdates(list.Items)
}
+}
+func (d *Dispatcher) checkListForUpdates(containers []arvados.Container) {
d.mtx.Lock()
defer d.mtx.Unlock()
- if d.running == nil {
- d.running = make(map[string]*runTracker)
+ if d.trackers == nil {
+ d.trackers = make(map[string]*runTracker)
}
- for _, c := range list.Items {
- tracker, running := d.running[c.UUID]
+ for _, c := range containers {
+ tracker, alreadyTracking := d.trackers[c.UUID]
if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
- } else if running {
+ } else if alreadyTracking {
switch c.State {
case Queued:
tracker.close()
} else {
switch c.State {
case Queued:
- if err := d.lock(c.UUID); err != nil {
+ if !d.throttle.Check(c.UUID) {
+ break
+ }
+ err := d.lock(c.UUID)
+ if err != nil {
log.Printf("debug: error locking container %s: %s", c.UUID, err)
- } else {
- c.State = Locked
- d.running[c.UUID] = d.start(c)
+ break
}
+ c.State = Locked
+ d.trackers[c.UUID] = d.start(c)
case Locked, Running:
- d.running[c.UUID] = d.start(c)
+ if !d.throttle.Check(c.UUID) {
+ break
+ }
+ d.trackers[c.UUID] = d.start(c)
case Cancelled, Complete:
- tracker.close()
+ // no-op (we already stopped monitoring)
}
}
}
type runTracker struct {
closing bool
- updates chan<- arvados.Container
+ updates chan arvados.Container
}
func (tracker *runTracker) close() {