20649: Fix panic on race, worker shutdown vs. container startup.
[arvados.git] / lib / dispatchcloud / worker / pool.go
index 6a74280ca452e9b365f6a976f96e04ef03edc7e6..1d600e37020ec0f26108e1997adb79c1f787839c 100644 (file)
@@ -13,6 +13,7 @@ import (
        "fmt"
        "io"
        "io/ioutil"
+       mathrand "math/rand"
        "sort"
        "strings"
        "sync"
@@ -103,12 +104,15 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe
                instanceSetID:                  instanceSetID,
                instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
                newExecutor:                    newExecutor,
+               cluster:                        cluster,
                bootProbeCommand:               cluster.Containers.CloudVMs.BootProbeCommand,
+               instanceInitCommand:            cloud.InitCommand(cluster.Containers.CloudVMs.InstanceInitCommand),
                runnerSource:                   cluster.Containers.CloudVMs.DeployRunnerBinary,
                imageID:                        cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
                instanceTypes:                  cluster.InstanceTypes,
                maxProbesPerSecond:             cluster.Containers.CloudVMs.MaxProbesPerSecond,
                maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
+               maxInstances:                   cluster.Containers.CloudVMs.MaxInstances,
                probeInterval:                  duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
                syncInterval:                   duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
                timeoutIdle:                    duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
@@ -121,6 +125,8 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe
                systemRootToken:                cluster.SystemRootToken,
                installPublicKey:               installPublicKey,
                tagKeyPrefix:                   cluster.Containers.CloudVMs.TagKeyPrefix,
+               runnerCmdDefault:               cluster.Containers.CrunchRunCommand,
+               runnerArgs:                     append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...),
                stop:                           make(chan bool),
        }
        wp.registerMetrics(reg)
@@ -142,7 +148,9 @@ type Pool struct {
        instanceSetID                  cloud.InstanceSetID
        instanceSet                    *throttledInstanceSet
        newExecutor                    func(cloud.Instance) Executor
+       cluster                        *arvados.Cluster
        bootProbeCommand               string
+       instanceInitCommand            cloud.InitCommand
        runnerSource                   string
        imageID                        cloud.ImageID
        instanceTypes                  map[string]arvados.InstanceType
@@ -150,6 +158,7 @@ type Pool struct {
        probeInterval                  time.Duration
        maxProbesPerSecond             int
        maxConcurrentInstanceCreateOps int
+       maxInstances                   int
        timeoutIdle                    time.Duration
        timeoutBooting                 time.Duration
        timeoutProbe                   time.Duration
@@ -160,6 +169,8 @@ type Pool struct {
        systemRootToken                string
        installPublicKey               ssh.PublicKey
        tagKeyPrefix                   string
+       runnerCmdDefault               string   // crunch-run command to use if not deploying a binary
+       runnerArgs                     []string // extra args passed to crunch-run
 
        // private state
        subscribers  map[<-chan struct{}]chan<- struct{}
@@ -188,6 +199,8 @@ type Pool struct {
        mTimeFromShutdownToGone   prometheus.Summary
        mTimeFromQueueToCrunchRun prometheus.Summary
        mRunProbeDuration         *prometheus.SummaryVec
+       mProbeAgeMax              prometheus.Gauge
+       mProbeAgeMedian           prometheus.Gauge
 }
 
 type createCall struct {
@@ -295,10 +308,10 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
 // pool. The worker is added immediately; instance creation runs in
 // the background.
 //
-// Create returns false if a pre-existing error state prevents it from
-// even attempting to create a new instance. Those errors are logged
-// by the Pool, so the caller does not need to log anything in such
-// cases.
+// Create returns false if a pre-existing error or a configuration
+// setting prevents it from even attempting to create a new
+// instance. Those errors are logged by the Pool, so the caller does
+// not need to log anything in such cases.
 func (wp *Pool) Create(it arvados.InstanceType) bool {
        logger := wp.logger.WithField("InstanceType", it.Name)
        wp.setupOnce.Do(wp.setup)
@@ -308,7 +321,9 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
        }
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
-       if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil {
+       if time.Now().Before(wp.atQuotaUntil) ||
+               wp.instanceSet.throttleCreate.Error() != nil ||
+               (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating)) {
                return false
        }
        // The maxConcurrentInstanceCreateOps knob throttles the number of node create
@@ -334,7 +349,7 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
                        wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
                        wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
                }
-               initCmd := TagVerifier{nil, secret, nil}.InitCommand()
+               initCmd := TagVerifier{nil, secret, nil}.InitCommand() + "\n" + wp.instanceInitCommand
                inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
                wp.mtx.Lock()
                defer wp.mtx.Unlock()
@@ -354,15 +369,19 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
                }
                wp.updateWorker(inst, it)
        }()
+       if len(wp.creating)+len(wp.workers) == wp.maxInstances {
+               logger.Infof("now at MaxInstances limit of %d instances", wp.maxInstances)
+       }
        return true
 }
 
 // AtQuota returns true if Create is not expected to work at the
