14360: Cut excess time.Now() calls.
[arvados.git] / lib / dispatchcloud / worker / pool.go
index cf8fac380c852b945229d877d772d8c8520b7846..74d3d517295691bf9577d4c7fa9ec150489c4b32 100644 (file)
@@ -18,6 +18,11 @@ import (
        "github.com/prometheus/client_golang/prometheus"
 )
 
+const (
+       tagKeyInstanceType = "InstanceType"
+       tagKeyHold         = "Hold"
+)
+
 // A View shows a worker's current state and recent activity.
 type View struct {
        Instance             string
@@ -45,6 +50,8 @@ type Executor interface {
        //
        // SetTarget must not block on concurrent Execute calls.
        SetTarget(cloud.ExecutorTarget)
+
+       Close()
 }
 
 const (
@@ -84,7 +91,12 @@ func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cl
                timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
        }
        wp.registerMetrics(reg)
-       go wp.run()
+       go func() {
+               wp.setupOnce.Do(wp.setup)
+               go wp.runMetrics()
+               go wp.runProbes()
+               go wp.runSync()
+       }()
        return wp
 }
 
@@ -198,7 +210,7 @@ func (wp *Pool) Create(it arvados.InstanceType) error {
        wp.setupOnce.Do(wp.setup)
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
-       tags := cloud.InstanceTags{"InstanceType": it.Name}
+       tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
        wp.creating[it]++
        go func() {
                inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
@@ -237,7 +249,7 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi
                }
                return
        }
-       if initialState == StateUnknown && inst.Tags()["hold"] != "" {
+       if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
                initialState = StateHold
        }
        wp.logger.WithFields(logrus.Fields{
@@ -245,15 +257,16 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi
                "Instance":     inst,
                "State":        initialState,
        }).Infof("instance appeared in cloud")
+       now := time.Now()
        wp.workers[id] = &worker{
                executor:    wp.newExecutor(inst),
                state:       initialState,
                instance:    inst,
                instType:    it,
-               probed:      time.Now(),
-               busy:        time.Now(),
-               updated:     time.Now(),
-               unallocated: time.Now(),
+               probed:      now,
+               busy:        now,
+               updated:     now,
+               unallocated: now,
                running:     make(map[string]struct{}),
                starting:    make(map[string]struct{}),
                probing:     make(chan struct{}, 1),
@@ -480,6 +493,14 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
        reg.MustRegister(wp.mMemoryInuse)
 }
 
+func (wp *Pool) runMetrics() {
+       ch := wp.Subscribe()
+       defer wp.Unsubscribe(ch)
+       for range ch {
+               wp.updateMetrics()
+       }
+}
+
 func (wp *Pool) updateMetrics() {
        wp.mtx.RLock()
        defer wp.mtx.RUnlock()
@@ -503,85 +524,68 @@ func (wp *Pool) updateMetrics() {
        wp.mMemoryInuse.Set(float64(memInuse))
 }
 
-func (wp *Pool) run() {
-       wp.setupOnce.Do(wp.setup)
+func (wp *Pool) runProbes() {
+       maxPPS := wp.maxProbesPerSecond
+       if maxPPS < 1 {
+               maxPPS = defaultMaxProbesPerSecond
+       }
+       limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
+       defer limitticker.Stop()
 
-       go func() {
-               ch := wp.Subscribe()
-               defer wp.Unsubscribe(ch)
-               for range ch {
-                       wp.updateMetrics()
-               }
-       }()
+       probeticker := time.NewTicker(wp.probeInterval)
+       defer probeticker.Stop()
 
-       go func() {
-               maxPPS := wp.maxProbesPerSecond
-               if maxPPS < 1 {
-                       maxPPS = defaultMaxProbesPerSecond
+       workers := []cloud.InstanceID{}
+       for range probeticker.C {
+               workers = workers[:0]
+               wp.mtx.Lock()
+               for id, wkr := range wp.workers {
+                       if wkr.state == StateShutdown || wp.autoShutdown(wkr) {
+                               continue
+                       }
+                       workers = append(workers, id)
                }
-               limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
-               defer limitticker.Stop()
-
-               probeticker := time.NewTicker(wp.probeInterval)
-               defer probeticker.Stop()
+               wp.mtx.Unlock()
 
-               workers := []cloud.InstanceID{}
-               for range probeticker.C {
-                       workers = workers[:0]
+               for _, id := range workers {
                        wp.mtx.Lock()
-                       for id, wkr := range wp.workers {
-                               if wkr.state == StateShutdown || wp.autoShutdown(wkr) {
-                                       continue
-                               }
-                               workers = append(workers, id)
-                       }
+                       wkr, ok := wp.workers[id]
                        wp.mtx.Unlock()
-
-                       for _, id := range workers {
-                               wp.mtx.Lock()
-                               wkr, ok := wp.workers[id]
-                               wp.mtx.Unlock()
-                               if !ok || wkr.state == StateShutdown {
-                                       // Deleted/shutdown while we
-                                       // were probing others
-                                       continue
-                               }
-                               select {
-                               case wkr.probing <- struct{}{}:
-                                       go func() {
-                                               wp.probeAndUpdate(wkr)
-                                               <-wkr.probing
-                                       }()
-                               default:
-                                       wp.logger.WithField("Instance", wkr.instance).Debug("still waiting for last probe to finish")
-                               }
-                               select {
-                               case <-wp.stop:
-                                       return
-                               case <-limitticker.C:
-                               }
+                       if !ok || wkr.state == StateShutdown {
+                               // Deleted/shutdown while we
+                               // were probing others
+                               continue
+                       }
+                       select {
+                       case wkr.probing <- struct{}{}:
+                               go func() {
+                                       wp.probeAndUpdate(wkr)
+                                       <-wkr.probing
+                               }()
+                       default:
+                               wp.logger.WithField("Instance", wkr.instance).Debug("still waiting for last probe to finish")
+                       }
+                       select {
+                       case <-wp.stop:
+                               return
+                       case <-limitticker.C:
                        }
                }
-       }()
+       }
+}
 
-       timer := time.NewTimer(time.Nanosecond)
+func (wp *Pool) runSync() {
+       // sync once immediately, then wait syncInterval, sync again,
+       // etc.
+       timer := time.NewTimer(1)
        for {
-               err := wp.getInstancesAndSync()
-               if err != nil {
-                       wp.logger.WithError(err).Warn("sync failed")
-               }
-
-               // Reset timer to desired interval, and ignore the
-               // tick that might have already arrived.
-               timer.Stop()
-               select {
-               case <-timer.C:
-               default:
-               }
-               timer.Reset(wp.syncInterval)
-
                select {
                case <-timer.C:
+                       err := wp.getInstancesAndSync()
+                       if err != nil {
+                               wp.logger.WithError(err).Warn("sync failed")
+                       }
+                       timer.Reset(wp.syncInterval)
                case <-wp.stop:
                        wp.logger.Debug("worker.Pool stopped")
                        return
@@ -676,7 +680,7 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
        wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
 
        for _, inst := range instances {
-               itTag := inst.Tags()["InstanceType"]
+               itTag := inst.Tags()[tagKeyInstanceType]
                it, ok := wp.instanceTypes[itTag]
                if !ok {
                        wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
@@ -695,6 +699,7 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
                })
                logger.Info("instance disappeared in cloud")
                delete(wp.workers, id)
+               go wkr.executor.Close()
                go wp.notify()
        }