1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
22 "git.arvados.org/arvados.git/lib/cloud"
23 "git.arvados.org/arvados.git/sdk/go/arvados"
24 "github.com/prometheus/client_golang/prometheus"
25 "github.com/sirupsen/logrus"
26 "golang.org/x/crypto/ssh"
30 tagKeyInstanceType = "InstanceType"
31 tagKeyIdleBehavior = "IdleBehavior"
32 tagKeyInstanceSecret = "InstanceSecret"
33 tagKeyInstanceSetID = "InstanceSetID"
36 // An InstanceView shows a worker's current state and recent activity.
37 type InstanceView struct {
38 Instance cloud.InstanceID `json:"instance"`
39 Address string `json:"address"`
40 Price float64 `json:"price"`
41 ArvadosInstanceType string `json:"arvados_instance_type"`
42 ProviderInstanceType string `json:"provider_instance_type"`
43 LastContainerUUID string `json:"last_container_uuid"`
44 LastBusy time.Time `json:"last_busy"`
45 WorkerState string `json:"worker_state"`
46 IdleBehavior IdleBehavior `json:"idle_behavior"`
49 // An Executor executes shell commands on a remote host.
50 type Executor interface {
51 // Run cmd on the current target.
52 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
54 // Use the given target for subsequent operations. The new
55 // target is the same host as the previous target, but it
56 // might return a different address and verify a different
59 // SetTarget is called frequently, and in most cases the new
60 // target will behave exactly the same as the old one. An
61 // implementation should optimize accordingly.
63 // SetTarget must not block on concurrent Execute calls.
64 SetTarget(cloud.ExecutorTarget)
70 defaultSyncInterval = time.Minute
71 defaultProbeInterval = time.Second * 10
72 defaultMaxProbesPerSecond = 10
73 defaultTimeoutIdle = time.Minute
74 defaultTimeoutBooting = time.Minute * 10
75 defaultTimeoutProbe = time.Minute * 10
76 defaultTimeoutShutdown = time.Second * 10
77 defaultTimeoutTERM = time.Minute * 2
78 defaultTimeoutSignal = time.Second * 5
79 defaultTimeoutStaleRunLock = time.Second * 5
81 // Time after a quota error to try again anyway, even if no
82 // instances have been shutdown.
83 quotaErrorTTL = time.Minute
85 // Time between "X failed because rate limiting" messages
86 logRateLimitErrorInterval = time.Second * 10
89 func duration(conf arvados.Duration, def time.Duration) time.Duration {
91 return time.Duration(conf)
96 // NewPool creates a Pool of workers backed by instanceSet.
98 // New instances are configured and set up according to the given
99 // cluster configuration.
100 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
103 arvClient: arvClient,
104 instanceSetID: instanceSetID,
105 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
106 newExecutor: newExecutor,
108 bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
109 instanceInitCommand: cloud.InitCommand(cluster.Containers.CloudVMs.InstanceInitCommand),
110 runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
111 imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
112 instanceTypes: cluster.InstanceTypes,
113 maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
114 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
115 maxInstances: cluster.Containers.CloudVMs.MaxInstances,
116 probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
117 syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
118 timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
119 timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
120 timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
121 timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
122 timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
123 timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
124 timeoutStaleRunLock: duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
125 systemRootToken: cluster.SystemRootToken,
126 installPublicKey: installPublicKey,
127 tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
128 runnerCmdDefault: cluster.Containers.CrunchRunCommand,
129 runnerArgs: append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...),
130 stop: make(chan bool),
132 wp.registerMetrics(reg)
134 wp.setupOnce.Do(wp.setup)
142 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
143 // zero Pool should not be used. Call NewPool to create a new Pool.
146 logger logrus.FieldLogger
147 arvClient *arvados.Client
148 instanceSetID cloud.InstanceSetID
149 instanceSet *throttledInstanceSet
150 newExecutor func(cloud.Instance) Executor
151 cluster *arvados.Cluster
152 bootProbeCommand string
153 instanceInitCommand cloud.InitCommand
155 imageID cloud.ImageID
156 instanceTypes map[string]arvados.InstanceType
157 syncInterval time.Duration
158 probeInterval time.Duration
159 maxProbesPerSecond int
160 maxConcurrentInstanceCreateOps int
162 timeoutIdle time.Duration
163 timeoutBooting time.Duration
164 timeoutProbe time.Duration
165 timeoutShutdown time.Duration
166 timeoutTERM time.Duration
167 timeoutSignal time.Duration
168 timeoutStaleRunLock time.Duration
169 systemRootToken string
170 installPublicKey ssh.PublicKey
172 runnerCmdDefault string // crunch-run command to use if not deploying a binary
173 runnerArgs []string // extra args passed to crunch-run
176 subscribers map[<-chan struct{}]chan<- struct{}
177 creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
178 workers map[cloud.InstanceID]*worker
179 loaded bool // loaded list of instances from InstanceSet at least once
180 exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
181 atQuotaUntilFewerInstances int
182 atQuotaUntil time.Time
183 atQuotaErr cloud.QuotaError
188 runnerMD5 [md5.Size]byte
191 mContainersRunning prometheus.Gauge
192 mInstances *prometheus.GaugeVec
193 mInstancesPrice *prometheus.GaugeVec
194 mVCPUs *prometheus.GaugeVec
195 mMemory *prometheus.GaugeVec
196 mBootOutcomes *prometheus.CounterVec
197 mDisappearances *prometheus.CounterVec
198 mTimeToSSH prometheus.Summary
199 mTimeToReadyForContainer prometheus.Summary
200 mTimeFromShutdownToGone prometheus.Summary
201 mTimeFromQueueToCrunchRun prometheus.Summary
202 mRunProbeDuration *prometheus.SummaryVec
203 mProbeAgeMax prometheus.Gauge
204 mProbeAgeMedian prometheus.Gauge
207 type createCall struct {
209 instanceType arvados.InstanceType
212 func (wp *Pool) CheckHealth() error {
213 wp.setupOnce.Do(wp.setup)
214 if err := wp.loadRunnerData(); err != nil {
215 return fmt.Errorf("error loading runner binary: %s", err)
220 // Subscribe returns a buffered channel that becomes ready after any
221 // change to the pool's state that could have scheduling implications:
222 // a worker's state changes, a new worker appears, the cloud
223 // provider's API rate limiting period ends, etc.
225 // Additional events that occur while the channel is already ready
226 // will be dropped, so it is OK if the caller services the channel
231 // ch := wp.Subscribe()
232 // defer wp.Unsubscribe(ch)
239 func (wp *Pool) Subscribe() <-chan struct{} {
240 wp.setupOnce.Do(wp.setup)
242 defer wp.mtx.Unlock()
243 ch := make(chan struct{}, 1)
244 wp.subscribers[ch] = ch
248 // Unsubscribe stops sending updates to the given channel.
249 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
250 wp.setupOnce.Do(wp.setup)
252 defer wp.mtx.Unlock()
253 delete(wp.subscribers, ch)
256 // Unallocated returns the number of unallocated (creating + booting +
257 // idle + unknown) workers for each instance type. Workers in
258 // hold/drain mode are not included.
259 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
260 wp.setupOnce.Do(wp.setup)
262 defer wp.mtx.RUnlock()
263 unalloc := map[arvados.InstanceType]int{}
264 creating := map[arvados.InstanceType]int{}
265 oldestCreate := map[arvados.InstanceType]time.Time{}
266 for _, cc := range wp.creating {
267 it := cc.instanceType
269 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
270 oldestCreate[it] = cc.time
273 for _, wkr := range wp.workers {
274 // Skip workers that are not expected to become
275 // available soon. Note len(wkr.running)>0 is not
276 // redundant here: it can be true even in
278 if wkr.state == StateShutdown ||
279 wkr.state == StateRunning ||
280 wkr.idleBehavior != IdleBehaviorRun ||
281 len(wkr.running) > 0 {
286 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
287 // If up to N new workers appear in
288 // Instances() while we are waiting for N
289 // Create() calls to complete, we assume we're
290 // just seeing a race between Instances() and
291 // Create() responses.
293 // The other common reason why nodes have
294 // state==Unknown is that they appeared at
295 // startup, before any Create calls. They
296 // don't match the above timing condition, so
297 // we never mistakenly attribute them to
298 // pending Create calls.
302 for it, c := range creating {
308 // Create a new instance with the given type, and add it to the worker
309 // pool. The worker is added immediately; instance creation runs in
312 // Create returns false if a pre-existing error or a configuration
313 // setting prevents it from even attempting to create a new
314 // instance. Those errors are logged by the Pool, so the caller does
315 // not need to log anything in such cases.
316 func (wp *Pool) Create(it arvados.InstanceType) bool {
317 logger := wp.logger.WithField("InstanceType", it.Name)
318 wp.setupOnce.Do(wp.setup)
319 if wp.loadRunnerData() != nil {
320 // Boot probe is certain to fail.
324 defer wp.mtx.Unlock()
325 if time.Now().Before(wp.atQuotaUntil) ||
326 wp.atQuotaUntilFewerInstances > 0 ||
327 wp.instanceSet.throttleCreate.Error() != nil ||
328 (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating)) {
331 // The maxConcurrentInstanceCreateOps knob throttles the number of node create
332 // requests in flight. It was added to work around a limitation in Azure's
333 // managed disks, which support no more than 20 concurrent node creation
334 // requests from a single disk image (cf.
335 // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
336 // The code assumes that node creation, from Azure's perspective, means the
337 // period until the instance appears in the "get all instances" list.
338 if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
339 logger.Info("reached MaxConcurrentInstanceCreateOps")
340 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
344 secret := randomHex(instanceSecretLength)
345 wp.creating[secret] = createCall{time: now, instanceType: it}
348 tags := cloud.InstanceTags{
349 wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID),
350 wp.tagKeyPrefix + tagKeyInstanceType: it.Name,
351 wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
352 wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
354 initCmd := TagVerifier{nil, secret, nil}.InitCommand() + "\n" + wp.instanceInitCommand
355 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
357 defer wp.mtx.Unlock()
358 // delete() is deferred so the updateWorker() call
359 // below knows to use StateBooting when adding a new
361 defer delete(wp.creating, secret)
363 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
365 n := len(wp.workers) + len(wp.creating) - 1
367 // Quota error with no
368 // instances running --
369 // nothing to do but wait
370 wp.atQuotaUntilFewerInstances = 0
371 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
372 time.AfterFunc(quotaErrorTTL, wp.notify)
373 logger.WithField("atQuotaUntil", wp.atQuotaUntil).Info("quota error with 0 running -- waiting for quotaErrorTTL")
374 } else if n < wp.atQuotaUntilFewerInstances || wp.atQuotaUntilFewerInstances == 0 {
375 // Quota error with N
376 // instances running -- report
377 // AtQuota until some
378 // instances shut down
379 wp.atQuotaUntilFewerInstances = n
380 wp.atQuotaUntil = time.Time{}
381 logger.WithField("atQuotaUntilFewerInstances", n).Info("quota error -- waiting for next instance shutdown")
384 logger.WithError(err).Error("create failed")
385 wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
388 wp.updateWorker(inst, it)
390 if len(wp.creating)+len(wp.workers) == wp.maxInstances {
391 logger.Infof("now at MaxInstances limit of %d instances", wp.maxInstances)
396 // AtQuota returns true if Create is not expected to work at the
397 // moment (e.g., cloud provider has reported quota errors, or we are
398 // already at our own configured quota).
399 func (wp *Pool) AtQuota() bool {
401 defer wp.mtx.Unlock()
402 return wp.atQuotaUntilFewerInstances > 0 ||
403 time.Now().Before(wp.atQuotaUntil) ||
404 (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating))
407 // SetIdleBehavior determines how the indicated instance will behave
408 // when it has no containers running.
409 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
411 defer wp.mtx.Unlock()
412 wkr, ok := wp.workers[id]
414 return errors.New("requested instance does not exist")
416 wkr.setIdleBehavior(idleBehavior)
420 // Successful connection to the SSH daemon, update the mTimeToSSH metric
421 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
423 defer wp.mtx.Unlock()
424 wkr, ok := wp.workers[inst.ID()]
426 // race: inst was removed from the pool
429 if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
430 // the node is not in booting state (can happen if
431 // a-d-c is restarted) OR this is not the first SSH
436 wkr.firstSSHConnection = time.Now()
437 if wp.mTimeToSSH != nil {
438 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
442 // Add or update worker attached to the given instance.
444 // The second return value is true if a new worker is created.
446 // A newly added instance has state=StateBooting if its tags match an
447 // entry in wp.creating, otherwise StateUnknown.
449 // Caller must have lock.
450 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
451 secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
452 inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
454 if wkr := wp.workers[id]; wkr != nil {
455 wkr.executor.SetTarget(inst)
457 wkr.updated = time.Now()
462 state := StateUnknown
463 if _, ok := wp.creating[secret]; ok {
467 // If an instance has a valid IdleBehavior tag when it first
468 // appears, initialize the new worker accordingly (this is how
469 // we restore IdleBehavior that was set by a prior dispatch
470 // process); otherwise, default to "run". After this,
471 // wkr.idleBehavior is the source of truth, and will only be
472 // changed via SetIdleBehavior().
473 idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
474 if !validIdleBehavior[idleBehavior] {
475 idleBehavior = IdleBehaviorRun
478 logger := wp.logger.WithFields(logrus.Fields{
479 "InstanceType": it.Name,
480 "Instance": inst.ID(),
481 "Address": inst.Address(),
483 logger.WithFields(logrus.Fields{
485 "IdleBehavior": idleBehavior,
486 }).Infof("instance appeared in cloud")
492 executor: wp.newExecutor(inst),
494 idleBehavior: idleBehavior,
501 running: make(map[string]*remoteRunner),
502 starting: make(map[string]*remoteRunner),
503 probing: make(chan struct{}, 1),
509 // Shutdown shuts down a worker with the given type, or returns false
510 // if all workers with the given type are busy.
511 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
512 wp.setupOnce.Do(wp.setup)
514 defer wp.mtx.Unlock()
515 logger := wp.logger.WithField("InstanceType", it.Name)
516 logger.Info("shutdown requested")
517 for _, tryState := range []State{StateBooting, StateIdle} {
518 // TODO: shutdown the worker with the longest idle
519 // time (Idle) or the earliest create time (Booting)
520 for _, wkr := range wp.workers {
521 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
522 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
523 wkr.reportBootOutcome(BootOutcomeAborted)
532 // CountWorkers returns the current number of workers in each state.
534 // CountWorkers blocks, if necessary, until the initial instance list
535 // has been loaded from the cloud provider.
536 func (wp *Pool) CountWorkers() map[State]int {
537 wp.setupOnce.Do(wp.setup)
540 defer wp.mtx.Unlock()
542 for _, w := range wp.workers {
548 // Running returns the container UUIDs being prepared/run on workers.
550 // In the returned map, the time value indicates when the Pool
551 // observed that the container process had exited. A container that
552 // has not yet exited has a zero time value. The caller should use
553 // ForgetContainer() to garbage-collect the entries for exited
555 func (wp *Pool) Running() map[string]time.Time {
556 wp.setupOnce.Do(wp.setup)
558 defer wp.mtx.Unlock()
559 r := map[string]time.Time{}
560 for _, wkr := range wp.workers {
561 for uuid := range wkr.running {
562 r[uuid] = time.Time{}
564 for uuid := range wkr.starting {
565 r[uuid] = time.Time{}
568 for uuid, exited := range wp.exited {
574 // StartContainer starts a container on an idle worker immediately if
575 // possible, otherwise returns false.
576 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
577 wp.setupOnce.Do(wp.setup)
579 defer wp.mtx.Unlock()
581 for _, w := range wp.workers {
582 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
583 if wkr == nil || w.busy.After(wkr.busy) {
591 wkr.startContainer(ctr)
595 // KillContainer kills the crunch-run process for the given container
596 // UUID, if it's running on any worker.
598 // KillContainer returns immediately; the act of killing the container
599 // takes some time, and runs in the background.
601 // KillContainer returns false if the container has already ended.
602 func (wp *Pool) KillContainer(uuid string, reason string) bool {
604 defer wp.mtx.Unlock()
605 logger := wp.logger.WithFields(logrus.Fields{
606 "ContainerUUID": uuid,
609 for _, wkr := range wp.workers {
610 rr := wkr.running[uuid]
612 rr = wkr.starting[uuid]
619 logger.Debug("cannot kill: already disappeared")
623 // ForgetContainer clears the placeholder for the given exited
624 // container, so it isn't returned by subsequent calls to Running().
626 // ForgetContainer has no effect if the container has not yet exited.
628 // The "container exited at time T" placeholder (which necessitates
629 // ForgetContainer) exists to make it easier for the caller
630 // (scheduler) to distinguish a container that exited without
631 // finalizing its state from a container that exited too recently for
632 // its final state to have appeared in the scheduler's queue cache.
633 func (wp *Pool) ForgetContainer(uuid string) {
635 defer wp.mtx.Unlock()
636 if _, ok := wp.exited[uuid]; ok {
637 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
638 delete(wp.exited, uuid)
642 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
644 reg = prometheus.NewRegistry()
646 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
647 Namespace: "arvados",
648 Subsystem: "dispatchcloud",
649 Name: "containers_running",
650 Help: "Number of containers reported running by cloud VMs.",
652 reg.MustRegister(wp.mContainersRunning)
653 wp.mProbeAgeMax = prometheus.NewGauge(prometheus.GaugeOpts{
654 Namespace: "arvados",
655 Subsystem: "dispatchcloud",
656 Name: "probe_age_seconds_max",
657 Help: "Maximum number of seconds since an instance's most recent successful probe.",
659 reg.MustRegister(wp.mProbeAgeMax)
660 wp.mProbeAgeMedian = prometheus.NewGauge(prometheus.GaugeOpts{
661 Namespace: "arvados",
662 Subsystem: "dispatchcloud",
663 Name: "probe_age_seconds_median",
664 Help: "Median number of seconds since an instance's most recent successful probe.",
666 reg.MustRegister(wp.mProbeAgeMedian)
667 wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
668 Namespace: "arvados",
669 Subsystem: "dispatchcloud",
670 Name: "instances_total",
671 Help: "Number of cloud VMs.",
672 }, []string{"category", "instance_type"})
673 reg.MustRegister(wp.mInstances)
674 wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
675 Namespace: "arvados",
676 Subsystem: "dispatchcloud",
677 Name: "instances_price",
678 Help: "Price of cloud VMs.",
679 }, []string{"category"})
680 reg.MustRegister(wp.mInstancesPrice)
681 wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
682 Namespace: "arvados",
683 Subsystem: "dispatchcloud",
685 Help: "Total VCPUs on all cloud VMs.",
686 }, []string{"category"})
687 reg.MustRegister(wp.mVCPUs)
688 wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
689 Namespace: "arvados",
690 Subsystem: "dispatchcloud",
691 Name: "memory_bytes_total",
692 Help: "Total memory on all cloud VMs.",
693 }, []string{"category"})
694 reg.MustRegister(wp.mMemory)
695 wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
696 Namespace: "arvados",
697 Subsystem: "dispatchcloud",
698 Name: "boot_outcomes",
699 Help: "Boot outcomes by type.",
700 }, []string{"outcome"})
701 for k := range validBootOutcomes {
702 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
704 reg.MustRegister(wp.mBootOutcomes)
705 wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
706 Namespace: "arvados",
707 Subsystem: "dispatchcloud",
708 Name: "instances_disappeared",
709 Help: "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
710 }, []string{"state"})
711 for _, v := range stateString {
712 wp.mDisappearances.WithLabelValues(v).Add(0)
714 reg.MustRegister(wp.mDisappearances)
715 wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
716 Namespace: "arvados",
717 Subsystem: "dispatchcloud",
718 Name: "instances_time_to_ssh_seconds",
719 Help: "Number of seconds between instance creation and the first successful SSH connection.",
720 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
722 reg.MustRegister(wp.mTimeToSSH)
723 wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
724 Namespace: "arvados",
725 Subsystem: "dispatchcloud",
726 Name: "instances_time_to_ready_for_container_seconds",
727 Help: "Number of seconds between the first successful SSH connection and ready to run a container.",
728 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
730 reg.MustRegister(wp.mTimeToReadyForContainer)
731 wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
732 Namespace: "arvados",
733 Subsystem: "dispatchcloud",
734 Name: "instances_time_from_shutdown_request_to_disappearance_seconds",
735 Help: "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
736 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
738 reg.MustRegister(wp.mTimeFromShutdownToGone)
739 wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
740 Namespace: "arvados",
741 Subsystem: "dispatchcloud",
742 Name: "containers_time_from_queue_to_crunch_run_seconds",
743 Help: "Number of seconds between the queuing of a container and the start of crunch-run.",
744 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
746 reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
747 wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
748 Namespace: "arvados",
749 Subsystem: "dispatchcloud",
750 Name: "instances_run_probe_duration_seconds",
751 Help: "Number of seconds per runProbe call.",
752 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
753 }, []string{"outcome"})
754 reg.MustRegister(wp.mRunProbeDuration)
757 func (wp *Pool) runMetrics() {
759 defer wp.Unsubscribe(ch)
766 func (wp *Pool) updateMetrics() {
768 defer wp.mtx.RUnlock()
774 instances := map[entKey]int64{}
775 price := map[string]float64{}
776 cpu := map[string]int64{}
777 mem := map[string]int64{}
780 var probed []time.Time
781 for _, wkr := range wp.workers {
784 case len(wkr.running)+len(wkr.starting) > 0:
786 case wkr.idleBehavior == IdleBehaviorHold:
788 case wkr.state == StateBooting:
790 case wkr.state == StateUnknown:
795 instances[entKey{cat, wkr.instType.Name}]++
796 price[cat] += wkr.instType.Price
797 cpu[cat] += int64(wkr.instType.VCPUs)
798 mem[cat] += int64(wkr.instType.RAM)
799 running += int64(len(wkr.running) + len(wkr.starting))
800 probed = append(probed, wkr.probed)
802 for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
803 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
804 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
805 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
806 // make sure to reset gauges for non-existing category/nodetype combinations
807 for _, it := range wp.instanceTypes {
808 if _, ok := instances[entKey{cat, it.Name}]; !ok {
809 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
813 for k, v := range instances {
814 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
816 wp.mContainersRunning.Set(float64(running))
818 if len(probed) == 0 {
819 wp.mProbeAgeMax.Set(0)
820 wp.mProbeAgeMedian.Set(0)
822 sort.Slice(probed, func(i, j int) bool { return probed[i].Before(probed[j]) })
823 wp.mProbeAgeMax.Set(now.Sub(probed[0]).Seconds())
824 wp.mProbeAgeMedian.Set(now.Sub(probed[len(probed)/2]).Seconds())
828 func (wp *Pool) runProbes() {
829 maxPPS := wp.maxProbesPerSecond
831 maxPPS = defaultMaxProbesPerSecond
833 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
834 defer limitticker.Stop()
836 probeticker := time.NewTicker(wp.probeInterval)
837 defer probeticker.Stop()
839 workers := []cloud.InstanceID{}
840 for range probeticker.C {
841 // Add some jitter. Without this, if probeInterval is
842 // a multiple of syncInterval and sync is
843 // instantaneous (as with the loopback driver), the
844 // first few probes race with sync operations and
845 // don't update the workers.
846 time.Sleep(time.Duration(mathrand.Int63n(int64(wp.probeInterval) / 23)))
848 workers = workers[:0]
850 for id, wkr := range wp.workers {
851 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
854 workers = append(workers, id)
858 for _, id := range workers {
860 wkr, ok := wp.workers[id]
863 // Deleted while we were probing
867 go wkr.ProbeAndUpdate()
871 case <-limitticker.C:
877 func (wp *Pool) runSync() {
878 // sync once immediately, then wait syncInterval, sync again,
880 timer := time.NewTimer(1)
884 err := wp.getInstancesAndSync()
886 wp.logger.WithError(err).Warn("sync failed")
888 timer.Reset(wp.syncInterval)
890 wp.logger.Debug("worker.Pool stopped")
896 // Stop synchronizing with the InstanceSet.
897 func (wp *Pool) Stop() {
898 wp.setupOnce.Do(wp.setup)
902 // Instances returns an InstanceView for each worker in the pool,
903 // summarizing its current state and recent activity.
904 func (wp *Pool) Instances() []InstanceView {
906 wp.setupOnce.Do(wp.setup)
908 for _, w := range wp.workers {
909 r = append(r, InstanceView{
910 Instance: w.instance.ID(),
911 Address: w.instance.Address(),
912 Price: w.instType.Price,
913 ArvadosInstanceType: w.instType.Name,
914 ProviderInstanceType: w.instType.ProviderType,
915 LastContainerUUID: w.lastUUID,
917 WorkerState: w.state.String(),
918 IdleBehavior: w.idleBehavior,
922 sort.Slice(r, func(i, j int) bool {
923 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
928 // KillInstance destroys a cloud VM instance. It returns an error if
929 // the given instance does not exist.
930 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
931 wp.setupOnce.Do(wp.setup)
933 defer wp.mtx.Unlock()
934 wkr, ok := wp.workers[id]
936 return errors.New("instance not found")
938 wkr.logger.WithField("Reason", reason).Info("shutting down")
939 wkr.reportBootOutcome(BootOutcomeAborted)
944 func (wp *Pool) setup() {
945 wp.creating = map[string]createCall{}
946 wp.exited = map[string]time.Time{}
947 wp.workers = map[cloud.InstanceID]*worker{}
948 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
952 // Load the runner program to be deployed on worker nodes into
953 // wp.runnerData, if necessary. Errors are logged.
955 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
957 // Caller must not have lock.
958 func (wp *Pool) loadRunnerData() error {
960 defer wp.mtx.Unlock()
961 if wp.runnerData != nil {
963 } else if wp.runnerSource == "" {
964 wp.runnerCmd = wp.runnerCmdDefault
965 wp.runnerData = []byte{}
968 logger := wp.logger.WithField("source", wp.runnerSource)
969 logger.Debug("loading runner")
970 buf, err := ioutil.ReadFile(wp.runnerSource)
972 logger.WithError(err).Error("failed to load runner program")
976 wp.runnerMD5 = md5.Sum(buf)
977 wp.runnerCmd = fmt.Sprintf("/tmp/arvados-crunch-run/crunch-run~%x", wp.runnerMD5)
981 func (wp *Pool) notify() {
983 defer wp.mtx.RUnlock()
984 for _, send := range wp.subscribers {
986 case send <- struct{}{}:
992 func (wp *Pool) getInstancesAndSync() error {
993 wp.setupOnce.Do(wp.setup)
994 if err := wp.instanceSet.throttleInstances.Error(); err != nil {
997 wp.logger.Debug("getting instance list")
998 threshold := time.Now()
999 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
1001 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
1004 wp.sync(threshold, instances)
1005 wp.logger.Debug("sync done")
1009 // Add/remove/update workers based on instances, which was obtained
1010 // from the instanceSet. However, don't clobber any other updates that
1011 // already happened after threshold.
1012 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
1014 defer wp.mtx.Unlock()
1015 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
1018 for _, inst := range instances {
1019 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
1020 it, ok := wp.instanceTypes[itTag]
1022 wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
1025 if wkr, isNew := wp.updateWorker(inst, it); isNew {
1027 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
1028 wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
1033 for id, wkr := range wp.workers {
1034 if wkr.updated.After(threshold) {
1037 logger := wp.logger.WithFields(logrus.Fields{
1038 "Instance": wkr.instance.ID(),
1039 "WorkerState": wkr.state,
1041 logger.Info("instance disappeared in cloud")
1042 wkr.reportBootOutcome(BootOutcomeDisappeared)
1043 if wp.mDisappearances != nil {
1044 wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
1046 // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
1047 if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
1048 wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
1050 delete(wp.workers, id)
1055 if wp.atQuotaUntilFewerInstances > len(wp.workers)+len(wp.creating) {
1056 // After syncing, there are fewer instances (including
1057 // pending creates) than there were last time we saw a
1058 // quota error. This might mean it's now possible to
1059 // create new instances. Reset our "at quota" state.
1060 wp.atQuotaUntilFewerInstances = 0
1066 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
1074 func (wp *Pool) waitUntilLoaded() {
1075 ch := wp.Subscribe()
1077 defer wp.mtx.RUnlock()
1085 func (wp *Pool) gatewayAuthSecret(uuid string) string {
1086 h := hmac.New(sha256.New, []byte(wp.systemRootToken))
1088 return fmt.Sprintf("%x", h.Sum(nil))
1091 // Return a random string of n hexadecimal digits (n*4 random bits). n
1093 func randomHex(n int) string {
1094 buf := make([]byte, n/2)
1095 _, err := rand.Read(buf)
1099 return fmt.Sprintf("%x", buf)