X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/cc952178056bf6d29471f6986306fb673dcf394a..51c1daf863f3e1920f758f73b4e5d70ff2c706d6:/lib/dispatchcloud/worker/pool.go diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go index 37add6d3d6..ff5f762c1d 100644 --- a/lib/dispatchcloud/worker/pool.go +++ b/lib/dispatchcloud/worker/pool.go @@ -90,6 +90,7 @@ func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cl timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting), timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe), timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown), + stop: make(chan bool), } wp.registerMetrics(reg) go func() { @@ -121,7 +122,7 @@ type Pool struct { // private state subscribers map[<-chan struct{}]chan<- struct{} - creating map[arvados.InstanceType]int // goroutines waiting for (InstanceSet)Create to return + creating map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls workers map[cloud.InstanceID]*worker loaded bool // loaded list of instances from InstanceSet at least once exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called @@ -171,25 +172,41 @@ func (wp *Pool) Unsubscribe(ch <-chan struct{}) { // Unallocated returns the number of unallocated (creating + booting + // idle + unknown) workers for each instance type. -// -// The returned counts should be interpreted as upper bounds, rather -// than exact counts: they are sometimes artificially high when a -// newly created instance appears in the driver's Instances() list -// before the Create() call returns. func (wp *Pool) Unallocated() map[arvados.InstanceType]int { wp.setupOnce.Do(wp.setup) wp.mtx.RLock() defer wp.mtx.RUnlock() - u := map[arvados.InstanceType]int{} - for it, c := range wp.creating { - u[it] = c + unalloc := map[arvados.InstanceType]int{} + creating := map[arvados.InstanceType]int{} + for it, times := range wp.creating { + creating[it] = len(times) } for _, wkr := range wp.workers { - if wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown { - u[wkr.instType]++ + if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) { + continue } + it := wkr.instType + unalloc[it]++ + if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) { + // If up to N new workers appear in + // Instances() while we are waiting for N + // Create() calls to complete, we assume we're + // just seeing a race between Instances() and + // Create() responses. + // + // The other common reason why nodes have + // state==Unknown is that they appeared at + // startup, before any Create calls. They + // don't match the above timing condition, so + // we never mistakenly attribute them to + // pending Create calls. + creating[it]-- + } + } + for it, c := range creating { + unalloc[it] += c } - return u + return unalloc } // Create a new instance with the given type, and add it to the worker @@ -204,13 +221,21 @@ func (wp *Pool) Create(it arvados.InstanceType) error { return wp.atQuotaErr } tags := cloud.InstanceTags{tagKeyInstanceType: it.Name} - wp.creating[it]++ + now := time.Now() + wp.creating[it] = append(wp.creating[it], now) go func() { defer wp.notify() inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil) wp.mtx.Lock() defer wp.mtx.Unlock() - wp.creating[it]-- + // Remove our timestamp marker from wp.creating + for i, t := range wp.creating[it] { + if t == now { + copy(wp.creating[it][i:], wp.creating[it][i+1:]) + wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1] + break + } + } if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() { wp.atQuotaErr = err wp.atQuotaUntil = time.Now().Add(time.Minute) @@ -266,6 +291,7 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi state: initialState, instance: inst, instType: it, + appeared: now, probed: now, busy: now, updated: now, @@ -304,8 +330,8 @@ func (wp *Pool) Shutdown(it arvados.InstanceType) bool { return false } -// Workers returns the current number of workers in each state. -func (wp *Pool) Workers() map[State]int { +// CountWorkers returns the current number of workers in each state. +func (wp *Pool) CountWorkers() map[State]int { wp.setupOnce.Do(wp.setup) wp.mtx.Lock() defer wp.mtx.Unlock() @@ -385,7 +411,7 @@ func (wp *Pool) kill(wkr *worker, uuid string) { "Instance": wkr.instance, }) logger.Debug("killing process") - stdout, stderr, err := wkr.executor.Execute("crunch-run --kill "+uuid, nil) + stdout, stderr, err := wkr.executor.Execute("crunch-run --kill 15 "+uuid, nil) if err != nil { logger.WithFields(logrus.Fields{ "stderr": string(stderr), @@ -579,7 +605,7 @@ func (wp *Pool) Instances() []InstanceView { } func (wp *Pool) setup() { - wp.creating = map[arvados.InstanceType]int{} + wp.creating = map[arvados.InstanceType][]time.Time{} wp.exited = map[string]time.Time{} wp.workers = map[cloud.InstanceID]*worker{} wp.subscribers = map[<-chan struct{}]chan<- struct{}{}