14360: Split up long run func.
authorTom Clegg <tclegg@veritasgenetics.com>
Sat, 27 Oct 2018 04:30:16 +0000 (00:30 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Sat, 27 Oct 2018 04:30:16 +0000 (00:30 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

lib/dispatchcloud/worker/pool.go

index f2a53acbaa746b5e365d681e98a99ac2456ac764..fc87ea42187c8268f72d311c82955d077a3504df 100644 (file)
@@ -91,7 +91,12 @@ func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cl
                timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
        }
        wp.registerMetrics(reg)
-       go wp.run()
+       go func() {
+               wp.setupOnce.Do(wp.setup)
+               go wp.runMetrics()
+               go wp.runProbes()
+               go wp.runSync()
+       }()
        return wp
 }
 
@@ -487,6 +492,14 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
        reg.MustRegister(wp.mMemoryInuse)
 }
 
+func (wp *Pool) runMetrics() {
+       ch := wp.Subscribe()
+       defer wp.Unsubscribe(ch)
+       for range ch {
+               wp.updateMetrics()
+       }
+}
+
 func (wp *Pool) updateMetrics() {
        wp.mtx.RLock()
        defer wp.mtx.RUnlock()
@@ -510,67 +523,57 @@ func (wp *Pool) updateMetrics() {
        wp.mMemoryInuse.Set(float64(memInuse))
 }
 
-func (wp *Pool) run() {
-       wp.setupOnce.Do(wp.setup)
+func (wp *Pool) runProbes() {
+       maxPPS := wp.maxProbesPerSecond
+       if maxPPS < 1 {
+               maxPPS = defaultMaxProbesPerSecond
+       }
+       limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
+       defer limitticker.Stop()
 
-       go func() {
-               ch := wp.Subscribe()
-               defer wp.Unsubscribe(ch)
-               for range ch {
-                       wp.updateMetrics()
-               }
-       }()
+       probeticker := time.NewTicker(wp.probeInterval)
+       defer probeticker.Stop()
 
-       go func() {
-               maxPPS := wp.maxProbesPerSecond
-               if maxPPS < 1 {
-                       maxPPS = defaultMaxProbesPerSecond
+       workers := []cloud.InstanceID{}
+       for range probeticker.C {
+               workers = workers[:0]
+               wp.mtx.Lock()
+               for id, wkr := range wp.workers {
+                       if wkr.state == StateShutdown || wp.autoShutdown(wkr) {
+                               continue
+                       }
+                       workers = append(workers, id)
                }
-               limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
-               defer limitticker.Stop()
-
-               probeticker := time.NewTicker(wp.probeInterval)
-               defer probeticker.Stop()
+               wp.mtx.Unlock()
 
-               workers := []cloud.InstanceID{}
-               for range probeticker.C {
-                       workers = workers[:0]
+               for _, id := range workers {
                        wp.mtx.Lock()
-                       for id, wkr := range wp.workers {
-                               if wkr.state == StateShutdown || wp.autoShutdown(wkr) {
-                                       continue
-                               }
-                               workers = append(workers, id)
-                       }
+                       wkr, ok := wp.workers[id]
                        wp.mtx.Unlock()
-
-                       for _, id := range workers {
-                               wp.mtx.Lock()
-                               wkr, ok := wp.workers[id]
-                               wp.mtx.Unlock()
-                               if !ok || wkr.state == StateShutdown {
-                                       // Deleted/shutdown while we
-                                       // were probing others
-                                       continue
-                               }
-                               select {
-                               case wkr.probing <- struct{}{}:
-                                       go func() {
-                                               wp.probeAndUpdate(wkr)
-                                               <-wkr.probing
-                                       }()
-                               default:
-                                       wp.logger.WithField("Instance", wkr.instance).Debug("still waiting for last probe to finish")
-                               }
-                               select {
-                               case <-wp.stop:
-                                       return
-                               case <-limitticker.C:
-                               }
+                       if !ok || wkr.state == StateShutdown {
+                               // Deleted/shutdown while we
+                               // were probing others
+                               continue
+                       }
+                       select {
+                       case wkr.probing <- struct{}{}:
+                               go func() {
+                                       wp.probeAndUpdate(wkr)
+                                       <-wkr.probing
+                               }()
+                       default:
+                               wp.logger.WithField("Instance", wkr.instance).Debug("still waiting for last probe to finish")
+                       }
+                       select {
+                       case <-wp.stop:
+                               return
+                       case <-limitticker.C:
                        }
                }
-       }()
+       }
+}
 
+func (wp *Pool) runSync() {
        // sync once immediately, then wait syncInterval, sync again,
        // etc.
        timer := time.NewTimer(1)