import (
"fmt"
- "time"
"git.curoverse.com/arvados.git/lib/dispatchcloud/container"
"git.curoverse.com/arvados.git/sdk/go/arvados"
// cancelled.
func (sch *Scheduler) sync() {
running := sch.pool.Running()
- cancel := func(ent container.QueueEnt, reason string) {
- uuid := ent.Container.UUID
- logger := sch.logger.WithField("ContainerUUID", uuid)
- logger.Infof("cancelling container because %s", reason)
- err := sch.queue.Cancel(uuid)
- if err != nil {
- logger.WithError(err).Print("error cancelling container")
- }
- }
- kill := func(ent container.QueueEnt, reason string) {
- uuid := ent.Container.UUID
- logger := sch.logger.WithField("ContainerUUID", uuid)
- logger.Debugf("killing crunch-run process because %s", reason)
- sch.pool.KillContainer(uuid)
- }
qEntries, qUpdated := sch.queue.Entries()
for uuid, ent := range qEntries {
exited, running := running[uuid]
switch ent.Container.State {
case arvados.ContainerStateRunning:
if !running {
- go cancel(ent, "not running on any worker")
+ go sch.cancel(uuid, "not running on any worker")
} else if !exited.IsZero() && qUpdated.After(exited) {
- go cancel(ent, "state=\"Running\" after crunch-run exited")
+ go sch.cancel(uuid, "state=Running after crunch-run exited")
} else if ent.Container.Priority == 0 {
- go kill(ent, fmt.Sprintf("priority=%d", ent.Container.Priority))
+ go sch.kill(uuid, "priority=0")
}
case arvados.ContainerStateComplete, arvados.ContainerStateCancelled:
if running {
// of kill() will be to make the
// worker available for the next
// container.
- go kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
+ go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
} else {
sch.logger.WithFields(logrus.Fields{
"ContainerUUID": uuid,
// a network outage and is still
// preparing to run a container that
// has already been unlocked/requeued.
- go kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
+ go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
}
case arvados.ContainerStateLocked:
if running && !exited.IsZero() && qUpdated.After(exited) {
- logger := sch.logger.WithFields(logrus.Fields{
- "ContainerUUID": uuid,
- "Exited": time.Since(exited).Seconds(),
- })
- logger.Infof("requeueing container because state=%q after crunch-run exited", ent.Container.State)
- err := sch.queue.Unlock(uuid)
- if err != nil {
- logger.WithError(err).Info("error requeueing container")
- }
+ go sch.requeue(ent, "crunch-run exited")
+ } else if running && exited.IsZero() && ent.Container.Priority == 0 {
+ go sch.kill(uuid, "priority=0")
+ } else if !running && ent.Container.Priority == 0 {
+ go sch.requeue(ent, "priority=0")
}
default:
- sch.logger.WithField("ContainerUUID", uuid).Errorf("BUG: unexpected state %q", ent.Container.State)
+ sch.logger.WithFields(logrus.Fields{
+ "ContainerUUID": uuid,
+ "State": ent.Container.State,
+ }).Error("BUG: unexpected state")
}
}
+ for uuid := range running {
+ if _, known := qEntries[uuid]; !known {
+ go sch.kill(uuid, "not in queue")
+ }
+ }
+}
+
+func (sch *Scheduler) cancel(uuid string, reason string) {
+ if !sch.uuidLock(uuid, "cancel") {
+ return
+ }
+ defer sch.uuidUnlock(uuid)
+ logger := sch.logger.WithField("ContainerUUID", uuid)
+ logger.Infof("cancelling container because %s", reason)
+ err := sch.queue.Cancel(uuid)
+ if err != nil {
+ logger.WithError(err).Print("error cancelling container")
+ }
+}
+
+func (sch *Scheduler) kill(uuid string, reason string) {
+ sch.pool.KillContainer(uuid, reason)
+ sch.pool.ForgetContainer(uuid)
+}
+
+func (sch *Scheduler) requeue(ent container.QueueEnt, reason string) {
+ uuid := ent.Container.UUID
+ if !sch.uuidLock(uuid, "cancel") {
+ return
+ }
+ defer sch.uuidUnlock(uuid)
+ logger := sch.logger.WithFields(logrus.Fields{
+ "ContainerUUID": uuid,
+ "State": ent.Container.State,
+ "Priority": ent.Container.Priority,
+ })
+ logger.Infof("requeueing locked container because %s", reason)
+ err := sch.queue.Unlock(uuid)
+ if err != nil {
+ logger.WithError(err).Error("error requeueing container")
+ }
}