14360: Move single-worker Pool methods to worker type.
[arvados.git] / lib / dispatchcloud / worker / pool.go
index 45e9da37adc7b544b8eb4e3e001f4554bbd3e3a9..be66895a98e8abb927943a198a72140b6e9c5a91 100644 (file)
@@ -5,7 +5,6 @@
 package worker
 
 import (
-       "bytes"
        "io"
        "sort"
        "strings"
@@ -137,22 +136,6 @@ type Pool struct {
        mMemoryInuse       prometheus.Gauge
 }
 
-type worker struct {
-       state    State
-       instance cloud.Instance
-       executor Executor
-       instType arvados.InstanceType
-       vcpus    int64
-       memory   int64
-       probed   time.Time
-       updated  time.Time
-       busy     time.Time
-       lastUUID string
-       running  map[string]struct{} // remember to update state idle<->running when this changes
-       starting map[string]struct{} // remember to update state idle<->running when this changes
-       probing  chan struct{}
-}
-
 // Subscribe returns a channel that becomes ready whenever a worker's
 // state changes.
 //
@@ -264,13 +247,16 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi
        if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
                initialState = StateHold
        }
-       wp.logger.WithFields(logrus.Fields{
+       logger := wp.logger.WithFields(logrus.Fields{
                "InstanceType": it.Name,
                "Instance":     inst,
-               "State":        initialState,
-       }).Infof("instance appeared in cloud")
+       })
+       logger.WithField("State", initialState).Infof("instance appeared in cloud")
        now := time.Now()
        wp.workers[id] = &worker{
+               mtx:      &wp.mtx,
+               wp:       wp,
+               logger:   logger,
                executor: wp.newExecutor(inst),
                state:    initialState,
                instance: inst,
@@ -285,6 +271,11 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi
        return true
 }
 
+// caller must have lock.
+func (wp *Pool) notifyExited(uuid string, t time.Time) {
+       wp.exited[uuid] = t
+}
+
 // Shutdown shuts down a worker with the given type, or returns false
 // if all workers with the given type are busy.
 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
@@ -298,9 +289,8 @@ func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
                // time (Idle) or the earliest create time (Booting)
                for _, wkr := range wp.workers {
                        if wkr.state == tryState && wkr.instType == it {
-                               logger = logger.WithField("Instance", wkr.instance)
-                               logger.Info("shutting down")
-                               wp.shutdown(wkr, logger)
+                               logger.WithField("Instance", wkr.instance).Info("shutting down")
+                               wkr.shutdown()
                                return true
                        }
                }
@@ -308,23 +298,6 @@ func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
        return false
 }
 
