X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/fb9f29b33a380616211a56973045dac1254977ee..41365ac598721e31fc88c462934e0a06cafe2aae:/lib/dispatchcloud/worker/pool.go diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go index e81c2c091f..81a658535e 100644 --- a/lib/dispatchcloud/worker/pool.go +++ b/lib/dispatchcloud/worker/pool.go @@ -68,6 +68,8 @@ const ( defaultTimeoutBooting = time.Minute * 10 defaultTimeoutProbe = time.Minute * 10 defaultTimeoutShutdown = time.Second * 10 + defaultTimeoutTERM = time.Minute * 2 + defaultTimeoutSignal = time.Second * 5 // Time after a quota error to try again anyway, even if no // instances have been shutdown. @@ -105,6 +107,8 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting), timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe), timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown), + timeoutTERM: duration(cluster.Dispatch.TimeoutTERM, defaultTimeoutTERM), + timeoutSignal: duration(cluster.Dispatch.TimeoutSignal, defaultTimeoutSignal), installPublicKey: installPublicKey, stop: make(chan bool), } @@ -136,6 +140,8 @@ type Pool struct { timeoutBooting time.Duration timeoutProbe time.Duration timeoutShutdown time.Duration + timeoutTERM time.Duration + timeoutSignal time.Duration installPublicKey ssh.PublicKey // private state @@ -319,9 +325,7 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) if !ok { return errors.New("requested instance does not exist") } - wkr.idleBehavior = idleBehavior - wkr.saveTags() - wkr.shutdownIfIdle() + wkr.setIdleBehavior(idleBehavior) return nil } @@ -383,19 +387,14 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*wor probed: now, busy: now, updated: now, - running: make(map[string]struct{}), - starting: make(map[string]struct{}), + running: make(map[string]*remoteRunner), + starting: make(map[string]*remoteRunner), probing: make(chan struct{}, 1), } wp.workers[id] = wkr return wkr, true } -// caller must have lock. -func (wp *Pool) notifyExited(uuid string, t time.Time) { - wp.exited[uuid] = t -} - // Shutdown shuts down a worker with the given type, or returns false // if all workers with the given type are busy. func (wp *Pool) Shutdown(it arvados.InstanceType) bool { @@ -419,8 +418,12 @@ func (wp *Pool) Shutdown(it arvados.InstanceType) bool { } // CountWorkers returns the current number of workers in each state. +// +// CountWorkers blocks, if necessary, until the initial instance list +// has been loaded from the cloud provider. func (wp *Pool) CountWorkers() map[State]int { wp.setupOnce.Do(wp.setup) + wp.waitUntilLoaded() wp.mtx.Lock() defer wp.mtx.Unlock() r := map[State]int{} @@ -482,53 +485,29 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b // // KillContainer returns immediately; the act of killing the container // takes some time, and runs in the background. -func (wp *Pool) KillContainer(uuid string) { +func (wp *Pool) KillContainer(uuid string, reason string) { wp.mtx.Lock() defer wp.mtx.Unlock() + logger := wp.logger.WithFields(logrus.Fields{ + "ContainerUUID": uuid, + "Reason": reason, + }) if _, ok := wp.exited[uuid]; ok { - wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process") + logger.Debug("clearing placeholder for exited crunch-run process") delete(wp.exited, uuid) return } for _, wkr := range wp.workers { - if _, ok := wkr.running[uuid]; ok { - go wp.kill(wkr, uuid) - return + rr := wkr.running[uuid] + if rr == nil { + rr = wkr.starting[uuid] } - } - wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared") -} - -func (wp *Pool) kill(wkr *worker, uuid string) { - logger := wp.logger.WithFields(logrus.Fields{ - "ContainerUUID": uuid, - "Instance": wkr.instance.ID(), - }) - logger.Debug("killing process") - cmd := "crunch-run --kill 15 " + uuid - if u := wkr.instance.RemoteUser(); u != "root" { - cmd = "sudo " + cmd - } - stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil) - if err != nil { - logger.WithFields(logrus.Fields{ - "stderr": string(stderr), - "stdout": string(stdout), - "error": err, - }).Warn("kill failed") - return - } - logger.Debug("killing process succeeded") - wp.mtx.Lock() - defer wp.mtx.Unlock() - if _, ok := wkr.running[uuid]; ok { - delete(wkr.running, uuid) - if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 { - wkr.state = StateIdle + if rr != nil { + rr.Kill(reason) + return } - wkr.updated = time.Now() - go wp.notify() } + logger.Debug("cannot kill: already disappeared") } func (wp *Pool) registerMetrics(reg *prometheus.Registry) { @@ -781,11 +760,12 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { }) logger.Info("instance disappeared in cloud") delete(wp.workers, id) - go wkr.executor.Close() + go wkr.Close() notify = true } if !wp.loaded { + notify = true wp.loaded = true wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list") } @@ -795,6 +775,17 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { } } +func (wp *Pool) waitUntilLoaded() { + ch := wp.Subscribe() + wp.mtx.RLock() + defer wp.mtx.RUnlock() + for !wp.loaded { + wp.mtx.RUnlock() + <-ch + wp.mtx.RLock() + } +} + // Return a random string of n hexadecimal digits (n*4 random bits). n // must be even. func randomHex(n int) string {