1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
21 "git.arvados.org/arvados.git/lib/cloud"
22 "git.arvados.org/arvados.git/sdk/go/arvados"
23 "github.com/prometheus/client_golang/prometheus"
24 "github.com/sirupsen/logrus"
25 "golang.org/x/crypto/ssh"
29 tagKeyInstanceType = "InstanceType"
30 tagKeyIdleBehavior = "IdleBehavior"
31 tagKeyInstanceSecret = "InstanceSecret"
32 tagKeyInstanceSetID = "InstanceSetID"
35 // An InstanceView shows a worker's current state and recent activity.
36 type InstanceView struct {
37 Instance cloud.InstanceID `json:"instance"`
38 Address string `json:"address"`
39 Price float64 `json:"price"`
40 ArvadosInstanceType string `json:"arvados_instance_type"`
41 ProviderInstanceType string `json:"provider_instance_type"`
42 LastContainerUUID string `json:"last_container_uuid"`
43 LastBusy time.Time `json:"last_busy"`
44 WorkerState string `json:"worker_state"`
45 IdleBehavior IdleBehavior `json:"idle_behavior"`
48 // An Executor executes shell commands on a remote host.
49 type Executor interface {
50 // Run cmd on the current target.
51 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
53 // Use the given target for subsequent operations. The new
54 // target is the same host as the previous target, but it
55 // might return a different address and verify a different
58 // SetTarget is called frequently, and in most cases the new
59 // target will behave exactly the same as the old one. An
60 // implementation should optimize accordingly.
62 // SetTarget must not block on concurrent Execute calls.
63 SetTarget(cloud.ExecutorTarget)
69 defaultSyncInterval = time.Minute
70 defaultProbeInterval = time.Second * 10
71 defaultMaxProbesPerSecond = 10
72 defaultTimeoutIdle = time.Minute
73 defaultTimeoutBooting = time.Minute * 10
74 defaultTimeoutProbe = time.Minute * 10
75 defaultTimeoutShutdown = time.Second * 10
76 defaultTimeoutTERM = time.Minute * 2
77 defaultTimeoutSignal = time.Second * 5
78 defaultTimeoutStaleRunLock = time.Second * 5
80 // Time after a quota error to try again anyway, even if no
81 // instances have been shutdown.
82 quotaErrorTTL = time.Minute
84 // Time between "X failed because rate limiting" messages
85 logRateLimitErrorInterval = time.Second * 10
88 func duration(conf arvados.Duration, def time.Duration) time.Duration {
90 return time.Duration(conf)
95 // NewPool creates a Pool of workers backed by instanceSet.
97 // New instances are configured and set up according to the given
98 // cluster configuration.
99 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
102 arvClient: arvClient,
103 instanceSetID: instanceSetID,
104 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
105 newExecutor: newExecutor,
106 bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
107 runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
108 imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
109 instanceTypes: cluster.InstanceTypes,
110 maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
111 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
112 probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
113 syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
114 timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
115 timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
116 timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
117 timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
118 timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
119 timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
120 timeoutStaleRunLock: duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
121 systemRootToken: cluster.SystemRootToken,
122 installPublicKey: installPublicKey,
123 tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
124 runnerCmdDefault: cluster.Containers.CrunchRunCommand,
125 runnerArgs: append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...),
126 stop: make(chan bool),
128 wp.registerMetrics(reg)
130 wp.setupOnce.Do(wp.setup)
138 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
139 // zero Pool should not be used. Call NewPool to create a new Pool.
142 logger logrus.FieldLogger
143 arvClient *arvados.Client
144 instanceSetID cloud.InstanceSetID
145 instanceSet *throttledInstanceSet
146 newExecutor func(cloud.Instance) Executor
147 bootProbeCommand string
149 imageID cloud.ImageID
150 instanceTypes map[string]arvados.InstanceType
151 syncInterval time.Duration
152 probeInterval time.Duration
153 maxProbesPerSecond int
154 maxConcurrentInstanceCreateOps int
155 timeoutIdle time.Duration
156 timeoutBooting time.Duration
157 timeoutProbe time.Duration
158 timeoutShutdown time.Duration
159 timeoutTERM time.Duration
160 timeoutSignal time.Duration
161 timeoutStaleRunLock time.Duration
162 systemRootToken string
163 installPublicKey ssh.PublicKey
165 runnerCmdDefault string // crunch-run command to use if not deploying a binary
166 runnerArgs []string // extra args passed to crunch-run
169 subscribers map[<-chan struct{}]chan<- struct{}
170 creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
171 workers map[cloud.InstanceID]*worker
172 loaded bool // loaded list of instances from InstanceSet at least once
173 exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
174 atQuotaUntil time.Time
175 atQuotaErr cloud.QuotaError
180 runnerMD5 [md5.Size]byte
183 mContainersRunning prometheus.Gauge
184 mInstances *prometheus.GaugeVec
185 mInstancesPrice *prometheus.GaugeVec
186 mVCPUs *prometheus.GaugeVec
187 mMemory *prometheus.GaugeVec
188 mBootOutcomes *prometheus.CounterVec
189 mDisappearances *prometheus.CounterVec
190 mTimeToSSH prometheus.Summary
191 mTimeToReadyForContainer prometheus.Summary
192 mTimeFromShutdownToGone prometheus.Summary
193 mTimeFromQueueToCrunchRun prometheus.Summary
194 mRunProbeDuration *prometheus.SummaryVec
197 type createCall struct {
199 instanceType arvados.InstanceType
202 func (wp *Pool) CheckHealth() error {
203 wp.setupOnce.Do(wp.setup)
204 if err := wp.loadRunnerData(); err != nil {
205 return fmt.Errorf("error loading runner binary: %s", err)
210 // Subscribe returns a buffered channel that becomes ready after any
211 // change to the pool's state that could have scheduling implications:
212 // a worker's state changes, a new worker appears, the cloud
213 // provider's API rate limiting period ends, etc.
215 // Additional events that occur while the channel is already ready
216 // will be dropped, so it is OK if the caller services the channel
221 // ch := wp.Subscribe()
222 // defer wp.Unsubscribe(ch)
229 func (wp *Pool) Subscribe() <-chan struct{} {
230 wp.setupOnce.Do(wp.setup)
232 defer wp.mtx.Unlock()
233 ch := make(chan struct{}, 1)
234 wp.subscribers[ch] = ch
238 // Unsubscribe stops sending updates to the given channel.
239 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
240 wp.setupOnce.Do(wp.setup)
242 defer wp.mtx.Unlock()
243 delete(wp.subscribers, ch)
246 // Unallocated returns the number of unallocated (creating + booting +
247 // idle + unknown) workers for each instance type. Workers in
248 // hold/drain mode are not included.
249 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
250 wp.setupOnce.Do(wp.setup)
252 defer wp.mtx.RUnlock()
253 unalloc := map[arvados.InstanceType]int{}
254 creating := map[arvados.InstanceType]int{}
255 oldestCreate := map[arvados.InstanceType]time.Time{}
256 for _, cc := range wp.creating {
257 it := cc.instanceType
259 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
260 oldestCreate[it] = cc.time
263 for _, wkr := range wp.workers {
264 // Skip workers that are not expected to become
265 // available soon. Note len(wkr.running)>0 is not
266 // redundant here: it can be true even in
268 if wkr.state == StateShutdown ||
269 wkr.state == StateRunning ||
270 wkr.idleBehavior != IdleBehaviorRun ||
271 len(wkr.running) > 0 {
276 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
277 // If up to N new workers appear in
278 // Instances() while we are waiting for N
279 // Create() calls to complete, we assume we're
280 // just seeing a race between Instances() and
281 // Create() responses.
283 // The other common reason why nodes have
284 // state==Unknown is that they appeared at
285 // startup, before any Create calls. They
286 // don't match the above timing condition, so
287 // we never mistakenly attribute them to
288 // pending Create calls.
292 for it, c := range creating {
298 // Create a new instance with the given type, and add it to the worker
299 // pool. The worker is added immediately; instance creation runs in
302 // Create returns false if a pre-existing error state prevents it from
303 // even attempting to create a new instance. Those errors are logged
304 // by the Pool, so the caller does not need to log anything in such
306 func (wp *Pool) Create(it arvados.InstanceType) bool {
307 logger := wp.logger.WithField("InstanceType", it.Name)
308 wp.setupOnce.Do(wp.setup)
309 if wp.loadRunnerData() != nil {
310 // Boot probe is certain to fail.
314 defer wp.mtx.Unlock()
315 if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil {
318 // The maxConcurrentInstanceCreateOps knob throttles the number of node create
319 // requests in flight. It was added to work around a limitation in Azure's
320 // managed disks, which support no more than 20 concurrent node creation
321 // requests from a single disk image (cf.
322 // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
323 // The code assumes that node creation, from Azure's perspective, means the
324 // period until the instance appears in the "get all instances" list.
325 if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
326 logger.Info("reached MaxConcurrentInstanceCreateOps")
327 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
331 secret := randomHex(instanceSecretLength)
332 wp.creating[secret] = createCall{time: now, instanceType: it}
335 tags := cloud.InstanceTags{
336 wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID),
337 wp.tagKeyPrefix + tagKeyInstanceType: it.Name,
338 wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
339 wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
341 initCmd := TagVerifier{nil, secret, nil}.InitCommand()
342 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
344 defer wp.mtx.Unlock()
345 // delete() is deferred so the updateWorker() call
346 // below knows to use StateBooting when adding a new
348 defer delete(wp.creating, secret)
350 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
352 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
353 time.AfterFunc(quotaErrorTTL, wp.notify)
355 logger.WithError(err).Error("create failed")
356 wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
359 wp.updateWorker(inst, it)
364 // AtQuota returns true if Create is not expected to work at the
366 func (wp *Pool) AtQuota() bool {
368 defer wp.mtx.Unlock()
369 return time.Now().Before(wp.atQuotaUntil)
372 // SetIdleBehavior determines how the indicated instance will behave
373 // when it has no containers running.
374 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
376 defer wp.mtx.Unlock()
377 wkr, ok := wp.workers[id]
379 return errors.New("requested instance does not exist")
381 wkr.setIdleBehavior(idleBehavior)
385 // Successful connection to the SSH daemon, update the mTimeToSSH metric
386 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
388 defer wp.mtx.Unlock()
389 wkr := wp.workers[inst.ID()]
390 if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
391 // the node is not in booting state (can happen if a-d-c is restarted) OR
392 // this is not the first SSH connection
396 wkr.firstSSHConnection = time.Now()
397 if wp.mTimeToSSH != nil {
398 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
402 // Add or update worker attached to the given instance.
404 // The second return value is true if a new worker is created.
406 // A newly added instance has state=StateBooting if its tags match an
407 // entry in wp.creating, otherwise StateUnknown.
409 // Caller must have lock.
410 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
411 secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
412 inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
414 if wkr := wp.workers[id]; wkr != nil {
415 wkr.executor.SetTarget(inst)
417 wkr.updated = time.Now()
422 state := StateUnknown
423 if _, ok := wp.creating[secret]; ok {
427 // If an instance has a valid IdleBehavior tag when it first
428 // appears, initialize the new worker accordingly (this is how
429 // we restore IdleBehavior that was set by a prior dispatch
430 // process); otherwise, default to "run". After this,
431 // wkr.idleBehavior is the source of truth, and will only be
432 // changed via SetIdleBehavior().
433 idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
434 if !validIdleBehavior[idleBehavior] {
435 idleBehavior = IdleBehaviorRun
438 logger := wp.logger.WithFields(logrus.Fields{
439 "InstanceType": it.Name,
440 "Instance": inst.ID(),
441 "Address": inst.Address(),
443 logger.WithFields(logrus.Fields{
445 "IdleBehavior": idleBehavior,
446 }).Infof("instance appeared in cloud")
452 executor: wp.newExecutor(inst),
454 idleBehavior: idleBehavior,
461 running: make(map[string]*remoteRunner),
462 starting: make(map[string]*remoteRunner),
463 probing: make(chan struct{}, 1),
469 // Shutdown shuts down a worker with the given type, or returns false
470 // if all workers with the given type are busy.
471 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
472 wp.setupOnce.Do(wp.setup)
474 defer wp.mtx.Unlock()
475 logger := wp.logger.WithField("InstanceType", it.Name)
476 logger.Info("shutdown requested")
477 for _, tryState := range []State{StateBooting, StateIdle} {
478 // TODO: shutdown the worker with the longest idle
479 // time (Idle) or the earliest create time (Booting)
480 for _, wkr := range wp.workers {
481 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
482 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
483 wkr.reportBootOutcome(BootOutcomeAborted)
492 // CountWorkers returns the current number of workers in each state.
494 // CountWorkers blocks, if necessary, until the initial instance list
495 // has been loaded from the cloud provider.
496 func (wp *Pool) CountWorkers() map[State]int {
497 wp.setupOnce.Do(wp.setup)
500 defer wp.mtx.Unlock()
502 for _, w := range wp.workers {
508 // Running returns the container UUIDs being prepared/run on workers.
510 // In the returned map, the time value indicates when the Pool
511 // observed that the container process had exited. A container that
512 // has not yet exited has a zero time value. The caller should use
513 // ForgetContainer() to garbage-collect the entries for exited
515 func (wp *Pool) Running() map[string]time.Time {
516 wp.setupOnce.Do(wp.setup)
518 defer wp.mtx.Unlock()
519 r := map[string]time.Time{}
520 for _, wkr := range wp.workers {
521 for uuid := range wkr.running {
522 r[uuid] = time.Time{}
524 for uuid := range wkr.starting {
525 r[uuid] = time.Time{}
528 for uuid, exited := range wp.exited {
534 // StartContainer starts a container on an idle worker immediately if
535 // possible, otherwise returns false.
536 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
537 wp.setupOnce.Do(wp.setup)
539 defer wp.mtx.Unlock()
541 for _, w := range wp.workers {
542 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
543 if wkr == nil || w.busy.After(wkr.busy) {
551 wkr.startContainer(ctr)
555 // KillContainer kills the crunch-run process for the given container
556 // UUID, if it's running on any worker.
558 // KillContainer returns immediately; the act of killing the container
559 // takes some time, and runs in the background.
561 // KillContainer returns false if the container has already ended.
562 func (wp *Pool) KillContainer(uuid string, reason string) bool {
564 defer wp.mtx.Unlock()
565 logger := wp.logger.WithFields(logrus.Fields{
566 "ContainerUUID": uuid,
569 for _, wkr := range wp.workers {
570 rr := wkr.running[uuid]
572 rr = wkr.starting[uuid]
579 logger.Debug("cannot kill: already disappeared")
583 // ForgetContainer clears the placeholder for the given exited
584 // container, so it isn't returned by subsequent calls to Running().
586 // ForgetContainer has no effect if the container has not yet exited.
588 // The "container exited at time T" placeholder (which necessitates
589 // ForgetContainer) exists to make it easier for the caller
590 // (scheduler) to distinguish a container that exited without
591 // finalizing its state from a container that exited too recently for
592 // its final state to have appeared in the scheduler's queue cache.
593 func (wp *Pool) ForgetContainer(uuid string) {
595 defer wp.mtx.Unlock()
596 if _, ok := wp.exited[uuid]; ok {
597 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
598 delete(wp.exited, uuid)
602 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
604 reg = prometheus.NewRegistry()
606 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
607 Namespace: "arvados",
608 Subsystem: "dispatchcloud",
609 Name: "containers_running",
610 Help: "Number of containers reported running by cloud VMs.",
612 reg.MustRegister(wp.mContainersRunning)
613 wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
614 Namespace: "arvados",
615 Subsystem: "dispatchcloud",
616 Name: "instances_total",
617 Help: "Number of cloud VMs.",
618 }, []string{"category", "instance_type"})
619 reg.MustRegister(wp.mInstances)
620 wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
621 Namespace: "arvados",
622 Subsystem: "dispatchcloud",
623 Name: "instances_price",
624 Help: "Price of cloud VMs.",
625 }, []string{"category"})
626 reg.MustRegister(wp.mInstancesPrice)
627 wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
628 Namespace: "arvados",
629 Subsystem: "dispatchcloud",
631 Help: "Total VCPUs on all cloud VMs.",
632 }, []string{"category"})
633 reg.MustRegister(wp.mVCPUs)
634 wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
635 Namespace: "arvados",
636 Subsystem: "dispatchcloud",
637 Name: "memory_bytes_total",
638 Help: "Total memory on all cloud VMs.",
639 }, []string{"category"})
640 reg.MustRegister(wp.mMemory)
641 wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
642 Namespace: "arvados",
643 Subsystem: "dispatchcloud",
644 Name: "boot_outcomes",
645 Help: "Boot outcomes by type.",
646 }, []string{"outcome"})
647 for k := range validBootOutcomes {
648 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
650 reg.MustRegister(wp.mBootOutcomes)
651 wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
652 Namespace: "arvados",
653 Subsystem: "dispatchcloud",
654 Name: "instances_disappeared",
655 Help: "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
656 }, []string{"state"})
657 for _, v := range stateString {
658 wp.mDisappearances.WithLabelValues(v).Add(0)
660 reg.MustRegister(wp.mDisappearances)
661 wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
662 Namespace: "arvados",
663 Subsystem: "dispatchcloud",
664 Name: "instances_time_to_ssh_seconds",
665 Help: "Number of seconds between instance creation and the first successful SSH connection.",
666 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
668 reg.MustRegister(wp.mTimeToSSH)
669 wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
670 Namespace: "arvados",
671 Subsystem: "dispatchcloud",
672 Name: "instances_time_to_ready_for_container_seconds",
673 Help: "Number of seconds between the first successful SSH connection and ready to run a container.",
674 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
676 reg.MustRegister(wp.mTimeToReadyForContainer)
677 wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
678 Namespace: "arvados",
679 Subsystem: "dispatchcloud",
680 Name: "instances_time_from_shutdown_request_to_disappearance_seconds",
681 Help: "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
682 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
684 reg.MustRegister(wp.mTimeFromShutdownToGone)
685 wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
686 Namespace: "arvados",
687 Subsystem: "dispatchcloud",
688 Name: "containers_time_from_queue_to_crunch_run_seconds",
689 Help: "Number of seconds between the queuing of a container and the start of crunch-run.",
690 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
692 reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
693 wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
694 Namespace: "arvados",
695 Subsystem: "dispatchcloud",
696 Name: "instances_run_probe_duration_seconds",
697 Help: "Number of seconds per runProbe call.",
698 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
699 }, []string{"outcome"})
700 reg.MustRegister(wp.mRunProbeDuration)
703 func (wp *Pool) runMetrics() {
705 defer wp.Unsubscribe(ch)
712 func (wp *Pool) updateMetrics() {
714 defer wp.mtx.RUnlock()
720 instances := map[entKey]int64{}
721 price := map[string]float64{}
722 cpu := map[string]int64{}
723 mem := map[string]int64{}
725 for _, wkr := range wp.workers {
728 case len(wkr.running)+len(wkr.starting) > 0:
730 case wkr.idleBehavior == IdleBehaviorHold:
732 case wkr.state == StateBooting:
734 case wkr.state == StateUnknown:
739 instances[entKey{cat, wkr.instType.Name}]++
740 price[cat] += wkr.instType.Price
741 cpu[cat] += int64(wkr.instType.VCPUs)
742 mem[cat] += int64(wkr.instType.RAM)
743 running += int64(len(wkr.running) + len(wkr.starting))
745 for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
746 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
747 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
748 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
749 // make sure to reset gauges for non-existing category/nodetype combinations
750 for _, it := range wp.instanceTypes {
751 if _, ok := instances[entKey{cat, it.Name}]; !ok {
752 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
756 for k, v := range instances {
757 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
759 wp.mContainersRunning.Set(float64(running))
762 func (wp *Pool) runProbes() {
763 maxPPS := wp.maxProbesPerSecond
765 maxPPS = defaultMaxProbesPerSecond
767 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
768 defer limitticker.Stop()
770 probeticker := time.NewTicker(wp.probeInterval)
771 defer probeticker.Stop()
773 workers := []cloud.InstanceID{}
774 for range probeticker.C {
775 workers = workers[:0]
777 for id, wkr := range wp.workers {
778 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
781 workers = append(workers, id)
785 for _, id := range workers {
787 wkr, ok := wp.workers[id]
790 // Deleted while we were probing
794 go wkr.ProbeAndUpdate()
798 case <-limitticker.C:
804 func (wp *Pool) runSync() {
805 // sync once immediately, then wait syncInterval, sync again,
807 timer := time.NewTimer(1)
811 err := wp.getInstancesAndSync()
813 wp.logger.WithError(err).Warn("sync failed")
815 timer.Reset(wp.syncInterval)
817 wp.logger.Debug("worker.Pool stopped")
823 // Stop synchronizing with the InstanceSet.
824 func (wp *Pool) Stop() {
825 wp.setupOnce.Do(wp.setup)
829 // Instances returns an InstanceView for each worker in the pool,
830 // summarizing its current state and recent activity.
831 func (wp *Pool) Instances() []InstanceView {
833 wp.setupOnce.Do(wp.setup)
835 for _, w := range wp.workers {
836 r = append(r, InstanceView{
837 Instance: w.instance.ID(),
838 Address: w.instance.Address(),
839 Price: w.instType.Price,
840 ArvadosInstanceType: w.instType.Name,
841 ProviderInstanceType: w.instType.ProviderType,
842 LastContainerUUID: w.lastUUID,
844 WorkerState: w.state.String(),
845 IdleBehavior: w.idleBehavior,
849 sort.Slice(r, func(i, j int) bool {
850 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
855 // KillInstance destroys a cloud VM instance. It returns an error if
856 // the given instance does not exist.
857 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
858 wkr, ok := wp.workers[id]
860 return errors.New("instance not found")
862 wkr.logger.WithField("Reason", reason).Info("shutting down")
863 wkr.reportBootOutcome(BootOutcomeAborted)
868 func (wp *Pool) setup() {
869 wp.creating = map[string]createCall{}
870 wp.exited = map[string]time.Time{}
871 wp.workers = map[cloud.InstanceID]*worker{}
872 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
876 // Load the runner program to be deployed on worker nodes into
877 // wp.runnerData, if necessary. Errors are logged.
879 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
881 // Caller must not have lock.
882 func (wp *Pool) loadRunnerData() error {
884 defer wp.mtx.Unlock()
885 if wp.runnerData != nil {
887 } else if wp.runnerSource == "" {
888 wp.runnerCmd = wp.runnerCmdDefault
889 wp.runnerData = []byte{}
892 logger := wp.logger.WithField("source", wp.runnerSource)
893 logger.Debug("loading runner")
894 buf, err := ioutil.ReadFile(wp.runnerSource)
896 logger.WithError(err).Error("failed to load runner program")
900 wp.runnerMD5 = md5.Sum(buf)
901 wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
905 func (wp *Pool) notify() {
907 defer wp.mtx.RUnlock()
908 for _, send := range wp.subscribers {
910 case send <- struct{}{}:
916 func (wp *Pool) getInstancesAndSync() error {
917 wp.setupOnce.Do(wp.setup)
918 if err := wp.instanceSet.throttleInstances.Error(); err != nil {
921 wp.logger.Debug("getting instance list")
922 threshold := time.Now()
923 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
925 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
928 wp.sync(threshold, instances)
929 wp.logger.Debug("sync done")
933 // Add/remove/update workers based on instances, which was obtained
934 // from the instanceSet. However, don't clobber any other updates that
935 // already happened after threshold.
936 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
938 defer wp.mtx.Unlock()
939 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
942 for _, inst := range instances {
943 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
944 it, ok := wp.instanceTypes[itTag]
946 wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
949 if wkr, isNew := wp.updateWorker(inst, it); isNew {
951 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
952 wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
957 for id, wkr := range wp.workers {
958 if wkr.updated.After(threshold) {
961 logger := wp.logger.WithFields(logrus.Fields{
962 "Instance": wkr.instance.ID(),
963 "WorkerState": wkr.state,
965 logger.Info("instance disappeared in cloud")
966 wkr.reportBootOutcome(BootOutcomeDisappeared)
967 if wp.mDisappearances != nil {
968 wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
970 // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
971 if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
972 wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
974 delete(wp.workers, id)
982 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
990 func (wp *Pool) waitUntilLoaded() {
993 defer wp.mtx.RUnlock()
1001 func (wp *Pool) gatewayAuthSecret(uuid string) string {
1002 h := hmac.New(sha256.New, []byte(wp.systemRootToken))
1004 return fmt.Sprintf("%x", h.Sum(nil))
1007 // Return a random string of n hexadecimal digits (n*4 random bits). n
1009 func randomHex(n int) string {
1010 buf := make([]byte, n/2)
1011 _, err := rand.Read(buf)
1015 return fmt.Sprintf("%x", buf)