package scheduler
import (
+ "fmt"
"time"
"git.curoverse.com/arvados.git/lib/dispatchcloud/container"
"git.curoverse.com/arvados.git/sdk/go/arvados"
- "github.com/Sirupsen/logrus"
+ "github.com/sirupsen/logrus"
)
-// Sync resolves discrepancies between the queue and the pool:
+// sync resolves discrepancies between the queue and the pool:
//
// Lingering crunch-run processes for finalized and unlocked/requeued
// containers are killed.
//
// Running containers whose crunch-run processes have exited are
// cancelled.
-//
-// Sync must not be called concurrently with other calls to Map or
-// Sync using the same queue or pool.
-func Sync(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool) {
- running := pool.Running()
+func (sch *Scheduler) sync() {
+ running := sch.pool.Running()
cancel := func(ent container.QueueEnt, reason string) {
uuid := ent.Container.UUID
- logger := logger.WithField("ContainerUUID", uuid)
+ logger := sch.logger.WithField("ContainerUUID", uuid)
logger.Infof("cancelling container because %s", reason)
- err := queue.Cancel(uuid)
+ err := sch.queue.Cancel(uuid)
if err != nil {
logger.WithError(err).Print("error cancelling container")
}
}
- kill := func(ent container.QueueEnt) {
+ kill := func(ent container.QueueEnt, reason string) {
uuid := ent.Container.UUID
- logger := logger.WithField("ContainerUUID", uuid)
- logger.Debugf("killing crunch-run process because state=%q", ent.Container.State)
- pool.KillContainer(uuid)
+ logger := sch.logger.WithField("ContainerUUID", uuid)
+ logger.Debugf("killing crunch-run process because %s", reason)
+ sch.pool.KillContainer(uuid)
}
- qEntries, qUpdated := queue.Entries()
+ qEntries, qUpdated := sch.queue.Entries()
for uuid, ent := range qEntries {
exited, running := running[uuid]
switch ent.Container.State {
case arvados.ContainerStateRunning:
if !running {
- cancel(ent, "not running on any worker")
+ go cancel(ent, "not running on any worker")
} else if !exited.IsZero() && qUpdated.After(exited) {
- cancel(ent, "state=\"Running\" after crunch-run exited")
+ go cancel(ent, "state=\"Running\" after crunch-run exited")
+ } else if ent.Container.Priority == 0 {
+ go kill(ent, fmt.Sprintf("priority=%d", ent.Container.Priority))
}
case arvados.ContainerStateComplete, arvados.ContainerStateCancelled:
if running {
- kill(ent)
+ // Kill crunch-run in case it's stuck;
+ // nothing it does now will matter
+ // anyway. If crunch-run has already
+ // exited and we just haven't found
+ // out about it yet, the only effect
+ // of kill() will be to make the
+ // worker available for the next
+ // container.
+ go kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
} else {
- logger.WithFields(logrus.Fields{
+ sch.logger.WithFields(logrus.Fields{
"ContainerUUID": uuid,
"State": ent.Container.State,
}).Info("container finished")
- queue.Forget(uuid)
+ sch.queue.Forget(uuid)
}
case arvados.ContainerStateQueued:
if running {
- kill(ent)
+ // Can happen if a worker returns from
+ // a network outage and is still
+ // preparing to run a container that
+ // has already been unlocked/requeued.
+ go kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
}
case arvados.ContainerStateLocked:
if running && !exited.IsZero() && qUpdated.After(exited) {
- logger = logger.WithFields(logrus.Fields{
+ logger := sch.logger.WithFields(logrus.Fields{
"ContainerUUID": uuid,
"Exited": time.Since(exited).Seconds(),
})
logger.Infof("requeueing container because state=%q after crunch-run exited", ent.Container.State)
- err := queue.Unlock(uuid)
+ err := sch.queue.Unlock(uuid)
if err != nil {
logger.WithError(err).Info("error requeueing container")
}
}
default:
- logger.WithField("ContainerUUID", uuid).Errorf("BUG: unexpected state %q", ent.Container.State)
+ sch.logger.WithField("ContainerUUID", uuid).Errorf("BUG: unexpected state %q", ent.Container.State)
}
}
}