1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
22 "git.arvados.org/arvados.git/lib/cloud"
23 "git.arvados.org/arvados.git/sdk/go/arvados"
24 "github.com/prometheus/client_golang/prometheus"
25 "github.com/sirupsen/logrus"
26 "golang.org/x/crypto/ssh"
30 tagKeyInstanceType = "InstanceType"
31 tagKeyIdleBehavior = "IdleBehavior"
32 tagKeyInstanceSecret = "InstanceSecret"
33 tagKeyInstanceSetID = "InstanceSetID"
36 // An InstanceView shows a worker's current state and recent activity.
37 type InstanceView struct {
38 Instance cloud.InstanceID `json:"instance"`
39 Address string `json:"address"`
40 Price float64 `json:"price"`
41 ArvadosInstanceType string `json:"arvados_instance_type"`
42 ProviderInstanceType string `json:"provider_instance_type"`
43 LastContainerUUID string `json:"last_container_uuid"`
44 LastBusy time.Time `json:"last_busy"`
45 WorkerState string `json:"worker_state"`
46 IdleBehavior IdleBehavior `json:"idle_behavior"`
49 // An Executor executes shell commands on a remote host.
50 type Executor interface {
51 // Run cmd on the current target.
52 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
54 // Use the given target for subsequent operations. The new
55 // target is the same host as the previous target, but it
56 // might return a different address and verify a different
59 // SetTarget is called frequently, and in most cases the new
60 // target will behave exactly the same as the old one. An
61 // implementation should optimize accordingly.
63 // SetTarget must not block on concurrent Execute calls.
64 SetTarget(cloud.ExecutorTarget)
70 defaultSyncInterval = time.Minute
71 defaultProbeInterval = time.Second * 10
72 defaultMaxProbesPerSecond = 10
73 defaultTimeoutIdle = time.Minute
74 defaultTimeoutBooting = time.Minute * 10
75 defaultTimeoutProbe = time.Minute * 10
76 defaultTimeoutShutdown = time.Second * 10
77 defaultTimeoutTERM = time.Minute * 2
78 defaultTimeoutSignal = time.Second * 5
79 defaultTimeoutStaleRunLock = time.Second * 5
81 // Time after a quota error to try again anyway, even if no
82 // instances have been shutdown.
83 quotaErrorTTL = time.Minute
85 // Time between "X failed because rate limiting" messages
86 logRateLimitErrorInterval = time.Second * 10
89 func duration(conf arvados.Duration, def time.Duration) time.Duration {
91 return time.Duration(conf)
96 // NewPool creates a Pool of workers backed by instanceSet.
98 // New instances are configured and set up according to the given
99 // cluster configuration.
100 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
103 arvClient: arvClient,
104 instanceSetID: instanceSetID,
105 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
106 newExecutor: newExecutor,
108 bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
109 instanceInitCommand: cloud.InitCommand(cluster.Containers.CloudVMs.InstanceInitCommand),
110 runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
111 imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
112 instanceTypes: cluster.InstanceTypes,
113 maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
114 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
115 maxInstances: cluster.Containers.CloudVMs.MaxInstances,
116 probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
117 syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
118 timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
119 timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
120 timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
121 timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
122 timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
123 timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
124 timeoutStaleRunLock: duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
125 systemRootToken: cluster.SystemRootToken,
126 installPublicKey: installPublicKey,
127 tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
128 runnerCmdDefault: cluster.Containers.CrunchRunCommand,
129 runnerArgs: append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...),
130 stop: make(chan bool),
132 wp.registerMetrics(reg)
134 wp.setupOnce.Do(wp.setup)
142 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
143 // zero Pool should not be used. Call NewPool to create a new Pool.
146 logger logrus.FieldLogger
147 arvClient *arvados.Client
148 instanceSetID cloud.InstanceSetID
149 instanceSet *throttledInstanceSet
150 newExecutor func(cloud.Instance) Executor
151 cluster *arvados.Cluster
152 bootProbeCommand string
153 instanceInitCommand cloud.InitCommand
155 imageID cloud.ImageID
156 instanceTypes map[string]arvados.InstanceType
157 syncInterval time.Duration
158 probeInterval time.Duration
159 maxProbesPerSecond int
160 maxConcurrentInstanceCreateOps int
162 timeoutIdle time.Duration
163 timeoutBooting time.Duration
164 timeoutProbe time.Duration
165 timeoutShutdown time.Duration
166 timeoutTERM time.Duration
167 timeoutSignal time.Duration
168 timeoutStaleRunLock time.Duration
169 systemRootToken string
170 installPublicKey ssh.PublicKey
172 runnerCmdDefault string // crunch-run command to use if not deploying a binary
173 runnerArgs []string // extra args passed to crunch-run
176 subscribers map[<-chan struct{}]chan<- struct{}
177 creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
178 workers map[cloud.InstanceID]*worker
179 loaded bool // loaded list of instances from InstanceSet at least once
180 exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
181 atQuotaUntil time.Time
182 atQuotaErr cloud.QuotaError
187 runnerMD5 [md5.Size]byte
190 mContainersRunning prometheus.Gauge
191 mInstances *prometheus.GaugeVec
192 mInstancesPrice *prometheus.GaugeVec
193 mVCPUs *prometheus.GaugeVec
194 mMemory *prometheus.GaugeVec
195 mBootOutcomes *prometheus.CounterVec
196 mDisappearances *prometheus.CounterVec
197 mTimeToSSH prometheus.Summary
198 mTimeToReadyForContainer prometheus.Summary
199 mTimeFromShutdownToGone prometheus.Summary
200 mTimeFromQueueToCrunchRun prometheus.Summary
201 mRunProbeDuration *prometheus.SummaryVec
202 mProbeAgeMax prometheus.Gauge
203 mProbeAgeMedian prometheus.Gauge
206 type createCall struct {
208 instanceType arvados.InstanceType
211 func (wp *Pool) CheckHealth() error {
212 wp.setupOnce.Do(wp.setup)
213 if err := wp.loadRunnerData(); err != nil {
214 return fmt.Errorf("error loading runner binary: %s", err)
219 // Subscribe returns a buffered channel that becomes ready after any
220 // change to the pool's state that could have scheduling implications:
221 // a worker's state changes, a new worker appears, the cloud
222 // provider's API rate limiting period ends, etc.
224 // Additional events that occur while the channel is already ready
225 // will be dropped, so it is OK if the caller services the channel
230 // ch := wp.Subscribe()
231 // defer wp.Unsubscribe(ch)
238 func (wp *Pool) Subscribe() <-chan struct{} {
239 wp.setupOnce.Do(wp.setup)
241 defer wp.mtx.Unlock()
242 ch := make(chan struct{}, 1)
243 wp.subscribers[ch] = ch
247 // Unsubscribe stops sending updates to the given channel.
248 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
249 wp.setupOnce.Do(wp.setup)
251 defer wp.mtx.Unlock()
252 delete(wp.subscribers, ch)
255 // Unallocated returns the number of unallocated (creating + booting +
256 // idle + unknown) workers for each instance type. Workers in
257 // hold/drain mode are not included.
258 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
259 wp.setupOnce.Do(wp.setup)
261 defer wp.mtx.RUnlock()
262 unalloc := map[arvados.InstanceType]int{}
263 creating := map[arvados.InstanceType]int{}
264 oldestCreate := map[arvados.InstanceType]time.Time{}
265 for _, cc := range wp.creating {
266 it := cc.instanceType
268 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
269 oldestCreate[it] = cc.time
272 for _, wkr := range wp.workers {
273 // Skip workers that are not expected to become
274 // available soon. Note len(wkr.running)>0 is not
275 // redundant here: it can be true even in
277 if wkr.state == StateShutdown ||
278 wkr.state == StateRunning ||
279 wkr.idleBehavior != IdleBehaviorRun ||
280 len(wkr.running) > 0 {
285 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
286 // If up to N new workers appear in
287 // Instances() while we are waiting for N
288 // Create() calls to complete, we assume we're
289 // just seeing a race between Instances() and
290 // Create() responses.
292 // The other common reason why nodes have
293 // state==Unknown is that they appeared at
294 // startup, before any Create calls. They
295 // don't match the above timing condition, so
296 // we never mistakenly attribute them to
297 // pending Create calls.
301 for it, c := range creating {
307 // Create a new instance with the given type, and add it to the worker
308 // pool. The worker is added immediately; instance creation runs in
311 // Create returns false if a pre-existing error or a configuration
312 // setting prevents it from even attempting to create a new
313 // instance. Those errors are logged by the Pool, so the caller does
314 // not need to log anything in such cases.
315 func (wp *Pool) Create(it arvados.InstanceType) bool {
316 logger := wp.logger.WithField("InstanceType", it.Name)
317 wp.setupOnce.Do(wp.setup)
318 if wp.loadRunnerData() != nil {
319 // Boot probe is certain to fail.
323 defer wp.mtx.Unlock()
324 if time.Now().Before(wp.atQuotaUntil) ||
325 wp.instanceSet.throttleCreate.Error() != nil ||
326 (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating)) {
329 // The maxConcurrentInstanceCreateOps knob throttles the number of node create
330 // requests in flight. It was added to work around a limitation in Azure's
331 // managed disks, which support no more than 20 concurrent node creation
332 // requests from a single disk image (cf.
333 // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
334 // The code assumes that node creation, from Azure's perspective, means the
335 // period until the instance appears in the "get all instances" list.
336 if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
337 logger.Info("reached MaxConcurrentInstanceCreateOps")
338 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
342 secret := randomHex(instanceSecretLength)
343 wp.creating[secret] = createCall{time: now, instanceType: it}
346 tags := cloud.InstanceTags{
347 wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID),
348 wp.tagKeyPrefix + tagKeyInstanceType: it.Name,
349 wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
350 wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
352 initCmd := TagVerifier{nil, secret, nil}.InitCommand() + "\n" + wp.instanceInitCommand
353 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
355 defer wp.mtx.Unlock()
356 // delete() is deferred so the updateWorker() call
357 // below knows to use StateBooting when adding a new
359 defer delete(wp.creating, secret)
361 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
363 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
364 time.AfterFunc(quotaErrorTTL, wp.notify)
366 logger.WithError(err).Error("create failed")
367 wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
370 wp.updateWorker(inst, it)
372 if len(wp.creating)+len(wp.workers) == wp.maxInstances {
373 logger.Infof("now at MaxInstances limit of %d instances", wp.maxInstances)
378 // AtQuota returns true if Create is not expected to work at the
379 // moment (e.g., cloud provider has reported quota errors, or we are
380 // already at our own configured quota).
381 func (wp *Pool) AtQuota() bool {
383 defer wp.mtx.Unlock()
384 return time.Now().Before(wp.atQuotaUntil) || (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating))
387 // SetIdleBehavior determines how the indicated instance will behave
388 // when it has no containers running.
389 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
391 defer wp.mtx.Unlock()
392 wkr, ok := wp.workers[id]
394 return errors.New("requested instance does not exist")
396 wkr.setIdleBehavior(idleBehavior)
400 // Successful connection to the SSH daemon, update the mTimeToSSH metric
401 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
403 defer wp.mtx.Unlock()
404 wkr, ok := wp.workers[inst.ID()]
406 // race: inst was removed from the pool
409 if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
410 // the node is not in booting state (can happen if
411 // a-d-c is restarted) OR this is not the first SSH
416 wkr.firstSSHConnection = time.Now()
417 if wp.mTimeToSSH != nil {
418 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
422 // Add or update worker attached to the given instance.
424 // The second return value is true if a new worker is created.
426 // A newly added instance has state=StateBooting if its tags match an
427 // entry in wp.creating, otherwise StateUnknown.
429 // Caller must have lock.
430 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
431 secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
432 inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
434 if wkr := wp.workers[id]; wkr != nil {
435 wkr.executor.SetTarget(inst)
437 wkr.updated = time.Now()
442 state := StateUnknown
443 if _, ok := wp.creating[secret]; ok {
447 // If an instance has a valid IdleBehavior tag when it first
448 // appears, initialize the new worker accordingly (this is how
449 // we restore IdleBehavior that was set by a prior dispatch
450 // process); otherwise, default to "run". After this,
451 // wkr.idleBehavior is the source of truth, and will only be
452 // changed via SetIdleBehavior().
453 idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
454 if !validIdleBehavior[idleBehavior] {
455 idleBehavior = IdleBehaviorRun
458 logger := wp.logger.WithFields(logrus.Fields{
459 "InstanceType": it.Name,
460 "Instance": inst.ID(),
461 "Address": inst.Address(),
463 logger.WithFields(logrus.Fields{
465 "IdleBehavior": idleBehavior,
466 }).Infof("instance appeared in cloud")
472 executor: wp.newExecutor(inst),
474 idleBehavior: idleBehavior,
481 running: make(map[string]*remoteRunner),
482 starting: make(map[string]*remoteRunner),
483 probing: make(chan struct{}, 1),
489 // Shutdown shuts down a worker with the given type, or returns false
490 // if all workers with the given type are busy.
491 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
492 wp.setupOnce.Do(wp.setup)
494 defer wp.mtx.Unlock()
495 logger := wp.logger.WithField("InstanceType", it.Name)
496 logger.Info("shutdown requested")
497 for _, tryState := range []State{StateBooting, StateIdle} {
498 // TODO: shutdown the worker with the longest idle
499 // time (Idle) or the earliest create time (Booting)
500 for _, wkr := range wp.workers {
501 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
502 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
503 wkr.reportBootOutcome(BootOutcomeAborted)
512 // CountWorkers returns the current number of workers in each state.
514 // CountWorkers blocks, if necessary, until the initial instance list
515 // has been loaded from the cloud provider.
516 func (wp *Pool) CountWorkers() map[State]int {
517 wp.setupOnce.Do(wp.setup)
520 defer wp.mtx.Unlock()
522 for _, w := range wp.workers {
528 // Running returns the container UUIDs being prepared/run on workers.
530 // In the returned map, the time value indicates when the Pool
531 // observed that the container process had exited. A container that
532 // has not yet exited has a zero time value. The caller should use
533 // ForgetContainer() to garbage-collect the entries for exited
535 func (wp *Pool) Running() map[string]time.Time {
536 wp.setupOnce.Do(wp.setup)
538 defer wp.mtx.Unlock()
539 r := map[string]time.Time{}
540 for _, wkr := range wp.workers {
541 for uuid := range wkr.running {
542 r[uuid] = time.Time{}
544 for uuid := range wkr.starting {
545 r[uuid] = time.Time{}
548 for uuid, exited := range wp.exited {
554 // StartContainer starts a container on an idle worker immediately if
555 // possible, otherwise returns false.
556 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
557 wp.setupOnce.Do(wp.setup)
559 defer wp.mtx.Unlock()
561 for _, w := range wp.workers {
562 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
563 if wkr == nil || w.busy.After(wkr.busy) {
571 wkr.startContainer(ctr)
575 // KillContainer kills the crunch-run process for the given container
576 // UUID, if it's running on any worker.
578 // KillContainer returns immediately; the act of killing the container
579 // takes some time, and runs in the background.
581 // KillContainer returns false if the container has already ended.
582 func (wp *Pool) KillContainer(uuid string, reason string) bool {
584 defer wp.mtx.Unlock()
585 logger := wp.logger.WithFields(logrus.Fields{
586 "ContainerUUID": uuid,
589 for _, wkr := range wp.workers {
590 rr := wkr.running[uuid]
592 rr = wkr.starting[uuid]
599 logger.Debug("cannot kill: already disappeared")
603 // ForgetContainer clears the placeholder for the given exited
604 // container, so it isn't returned by subsequent calls to Running().
606 // ForgetContainer has no effect if the container has not yet exited.
608 // The "container exited at time T" placeholder (which necessitates
609 // ForgetContainer) exists to make it easier for the caller
610 // (scheduler) to distinguish a container that exited without
611 // finalizing its state from a container that exited too recently for
612 // its final state to have appeared in the scheduler's queue cache.
613 func (wp *Pool) ForgetContainer(uuid string) {
615 defer wp.mtx.Unlock()
616 if _, ok := wp.exited[uuid]; ok {
617 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
618 delete(wp.exited, uuid)
622 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
624 reg = prometheus.NewRegistry()
626 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
627 Namespace: "arvados",
628 Subsystem: "dispatchcloud",
629 Name: "containers_running",
630 Help: "Number of containers reported running by cloud VMs.",
632 reg.MustRegister(wp.mContainersRunning)
633 wp.mProbeAgeMax = prometheus.NewGauge(prometheus.GaugeOpts{
634 Namespace: "arvados",
635 Subsystem: "dispatchcloud",
636 Name: "probe_age_seconds_max",
637 Help: "Maximum number of seconds since an instance's most recent successful probe.",
639 reg.MustRegister(wp.mProbeAgeMax)
640 wp.mProbeAgeMedian = prometheus.NewGauge(prometheus.GaugeOpts{
641 Namespace: "arvados",
642 Subsystem: "dispatchcloud",
643 Name: "probe_age_seconds_median",
644 Help: "Median number of seconds since an instance's most recent successful probe.",
646 reg.MustRegister(wp.mProbeAgeMedian)
647 wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
648 Namespace: "arvados",
649 Subsystem: "dispatchcloud",
650 Name: "instances_total",
651 Help: "Number of cloud VMs.",
652 }, []string{"category", "instance_type"})
653 reg.MustRegister(wp.mInstances)
654 wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
655 Namespace: "arvados",
656 Subsystem: "dispatchcloud",
657 Name: "instances_price",
658 Help: "Price of cloud VMs.",
659 }, []string{"category"})
660 reg.MustRegister(wp.mInstancesPrice)
661 wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
662 Namespace: "arvados",
663 Subsystem: "dispatchcloud",
665 Help: "Total VCPUs on all cloud VMs.",
666 }, []string{"category"})
667 reg.MustRegister(wp.mVCPUs)
668 wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
669 Namespace: "arvados",
670 Subsystem: "dispatchcloud",
671 Name: "memory_bytes_total",
672 Help: "Total memory on all cloud VMs.",
673 }, []string{"category"})
674 reg.MustRegister(wp.mMemory)
675 wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
676 Namespace: "arvados",
677 Subsystem: "dispatchcloud",
678 Name: "boot_outcomes",
679 Help: "Boot outcomes by type.",
680 }, []string{"outcome"})
681 for k := range validBootOutcomes {
682 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
684 reg.MustRegister(wp.mBootOutcomes)
685 wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
686 Namespace: "arvados",
687 Subsystem: "dispatchcloud",
688 Name: "instances_disappeared",
689 Help: "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
690 }, []string{"state"})
691 for _, v := range stateString {
692 wp.mDisappearances.WithLabelValues(v).Add(0)
694 reg.MustRegister(wp.mDisappearances)
695 wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
696 Namespace: "arvados",
697 Subsystem: "dispatchcloud",
698 Name: "instances_time_to_ssh_seconds",
699 Help: "Number of seconds between instance creation and the first successful SSH connection.",
700 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
702 reg.MustRegister(wp.mTimeToSSH)
703 wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
704 Namespace: "arvados",
705 Subsystem: "dispatchcloud",
706 Name: "instances_time_to_ready_for_container_seconds",
707 Help: "Number of seconds between the first successful SSH connection and ready to run a container.",
708 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
710 reg.MustRegister(wp.mTimeToReadyForContainer)
711 wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
712 Namespace: "arvados",
713 Subsystem: "dispatchcloud",
714 Name: "instances_time_from_shutdown_request_to_disappearance_seconds",
715 Help: "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
716 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
718 reg.MustRegister(wp.mTimeFromShutdownToGone)
719 wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
720 Namespace: "arvados",
721 Subsystem: "dispatchcloud",
722 Name: "containers_time_from_queue_to_crunch_run_seconds",
723 Help: "Number of seconds between the queuing of a container and the start of crunch-run.",
724 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
726 reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
727 wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
728 Namespace: "arvados",
729 Subsystem: "dispatchcloud",
730 Name: "instances_run_probe_duration_seconds",
731 Help: "Number of seconds per runProbe call.",
732 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
733 }, []string{"outcome"})
734 reg.MustRegister(wp.mRunProbeDuration)
737 func (wp *Pool) runMetrics() {
739 defer wp.Unsubscribe(ch)
746 func (wp *Pool) updateMetrics() {
748 defer wp.mtx.RUnlock()
754 instances := map[entKey]int64{}
755 price := map[string]float64{}
756 cpu := map[string]int64{}
757 mem := map[string]int64{}
760 var probed []time.Time
761 for _, wkr := range wp.workers {
764 case len(wkr.running)+len(wkr.starting) > 0:
766 case wkr.idleBehavior == IdleBehaviorHold:
768 case wkr.state == StateBooting:
770 case wkr.state == StateUnknown:
775 instances[entKey{cat, wkr.instType.Name}]++
776 price[cat] += wkr.instType.Price
777 cpu[cat] += int64(wkr.instType.VCPUs)
778 mem[cat] += int64(wkr.instType.RAM)
779 running += int64(len(wkr.running) + len(wkr.starting))
780 probed = append(probed, wkr.probed)
782 for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
783 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
784 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
785 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
786 // make sure to reset gauges for non-existing category/nodetype combinations
787 for _, it := range wp.instanceTypes {
788 if _, ok := instances[entKey{cat, it.Name}]; !ok {
789 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
793 for k, v := range instances {
794 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
796 wp.mContainersRunning.Set(float64(running))
798 if len(probed) == 0 {
799 wp.mProbeAgeMax.Set(0)
800 wp.mProbeAgeMedian.Set(0)
802 sort.Slice(probed, func(i, j int) bool { return probed[i].Before(probed[j]) })
803 wp.mProbeAgeMax.Set(now.Sub(probed[0]).Seconds())
804 wp.mProbeAgeMedian.Set(now.Sub(probed[len(probed)/2]).Seconds())
808 func (wp *Pool) runProbes() {
809 maxPPS := wp.maxProbesPerSecond
811 maxPPS = defaultMaxProbesPerSecond
813 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
814 defer limitticker.Stop()
816 probeticker := time.NewTicker(wp.probeInterval)
817 defer probeticker.Stop()
819 workers := []cloud.InstanceID{}
820 for range probeticker.C {
821 // Add some jitter. Without this, if probeInterval is
822 // a multiple of syncInterval and sync is
823 // instantaneous (as with the loopback driver), the
824 // first few probes race with sync operations and
825 // don't update the workers.
826 time.Sleep(time.Duration(mathrand.Int63n(int64(wp.probeInterval) / 23)))
828 workers = workers[:0]
830 for id, wkr := range wp.workers {
831 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
834 workers = append(workers, id)
838 for _, id := range workers {
840 wkr, ok := wp.workers[id]
843 // Deleted while we were probing
847 go wkr.ProbeAndUpdate()
851 case <-limitticker.C:
857 func (wp *Pool) runSync() {
858 // sync once immediately, then wait syncInterval, sync again,
860 timer := time.NewTimer(1)
864 err := wp.getInstancesAndSync()
866 wp.logger.WithError(err).Warn("sync failed")
868 timer.Reset(wp.syncInterval)
870 wp.logger.Debug("worker.Pool stopped")
876 // Stop synchronizing with the InstanceSet.
877 func (wp *Pool) Stop() {
878 wp.setupOnce.Do(wp.setup)
882 // Instances returns an InstanceView for each worker in the pool,
883 // summarizing its current state and recent activity.
884 func (wp *Pool) Instances() []InstanceView {
886 wp.setupOnce.Do(wp.setup)
888 for _, w := range wp.workers {
889 r = append(r, InstanceView{
890 Instance: w.instance.ID(),
891 Address: w.instance.Address(),
892 Price: w.instType.Price,
893 ArvadosInstanceType: w.instType.Name,
894 ProviderInstanceType: w.instType.ProviderType,
895 LastContainerUUID: w.lastUUID,
897 WorkerState: w.state.String(),
898 IdleBehavior: w.idleBehavior,
902 sort.Slice(r, func(i, j int) bool {
903 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
908 // KillInstance destroys a cloud VM instance. It returns an error if
909 // the given instance does not exist.
910 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
911 wkr, ok := wp.workers[id]
913 return errors.New("instance not found")
915 wkr.logger.WithField("Reason", reason).Info("shutting down")
916 wkr.reportBootOutcome(BootOutcomeAborted)
921 func (wp *Pool) setup() {
922 wp.creating = map[string]createCall{}
923 wp.exited = map[string]time.Time{}
924 wp.workers = map[cloud.InstanceID]*worker{}
925 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
929 // Load the runner program to be deployed on worker nodes into
930 // wp.runnerData, if necessary. Errors are logged.
932 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
934 // Caller must not have lock.
935 func (wp *Pool) loadRunnerData() error {
937 defer wp.mtx.Unlock()
938 if wp.runnerData != nil {
940 } else if wp.runnerSource == "" {
941 wp.runnerCmd = wp.runnerCmdDefault
942 wp.runnerData = []byte{}
945 logger := wp.logger.WithField("source", wp.runnerSource)
946 logger.Debug("loading runner")
947 buf, err := ioutil.ReadFile(wp.runnerSource)
949 logger.WithError(err).Error("failed to load runner program")
953 wp.runnerMD5 = md5.Sum(buf)
954 wp.runnerCmd = fmt.Sprintf("/tmp/arvados-crunch-run/crunch-run~%x", wp.runnerMD5)
958 func (wp *Pool) notify() {
960 defer wp.mtx.RUnlock()
961 for _, send := range wp.subscribers {
963 case send <- struct{}{}:
969 func (wp *Pool) getInstancesAndSync() error {
970 wp.setupOnce.Do(wp.setup)
971 if err := wp.instanceSet.throttleInstances.Error(); err != nil {
974 wp.logger.Debug("getting instance list")
975 threshold := time.Now()
976 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
978 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
981 wp.sync(threshold, instances)
982 wp.logger.Debug("sync done")
986 // Add/remove/update workers based on instances, which was obtained
987 // from the instanceSet. However, don't clobber any other updates that
988 // already happened after threshold.
989 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
991 defer wp.mtx.Unlock()
992 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
995 for _, inst := range instances {
996 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
997 it, ok := wp.instanceTypes[itTag]
999 wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
1002 if wkr, isNew := wp.updateWorker(inst, it); isNew {
1004 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
1005 wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
1010 for id, wkr := range wp.workers {
1011 if wkr.updated.After(threshold) {
1014 logger := wp.logger.WithFields(logrus.Fields{
1015 "Instance": wkr.instance.ID(),
1016 "WorkerState": wkr.state,
1018 logger.Info("instance disappeared in cloud")
1019 wkr.reportBootOutcome(BootOutcomeDisappeared)
1020 if wp.mDisappearances != nil {
1021 wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
1023 // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
1024 if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
1025 wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
1027 delete(wp.workers, id)
1035 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
1043 func (wp *Pool) waitUntilLoaded() {
1044 ch := wp.Subscribe()
1046 defer wp.mtx.RUnlock()
1054 func (wp *Pool) gatewayAuthSecret(uuid string) string {
1055 h := hmac.New(sha256.New, []byte(wp.systemRootToken))
1057 return fmt.Sprintf("%x", h.Sum(nil))
1060 // Return a random string of n hexadecimal digits (n*4 random bits). n
1062 func randomHex(n int) string {
1063 buf := make([]byte, n/2)
1064 _, err := rand.Read(buf)
1068 return fmt.Sprintf("%x", buf)