-// moment.
+// moment (e.g., cloud provider has reported quota errors, or we are
+// already at our own configured quota).
 func (wp *Pool) AtQuota() bool {
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
-       return time.Now().Before(wp.atQuotaUntil)
+       return time.Now().Before(wp.atQuotaUntil) || (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating))
 }
 
 // SetIdleBehavior determines how the indicated instance will behave
@@ -382,10 +401,15 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior)
 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
-       wkr := wp.workers[inst.ID()]
+       wkr, ok := wp.workers[inst.ID()]
+       if !ok {
+               // race: inst was removed from the pool
+               return
+       }
        if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
-               // the node is not in booting state (can happen if a-d-c is restarted) OR
-               // this is not the first SSH connection
+               // the node is not in booting state (can happen if
+               // a-d-c is restarted) OR this is not the first SSH
+               // connection
                return
        }
 
@@ -606,6 +630,20 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
                Help:      "Number of containers reported running by cloud VMs.",
        })
        reg.MustRegister(wp.mContainersRunning)
+       wp.mProbeAgeMax = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "probe_age_seconds_max",
+               Help:      "Maximum number of seconds since an instance's most recent successful probe.",
+       })
+       reg.MustRegister(wp.mProbeAgeMax)
+       wp.mProbeAgeMedian = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "probe_age_seconds_median",
+               Help:      "Median number of seconds since an instance's most recent successful probe.",
+       })
+       reg.MustRegister(wp.mProbeAgeMedian)
        wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
                Namespace: "arvados",
                Subsystem: "dispatchcloud",
@@ -718,6 +756,8 @@ func (wp *Pool) updateMetrics() {
        cpu := map[string]int64{}
        mem := map[string]int64{}
        var running int64
+       now := time.Now()
+       var probed []time.Time
        for _, wkr := range wp.workers {
                var cat string
                switch {
@@ -737,6 +777,7 @@ func (wp *Pool) updateMetrics() {
                cpu[cat] += int64(wkr.instType.VCPUs)
                mem[cat] += int64(wkr.instType.RAM)
                running += int64(len(wkr.running) + len(wkr.starting))
+               probed = append(probed, wkr.probed)
        }
        for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
                wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
@@ -753,6 +794,15 @@ func (wp *Pool) updateMetrics() {
                wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
        }
        wp.mContainersRunning.Set(float64(running))
+
+       if len(probed) == 0 {
+               wp.mProbeAgeMax.Set(0)
+               wp.mProbeAgeMedian.Set(0)
+       } else {
+               sort.Slice(probed, func(i, j int) bool { return probed[i].Before(probed[j]) })
+               wp.mProbeAgeMax.Set(now.Sub(probed[0]).Seconds())
+               wp.mProbeAgeMedian.Set(now.Sub(probed[len(probed)/2]).Seconds())
+       }
 }
 
 func (wp *Pool) runProbes() {
@@ -768,6 +818,13 @@ func (wp *Pool) runProbes() {
 
        workers := []cloud.InstanceID{}
        for range probeticker.C {
+               // Add some jitter. Without this, if probeInterval is
+               // a multiple of syncInterval and sync is
+               // instantaneous (as with the loopback driver), the
+               // first few probes race with sync operations and
+               // don't update the workers.
+               time.Sleep(time.Duration(mathrand.Int63n(int64(wp.probeInterval) / 23)))
+
                workers = workers[:0]
                wp.mtx.Lock()
                for id, wkr := range wp.workers {
@@ -851,6 +908,9 @@ func (wp *Pool) Instances() []InstanceView {
 // KillInstance destroys a cloud VM instance. It returns an error if
 // the given instance does not exist.
 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
+       wp.setupOnce.Do(wp.setup)
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
        wkr, ok := wp.workers[id]
        if !ok {
                return errors.New("instance not found")
@@ -881,7 +941,7 @@ func (wp *Pool) loadRunnerData() error {
        if wp.runnerData != nil {
                return nil
        } else if wp.runnerSource == "" {
-               wp.runnerCmd = "crunch-run"
+               wp.runnerCmd = wp.runnerCmdDefault
                wp.runnerData = []byte{}
                return nil
        }
@@ -894,7 +954,7 @@ func (wp *Pool) loadRunnerData() error {
        }
        wp.runnerData = buf
        wp.runnerMD5 = md5.Sum(buf)
-       wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
+       wp.runnerCmd = fmt.Sprintf("/tmp/arvados-crunch-run/crunch-run~%x", wp.runnerMD5)
        return nil
 }