14360: Just notify once per sync.
authorTom Clegg <tclegg@veritasgenetics.com>
Sat, 27 Oct 2018 05:17:39 +0000 (01:17 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Sat, 27 Oct 2018 05:17:39 +0000 (01:17 -0400)
Sync holds the mutex so none of the notifications are delivered until
sync is done anyway.

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

lib/dispatchcloud/worker/pool.go

index 74d3d517295691bf9577d4c7fa9ec150489c4b32..a7b5132a5ab6307c46b26699bfd2500667817170 100644 (file)
@@ -213,6 +213,7 @@ func (wp *Pool) Create(it arvados.InstanceType) error {
        tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
        wp.creating[it]++
        go func() {
+               defer wp.notify()
                inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
                wp.mtx.Lock()
                defer wp.mtx.Unlock()
@@ -222,7 +223,6 @@ func (wp *Pool) Create(it arvados.InstanceType) error {
                }
                if err != nil {
                        logger.WithError(err).Error("create failed")
-                       go wp.notify()
                        return
                }
                wp.updateWorker(inst, it, StateBooting)
@@ -238,7 +238,9 @@ func (wp *Pool) AtQuota() bool {
 
 // Add or update worker attached to the given instance. Use
 // initialState if a new worker is created. Caller must have lock.
-func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) {
+//
+// Returns true when a new worker is created.
+func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) bool {
        id := inst.ID()
        if wp.workers[id] != nil {
                wp.workers[id].executor.SetTarget(inst)
@@ -247,7 +249,7 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi
                if initialState == StateBooting && wp.workers[id].state == StateUnknown {
                        wp.workers[id].state = StateBooting
                }
-               return
+               return false
        }
        if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
                initialState = StateHold
@@ -271,7 +273,7 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi
                starting:    make(map[string]struct{}),
                probing:     make(chan struct{}, 1),
        }
-       go wp.notify()
+       return true
 }
 
 // Shutdown shuts down a worker with the given type, or returns false
@@ -678,6 +680,7 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
        wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
+       notify := false
 
        for _, inst := range instances {
                itTag := inst.Tags()[tagKeyInstanceType]
@@ -686,7 +689,9 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
                        wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
                        continue
                }
-               wp.updateWorker(inst, it, StateUnknown)
+               if wp.updateWorker(inst, it, StateUnknown) {
+                       notify = true
+               }
        }
 
        for id, wkr := range wp.workers {
@@ -700,13 +705,17 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
                logger.Info("instance disappeared in cloud")
                delete(wp.workers, id)
                go wkr.executor.Close()
-               go wp.notify()
+               notify = true
        }
 
        if !wp.loaded {
                wp.loaded = true
                wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
        }
+
+       if notify {
+               go wp.notify()
+       }
 }
 
 // should be called in a new goroutine