Merge branch '15370-loopback-dispatchcloud'
[arvados.git] / lib / dispatchcloud / worker / pool.go
index 435b6e43ae4a3e150f680883aa7e124daf6e4230..66e0bfee910a236b46980f2db4b7c30850b3a759 100644 (file)
@@ -5,12 +5,15 @@
 package worker
 
 import (
+       "crypto/hmac"
        "crypto/md5"
        "crypto/rand"
+       "crypto/sha256"
        "errors"
        "fmt"
        "io"
        "io/ioutil"
+       mathrand "math/rand"
        "sort"
        "strings"
        "sync"
@@ -64,15 +67,16 @@ type Executor interface {
 }
 
 const (
-       defaultSyncInterval       = time.Minute
-       defaultProbeInterval      = time.Second * 10
-       defaultMaxProbesPerSecond = 10
-       defaultTimeoutIdle        = time.Minute
-       defaultTimeoutBooting     = time.Minute * 10
-       defaultTimeoutProbe       = time.Minute * 10
-       defaultTimeoutShutdown    = time.Second * 10
-       defaultTimeoutTERM        = time.Minute * 2
-       defaultTimeoutSignal      = time.Second * 5
+       defaultSyncInterval        = time.Minute
+       defaultProbeInterval       = time.Second * 10
+       defaultMaxProbesPerSecond  = 10
+       defaultTimeoutIdle         = time.Minute
+       defaultTimeoutBooting      = time.Minute * 10
+       defaultTimeoutProbe        = time.Minute * 10
+       defaultTimeoutShutdown     = time.Second * 10
+       defaultTimeoutTERM         = time.Minute * 2
+       defaultTimeoutSignal       = time.Second * 5
+       defaultTimeoutStaleRunLock = time.Second * 5
 
        // Time after a quota error to try again anyway, even if no
        // instances have been shutdown.
@@ -85,9 +89,8 @@ const (
 func duration(conf arvados.Duration, def time.Duration) time.Duration {
        if conf > 0 {
                return time.Duration(conf)
-       } else {
-               return def
        }
+       return def
 }
 
 // NewPool creates a Pool of workers backed by instanceSet.
@@ -101,6 +104,7 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe
                instanceSetID:                  instanceSetID,
                instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
                newExecutor:                    newExecutor,
+               cluster:                        cluster,
                bootProbeCommand:               cluster.Containers.CloudVMs.BootProbeCommand,
                runnerSource:                   cluster.Containers.CloudVMs.DeployRunnerBinary,
                imageID:                        cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
@@ -115,8 +119,12 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe
                timeoutShutdown:                duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
                timeoutTERM:                    duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
                timeoutSignal:                  duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
+               timeoutStaleRunLock:            duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
+               systemRootToken:                cluster.SystemRootToken,
                installPublicKey:               installPublicKey,
                tagKeyPrefix:                   cluster.Containers.CloudVMs.TagKeyPrefix,
+               runnerCmdDefault:               cluster.Containers.CrunchRunCommand,
+               runnerArgs:                     append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...),
                stop:                           make(chan bool),
        }
        wp.registerMetrics(reg)
@@ -138,6 +146,7 @@ type Pool struct {
        instanceSetID                  cloud.InstanceSetID
        instanceSet                    *throttledInstanceSet
        newExecutor                    func(cloud.Instance) Executor
+       cluster                        *arvados.Cluster
        bootProbeCommand               string
        runnerSource                   string
        imageID                        cloud.ImageID
@@ -152,8 +161,12 @@ type Pool struct {
        timeoutShutdown                time.Duration
        timeoutTERM                    time.Duration
        timeoutSignal                  time.Duration
+       timeoutStaleRunLock            time.Duration
+       systemRootToken                string
        installPublicKey               ssh.PublicKey
        tagKeyPrefix                   string
+       runnerCmdDefault               string   // crunch-run command to use if not deploying a binary
+       runnerArgs                     []string // extra args passed to crunch-run
 
        // private state
        subscribers  map[<-chan struct{}]chan<- struct{}
@@ -170,13 +183,18 @@ type Pool struct {
        runnerMD5    [md5.Size]byte
        runnerCmd    string
 
-       mContainersRunning prometheus.Gauge
-       mInstances         *prometheus.GaugeVec
-       mInstancesPrice    *prometheus.GaugeVec
-       mVCPUs             *prometheus.GaugeVec
-       mMemory            *prometheus.GaugeVec
-       mBootOutcomes      *prometheus.CounterVec
-       mDisappearances    *prometheus.CounterVec
+       mContainersRunning        prometheus.Gauge
+       mInstances                *prometheus.GaugeVec
+       mInstancesPrice           *prometheus.GaugeVec
+       mVCPUs                    *prometheus.GaugeVec
+       mMemory                   *prometheus.GaugeVec
+       mBootOutcomes             *prometheus.CounterVec
+       mDisappearances           *prometheus.CounterVec
+       mTimeToSSH                prometheus.Summary
+       mTimeToReadyForContainer  prometheus.Summary
+       mTimeFromShutdownToGone   prometheus.Summary
+       mTimeFromQueueToCrunchRun prometheus.Summary
+       mRunProbeDuration         *prometheus.SummaryVec
 }
 
 type createCall struct {
@@ -323,7 +341,7 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
                        wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
                        wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
                }
-               initCmd := TagVerifier{nil, secret}.InitCommand()
+               initCmd := TagVerifier{nil, secret, nil}.InitCommand()
                inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
                wp.mtx.Lock()
                defer wp.mtx.Unlock()
@@ -367,6 +385,23 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior)
        return nil
 }
 
+// Successful connection to the SSH daemon, update the mTimeToSSH metric
+func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       wkr := wp.workers[inst.ID()]
+       if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
+               // the node is not in booting state (can happen if a-d-c is restarted) OR
+               // this is not the first SSH connection
+               return
+       }
+
+       wkr.firstSSHConnection = time.Now()
+       if wp.mTimeToSSH != nil {
+               wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
+       }
+}
+
 // Add or update worker attached to the given instance.
 //
 // The second return value is true if a new worker is created.
