package worker
import (
+ "crypto/hmac"
"crypto/md5"
"crypto/rand"
+ "crypto/sha256"
"errors"
"fmt"
"io"
}
const (
- defaultSyncInterval = time.Minute
- defaultProbeInterval = time.Second * 10
- defaultMaxProbesPerSecond = 10
- defaultTimeoutIdle = time.Minute
- defaultTimeoutBooting = time.Minute * 10
- defaultTimeoutProbe = time.Minute * 10
- defaultTimeoutShutdown = time.Second * 10
- defaultTimeoutTERM = time.Minute * 2
- defaultTimeoutSignal = time.Second * 5
+ defaultSyncInterval = time.Minute
+ defaultProbeInterval = time.Second * 10
+ defaultMaxProbesPerSecond = 10
+ defaultTimeoutIdle = time.Minute
+ defaultTimeoutBooting = time.Minute * 10
+ defaultTimeoutProbe = time.Minute * 10
+ defaultTimeoutShutdown = time.Second * 10
+ defaultTimeoutTERM = time.Minute * 2
+ defaultTimeoutSignal = time.Second * 5
+ defaultTimeoutStaleRunLock = time.Second * 5
// Time after a quota error to try again anyway, even if no
// instances have been shutdown.
func duration(conf arvados.Duration, def time.Duration) time.Duration {
if conf > 0 {
return time.Duration(conf)
- } else {
- return def
}
+ return def
}
// NewPool creates a Pool of workers backed by instanceSet.
// cluster configuration.
func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
wp := &Pool{
- logger: logger,
- arvClient: arvClient,
- instanceSetID: instanceSetID,
- instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
- newExecutor: newExecutor,
- bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
- runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
- imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
- instanceTypes: cluster.InstanceTypes,
- maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
- probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
- syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
- timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
- timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
- timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
- timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
- timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
- timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
- installPublicKey: installPublicKey,
- tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
- stop: make(chan bool),
+ logger: logger,
+ arvClient: arvClient,
+ instanceSetID: instanceSetID,
+ instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
+ newExecutor: newExecutor,
+ bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
+ runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
+ imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
+ instanceTypes: cluster.InstanceTypes,
+ maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
+ maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
+ probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
+ syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
+ timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
+ timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
+ timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
+ timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+ timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
+ timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
+ timeoutStaleRunLock: duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
+ systemRootToken: cluster.SystemRootToken,
+ installPublicKey: installPublicKey,
+ tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
+ stop: make(chan bool),
}
wp.registerMetrics(reg)
go func() {
// zero Pool should not be used. Call NewPool to create a new Pool.
type Pool struct {
// configuration
- logger logrus.FieldLogger
- arvClient *arvados.Client
- instanceSetID cloud.InstanceSetID
- instanceSet *throttledInstanceSet
- newExecutor func(cloud.Instance) Executor
- bootProbeCommand string
- runnerSource string
- imageID cloud.ImageID
- instanceTypes map[string]arvados.InstanceType
- syncInterval time.Duration
- probeInterval time.Duration
- maxProbesPerSecond int
- timeoutIdle time.Duration
- timeoutBooting time.Duration
- timeoutProbe time.Duration
- timeoutShutdown time.Duration
- timeoutTERM time.Duration
- timeoutSignal time.Duration
- installPublicKey ssh.PublicKey
- tagKeyPrefix string
+ logger logrus.FieldLogger
+ arvClient *arvados.Client
+ instanceSetID cloud.InstanceSetID
+ instanceSet *throttledInstanceSet
+ newExecutor func(cloud.Instance) Executor
+ bootProbeCommand string
+ runnerSource string
+ imageID cloud.ImageID
+ instanceTypes map[string]arvados.InstanceType
+ syncInterval time.Duration
+ probeInterval time.Duration
+ maxProbesPerSecond int
+ maxConcurrentInstanceCreateOps int
+ timeoutIdle time.Duration
+ timeoutBooting time.Duration
+ timeoutProbe time.Duration
+ timeoutShutdown time.Duration
+ timeoutTERM time.Duration
+ timeoutSignal time.Duration
+ timeoutStaleRunLock time.Duration
+ systemRootToken string
+ installPublicKey ssh.PublicKey
+ tagKeyPrefix string
// private state
subscribers map[<-chan struct{}]chan<- struct{}
runnerMD5 [md5.Size]byte
runnerCmd string
- throttleCreate throttle
- throttleInstances throttle
-
- mContainersRunning prometheus.Gauge
- mInstances *prometheus.GaugeVec
- mInstancesPrice *prometheus.GaugeVec
- mVCPUs *prometheus.GaugeVec
- mMemory *prometheus.GaugeVec
- mBootOutcomes *prometheus.CounterVec
- mDisappearances *prometheus.CounterVec
+ mContainersRunning prometheus.Gauge
+ mInstances *prometheus.GaugeVec
+ mInstancesPrice *prometheus.GaugeVec
+ mVCPUs *prometheus.GaugeVec
+ mMemory *prometheus.GaugeVec
+ mBootOutcomes *prometheus.CounterVec
+ mDisappearances *prometheus.CounterVec
+ mTimeToSSH prometheus.Summary
+ mTimeToReadyForContainer prometheus.Summary
+ mTimeFromShutdownToGone prometheus.Summary
+ mTimeFromQueueToCrunchRun prometheus.Summary
+ mRunProbeDuration *prometheus.SummaryVec
}
type createCall struct {
}
wp.mtx.Lock()
defer wp.mtx.Unlock()
- if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
+ if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil {
+ return false
+ }
+ // The maxConcurrentInstanceCreateOps knob throttles the number of node create
+ // requests in flight. It was added to work around a limitation in Azure's
+ // managed disks, which support no more than 20 concurrent node creation
+ // requests from a single disk image (cf.
+ // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
+ // The code assumes that node creation, from Azure's perspective, means the
+ // period until the instance appears in the "get all instances" list.
+ if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
+ logger.Info("reached MaxConcurrentInstanceCreateOps")
+ wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
return false
}
now := time.Now()
wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
}
- initCmd := TagVerifier{nil, secret}.InitCommand()
+ initCmd := TagVerifier{nil, secret, nil}.InitCommand()
inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
wp.mtx.Lock()
defer wp.mtx.Unlock()
return nil
}
+// Successful connection to the SSH daemon, update the mTimeToSSH metric
+func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
+ wkr := wp.workers[inst.ID()]
+ if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
+ // the node is not in booting state (can happen if a-d-c is restarted) OR
+ // this is not the first SSH connection
+ return
+ }
+
+ wkr.firstSSHConnection = time.Now()
+ if wp.mTimeToSSH != nil {
+ wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
+ }
+}
+
// Add or update worker attached to the given instance.
//
// The second return value is true if a new worker is created.
// Caller must have lock.
func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
- inst = TagVerifier{inst, secret}
+ inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
id := inst.ID()
if wkr := wp.workers[id]; wkr != nil {
wkr.executor.SetTarget(inst)
wp.mDisappearances.WithLabelValues(v).Add(0)
}
reg.MustRegister(wp.mDisappearances)
+ wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "instances_time_to_ssh_seconds",
+ Help: "Number of seconds between instance creation and the first successful SSH connection.",
+ Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
+ })
+ reg.MustRegister(wp.mTimeToSSH)
+ wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "instances_time_to_ready_for_container_seconds",
+ Help: "Number of seconds between the first successful SSH connection and ready to run a container.",
+ Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
+ })
+ reg.MustRegister(wp.mTimeToReadyForContainer)
+ wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "instances_time_from_shutdown_request_to_disappearance_seconds",
+ Help: "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
+ Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
+ })
+ reg.MustRegister(wp.mTimeFromShutdownToGone)
+ wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "containers_time_from_queue_to_crunch_run_seconds",
+ Help: "Number of seconds between the queuing of a container and the start of crunch-run.",
+ Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
+ })
+ reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
+ wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "instances_run_probe_duration_seconds",
+ Help: "Number of seconds per runProbe call.",
+ Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
+ }, []string{"outcome"})
+ reg.MustRegister(wp.mRunProbeDuration)
}
func (wp *Pool) runMetrics() {
if wp.mDisappearances != nil {
wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
}
+ // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
+ if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
+ wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
+ }
delete(wp.workers, id)
go wkr.Close()
notify = true
}
}
+func (wp *Pool) gatewayAuthSecret(uuid string) string {
+ h := hmac.New(sha256.New, []byte(wp.systemRootToken))
+ fmt.Fprint(h, uuid)
+ return fmt.Sprintf("%x", h.Sum(nil))
+}
+
// Return a random string of n hexadecimal digits (n*4 random bits). n
// must be even.
func randomHex(n int) string {