-// caller must have lock
-func (wp *Pool) shutdown(wkr *worker, logger logrus.FieldLogger) {
-       wkr.updated = time.Now()
-       wkr.state = StateShutdown
-       go func() {
-               err := wkr.instance.Destroy()
-               if err != nil {
-                       logger.WithError(err).WithField("Instance", wkr.instance).Warn("shutdown failed")
-                       return
-               }
-               wp.mtx.Lock()
-               wp.atQuotaUntil = time.Now()
-               wp.mtx.Unlock()
-               wp.notify()
-       }()
-}
-
 // Workers returns the current number of workers in each state.
 func (wp *Pool) Workers() map[State]int {
        wp.setupOnce.Do(wp.setup)
@@ -360,11 +333,6 @@ func (wp *Pool) Running() map[string]time.Time {
 // StartContainer starts a container on an idle worker immediately if
 // possible, otherwise returns false.
 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
-       logger := wp.logger.WithFields(logrus.Fields{
-               "InstanceType":  it.Name,
-               "ContainerUUID": ctr.UUID,
-               "Priority":      ctr.Priority,
-       })
        wp.setupOnce.Do(wp.setup)
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
@@ -379,34 +347,7 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b
        if wkr == nil {
                return false
        }
-       logger = logger.WithField("Instance", wkr.instance)
-       logger.Debug("starting container")
-       wkr.starting[ctr.UUID] = struct{}{}
-       wkr.state = StateRunning
-       go func() {
-               stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
-               wp.mtx.Lock()
-               defer wp.mtx.Unlock()
-               now := time.Now()
-               wkr.updated = now
-               wkr.busy = now
-               delete(wkr.starting, ctr.UUID)
-               wkr.running[ctr.UUID] = struct{}{}
-               wkr.lastUUID = ctr.UUID
-               if err != nil {
-                       logger.WithField("stdout", string(stdout)).
-                               WithField("stderr", string(stderr)).
-                               WithError(err).
-                               Error("error starting crunch-run process")
-                       // Leave uuid in wkr.running, though: it's
-                       // possible the error was just a communication
-                       // failure and the process was in fact
-                       // started.  Wait for next probe to find out.
-                       return
-               }
-               logger.Info("crunch-run process started")
-               wkr.lastUUID = ctr.UUID
-       }()
+       wkr.startContainer(ctr)
        return true
 }
 
@@ -556,7 +497,7 @@ func (wp *Pool) runProbes() {
                workers = workers[:0]
                wp.mtx.Lock()
                for id, wkr := range wp.workers {
-                       if wkr.state == StateShutdown || wp.shutdownIfIdle(wkr) {
+                       if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
                                continue
                        }
                        workers = append(workers, id)
@@ -567,20 +508,12 @@ func (wp *Pool) runProbes() {
                        wp.mtx.Lock()
                        wkr, ok := wp.workers[id]
                        wp.mtx.Unlock()
-                       if !ok || wkr.state == StateShutdown {
-                               // Deleted/shutdown while we
-                               // were probing others
+                       if !ok {
+                               // Deleted while we were probing
+                               // others
                                continue
                        }
-                       select {
-                       case wkr.probing <- struct{}{}:
-                               go func() {
-                                       wp.probeAndUpdate(wkr)
-                                       <-wkr.probing
-                               }()
-                       default:
-                               wp.logger.WithField("Instance", wkr.instance).Debug("still waiting for last probe to finish")
-                       }
+                       go wkr.ProbeAndUpdate()
                        select {
                        case <-wp.stop:
                                return
@@ -609,45 +542,6 @@ func (wp *Pool) runSync() {
        }
 }
 
-// caller must have lock.
-func (wp *Pool) shutdownIfBroken(wkr *worker, dur time.Duration) {
-       if wkr.state == StateHold {
-               return
-       }
-       label, threshold := "", wp.timeoutProbe
-       if wkr.state == StateBooting {
-               label, threshold = "new ", wp.timeoutBooting
-       }
-       if dur < threshold {
-               return
-       }
-       wp.logger.WithFields(logrus.Fields{
-               "Instance": wkr.instance,
-               "Duration": dur,
-               "Since":    wkr.probed,
-               "State":    wkr.state,
-       }).Warnf("%sinstance unresponsive, shutting down", label)
-       wp.shutdown(wkr, wp.logger)
-}
-
-// caller must have lock.
-func (wp *Pool) shutdownIfIdle(wkr *worker) bool {
-       if wkr.state != StateIdle {
-               return false
-       }
-       age := time.Since(wkr.busy)
-       if age < wp.timeoutIdle {
-               return false
-       }
-       logger := wp.logger.WithFields(logrus.Fields{
-               "Age":      age,
-               "Instance": wkr.instance,
-       })
-       logger.Info("shutdown idle worker")
-       wp.shutdown(wkr, logger)
-       return true
-}
-
 // Stop synchronizing with the InstanceSet.
 func (wp *Pool) Stop() {
        wp.setupOnce.Do(wp.setup)
@@ -753,146 +647,3 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
                go wp.notify()
        }
 }
-
-// should be called in a new goroutine
-func (wp *Pool) probeAndUpdate(wkr *worker) {
-       logger := wp.logger.WithField("Instance", wkr.instance)
-       wp.mtx.Lock()
-       updated := wkr.updated
-       needProbeRunning := wkr.state == StateRunning || wkr.state == StateIdle
-       needProbeBooted := wkr.state == StateUnknown || wkr.state == StateBooting
-       wp.mtx.Unlock()
-       if !needProbeBooted && !needProbeRunning {
-               return
-       }
-
-       var (
-               ctrUUIDs []string
-               ok       bool
-               stderr   []byte
-       )
-       if needProbeBooted {
-               ok, stderr = wp.probeBooted(wkr)
-               wp.mtx.Lock()
-               if ok || wkr.state == StateRunning || wkr.state == StateIdle {
-                       logger.Info("instance booted; will try probeRunning")
-                       needProbeRunning = true
-               }
-               wp.mtx.Unlock()
-       }
-       if needProbeRunning {
-               ctrUUIDs, ok, stderr = wp.probeRunning(wkr)
-       }
-       logger = logger.WithField("stderr", string(stderr))
-       wp.mtx.Lock()
-       defer wp.mtx.Unlock()
-       if !ok {
-               if wkr.state == StateShutdown && wkr.updated.After(updated) {
-                       // Skip the logging noise if shutdown was
-                       // initiated during probe.
-                       return
-               }
-               dur := time.Since(wkr.probed)
-               logger := logger.WithFields(logrus.Fields{
-                       "Duration": dur,
-                       "State":    wkr.state,
-               })
-               if wkr.state == StateBooting {
-                       logger.Debug("new instance not responding")
-               } else {
-                       logger.Info("instance not responding")
-               }
-               wp.shutdownIfBroken(wkr, dur)
-               return
-       }
-
-       updateTime := time.Now()
-       wkr.probed = updateTime
-
-       if updated != wkr.updated {
-               // Worker was updated after the probe began, so
-               // wkr.running might have a container UUID that was
-               // not yet running when ctrUUIDs was generated. Leave
-               // wkr.running alone and wait for the next probe to
-               // catch up on any changes.
-               return
-       }
-
-       if len(ctrUUIDs) > 0 {
-               wkr.busy = updateTime
-               wkr.lastUUID = ctrUUIDs[0]
-       } else if len(wkr.running) > 0 {
-               // Actual last-busy time was sometime between wkr.busy
-               // and now. Now is the earliest opportunity to take
-               // advantage of the non-busy state, though.
-               wkr.busy = updateTime
-       }
-       running := map[string]struct{}{}
-       changed := false
-       for _, uuid := range ctrUUIDs {
-               running[uuid] = struct{}{}
-               if _, ok := wkr.running[uuid]; !ok {
-                       changed = true
-               }
-       }
-       for uuid := range wkr.running {
-               if _, ok := running[uuid]; !ok {
-                       logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
-                       wp.exited[uuid] = updateTime
-                       changed = true
-               }
-       }
-       if wkr.state == StateUnknown || wkr.state == StateBooting {
-               wkr.state = StateIdle
-               changed = true
-       }
-       if changed {
-               wkr.running = running
-               if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
-                       wkr.state = StateRunning
-               } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
-                       wkr.state = StateIdle
-               }
-               wkr.updated = updateTime
-               go wp.notify()
-       }
-}
-
-func (wp *Pool) probeRunning(wkr *worker) (running []string, ok bool, stderr []byte) {
-       cmd := "crunch-run --list"
-       stdout, stderr, err := wkr.executor.Execute(cmd, nil)
-       if err != nil {
-               wp.logger.WithFields(logrus.Fields{
-                       "Instance": wkr.instance,
-                       "Command":  cmd,
-                       "stdout":   string(stdout),
-                       "stderr":   string(stderr),
-               }).WithError(err).Warn("probe failed")
-               return nil, false, stderr
-       }
-       stdout = bytes.TrimRight(stdout, "\n")
-       if len(stdout) == 0 {
-               return nil, true, stderr
-       }
-       return strings.Split(string(stdout), "\n"), true, stderr
-}
-
-func (wp *Pool) probeBooted(wkr *worker) (ok bool, stderr []byte) {
-       cmd := wp.bootProbeCommand
-       if cmd == "" {
-               cmd = "true"
-       }
-       stdout, stderr, err := wkr.executor.Execute(cmd, nil)
-       logger := wp.logger.WithFields(logrus.Fields{
-               "Instance": wkr.instance,
-               "Command":  cmd,
-               "stdout":   string(stdout),
-               "stderr":   string(stderr),
-       })
-       if err != nil {
-               logger.WithError(err).Debug("boot probe failed")
-               return false, stderr
-       }
-       logger.Info("boot probe succeeded")
-       return true, stderr
-}