From: Tom Clegg Date: Sat, 27 Oct 2018 05:17:39 +0000 (-0400) Subject: 14360: Just notify once per sync. X-Git-Tag: 1.4.0~180^2~55 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/438ce6f86261376880b186775d7ba5fbade560a6 14360: Just notify once per sync. Sync holds the mutex so none of the notifications are delivered until sync is done anyway. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go index 74d3d51729..a7b5132a5a 100644 --- a/lib/dispatchcloud/worker/pool.go +++ b/lib/dispatchcloud/worker/pool.go @@ -213,6 +213,7 @@ func (wp *Pool) Create(it arvados.InstanceType) error { tags := cloud.InstanceTags{tagKeyInstanceType: it.Name} wp.creating[it]++ go func() { + defer wp.notify() inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil) wp.mtx.Lock() defer wp.mtx.Unlock() @@ -222,7 +223,6 @@ func (wp *Pool) Create(it arvados.InstanceType) error { } if err != nil { logger.WithError(err).Error("create failed") - go wp.notify() return } wp.updateWorker(inst, it, StateBooting) @@ -238,7 +238,9 @@ func (wp *Pool) AtQuota() bool { // Add or update worker attached to the given instance. Use // initialState if a new worker is created. Caller must have lock. -func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) { +// +// Returns true when a new worker is created. +func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) bool { id := inst.ID() if wp.workers[id] != nil { wp.workers[id].executor.SetTarget(inst) @@ -247,7 +249,7 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi if initialState == StateBooting && wp.workers[id].state == StateUnknown { wp.workers[id].state = StateBooting } - return + return false } if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" { initialState = StateHold @@ -271,7 +273,7 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi starting: make(map[string]struct{}), probing: make(chan struct{}, 1), } - go wp.notify() + return true } // Shutdown shuts down a worker with the given type, or returns false @@ -678,6 +680,7 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { wp.mtx.Lock() defer wp.mtx.Unlock() wp.logger.WithField("Instances", len(instances)).Debug("sync instances") + notify := false for _, inst := range instances { itTag := inst.Tags()[tagKeyInstanceType] @@ -686,7 +689,9 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag) continue } - wp.updateWorker(inst, it, StateUnknown) + if wp.updateWorker(inst, it, StateUnknown) { + notify = true + } } for id, wkr := range wp.workers { @@ -700,13 +705,17 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { logger.Info("instance disappeared in cloud") delete(wp.workers, id) go wkr.executor.Close() - go wp.notify() + notify = true } if !wp.loaded { wp.loaded = true wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list") } + + if notify { + go wp.notify() + } } // should be called in a new goroutine