X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b16f84a8c5562cd5c3939e8c445dc86df2be2d4b..514fb685c9d835441e0911d9b9499952b6787095:/lib/dispatchcloud/scheduler/sync.go diff --git a/lib/dispatchcloud/scheduler/sync.go b/lib/dispatchcloud/scheduler/sync.go index 23fc621dea..205ee50187 100644 --- a/lib/dispatchcloud/scheduler/sync.go +++ b/lib/dispatchcloud/scheduler/sync.go @@ -30,11 +30,11 @@ func (sch *Scheduler) sync() { switch ent.Container.State { case arvados.ContainerStateRunning: if !running { - go sch.cancel(ent, "not running on any worker") + go sch.cancel(uuid, "not running on any worker") } else if !exited.IsZero() && qUpdated.After(exited) { - go sch.cancel(ent, "state=\"Running\" after crunch-run exited") + go sch.cancel(uuid, "state=Running after crunch-run exited") } else if ent.Container.Priority == 0 { - go sch.kill(ent, "priority=0") + go sch.kill(uuid, "priority=0") } case arvados.ContainerStateComplete, arvados.ContainerStateCancelled: if running { @@ -46,7 +46,7 @@ func (sch *Scheduler) sync() { // of kill() will be to make the // worker available for the next // container. - go sch.kill(ent, fmt.Sprintf("state=%q", ent.Container.State)) + go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State)) } else { sch.logger.WithFields(logrus.Fields{ "ContainerUUID": uuid, @@ -60,13 +60,20 @@ func (sch *Scheduler) sync() { // a network outage and is still // preparing to run a container that // has already been unlocked/requeued. - go sch.kill(ent, fmt.Sprintf("state=%q", ent.Container.State)) + go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State)) + } else if ent.Container.Priority == 0 { + sch.logger.WithFields(logrus.Fields{ + "ContainerUUID": uuid, + "State": ent.Container.State, + "Priority": ent.Container.Priority, + }).Info("container on hold") + sch.queue.Forget(uuid) } case arvados.ContainerStateLocked: if running && !exited.IsZero() && qUpdated.After(exited) { go sch.requeue(ent, "crunch-run exited") } else if running && exited.IsZero() && ent.Container.Priority == 0 { - go sch.kill(ent, "priority=0") + go sch.kill(uuid, "priority=0") } else if !running && ent.Container.Priority == 0 { go sch.requeue(ent, "priority=0") } @@ -77,10 +84,14 @@ func (sch *Scheduler) sync() { }).Error("BUG: unexpected state") } } + for uuid := range running { + if _, known := qEntries[uuid]; !known { + go sch.kill(uuid, "not in queue") + } + } } -func (sch *Scheduler) cancel(ent container.QueueEnt, reason string) { - uuid := ent.Container.UUID +func (sch *Scheduler) cancel(uuid string, reason string) { if !sch.uuidLock(uuid, "cancel") { return } @@ -93,11 +104,9 @@ func (sch *Scheduler) cancel(ent container.QueueEnt, reason string) { } } -func (sch *Scheduler) kill(ent container.QueueEnt, reason string) { - uuid := ent.Container.UUID - logger := sch.logger.WithField("ContainerUUID", uuid) - logger.Debugf("killing crunch-run process because %s", reason) - sch.pool.KillContainer(uuid) +func (sch *Scheduler) kill(uuid string, reason string) { + sch.pool.KillContainer(uuid, reason) + sch.pool.ForgetContainer(uuid) } func (sch *Scheduler) requeue(ent container.QueueEnt, reason string) {