X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/2666c3b726d190511f072db9e6606a1a99936968..ca2d946973b6ae25dd594ddecec54e02b83bc44e:/lib/dispatchcloud/scheduler/sync.go diff --git a/lib/dispatchcloud/scheduler/sync.go b/lib/dispatchcloud/scheduler/sync.go index 4c55b3c0d8..23fc621dea 100644 --- a/lib/dispatchcloud/scheduler/sync.go +++ b/lib/dispatchcloud/scheduler/sync.go @@ -6,11 +6,10 @@ package scheduler import ( "fmt" - "time" "git.curoverse.com/arvados.git/lib/dispatchcloud/container" "git.curoverse.com/arvados.git/sdk/go/arvados" - "github.com/Sirupsen/logrus" + "github.com/sirupsen/logrus" ) // sync resolves discrepancies between the queue and the pool: @@ -25,32 +24,17 @@ import ( // cancelled. func (sch *Scheduler) sync() { running := sch.pool.Running() - cancel := func(ent container.QueueEnt, reason string) { - uuid := ent.Container.UUID - logger := sch.logger.WithField("ContainerUUID", uuid) - logger.Infof("cancelling container because %s", reason) - err := sch.queue.Cancel(uuid) - if err != nil { - logger.WithError(err).Print("error cancelling container") - } - } - kill := func(ent container.QueueEnt, reason string) { - uuid := ent.Container.UUID - logger := sch.logger.WithField("ContainerUUID", uuid) - logger.Debugf("killing crunch-run process because %s", reason) - sch.pool.KillContainer(uuid) - } qEntries, qUpdated := sch.queue.Entries() for uuid, ent := range qEntries { exited, running := running[uuid] switch ent.Container.State { case arvados.ContainerStateRunning: if !running { - go cancel(ent, "not running on any worker") + go sch.cancel(ent, "not running on any worker") } else if !exited.IsZero() && qUpdated.After(exited) { - go cancel(ent, "state=\"Running\" after crunch-run exited") + go sch.cancel(ent, "state=\"Running\" after crunch-run exited") } else if ent.Container.Priority == 0 { - go kill(ent, fmt.Sprintf("priority=%d", ent.Container.Priority)) + go sch.kill(ent, "priority=0") } case arvados.ContainerStateComplete, arvados.ContainerStateCancelled: if running { @@ -62,7 +46,7 @@ func (sch *Scheduler) sync() { // of kill() will be to make the // worker available for the next // container. - go kill(ent, fmt.Sprintf("state=%q", ent.Container.State)) + go sch.kill(ent, fmt.Sprintf("state=%q", ent.Container.State)) } else { sch.logger.WithFields(logrus.Fields{ "ContainerUUID": uuid, @@ -76,22 +60,60 @@ func (sch *Scheduler) sync() { // a network outage and is still // preparing to run a container that // has already been unlocked/requeued. - go kill(ent, fmt.Sprintf("state=%q", ent.Container.State)) + go sch.kill(ent, fmt.Sprintf("state=%q", ent.Container.State)) } case arvados.ContainerStateLocked: if running && !exited.IsZero() && qUpdated.After(exited) { - logger := sch.logger.WithFields(logrus.Fields{ - "ContainerUUID": uuid, - "Exited": time.Since(exited).Seconds(), - }) - logger.Infof("requeueing container because state=%q after crunch-run exited", ent.Container.State) - err := sch.queue.Unlock(uuid) - if err != nil { - logger.WithError(err).Info("error requeueing container") - } + go sch.requeue(ent, "crunch-run exited") + } else if running && exited.IsZero() && ent.Container.Priority == 0 { + go sch.kill(ent, "priority=0") + } else if !running && ent.Container.Priority == 0 { + go sch.requeue(ent, "priority=0") } default: - sch.logger.WithField("ContainerUUID", uuid).Errorf("BUG: unexpected state %q", ent.Container.State) + sch.logger.WithFields(logrus.Fields{ + "ContainerUUID": uuid, + "State": ent.Container.State, + }).Error("BUG: unexpected state") } } } + +func (sch *Scheduler) cancel(ent container.QueueEnt, reason string) { + uuid := ent.Container.UUID + if !sch.uuidLock(uuid, "cancel") { + return + } + defer sch.uuidUnlock(uuid) + logger := sch.logger.WithField("ContainerUUID", uuid) + logger.Infof("cancelling container because %s", reason) + err := sch.queue.Cancel(uuid) + if err != nil { + logger.WithError(err).Print("error cancelling container") + } +} + +func (sch *Scheduler) kill(ent container.QueueEnt, reason string) { + uuid := ent.Container.UUID + logger := sch.logger.WithField("ContainerUUID", uuid) + logger.Debugf("killing crunch-run process because %s", reason) + sch.pool.KillContainer(uuid) +} + +func (sch *Scheduler) requeue(ent container.QueueEnt, reason string) { + uuid := ent.Container.UUID + if !sch.uuidLock(uuid, "cancel") { + return + } + defer sch.uuidUnlock(uuid) + logger := sch.logger.WithFields(logrus.Fields{ + "ContainerUUID": uuid, + "State": ent.Container.State, + "Priority": ent.Container.Priority, + }) + logger.Infof("requeueing locked container because %s", reason) + err := sch.queue.Unlock(uuid) + if err != nil { + logger.WithError(err).Error("error requeueing container") + } +}