if container.State == Queued && container.Priority > 0 {
// Try to take the lock
- if err := dispatcher.UpdateState(container.UUID, Locked); err != nil {
+ if err := dispatcher.Lock(container.UUID); err != nil {
return
}
container.State = Locked
// UpdateState makes an API call to change the state of a container.
func (dispatcher *Dispatcher) UpdateState(uuid string, newState arvados.ContainerState) error {
if newState == Locked {
- err := dispatcher.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
- if err != nil {
- log.Printf("Error locking container %s: %q", uuid, err)
- }
- return err
+ return dispatcher.Lock(uuid)
} else if newState == Queued {
- err := dispatcher.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
- if err != nil {
- log.Printf("Error unlocking container %s: %q", uuid, err)
- }
- return err
+ return dispatcher.Unlock(uuid)
}
// All other states
return err
}
+// Lock makes the lock API call which updates the state of a container to Locked.
+func (dispatcher *Dispatcher) Lock(uuid string) error {
+ err := dispatcher.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
+ if err != nil {
+ log.Printf("Error locking container %s: %q", uuid, err)
+ }
+ return err
+}
+
+// Unlock makes the unlock API call which updates the state of a container to Queued.
+func (dispatcher *Dispatcher) Unlock(uuid string) error {
+ err := dispatcher.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
+ if err != nil {
+ log.Printf("Error unlocking container %s: %q", uuid, err)
+ }
+ return err
+}
+
// RunDispatcher runs the main loop of the dispatcher until receiving a message
// on the dispatcher.DoneProcessing channel. It also installs a signal handler
// to terminate gracefully on SIGINT, SIGTERM or SIGQUIT.
// OK, no cleanup needed
return
}
- err := dispatcher.UpdateState(container.UUID, dispatch.Queued)
+ err := dispatcher.Unlock(container.UUID)
if err != nil {
log.Printf("Error unlocking container %s: %v", container.UUID, err)
}
log.Printf("Error submitting container %s to slurm: %v",
container.UUID, err)
// maybe sbatch is broken, put it back to queued
- dispatcher.UpdateState(container.UUID, dispatch.Queued)
+ dispatcher.Unlock(container.UUID)
}
submitted = true
} else {
var st arvados.ContainerState
switch con.State {
case dispatch.Locked:
- st = dispatch.Queued
+ log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
+ container.UUID, con.State, dispatch.Queued)
+ dispatcher.Unlock(container.UUID)
case dispatch.Running:
st = dispatch.Cancelled
+ log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
+ container.UUID, con.State, st)
+ dispatcher.UpdateState(container.UUID, st)
default:
// Container state is Queued, Complete or Cancelled so stop monitoring it.
return
}
-
- log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
- container.UUID, con.State, st)
- dispatcher.UpdateState(container.UUID, st)
}
}
}