@@ -377,7 +412,7 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior)
 // Caller must have lock.
 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
        secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
-       inst = TagVerifier{inst, secret}
+       inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
        id := inst.ID()
        if wkr := wp.workers[id]; wkr != nil {
                wkr.executor.SetTarget(inst)
@@ -626,6 +661,46 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
                wp.mDisappearances.WithLabelValues(v).Add(0)
        }
        reg.MustRegister(wp.mDisappearances)
+       wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
+               Namespace:  "arvados",
+               Subsystem:  "dispatchcloud",
+               Name:       "instances_time_to_ssh_seconds",
+               Help:       "Number of seconds between instance creation and the first successful SSH connection.",
+               Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
+       })
+       reg.MustRegister(wp.mTimeToSSH)
+       wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
+               Namespace:  "arvados",
+               Subsystem:  "dispatchcloud",
+               Name:       "instances_time_to_ready_for_container_seconds",
+               Help:       "Number of seconds between the first successful SSH connection and ready to run a container.",
+               Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
+       })
+       reg.MustRegister(wp.mTimeToReadyForContainer)
+       wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
+               Namespace:  "arvados",
+               Subsystem:  "dispatchcloud",
+               Name:       "instances_time_from_shutdown_request_to_disappearance_seconds",
+               Help:       "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
+               Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
+       })
+       reg.MustRegister(wp.mTimeFromShutdownToGone)
+       wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
+               Namespace:  "arvados",
+               Subsystem:  "dispatchcloud",
+               Name:       "containers_time_from_queue_to_crunch_run_seconds",
+               Help:       "Number of seconds between the queuing of a container and the start of crunch-run.",
+               Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
+       })
+       reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
+       wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
+               Namespace:  "arvados",
+               Subsystem:  "dispatchcloud",
+               Name:       "instances_run_probe_duration_seconds",
+               Help:       "Number of seconds per runProbe call.",
+               Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
+       }, []string{"outcome"})
+       reg.MustRegister(wp.mRunProbeDuration)
 }
 
 func (wp *Pool) runMetrics() {
@@ -700,6 +775,13 @@ func (wp *Pool) runProbes() {
 
        workers := []cloud.InstanceID{}
        for range probeticker.C {
+               // Add some jitter. Without this, if probeInterval is
+               // a multiple of syncInterval and sync is
+               // instantaneous (as with the loopback driver), the
+               // first few probes race with sync operations and
+               // don't update the workers.
+               time.Sleep(time.Duration(mathrand.Int63n(int64(wp.probeInterval) / 23)))
+
                workers = workers[:0]
                wp.mtx.Lock()
                for id, wkr := range wp.workers {
@@ -813,7 +895,7 @@ func (wp *Pool) loadRunnerData() error {
        if wp.runnerData != nil {
                return nil
        } else if wp.runnerSource == "" {
-               wp.runnerCmd = "crunch-run"
+               wp.runnerCmd = wp.runnerCmdDefault
                wp.runnerData = []byte{}
                return nil
        }
@@ -826,7 +908,7 @@ func (wp *Pool) loadRunnerData() error {
        }
        wp.runnerData = buf
        wp.runnerMD5 = md5.Sum(buf)
-       wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
+       wp.runnerCmd = fmt.Sprintf("/tmp/arvados-crunch-run/crunch-run~%x", wp.runnerMD5)
        return nil
 }
 
@@ -895,6 +977,10 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
                if wp.mDisappearances != nil {
                        wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
                }
+               // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
+               if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
+                       wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
+               }
                delete(wp.workers, id)
                go wkr.Close()
                notify = true
@@ -922,6 +1008,12 @@ func (wp *Pool) waitUntilLoaded() {
        }
 }
 
+func (wp *Pool) gatewayAuthSecret(uuid string) string {
+       h := hmac.New(sha256.New, []byte(wp.systemRootToken))
+       fmt.Fprint(h, uuid)
+       return fmt.Sprintf("%x", h.Sum(nil))
+}
+
 // Return a random string of n hexadecimal digits (n*4 random bits). n
 // must be even.
 func randomHex(n int) string {