14360: Improve comments.
[arvados.git] / lib / dispatchcloud / worker / pool.go
index 364670544b125fc8dff44e686542935dd2c68554..1e759e38f3a31992a9fd628248c7d5c3b196b885 100644 (file)
@@ -18,8 +18,13 @@ import (
        "github.com/prometheus/client_golang/prometheus"
 )
 
-// A View shows a worker's current state and recent activity.
-type View struct {
+const (
+       tagKeyInstanceType = "InstanceType"
+       tagKeyHold         = "Hold"
+)
+
+// An InstanceView shows a worker's current state and recent activity.
+type InstanceView struct {
        Instance             string
        Price                float64
        ArvadosInstanceType  string
@@ -86,7 +91,12 @@ func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cl
                timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
        }
        wp.registerMetrics(reg)
-       go wp.run()
+       go func() {
+               wp.setupOnce.Do(wp.setup)
+               go wp.runMetrics()
+               go wp.runProbes()
+               go wp.runSync()
+       }()
        return wp
 }
 
@@ -200,9 +210,10 @@ func (wp *Pool) Create(it arvados.InstanceType) error {
        wp.setupOnce.Do(wp.setup)
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
-       tags := cloud.InstanceTags{"InstanceType": it.Name}
+       tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
        wp.creating[it]++
        go func() {
+               defer wp.notify()
                inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
                wp.mtx.Lock()
                defer wp.mtx.Unlock()
@@ -212,7 +223,6 @@ func (wp *Pool) Create(it arvados.InstanceType) error {
                }
                if err != nil {
                        logger.WithError(err).Error("create failed")
-                       go wp.notify()
                        return
                }
                wp.updateWorker(inst, it, StateBooting)
@@ -223,12 +233,16 @@ func (wp *Pool) Create(it arvados.InstanceType) error {
 // AtQuota returns true if Create is not expected to work at the
 // moment.
 func (wp *Pool) AtQuota() bool {
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
        return time.Now().Before(wp.atQuotaUntil)
 }
 
 // Add or update worker attached to the given instance. Use
 // initialState if a new worker is created. Caller must have lock.
-func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) {
+//
+// Returns true when a new worker is created.
+func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) bool {
        id := inst.ID()
        if wp.workers[id] != nil {
                wp.workers[id].executor.SetTarget(inst)
@@ -237,9 +251,9 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi
                if initialState == StateBooting && wp.workers[id].state == StateUnknown {
                        wp.workers[id].state = StateBooting
                }
-               return
+               return false
        }
-       if initialState == StateUnknown && inst.Tags()["hold"] != "" {
+       if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
                initialState = StateHold
        }
        wp.logger.WithFields(logrus.Fields{
@@ -247,20 +261,21 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi
                "Instance":     inst,
                "State":        initialState,
        }).Infof("instance appeared in cloud")
+       now := time.Now()
        wp.workers[id] = &worker{
                executor:    wp.newExecutor(inst),
                state:       initialState,
                instance:    inst,
                instType:    it,
-               probed:      time.Now(),
-               busy:        time.Now(),
-               updated:     time.Now(),
-               unallocated: time.Now(),
+               probed:      now,
+               busy:        now,
+               updated:     now,
+               unallocated: now,
                running:     make(map[string]struct{}),
                starting:    make(map[string]struct{}),
                probing:     make(chan struct{}, 1),
        }
-       go wp.notify()
+       return true
 }
 
 // Shutdown shuts down a worker with the given type, or returns false
@@ -298,7 +313,7 @@ func (wp *Pool) shutdown(wkr *worker, logger logrus.FieldLogger) {
        go func() {
                err := wkr.instance.Destroy()
                if err != nil {
-                       logger.WithError(err).Warn("shutdown failed")
+                       logger.WithError(err).WithField("Instance", wkr.instance).Warn("shutdown failed")
                        return
                }
                wp.mtx.Lock()
@@ -369,9 +384,12 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b
                stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
                wp.mtx.Lock()
                defer wp.mtx.Unlock()
-               wkr.updated = time.Now()
+               now := time.Now()
+               wkr.updated = now
+               wkr.busy = now
                delete(wkr.starting, ctr.UUID)
                wkr.running[ctr.UUID] = struct{}{}
+               wkr.lastUUID = ctr.UUID
                if err != nil {
                        logger.WithField("stdout", string(stdout)).
                                WithField("stderr", string(stderr)).
@@ -391,6 +409,9 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b
 
 // KillContainer kills the crunch-run process for the given container
 // UUID, if it's running on any worker.
+//
+// KillContainer returns immediately; the act of killing the container
+// takes some time, and runs in the background.
 func (wp *Pool) KillContainer(uuid string) {
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
@@ -482,6 +503,14 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
        reg.MustRegister(wp.mMemoryInuse)
 }
 
+func (wp *Pool) runMetrics() {
+       ch := wp.Subscribe()
+       defer wp.Unsubscribe(ch)
+       for range ch {
+               wp.updateMetrics()
+       }
+}
+
 func (wp *Pool) updateMetrics() {
        wp.mtx.RLock()
        defer wp.mtx.RUnlock()
@@ -505,67 +534,57 @@ func (wp *Pool) updateMetrics() {
        wp.mMemoryInuse.Set(float64(memInuse))
 }
 
-func (wp *Pool) run() {
-       wp.setupOnce.Do(wp.setup)
+func (wp *Pool) runProbes() {
+       maxPPS := wp.maxProbesPerSecond
+       if maxPPS < 1 {
+               maxPPS = defaultMaxProbesPerSecond
+       }
+       limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
+       defer limitticker.Stop()
 
-       go func() {
-               ch := wp.Subscribe()
-               defer wp.Unsubscribe(ch)
-               for range ch {
-                       wp.updateMetrics()
-               }
-       }()
+       probeticker := time.NewTicker(wp.probeInterval)
+       defer probeticker.Stop()
 
-       go func() {
-               maxPPS := wp.maxProbesPerSecond
-               if maxPPS < 1 {
-                       maxPPS = defaultMaxProbesPerSecond
+       workers := []cloud.InstanceID{}
+       for range probeticker.C {
+               workers = workers[:0]
+               wp.mtx.Lock()
+               for id, wkr := range wp.workers {
+                       if wkr.state == StateShutdown || wp.shutdownIfIdle(wkr) {
+                               continue
+                       }
+                       workers = append(workers, id)
                }
-               limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
-               defer limitticker.Stop()
-
-               probeticker := time.NewTicker(wp.probeInterval)
-               defer probeticker.Stop()
+               wp.mtx.Unlock()
 
-               workers := []cloud.InstanceID{}
-               for range probeticker.C {
-                       workers = workers[:0]
+               for _, id := range workers {
                        wp.mtx.Lock()
-                       for id, wkr := range wp.workers {
-                               if wkr.state == StateShutdown || wp.autoShutdown(wkr) {
-                                       continue
-                               }
-                               workers = append(workers, id)
-                       }
+                       wkr, ok := wp.workers[id]
                        wp.mtx.Unlock()
-
-                       for _, id := range workers {
-                               wp.mtx.Lock()
-                               wkr, ok := wp.workers[id]
-                               wp.mtx.Unlock()
-                               if !ok || wkr.state == StateShutdown {
-                                       // Deleted/shutdown while we
-                                       // were probing others
-                                       continue
-                               }
-                               select {
-                               case wkr.probing <- struct{}{}:
-                                       go func() {
-                                               wp.probeAndUpdate(wkr)
-                                               <-wkr.probing
-                                       }()
-                               default:
-                                       wp.logger.WithField("Instance", wkr.instance).Debug("still waiting for last probe to finish")
-                               }
-                               select {
-                               case <-wp.stop:
-                                       return
-                               case <-limitticker.C:
-                               }
+                       if !ok || wkr.state == StateShutdown {
+                               // Deleted/shutdown while we
+                               // were probing others
+                               continue
+                       }
+                       select {
+                       case wkr.probing <- struct{}{}:
+                               go func() {
+                                       wp.probeAndUpdate(wkr)
+                                       <-wkr.probing
+                               }()
+                       default:
+                               wp.logger.WithField("Instance", wkr.instance).Debug("still waiting for last probe to finish")
+                       }
+                       select {
+                       case <-wp.stop:
+                               return
+                       case <-limitticker.C:
                        }
                }
-       }()
+       }
+}
 
+func (wp *Pool) runSync() {
        // sync once immediately, then wait syncInterval, sync again,
        // etc.
        timer := time.NewTimer(1)
@@ -585,7 +604,28 @@ func (wp *Pool) run() {
 }
 
 // caller must have lock.
-func (wp *Pool) autoShutdown(wkr *worker) bool {
+func (wp *Pool) shutdownIfBroken(wkr *worker, dur time.Duration) {
+       if wkr.state == StateHold {
+               return
+       }
+       label, threshold := "", wp.timeoutProbe
+       if wkr.state == StateBooting {
+               label, threshold = "new ", wp.timeoutBooting
+       }
+       if dur < threshold {
+               return
+       }
+       wp.logger.WithFields(logrus.Fields{
+               "Instance": wkr.instance,
+               "Duration": dur,
+               "Since":    wkr.probed,
+               "State":    wkr.state,
+       }).Warnf("%sinstance unresponsive, shutting down", label)
+       wp.shutdown(wkr, wp.logger)
+}
+
+// caller must have lock.
+func (wp *Pool) shutdownIfIdle(wkr *worker) bool {
        if len(wkr.running)+len(wkr.starting) > 0 || wkr.state != StateRunning {
                return false
        }
@@ -608,13 +648,14 @@ func (wp *Pool) Stop() {
        close(wp.stop)
 }
 
-// View reports status information for every worker in the pool.
-func (wp *Pool) View() []View {
-       var r []View
+// Instances returns an InstanceView for each worker in the pool,
+// summarizing its current state and recent activity.
+func (wp *Pool) Instances() []InstanceView {
+       var r []InstanceView
        wp.setupOnce.Do(wp.setup)
        wp.mtx.Lock()
        for _, w := range wp.workers {
-               r = append(r, View{
+               r = append(r, InstanceView{
                        Instance:             w.instance.String(),
                        Price:                w.instType.Price,
                        ArvadosInstanceType:  w.instType.Name,
@@ -669,15 +710,18 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
        wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
+       notify := false
 
        for _, inst := range instances {
-               itTag := inst.Tags()["InstanceType"]
+               itTag := inst.Tags()[tagKeyInstanceType]
                it, ok := wp.instanceTypes[itTag]
                if !ok {
                        wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
                        continue
                }
-               wp.updateWorker(inst, it, StateUnknown)
+               if wp.updateWorker(inst, it, StateUnknown) {
+                       notify = true
+               }
        }
 
        for id, wkr := range wp.workers {
@@ -691,13 +735,17 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
                logger.Info("instance disappeared in cloud")
                delete(wp.workers, id)
                go wkr.executor.Close()
-               go wp.notify()
+               notify = true
        }
 
        if !wp.loaded {
                wp.loaded = true
                wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
        }
+
+       if notify {
+               go wp.notify()
+       }
 }
 
 // should be called in a new goroutine
@@ -744,28 +792,12 @@ func (wp *Pool) probeAndUpdate(wkr *worker) {
                } else {
                        logger.Info("instance not responding")
                }
-
-               if wkr.state == StateHold {
-                       return
-               }
-
-               label, threshold := "", wp.timeoutProbe
-               if wkr.state == StateBooting {
-                       label, threshold = "new ", wp.timeoutBooting
-               }
-               if dur > threshold {
-                       logger.WithField("Since", wkr.probed).Warnf("%sinstance unresponsive, shutting down", label)
-                       wp.shutdown(wkr, logger)
-               }
+               wp.shutdownIfBroken(wkr, dur)
                return
        }
 
        updateTime := time.Now()
        wkr.probed = updateTime
-       if len(ctrUUIDs) > 0 {
-               wkr.busy = updateTime
-               wkr.lastUUID = ctrUUIDs[0]
-       }
        if wkr.state == StateShutdown || wkr.state == StateHold {
        } else if booted {
                if wkr.state != StateRunning {
@@ -777,13 +809,18 @@ func (wp *Pool) probeAndUpdate(wkr *worker) {
        }
 
        if updated != wkr.updated {
-               // Worker was updated (e.g., by starting a new
-               // container) after the probe began. Avoid clobbering
-               // those changes with the probe results.
+               // Worker was updated after the probe began, so
+               // wkr.running might have a container UUID that was
+               // not yet running when ctrUUIDs was generated. Leave
+               // wkr.running alone and wait for the next probe to
+               // catch up on any changes.
                return
        }
 
-       if len(ctrUUIDs) == 0 && len(wkr.running) > 0 {
+       if len(ctrUUIDs) > 0 {
+               wkr.busy = updateTime
+               wkr.lastUUID = ctrUUIDs[0]
+       } else if len(wkr.running) > 0 {
                wkr.unallocated = updateTime
        }
        running := map[string]struct{}{}