X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/ca2d946973b6ae25dd594ddecec54e02b83bc44e..41365ac598721e31fc88c462934e0a06cafe2aae:/lib/dispatchcloud/worker/pool.go diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go index 14f6a3efce..81a658535e 100644 --- a/lib/dispatchcloud/worker/pool.go +++ b/lib/dispatchcloud/worker/pool.go @@ -68,6 +68,8 @@ const ( defaultTimeoutBooting = time.Minute * 10 defaultTimeoutProbe = time.Minute * 10 defaultTimeoutShutdown = time.Second * 10 + defaultTimeoutTERM = time.Minute * 2 + defaultTimeoutSignal = time.Second * 5 // Time after a quota error to try again anyway, even if no // instances have been shutdown. @@ -105,6 +107,8 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting), timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe), timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown), + timeoutTERM: duration(cluster.Dispatch.TimeoutTERM, defaultTimeoutTERM), + timeoutSignal: duration(cluster.Dispatch.TimeoutSignal, defaultTimeoutSignal), installPublicKey: installPublicKey, stop: make(chan bool), } @@ -136,11 +140,13 @@ type Pool struct { timeoutBooting time.Duration timeoutProbe time.Duration timeoutShutdown time.Duration + timeoutTERM time.Duration + timeoutSignal time.Duration installPublicKey ssh.PublicKey // private state subscribers map[<-chan struct{}]chan<- struct{} - creating map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls + creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret) workers map[cloud.InstanceID]*worker loaded bool // loaded list of instances from InstanceSet at least once exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called @@ -160,6 +166,11 @@ type Pool struct { mMemory *prometheus.GaugeVec } +type createCall struct { + time time.Time + instanceType arvados.InstanceType +} + // Subscribe returns a buffered channel that becomes ready after any // change to the pool's state that could have scheduling implications: // a worker's state changes, a new worker appears, the cloud @@ -205,8 +216,13 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int { defer wp.mtx.RUnlock() unalloc := map[arvados.InstanceType]int{} creating := map[arvados.InstanceType]int{} - for it, times := range wp.creating { - creating[it] = len(times) + oldestCreate := map[arvados.InstanceType]time.Time{} + for _, cc := range wp.creating { + it := cc.instanceType + creating[it]++ + if t, ok := oldestCreate[it]; !ok || t.After(cc.time) { + oldestCreate[it] = cc.time + } } for _, wkr := range wp.workers { // Skip workers that are not expected to become @@ -221,7 +237,7 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int { } it := wkr.instType unalloc[it]++ - if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) { + if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) { // If up to N new workers appear in // Instances() while we are waiting for N // Create() calls to complete, we assume we're @@ -260,10 +276,10 @@ func (wp *Pool) Create(it arvados.InstanceType) bool { return false } now := time.Now() - wp.creating[it] = append(wp.creating[it], now) + secret := randomHex(instanceSecretLength) + wp.creating[secret] = createCall{time: now, instanceType: it} go func() { defer wp.notify() - secret := randomHex(instanceSecretLength) tags := cloud.InstanceTags{ tagKeyInstanceType: it.Name, tagKeyIdleBehavior: string(IdleBehaviorRun), @@ -273,14 +289,10 @@ func (wp *Pool) Create(it arvados.InstanceType) bool { inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey) wp.mtx.Lock() defer wp.mtx.Unlock() - // Remove our timestamp marker from wp.creating - for i, t := range wp.creating[it] { - if t == now { - copy(wp.creating[it][i:], wp.creating[it][i+1:]) - wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1] - break - } - } + // delete() is deferred so the updateWorker() call + // below knows to use StateBooting when adding a new + // worker. + defer delete(wp.creating, secret) if err != nil { if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() { wp.atQuotaErr = err @@ -291,7 +303,7 @@ func (wp *Pool) Create(it arvados.InstanceType) bool { wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify) return } - wp.updateWorker(inst, it, StateBooting) + wp.updateWorker(inst, it) }() return true } @@ -313,32 +325,34 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) if !ok { return errors.New("requested instance does not exist") } - wkr.idleBehavior = idleBehavior - wkr.saveTags() - wkr.shutdownIfIdle() + wkr.setIdleBehavior(idleBehavior) return nil } -// Add or update worker attached to the given instance. Use -// initialState if a new worker is created. +// Add or update worker attached to the given instance. // // The second return value is true if a new worker is created. // +// A newly added instance has state=StateBooting if its tags match an +// entry in wp.creating, otherwise StateUnknown. +// // Caller must have lock. -func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) { +func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) { inst = tagVerifier{inst} id := inst.ID() if wkr := wp.workers[id]; wkr != nil { wkr.executor.SetTarget(inst) wkr.instance = inst wkr.updated = time.Now() - if initialState == StateBooting && wkr.state == StateUnknown { - wkr.state = StateBooting - } wkr.saveTags() return wkr, false } + state := StateUnknown + if _, ok := wp.creating[inst.Tags()[tagKeyInstanceSecret]]; ok { + state = StateBooting + } + // If an instance has a valid IdleBehavior tag when it first // appears, initialize the new worker accordingly (this is how // we restore IdleBehavior that was set by a prior dispatch @@ -356,7 +370,7 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi "Address": inst.Address(), }) logger.WithFields(logrus.Fields{ - "State": initialState, + "State": state, "IdleBehavior": idleBehavior, }).Infof("instance appeared in cloud") now := time.Now() @@ -365,7 +379,7 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi wp: wp, logger: logger, executor: wp.newExecutor(inst), - state: initialState, + state: state, idleBehavior: idleBehavior, instance: inst, instType: it, @@ -373,19 +387,14 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi probed: now, busy: now, updated: now, - running: make(map[string]struct{}), - starting: make(map[string]struct{}), + running: make(map[string]*remoteRunner), + starting: make(map[string]*remoteRunner), probing: make(chan struct{}, 1), } wp.workers[id] = wkr return wkr, true } -// caller must have lock. -func (wp *Pool) notifyExited(uuid string, t time.Time) { - wp.exited[uuid] = t -} - // Shutdown shuts down a worker with the given type, or returns false // if all workers with the given type are busy. func (wp *Pool) Shutdown(it arvados.InstanceType) bool { @@ -409,8 +418,12 @@ func (wp *Pool) Shutdown(it arvados.InstanceType) bool { } // CountWorkers returns the current number of workers in each state. +// +// CountWorkers blocks, if necessary, until the initial instance list +// has been loaded from the cloud provider. func (wp *Pool) CountWorkers() map[State]int { wp.setupOnce.Do(wp.setup) + wp.waitUntilLoaded() wp.mtx.Lock() defer wp.mtx.Unlock() r := map[State]int{} @@ -472,53 +485,29 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b // // KillContainer returns immediately; the act of killing the container // takes some time, and runs in the background. -func (wp *Pool) KillContainer(uuid string) { +func (wp *Pool) KillContainer(uuid string, reason string) { wp.mtx.Lock() defer wp.mtx.Unlock() + logger := wp.logger.WithFields(logrus.Fields{ + "ContainerUUID": uuid, + "Reason": reason, + }) if _, ok := wp.exited[uuid]; ok { - wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process") + logger.Debug("clearing placeholder for exited crunch-run process") delete(wp.exited, uuid) return } for _, wkr := range wp.workers { - if _, ok := wkr.running[uuid]; ok { - go wp.kill(wkr, uuid) - return + rr := wkr.running[uuid] + if rr == nil { + rr = wkr.starting[uuid] } - } - wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared") -} - -func (wp *Pool) kill(wkr *worker, uuid string) { - logger := wp.logger.WithFields(logrus.Fields{ - "ContainerUUID": uuid, - "Instance": wkr.instance.ID(), - }) - logger.Debug("killing process") - cmd := "crunch-run --kill 15 " + uuid - if u := wkr.instance.RemoteUser(); u != "root" { - cmd = "sudo " + cmd - } - stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil) - if err != nil { - logger.WithFields(logrus.Fields{ - "stderr": string(stderr), - "stdout": string(stdout), - "error": err, - }).Warn("kill failed") - return - } - logger.Debug("killing process succeeded") - wp.mtx.Lock() - defer wp.mtx.Unlock() - if _, ok := wkr.running[uuid]; ok { - delete(wkr.running, uuid) - if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 { - wkr.state = StateIdle + if rr != nil { + rr.Kill(reason) + return } - wkr.updated = time.Now() - go wp.notify() } + logger.Debug("cannot kill: already disappeared") } func (wp *Pool) registerMetrics(reg *prometheus.Registry) { @@ -565,6 +554,7 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) { func (wp *Pool) runMetrics() { ch := wp.Subscribe() defer wp.Unsubscribe(ch) + wp.updateMetrics() for range ch { wp.updateMetrics() } @@ -702,7 +692,7 @@ func (wp *Pool) Instances() []InstanceView { } func (wp *Pool) setup() { - wp.creating = map[arvados.InstanceType][]time.Time{} + wp.creating = map[string]createCall{} wp.exited = map[string]time.Time{} wp.workers = map[cloud.InstanceID]*worker{} wp.subscribers = map[<-chan struct{}]chan<- struct{}{} @@ -752,7 +742,7 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag) continue } - if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew { + if wkr, isNew := wp.updateWorker(inst, it); isNew { notify = true } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown { wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying") @@ -770,11 +760,12 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { }) logger.Info("instance disappeared in cloud") delete(wp.workers, id) - go wkr.executor.Close() + go wkr.Close() notify = true } if !wp.loaded { + notify = true wp.loaded = true wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list") } @@ -784,6 +775,17 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { } } +func (wp *Pool) waitUntilLoaded() { + ch := wp.Subscribe() + wp.mtx.RLock() + defer wp.mtx.RUnlock() + for !wp.loaded { + wp.mtx.RUnlock() + <-ch + wp.mtx.RLock() + } +} + // Return a random string of n hexadecimal digits (n*4 random bits). n // must be even. func randomHex(n int) string {