newExecutor: newExecutor,
cluster: cluster,
bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
+ instanceInitCommand: cloud.InitCommand(cluster.Containers.CloudVMs.InstanceInitCommand),
runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
instanceTypes: cluster.InstanceTypes,
newExecutor func(cloud.Instance) Executor
cluster *arvados.Cluster
bootProbeCommand string
+ instanceInitCommand cloud.InitCommand
runnerSource string
imageID cloud.ImageID
instanceTypes map[string]arvados.InstanceType
runnerArgs []string // extra args passed to crunch-run
// private state
- subscribers map[<-chan struct{}]chan<- struct{}
- creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
- workers map[cloud.InstanceID]*worker
- loaded bool // loaded list of instances from InstanceSet at least once
- exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
- atQuotaUntil time.Time
- atQuotaErr cloud.QuotaError
- stop chan bool
- mtx sync.RWMutex
- setupOnce sync.Once
- runnerData []byte
- runnerMD5 [md5.Size]byte
- runnerCmd string
+ subscribers map[<-chan struct{}]chan<- struct{}
+ creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
+ workers map[cloud.InstanceID]*worker
+ loaded bool // loaded list of instances from InstanceSet at least once
+ exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
+ atQuotaUntilFewerInstances int
+ atQuotaUntil time.Time
+ atQuotaErr cloud.QuotaError
+ stop chan bool
+ mtx sync.RWMutex
+ setupOnce sync.Once
+ runnerData []byte
+ runnerMD5 [md5.Size]byte
+ runnerCmd string
mContainersRunning prometheus.Gauge
mInstances *prometheus.GaugeVec
mTimeFromShutdownToGone prometheus.Summary
mTimeFromQueueToCrunchRun prometheus.Summary
mRunProbeDuration *prometheus.SummaryVec
+ mProbeAgeMax prometheus.Gauge
+ mProbeAgeMedian prometheus.Gauge
}
type createCall struct {
wp.mtx.Lock()
defer wp.mtx.Unlock()
if time.Now().Before(wp.atQuotaUntil) ||
+ wp.atQuotaUntilFewerInstances > 0 ||
wp.instanceSet.throttleCreate.Error() != nil ||
(wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating)) {
return false
wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
}
- initCmd := TagVerifier{nil, secret, nil}.InitCommand()
+ initCmd := TagVerifier{nil, secret, nil}.InitCommand() + "\n" + wp.instanceInitCommand
inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
wp.mtx.Lock()
defer wp.mtx.Unlock()
if err != nil {
if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
wp.atQuotaErr = err
- wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
- time.AfterFunc(quotaErrorTTL, wp.notify)
+ n := len(wp.workers) + len(wp.creating) - 1
+ if n < 1 {
+ // Quota error with no
+ // instances running --
+ // nothing to do but wait
+ wp.atQuotaUntilFewerInstances = 0
+ wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
+ time.AfterFunc(quotaErrorTTL, wp.notify)
+ logger.WithField("atQuotaUntil", wp.atQuotaUntil).Info("quota error with 0 running -- waiting for quotaErrorTTL")
+ } else if n < wp.atQuotaUntilFewerInstances || wp.atQuotaUntilFewerInstances == 0 {
+ // Quota error with N
+ // instances running -- report
+ // AtQuota until some
+ // instances shut down
+ wp.atQuotaUntilFewerInstances = n
+ wp.atQuotaUntil = time.Time{}
+ logger.WithField("atQuotaUntilFewerInstances", n).Info("quota error -- waiting for next instance shutdown")
+ }
}
logger.WithError(err).Error("create failed")
wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
func (wp *Pool) AtQuota() bool {
wp.mtx.Lock()
defer wp.mtx.Unlock()
- return time.Now().Before(wp.atQuotaUntil) || (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating))
+ return wp.atQuotaUntilFewerInstances > 0 ||
+ time.Now().Before(wp.atQuotaUntil) ||
+ (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating))
}
// SetIdleBehavior determines how the indicated instance will behave
Help: "Number of containers reported running by cloud VMs.",
})
reg.MustRegister(wp.mContainersRunning)
+ wp.mProbeAgeMax = prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "probe_age_seconds_max",
+ Help: "Maximum number of seconds since an instance's most recent successful probe.",
+ })
+ reg.MustRegister(wp.mProbeAgeMax)
+ wp.mProbeAgeMedian = prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "probe_age_seconds_median",
+ Help: "Median number of seconds since an instance's most recent successful probe.",
+ })
+ reg.MustRegister(wp.mProbeAgeMedian)
wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "dispatchcloud",
cpu := map[string]int64{}
mem := map[string]int64{}
var running int64
+ now := time.Now()
+ var probed []time.Time
for _, wkr := range wp.workers {
var cat string
switch {
cpu[cat] += int64(wkr.instType.VCPUs)
mem[cat] += int64(wkr.instType.RAM)
running += int64(len(wkr.running) + len(wkr.starting))
+ probed = append(probed, wkr.probed)
}
for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
}
wp.mContainersRunning.Set(float64(running))
+
+ if len(probed) == 0 {
+ wp.mProbeAgeMax.Set(0)
+ wp.mProbeAgeMedian.Set(0)
+ } else {
+ sort.Slice(probed, func(i, j int) bool { return probed[i].Before(probed[j]) })
+ wp.mProbeAgeMax.Set(now.Sub(probed[0]).Seconds())
+ wp.mProbeAgeMedian.Set(now.Sub(probed[len(probed)/2]).Seconds())
+ }
}
func (wp *Pool) runProbes() {
// KillInstance destroys a cloud VM instance. It returns an error if
// the given instance does not exist.
func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
+ wp.setupOnce.Do(wp.setup)
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
wkr, ok := wp.workers[id]
if !ok {
return errors.New("instance not found")
notify = true
}
+ if wp.atQuotaUntilFewerInstances > len(wp.workers)+len(wp.creating) {
+ // After syncing, there are fewer instances (including
+ // pending creates) than there were last time we saw a
+ // quota error. This might mean it's now possible to
+ // create new instances. Reset our "at quota" state.
+ wp.atQuotaUntilFewerInstances = 0
+ }
+
if !wp.loaded {
notify = true
wp.loaded = true