X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d4da0c6adf235aa103337430fd73cf3bfba64fd9..2a748e79c3a72454d70e40f39fcad9dabf4943cc:/lib/dispatchcloud/worker/pool.go diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go index fe1c6ecc03..e90935e2aa 100644 --- a/lib/dispatchcloud/worker/pool.go +++ b/lib/dispatchcloud/worker/pool.go @@ -140,7 +140,7 @@ type Pool struct { // private state subscribers map[<-chan struct{}]chan<- struct{} - creating map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls + creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret) workers map[cloud.InstanceID]*worker loaded bool // loaded list of instances from InstanceSet at least once exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called @@ -160,6 +160,11 @@ type Pool struct { mMemory *prometheus.GaugeVec } +type createCall struct { + time time.Time + instanceType arvados.InstanceType +} + // Subscribe returns a buffered channel that becomes ready after any // change to the pool's state that could have scheduling implications: // a worker's state changes, a new worker appears, the cloud @@ -205,8 +210,13 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int { defer wp.mtx.RUnlock() unalloc := map[arvados.InstanceType]int{} creating := map[arvados.InstanceType]int{} - for it, times := range wp.creating { - creating[it] = len(times) + oldestCreate := map[arvados.InstanceType]time.Time{} + for _, cc := range wp.creating { + it := cc.instanceType + creating[it]++ + if t, ok := oldestCreate[it]; !ok || t.After(cc.time) { + oldestCreate[it] = cc.time + } } for _, wkr := range wp.workers { // Skip workers that are not expected to become @@ -221,7 +231,7 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int { } it := wkr.instType unalloc[it]++ - if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) { + if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) { // If up to N new workers appear in // Instances() while we are waiting for N // Create() calls to complete, we assume we're @@ -260,10 +270,10 @@ func (wp *Pool) Create(it arvados.InstanceType) bool { return false } now := time.Now() - wp.creating[it] = append(wp.creating[it], now) + secret := randomHex(instanceSecretLength) + wp.creating[secret] = createCall{time: now, instanceType: it} go func() { defer wp.notify() - secret := randomHex(instanceSecretLength) tags := cloud.InstanceTags{ tagKeyInstanceType: it.Name, tagKeyIdleBehavior: string(IdleBehaviorRun), @@ -273,14 +283,10 @@ func (wp *Pool) Create(it arvados.InstanceType) bool { inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey) wp.mtx.Lock() defer wp.mtx.Unlock() - // Remove our timestamp marker from wp.creating - for i, t := range wp.creating[it] { - if t == now { - copy(wp.creating[it][i:], wp.creating[it][i+1:]) - wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1] - break - } - } + // delete() is deferred so the updateWorker() call + // below knows to use StateBooting when adding a new + // worker. + defer delete(wp.creating, secret) if err != nil { if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() { wp.atQuotaErr = err @@ -291,7 +297,7 @@ func (wp *Pool) Create(it arvados.InstanceType) bool { wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify) return } - wp.updateWorker(inst, it, StateBooting) + wp.updateWorker(inst, it) }() return true } @@ -319,26 +325,30 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) return nil } -// Add or update worker attached to the given instance. Use -// initialState if a new worker is created. +// Add or update worker attached to the given instance. // // The second return value is true if a new worker is created. // +// A newly added instance has state=StateBooting if its tags match an +// entry in wp.creating, otherwise StateUnknown. +// // Caller must have lock. -func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) { +func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) { inst = tagVerifier{inst} id := inst.ID() if wkr := wp.workers[id]; wkr != nil { wkr.executor.SetTarget(inst) wkr.instance = inst wkr.updated = time.Now() - if initialState == StateBooting && wkr.state == StateUnknown { - wkr.state = StateBooting - } wkr.saveTags() return wkr, false } + state := StateUnknown + if _, ok := wp.creating[inst.Tags()[tagKeyInstanceSecret]]; ok { + state = StateBooting + } + // If an instance has a valid IdleBehavior tag when it first // appears, initialize the new worker accordingly (this is how // we restore IdleBehavior that was set by a prior dispatch @@ -356,7 +366,7 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi "Address": inst.Address(), }) logger.WithFields(logrus.Fields{ - "State": initialState, + "State": state, "IdleBehavior": idleBehavior, }).Infof("instance appeared in cloud") now := time.Now() @@ -365,7 +375,7 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi wp: wp, logger: logger, executor: wp.newExecutor(inst), - state: initialState, + state: state, idleBehavior: idleBehavior, instance: inst, instType: it, @@ -409,8 +419,12 @@ func (wp *Pool) Shutdown(it arvados.InstanceType) bool { } // CountWorkers returns the current number of workers in each state. +// +// CountWorkers blocks, if necessary, until the initial instance list +// has been loaded from the cloud provider. func (wp *Pool) CountWorkers() map[State]int { wp.setupOnce.Do(wp.setup) + wp.waitUntilLoaded() wp.mtx.Lock() defer wp.mtx.Unlock() r := map[State]int{} @@ -703,7 +717,7 @@ func (wp *Pool) Instances() []InstanceView { } func (wp *Pool) setup() { - wp.creating = map[arvados.InstanceType][]time.Time{} + wp.creating = map[string]createCall{} wp.exited = map[string]time.Time{} wp.workers = map[cloud.InstanceID]*worker{} wp.subscribers = map[<-chan struct{}]chan<- struct{}{} @@ -753,7 +767,7 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag) continue } - if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew { + if wkr, isNew := wp.updateWorker(inst, it); isNew { notify = true } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown { wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying") @@ -776,6 +790,7 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { } if !wp.loaded { + notify = true wp.loaded = true wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list") } @@ -785,6 +800,17 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { } } +func (wp *Pool) waitUntilLoaded() { + ch := wp.Subscribe() + wp.mtx.RLock() + defer wp.mtx.RUnlock() + for !wp.loaded { + wp.mtx.RUnlock() + <-ch + wp.mtx.RLock() + } +} + // Return a random string of n hexadecimal digits (n*4 random bits). n // must be even. func randomHex(n int) string {