package worker
import (
+ "crypto/hmac"
"crypto/md5"
"crypto/rand"
+ "crypto/sha256"
"errors"
"fmt"
"io"
"io/ioutil"
+ mathrand "math/rand"
"sort"
"strings"
"sync"
func duration(conf arvados.Duration, def time.Duration) time.Duration {
if conf > 0 {
return time.Duration(conf)
- } else {
- return def
}
+ return def
}
// NewPool creates a Pool of workers backed by instanceSet.
instanceSetID: instanceSetID,
instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
newExecutor: newExecutor,
+ cluster: cluster,
bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
+ instanceInitCommand: cloud.InitCommand(cluster.Containers.CloudVMs.InstanceInitCommand),
runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
instanceTypes: cluster.InstanceTypes,
maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
+ maxInstances: cluster.Containers.CloudVMs.MaxInstances,
probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
timeoutStaleRunLock: duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
+ systemRootToken: cluster.SystemRootToken,
installPublicKey: installPublicKey,
tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
+ runnerCmdDefault: cluster.Containers.CrunchRunCommand,
+ runnerArgs: append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...),
stop: make(chan bool),
}
wp.registerMetrics(reg)
instanceSetID cloud.InstanceSetID
instanceSet *throttledInstanceSet
newExecutor func(cloud.Instance) Executor
+ cluster *arvados.Cluster
bootProbeCommand string
+ instanceInitCommand cloud.InitCommand
runnerSource string
imageID cloud.ImageID
instanceTypes map[string]arvados.InstanceType
probeInterval time.Duration
maxProbesPerSecond int
maxConcurrentInstanceCreateOps int
+ maxInstances int
timeoutIdle time.Duration
timeoutBooting time.Duration
timeoutProbe time.Duration
timeoutTERM time.Duration
timeoutSignal time.Duration
timeoutStaleRunLock time.Duration
+ systemRootToken string
installPublicKey ssh.PublicKey
tagKeyPrefix string
+ runnerCmdDefault string // crunch-run command to use if not deploying a binary
+ runnerArgs []string // extra args passed to crunch-run
// private state
subscribers map[<-chan struct{}]chan<- struct{}
runnerMD5 [md5.Size]byte
runnerCmd string
- mContainersRunning prometheus.Gauge
- mInstances *prometheus.GaugeVec
- mInstancesPrice *prometheus.GaugeVec
- mVCPUs *prometheus.GaugeVec
- mMemory *prometheus.GaugeVec
- mBootOutcomes *prometheus.CounterVec
- mDisappearances *prometheus.CounterVec
- mTimeToSSH prometheus.Summary
- mTimeToReadyForContainer prometheus.Summary
+ mContainersRunning prometheus.Gauge
+ mInstances *prometheus.GaugeVec
+ mInstancesPrice *prometheus.GaugeVec
+ mVCPUs *prometheus.GaugeVec
+ mMemory *prometheus.GaugeVec
+ mBootOutcomes *prometheus.CounterVec
+ mDisappearances *prometheus.CounterVec
+ mTimeToSSH prometheus.Summary
+ mTimeToReadyForContainer prometheus.Summary
+ mTimeFromShutdownToGone prometheus.Summary
+ mTimeFromQueueToCrunchRun prometheus.Summary
+ mRunProbeDuration *prometheus.SummaryVec
+ mProbeAgeMax prometheus.Gauge
+ mProbeAgeMedian prometheus.Gauge
}
type createCall struct {
// pool. The worker is added immediately; instance creation runs in
// the background.
//
-// Create returns false if a pre-existing error state prevents it from
-// even attempting to create a new instance. Those errors are logged
-// by the Pool, so the caller does not need to log anything in such
-// cases.
+// Create returns false if a pre-existing error or a configuration
+// setting prevents it from even attempting to create a new
+// instance. Those errors are logged by the Pool, so the caller does
+// not need to log anything in such cases.
func (wp *Pool) Create(it arvados.InstanceType) bool {
logger := wp.logger.WithField("InstanceType", it.Name)
wp.setupOnce.Do(wp.setup)
}
wp.mtx.Lock()
defer wp.mtx.Unlock()
- if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil {
+ if time.Now().Before(wp.atQuotaUntil) ||
+ wp.instanceSet.throttleCreate.Error() != nil ||
+ (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating)) {
return false
}
// The maxConcurrentInstanceCreateOps knob throttles the number of node create
wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
}
- initCmd := TagVerifier{nil, secret, nil}.InitCommand()
+ initCmd := TagVerifier{nil, secret, nil}.InitCommand() + "\n" + wp.instanceInitCommand
inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
wp.mtx.Lock()
defer wp.mtx.Unlock()
}
wp.updateWorker(inst, it)
}()
+ if len(wp.creating)+len(wp.workers) == wp.maxInstances {
+ logger.Infof("now at MaxInstances limit of %d instances", wp.maxInstances)
+ }
return true
}
// AtQuota returns true if Create is not expected to work at the
-// moment.
+// moment (e.g., cloud provider has reported quota errors, or we are
+// already at our own configured quota).
func (wp *Pool) AtQuota() bool {
wp.mtx.Lock()
defer wp.mtx.Unlock()
- return time.Now().Before(wp.atQuotaUntil)
+ return time.Now().Before(wp.atQuotaUntil) || (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating))
}
// SetIdleBehavior determines how the indicated instance will behave
func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
wp.mtx.Lock()
defer wp.mtx.Unlock()
- wkr := wp.workers[inst.ID()]
+ wkr, ok := wp.workers[inst.ID()]
+ if !ok {
+ // race: inst was removed from the pool
+ return
+ }
if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
- // the node is not in booting state (can happen if a-d-c is restarted) OR
- // this is not the first SSH connection
+ // the node is not in booting state (can happen if
+ // a-d-c is restarted) OR this is not the first SSH
+ // connection
return
}
Help: "Number of containers reported running by cloud VMs.",
})
reg.MustRegister(wp.mContainersRunning)
+ wp.mProbeAgeMax = prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "probe_age_seconds_max",
+ Help: "Maximum number of seconds since an instance's most recent successful probe.",
+ })
+ reg.MustRegister(wp.mProbeAgeMax)
+ wp.mProbeAgeMedian = prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "probe_age_seconds_median",
+ Help: "Median number of seconds since an instance's most recent successful probe.",
+ })
+ reg.MustRegister(wp.mProbeAgeMedian)
wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "dispatchcloud",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
})
reg.MustRegister(wp.mTimeToReadyForContainer)
+ wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "instances_time_from_shutdown_request_to_disappearance_seconds",
+ Help: "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
+ Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
+ })
+ reg.MustRegister(wp.mTimeFromShutdownToGone)
+ wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "containers_time_from_queue_to_crunch_run_seconds",
+ Help: "Number of seconds between the queuing of a container and the start of crunch-run.",
+ Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
+ })
+ reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
+ wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "instances_run_probe_duration_seconds",
+ Help: "Number of seconds per runProbe call.",
+ Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
+ }, []string{"outcome"})
+ reg.MustRegister(wp.mRunProbeDuration)
}
func (wp *Pool) runMetrics() {
cpu := map[string]int64{}
mem := map[string]int64{}
var running int64
+ now := time.Now()
+ var probed []time.Time
for _, wkr := range wp.workers {
var cat string
switch {
cpu[cat] += int64(wkr.instType.VCPUs)
mem[cat] += int64(wkr.instType.RAM)
running += int64(len(wkr.running) + len(wkr.starting))
+ probed = append(probed, wkr.probed)
}
for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
}
wp.mContainersRunning.Set(float64(running))
+
+ if len(probed) == 0 {
+ wp.mProbeAgeMax.Set(0)
+ wp.mProbeAgeMedian.Set(0)
+ } else {
+ sort.Slice(probed, func(i, j int) bool { return probed[i].Before(probed[j]) })
+ wp.mProbeAgeMax.Set(now.Sub(probed[0]).Seconds())
+ wp.mProbeAgeMedian.Set(now.Sub(probed[len(probed)/2]).Seconds())
+ }
}
func (wp *Pool) runProbes() {
workers := []cloud.InstanceID{}
for range probeticker.C {
+ // Add some jitter. Without this, if probeInterval is
+ // a multiple of syncInterval and sync is
+ // instantaneous (as with the loopback driver), the
+ // first few probes race with sync operations and
+ // don't update the workers.
+ time.Sleep(time.Duration(mathrand.Int63n(int64(wp.probeInterval) / 23)))
+
workers = workers[:0]
wp.mtx.Lock()
for id, wkr := range wp.workers {
// KillInstance destroys a cloud VM instance. It returns an error if
// the given instance does not exist.
func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
+ wp.setupOnce.Do(wp.setup)
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
wkr, ok := wp.workers[id]
if !ok {
return errors.New("instance not found")
if wp.runnerData != nil {
return nil
} else if wp.runnerSource == "" {
- wp.runnerCmd = "crunch-run"
+ wp.runnerCmd = wp.runnerCmdDefault
wp.runnerData = []byte{}
return nil
}
}
wp.runnerData = buf
wp.runnerMD5 = md5.Sum(buf)
- wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
+ wp.runnerCmd = fmt.Sprintf("/tmp/arvados-crunch-run/crunch-run~%x", wp.runnerMD5)
return nil
}
if wp.mDisappearances != nil {
wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
}
+ // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
+ if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
+ wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
+ }
delete(wp.workers, id)
go wkr.Close()
notify = true
}
}
+func (wp *Pool) gatewayAuthSecret(uuid string) string {
+ h := hmac.New(sha256.New, []byte(wp.systemRootToken))
+ fmt.Fprint(h, uuid)
+ return fmt.Sprintf("%x", h.Sum(nil))
+}
+
// Return a random string of n hexadecimal digits (n*4 random bits). n
// must be even.
func randomHex(n int) string {