X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/bcee68ee657af1591d1ae0624e2d12029b0b92d5..40e378c88c2ae8e90b0785f3983ca320827e4cdf:/lib/dispatchcloud/worker/pool.go diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go index 435b6e43ae..37e3fa9882 100644 --- a/lib/dispatchcloud/worker/pool.go +++ b/lib/dispatchcloud/worker/pool.go @@ -5,8 +5,10 @@ package worker import ( + "crypto/hmac" "crypto/md5" "crypto/rand" + "crypto/sha256" "errors" "fmt" "io" @@ -64,15 +66,16 @@ type Executor interface { } const ( - defaultSyncInterval = time.Minute - defaultProbeInterval = time.Second * 10 - defaultMaxProbesPerSecond = 10 - defaultTimeoutIdle = time.Minute - defaultTimeoutBooting = time.Minute * 10 - defaultTimeoutProbe = time.Minute * 10 - defaultTimeoutShutdown = time.Second * 10 - defaultTimeoutTERM = time.Minute * 2 - defaultTimeoutSignal = time.Second * 5 + defaultSyncInterval = time.Minute + defaultProbeInterval = time.Second * 10 + defaultMaxProbesPerSecond = 10 + defaultTimeoutIdle = time.Minute + defaultTimeoutBooting = time.Minute * 10 + defaultTimeoutProbe = time.Minute * 10 + defaultTimeoutShutdown = time.Second * 10 + defaultTimeoutTERM = time.Minute * 2 + defaultTimeoutSignal = time.Second * 5 + defaultTimeoutStaleRunLock = time.Second * 5 // Time after a quota error to try again anyway, even if no // instances have been shutdown. @@ -85,9 +88,8 @@ const ( func duration(conf arvados.Duration, def time.Duration) time.Duration { if conf > 0 { return time.Duration(conf) - } else { - return def } + return def } // NewPool creates a Pool of workers backed by instanceSet. @@ -101,6 +103,7 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe instanceSetID: instanceSetID, instanceSet: &throttledInstanceSet{InstanceSet: instanceSet}, newExecutor: newExecutor, + cluster: cluster, bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand, runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary, imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID), @@ -115,8 +118,12 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown), timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM), timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal), + timeoutStaleRunLock: duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock), + systemRootToken: cluster.SystemRootToken, installPublicKey: installPublicKey, tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix, + runnerCmdDefault: cluster.Containers.CrunchRunCommand, + runnerArgs: append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...), stop: make(chan bool), } wp.registerMetrics(reg) @@ -138,6 +145,7 @@ type Pool struct { instanceSetID cloud.InstanceSetID instanceSet *throttledInstanceSet newExecutor func(cloud.Instance) Executor + cluster *arvados.Cluster bootProbeCommand string runnerSource string imageID cloud.ImageID @@ -152,8 +160,12 @@ type Pool struct { timeoutShutdown time.Duration timeoutTERM time.Duration timeoutSignal time.Duration + timeoutStaleRunLock time.Duration + systemRootToken string installPublicKey ssh.PublicKey tagKeyPrefix string + runnerCmdDefault string // crunch-run command to use if not deploying a binary + runnerArgs []string // extra args passed to crunch-run // private state subscribers map[<-chan struct{}]chan<- struct{} @@ -170,13 +182,18 @@ type Pool struct { runnerMD5 [md5.Size]byte runnerCmd string - mContainersRunning prometheus.Gauge - mInstances *prometheus.GaugeVec - mInstancesPrice *prometheus.GaugeVec - mVCPUs *prometheus.GaugeVec - mMemory *prometheus.GaugeVec - mBootOutcomes *prometheus.CounterVec - mDisappearances *prometheus.CounterVec + mContainersRunning prometheus.Gauge + mInstances *prometheus.GaugeVec + mInstancesPrice *prometheus.GaugeVec + mVCPUs *prometheus.GaugeVec + mMemory *prometheus.GaugeVec + mBootOutcomes *prometheus.CounterVec + mDisappearances *prometheus.CounterVec + mTimeToSSH prometheus.Summary + mTimeToReadyForContainer prometheus.Summary + mTimeFromShutdownToGone prometheus.Summary + mTimeFromQueueToCrunchRun prometheus.Summary + mRunProbeDuration *prometheus.SummaryVec } type createCall struct { @@ -323,7 +340,7 @@ func (wp *Pool) Create(it arvados.InstanceType) bool { wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun), wp.tagKeyPrefix + tagKeyInstanceSecret: secret, } - initCmd := TagVerifier{nil, secret}.InitCommand() + initCmd := TagVerifier{nil, secret, nil}.InitCommand() inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey) wp.mtx.Lock() defer wp.mtx.Unlock() @@ -367,6 +384,23 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) return nil } +// Successful connection to the SSH daemon, update the mTimeToSSH metric +func (wp *Pool) reportSSHConnected(inst cloud.Instance) { + wp.mtx.Lock() + defer wp.mtx.Unlock() + wkr := wp.workers[inst.ID()] + if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() { + // the node is not in booting state (can happen if a-d-c is restarted) OR + // this is not the first SSH connection + return + } + + wkr.firstSSHConnection = time.Now() + if wp.mTimeToSSH != nil { + wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds()) + } +} + // Add or update worker attached to the given instance. // // The second return value is true if a new worker is created. @@ -377,7 +411,7 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) // Caller must have lock. func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) { secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret] - inst = TagVerifier{inst, secret} + inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected} id := inst.ID() if wkr := wp.workers[id]; wkr != nil { wkr.executor.SetTarget(inst) @@ -626,6 +660,46 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) { wp.mDisappearances.WithLabelValues(v).Add(0) } reg.MustRegister(wp.mDisappearances) + wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "instances_time_to_ssh_seconds", + Help: "Number of seconds between instance creation and the first successful SSH connection.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, + }) + reg.MustRegister(wp.mTimeToSSH) + wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "instances_time_to_ready_for_container_seconds", + Help: "Number of seconds between the first successful SSH connection and ready to run a container.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, + }) + reg.MustRegister(wp.mTimeToReadyForContainer) + wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "instances_time_from_shutdown_request_to_disappearance_seconds", + Help: "Number of seconds between the first shutdown attempt and the disappearance of the worker.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, + }) + reg.MustRegister(wp.mTimeFromShutdownToGone) + wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "containers_time_from_queue_to_crunch_run_seconds", + Help: "Number of seconds between the queuing of a container and the start of crunch-run.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, + }) + reg.MustRegister(wp.mTimeFromQueueToCrunchRun) + wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "instances_run_probe_duration_seconds", + Help: "Number of seconds per runProbe call.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, + }, []string{"outcome"}) + reg.MustRegister(wp.mRunProbeDuration) } func (wp *Pool) runMetrics() { @@ -813,7 +887,7 @@ func (wp *Pool) loadRunnerData() error { if wp.runnerData != nil { return nil } else if wp.runnerSource == "" { - wp.runnerCmd = "crunch-run" + wp.runnerCmd = wp.runnerCmdDefault wp.runnerData = []byte{} return nil } @@ -895,6 +969,10 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { if wp.mDisappearances != nil { wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc() } + // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down + if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() { + wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds()) + } delete(wp.workers, id) go wkr.Close() notify = true @@ -922,6 +1000,12 @@ func (wp *Pool) waitUntilLoaded() { } } +func (wp *Pool) gatewayAuthSecret(uuid string) string { + h := hmac.New(sha256.New, []byte(wp.systemRootToken)) + fmt.Fprint(h, uuid) + return fmt.Sprintf("%x", h.Sum(nil)) +} + // Return a random string of n hexadecimal digits (n*4 random bits). n // must be even. func randomHex(n int) string {