-// Framework for monitoring the Arvados container Queue, Locks container
-// records, and runs goroutine callbacks which implement execution and
-// monitoring of the containers.
+// Package dispatch is a helper library for building Arvados container
+// dispatchers.
package dispatch
import (
)
type Dispatcher struct {
- Arv *arvadosclient.ArvadosClient
- PollPeriod time.Duration
+ Arv *arvadosclient.ArvadosClient
+
+ // Queue polling frequency
+ PollPeriod time.Duration
+
+ // Time to wait between successive attempts to run the same container
MinRetryPeriod time.Duration
- RunContainer Runner
+
+ // Func that implements the container lifecycle. Must be set
+ // to a non-nil DispatchFunc before calling Run().
+ RunContainer DispatchFunc
auth arvados.APIClientAuthorization
mtx sync.Mutex
throttle throttle
}
-// A Runner executes a container. If it starts any goroutines, it must
-// not return until it can guarantee that none of those goroutines
-// will do anything with this container.
-type Runner func(*Dispatcher, arvados.Container, <-chan arvados.Container)
-
+// A DispatchFunc executes a container (if the container record is
+// Locked) or resume monitoring an already-running container, and wait
+// until that container exits.
+//
+// While the container runs, the DispatchFunc should listen for
+// updated container records on the provided channel. When the channel
+// closes, the DispatchFunc should stop the container if it's still
+// running, and return.
+//
+// The DispatchFunc should not return until the container is finished.
+type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
+
+// Run watches the API server's queue for containers that are either
+// ready to run and available to lock, or are already locked by this
+// dispatcher's token. When a new one appears, Run calls RunContainer
+// in a new goroutine.
func (d *Dispatcher) Run(ctx context.Context) error {
err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
if err != nil {
return fmt.Errorf("error getting my token UUID: %v", err)
}
+ d.throttle.hold = d.MinRetryPeriod
+
poll := time.NewTicker(d.PollPeriod)
defer poll.Stop()
} else {
switch c.State {
case Queued:
- if err := d.lock(c.UUID); err != nil {
+ if !d.throttle.Check(c.UUID) {
+ break
+ }
+ err := d.lock(c.UUID)
+ if err != nil {
log.Printf("debug: error locking container %s: %s", c.UUID, err)
- } else {
- c.State = Locked
- d.running[c.UUID] = d.start(c)
+ break
}
+ c.State = Locked
+ d.running[c.UUID] = d.start(c)
case Locked, Running:
+ if !d.throttle.Check(c.UUID) {
+ break
+ }
d.running[c.UUID] = d.start(c)
case Cancelled, Complete:
tracker.close()