+// Framework for monitoring the Arvados container Queue, Locks container
+// records, and runs goroutine callbacks which implement execution and
+// monitoring of the containers.
package dispatch
import (
Items []apiClientAuthorization `json:"items"`
}
-// Container data
+// Represents an Arvados container record
type Container struct {
UUID string `json:"uuid"`
State string `json:"state"`
// Dispatcher holds the state of the dispatcher
type Dispatcher struct {
- Arv arvadosclient.ArvadosClient
- RunContainer func(*Dispatcher, Container, chan Container)
- PollInterval time.Duration
+ // The Arvados client
+ Arv arvadosclient.ArvadosClient
+
+ // When a new queued container appears and is either already owned by
+ // this dispatcher or is successfully locked, the dispatcher will call
+ // go RunContainer(). The RunContainer() goroutine gets a channel over
+ // which it will receive updates to the container state. The
+ // RunContainer() goroutine should only assume status updates come when
+ // the container record changes on the API server; if it needs to
+ // monitor the job submission to the underlying slurm/grid engine/etc
+ // queue it should spin up its own polling goroutines. When the
+ // channel is closed, that means the container is no longer being
+ // handled by this dispatcher and the goroutine should terminate. The
+ // goroutine is responsible for draining the 'status' channel, failure
+ // to do so may deadlock the dispatcher.
+ RunContainer func(*Dispatcher, Container, chan Container)
+
+ // Amount of time to wait between polling for updates.
+ PollInterval time.Duration
+
+ // Channel used to signal that RunDispatcher loop should exit.
DoneProcessing chan struct{}
mineMutex sync.Mutex
mineMap map[string]chan Container
- auth apiClientAuthorization
+ Auth apiClientAuthorization
containers chan Container
}
}
}
-// Check if there is a channel for updates associated with this container. If
-// so send the container record on the channel and return true, if not return
-// false.
-func (dispatcher *Dispatcher) updateMine(c Container) bool {
+// checkMine returns true/false if there is a channel for updates associated
+// with container c. If update is true, also send the container record on
+// the channel.
+func (dispatcher *Dispatcher) checkMine(c Container, update bool) bool {
dispatcher.mineMutex.Lock()
defer dispatcher.mineMutex.Unlock()
ch, ok := dispatcher.mineMap[c.UUID]
if ok {
- ch <- c
+ if update {
+ ch <- c
+ }
return true
}
return false
err := dispatcher.Arv.List("containers", params, &containers)
if err != nil {
log.Printf("Error getting list of containers: %q", err)
- } else {
- if containers.ItemsAvailable > len(containers.Items) {
- // TODO: support paging
- log.Printf("Warning! %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
- containers.ItemsAvailable,
- len(containers.Items))
- }
- for _, container := range containers.Items {
- touched[container.UUID] = true
- dispatcher.containers <- container
- }
+ return
+ }
+
+ if containers.ItemsAvailable > len(containers.Items) {
+ // TODO: support paging
+ log.Printf("Warning! %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
+ containers.ItemsAvailable,
+ len(containers.Items))
+ }
+ for _, container := range containers.Items {
+ touched[container.UUID] = true
+ dispatcher.containers <- container
}
}
"order": []string{"priority desc"},
"limit": "1000"}
paramsP := arvadosclient.Dict{
- "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.auth.UUID}},
+ "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.Auth.UUID}},
"limit": "1000"}
for {
}
func (dispatcher *Dispatcher) handleUpdate(container Container) {
- if dispatcher.updateMine(container) {
- if container.State == Complete || container.State == Cancelled {
- log.Printf("Container %v now in state %v", container.UUID, container.State)
- dispatcher.notMine(container.UUID)
- }
+ if container.State == Queued && dispatcher.checkMine(container, false) {
+ // If we previously started the job, something failed, and it
+ // was re-queued, this dispatcher might still be monitoring it.
+ // Stop the existing monitor, then try to lock and run it
+ // again.
+ dispatcher.notMine(container.UUID)
+ }
+
+ if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued {
+ // If container is Complete, Cancelled, or Queued, LockedByUUID
+ // will be nil. If the container was formerly Locked, moved
+ // back to Queued and then locked by another dispatcher,
+ // LockedByUUID will be different. In either case, we want
+ // to stop monitoring it.
+ log.Printf("Container %v now in state %q with locked_by_uuid %q", container.UUID, container.State, container.LockedByUUID)
+ dispatcher.notMine(container.UUID)
+ return
+ }
+
+ if dispatcher.checkMine(container, true) {
+ // Already monitored, sent status update
return
}
- if container.State == Queued {
+ if container.State == Queued && container.Priority > 0 {
// Try to take the lock
if err := dispatcher.UpdateState(container.UUID, Locked); err != nil {
return
}
if container.State == Locked || container.State == Running {
+ // Not currently monitored but in Locked or Running state and
+ // owned by this dispatcher, so start monitoring.
go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID))
}
}
"container": arvadosclient.Dict{"state": newState}},
nil)
if err != nil {
- log.Printf("Error updating container %s to '%s' state: %q", uuid, newState, err)
+ log.Printf("Error updating container %s to state %q: %q", uuid, newState, err)
}
return err
}
// RunDispatcher runs the main loop of the dispatcher until receiving a message
// on the dispatcher.DoneProcessing channel. It also installs a signal handler
// to terminate gracefully on SIGINT, SIGTERM or SIGQUIT.
-//
-// When a new queued container appears and is successfully locked, the
-// dispatcher will call RunContainer() followed by MonitorContainer(). If a
-// container appears that is Locked or Running but not known to the dispatcher,
-// it will only call monitorContainer(). The monitorContainer() callback is
-// passed a channel over which it will receive updates to the container state.
-// The callback is responsible for draining the channel, if it fails to do so
-// it will deadlock the dispatcher.
func (dispatcher *Dispatcher) RunDispatcher() (err error) {
- err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.auth)
+ err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
if err != nil {
log.Printf("Error getting my token UUID: %v", err)
return