import (
"fmt"
- "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/lib/dispatchcloud/container"
+ "git.arvados.org/arvados.git/lib/dispatchcloud/worker"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
"github.com/sirupsen/logrus"
)
+var reportedUnexpectedState = false
+
// sync resolves discrepancies between the queue and the pool:
//
// Lingering crunch-run processes for finalized and unlocked/requeued
// Running containers whose crunch-run processes have exited are
// cancelled.
func (sch *Scheduler) sync() {
+ anyUnknownWorkers := sch.pool.CountWorkers()[worker.StateUnknown] > 0
running := sch.pool.Running()
qEntries, qUpdated := sch.queue.Entries()
for uuid, ent := range qEntries {
switch ent.Container.State {
case arvados.ContainerStateRunning:
if !running {
- go sch.cancel(uuid, "not running on any worker")
+ if !anyUnknownWorkers {
+ go sch.cancel(uuid, "not running on any worker")
+ }
} else if !exited.IsZero() && qUpdated.After(exited) {
go sch.cancel(uuid, "state=Running after crunch-run exited")
} else if ent.Container.Priority == 0 {
sch.logger.WithFields(logrus.Fields{
"ContainerUUID": uuid,
"State": ent.Container.State,
- }).Info("container finished")
+ }).Info("container finished -- dropping from queue")
sch.queue.Forget(uuid)
}
case arvados.ContainerStateQueued:
// a network outage and is still
// preparing to run a container that
// has already been unlocked/requeued.
- go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
+ go sch.kill(uuid, fmt.Sprintf("pool says running, but queue says state=%s", ent.Container.State))
+ } else if ent.Container.Priority == 0 {
+ sch.logger.WithFields(logrus.Fields{
+ "ContainerUUID": uuid,
+ "State": ent.Container.State,
+ "Priority": ent.Container.Priority,
+ }).Info("container on hold -- dropping from queue")
+ sch.queue.Forget(uuid)
}
case arvados.ContainerStateLocked:
if running && !exited.IsZero() && qUpdated.After(exited) {
go sch.requeue(ent, "priority=0")
}
default:
- sch.logger.WithFields(logrus.Fields{
- "ContainerUUID": uuid,
- "State": ent.Container.State,
- }).Error("BUG: unexpected state")
+ if !reportedUnexpectedState {
+ sch.logger.WithFields(logrus.Fields{
+ "ContainerUUID": uuid,
+ "State": ent.Container.State,
+ }).Error("BUG: unexpected state")
+ reportedUnexpectedState = true
+ }
}
}
for uuid := range running {
}
func (sch *Scheduler) kill(uuid string, reason string) {
+ if !sch.uuidLock(uuid, "kill") {
+ return
+ }
+ defer sch.uuidUnlock(uuid)
+ sch.logger.WithFields(logrus.Fields{
+ "ContainerUUID": uuid,
+ "reason": reason,
+ }).Debug("kill")
sch.pool.KillContainer(uuid, reason)
+ sch.pool.ForgetContainer(uuid)
}
func (sch *Scheduler) requeue(ent container.QueueEnt, reason string) {
uuid := ent.Container.UUID
- if !sch.uuidLock(uuid, "cancel") {
+ if !sch.uuidLock(uuid, "requeue") {
return
}
defer sch.uuidUnlock(uuid)