1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
22 "git.arvados.org/arvados.git/lib/cloud"
23 "git.arvados.org/arvados.git/sdk/go/arvados"
24 "github.com/prometheus/client_golang/prometheus"
25 "github.com/sirupsen/logrus"
26 "golang.org/x/crypto/ssh"
30 tagKeyInstanceType = "InstanceType"
31 tagKeyIdleBehavior = "IdleBehavior"
32 tagKeyInstanceSecret = "InstanceSecret"
33 tagKeyInstanceSetID = "InstanceSetID"
36 // An InstanceView shows a worker's current state and recent activity.
37 type InstanceView struct {
38 Instance cloud.InstanceID `json:"instance"`
39 Address string `json:"address"`
40 Price float64 `json:"price"`
41 ArvadosInstanceType string `json:"arvados_instance_type"`
42 ProviderInstanceType string `json:"provider_instance_type"`
43 LastContainerUUID string `json:"last_container_uuid"`
44 LastBusy time.Time `json:"last_busy"`
45 WorkerState string `json:"worker_state"`
46 IdleBehavior IdleBehavior `json:"idle_behavior"`
49 // An Executor executes shell commands on a remote host.
50 type Executor interface {
51 // Run cmd on the current target.
52 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
54 // Use the given target for subsequent operations. The new
55 // target is the same host as the previous target, but it
56 // might return a different address and verify a different
59 // SetTarget is called frequently, and in most cases the new
60 // target will behave exactly the same as the old one. An
61 // implementation should optimize accordingly.
63 // SetTarget must not block on concurrent Execute calls.
64 SetTarget(cloud.ExecutorTarget)
70 defaultSyncInterval = time.Minute
71 defaultProbeInterval = time.Second * 10
72 defaultMaxProbesPerSecond = 10
73 defaultTimeoutIdle = time.Minute
74 defaultTimeoutBooting = time.Minute * 10
75 defaultTimeoutProbe = time.Minute * 10
76 defaultTimeoutShutdown = time.Second * 10
77 defaultTimeoutTERM = time.Minute * 2
78 defaultTimeoutSignal = time.Second * 5
79 defaultTimeoutStaleRunLock = time.Second * 5
81 // Time after a quota error to try again anyway, even if no
82 // instances have been shutdown.
83 quotaErrorTTL = time.Minute
85 // Time between "X failed because rate limiting" messages
86 logRateLimitErrorInterval = time.Second * 10
89 func duration(conf arvados.Duration, def time.Duration) time.Duration {
91 return time.Duration(conf)
96 // NewPool creates a Pool of workers backed by instanceSet.
98 // New instances are configured and set up according to the given
99 // cluster configuration.
100 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
103 arvClient: arvClient,
104 instanceSetID: instanceSetID,
105 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
106 newExecutor: newExecutor,
108 bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
109 runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
110 imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
111 instanceTypes: cluster.InstanceTypes,
112 maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
113 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
114 maxInstances: cluster.Containers.CloudVMs.MaxInstances,
115 probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
116 syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
117 timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
118 timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
119 timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
120 timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
121 timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
122 timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
123 timeoutStaleRunLock: duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
124 systemRootToken: cluster.SystemRootToken,
125 installPublicKey: installPublicKey,
126 tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
127 runnerCmdDefault: cluster.Containers.CrunchRunCommand,
128 runnerArgs: append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...),
129 stop: make(chan bool),
131 wp.registerMetrics(reg)
133 wp.setupOnce.Do(wp.setup)
141 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
142 // zero Pool should not be used. Call NewPool to create a new Pool.
145 logger logrus.FieldLogger
146 arvClient *arvados.Client
147 instanceSetID cloud.InstanceSetID
148 instanceSet *throttledInstanceSet
149 newExecutor func(cloud.Instance) Executor
150 cluster *arvados.Cluster
151 bootProbeCommand string
153 imageID cloud.ImageID
154 instanceTypes map[string]arvados.InstanceType
155 syncInterval time.Duration
156 probeInterval time.Duration
157 maxProbesPerSecond int
158 maxConcurrentInstanceCreateOps int
160 timeoutIdle time.Duration
161 timeoutBooting time.Duration
162 timeoutProbe time.Duration
163 timeoutShutdown time.Duration
164 timeoutTERM time.Duration
165 timeoutSignal time.Duration
166 timeoutStaleRunLock time.Duration
167 systemRootToken string
168 installPublicKey ssh.PublicKey
170 runnerCmdDefault string // crunch-run command to use if not deploying a binary
171 runnerArgs []string // extra args passed to crunch-run
174 subscribers map[<-chan struct{}]chan<- struct{}
175 creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
176 workers map[cloud.InstanceID]*worker
177 loaded bool // loaded list of instances from InstanceSet at least once
178 exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
179 atQuotaUntil time.Time
180 atQuotaErr cloud.QuotaError
185 runnerMD5 [md5.Size]byte
188 mContainersRunning prometheus.Gauge
189 mInstances *prometheus.GaugeVec
190 mInstancesPrice *prometheus.GaugeVec
191 mVCPUs *prometheus.GaugeVec
192 mMemory *prometheus.GaugeVec
193 mBootOutcomes *prometheus.CounterVec
194 mDisappearances *prometheus.CounterVec
195 mTimeToSSH prometheus.Summary
196 mTimeToReadyForContainer prometheus.Summary
197 mTimeFromShutdownToGone prometheus.Summary
198 mTimeFromQueueToCrunchRun prometheus.Summary
199 mRunProbeDuration *prometheus.SummaryVec
202 type createCall struct {
204 instanceType arvados.InstanceType
207 func (wp *Pool) CheckHealth() error {
208 wp.setupOnce.Do(wp.setup)
209 if err := wp.loadRunnerData(); err != nil {
210 return fmt.Errorf("error loading runner binary: %s", err)
215 // Subscribe returns a buffered channel that becomes ready after any
216 // change to the pool's state that could have scheduling implications:
217 // a worker's state changes, a new worker appears, the cloud
218 // provider's API rate limiting period ends, etc.
220 // Additional events that occur while the channel is already ready
221 // will be dropped, so it is OK if the caller services the channel
226 // ch := wp.Subscribe()
227 // defer wp.Unsubscribe(ch)
234 func (wp *Pool) Subscribe() <-chan struct{} {
235 wp.setupOnce.Do(wp.setup)
237 defer wp.mtx.Unlock()
238 ch := make(chan struct{}, 1)
239 wp.subscribers[ch] = ch
243 // Unsubscribe stops sending updates to the given channel.
244 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
245 wp.setupOnce.Do(wp.setup)
247 defer wp.mtx.Unlock()
248 delete(wp.subscribers, ch)
251 // Unallocated returns the number of unallocated (creating + booting +
252 // idle + unknown) workers for each instance type. Workers in
253 // hold/drain mode are not included.
254 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
255 wp.setupOnce.Do(wp.setup)
257 defer wp.mtx.RUnlock()
258 unalloc := map[arvados.InstanceType]int{}
259 creating := map[arvados.InstanceType]int{}
260 oldestCreate := map[arvados.InstanceType]time.Time{}
261 for _, cc := range wp.creating {
262 it := cc.instanceType
264 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
265 oldestCreate[it] = cc.time
268 for _, wkr := range wp.workers {
269 // Skip workers that are not expected to become
270 // available soon. Note len(wkr.running)>0 is not
271 // redundant here: it can be true even in
273 if wkr.state == StateShutdown ||
274 wkr.state == StateRunning ||
275 wkr.idleBehavior != IdleBehaviorRun ||
276 len(wkr.running) > 0 {
281 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
282 // If up to N new workers appear in
283 // Instances() while we are waiting for N
284 // Create() calls to complete, we assume we're
285 // just seeing a race between Instances() and
286 // Create() responses.
288 // The other common reason why nodes have
289 // state==Unknown is that they appeared at
290 // startup, before any Create calls. They
291 // don't match the above timing condition, so
292 // we never mistakenly attribute them to
293 // pending Create calls.
297 for it, c := range creating {
303 // Create a new instance with the given type, and add it to the worker
304 // pool. The worker is added immediately; instance creation runs in
307 // Create returns false if a pre-existing error or a configuration
308 // setting prevents it from even attempting to create a new
309 // instance. Those errors are logged by the Pool, so the caller does
310 // not need to log anything in such cases.
311 func (wp *Pool) Create(it arvados.InstanceType) bool {
312 logger := wp.logger.WithField("InstanceType", it.Name)
313 wp.setupOnce.Do(wp.setup)
314 if wp.loadRunnerData() != nil {
315 // Boot probe is certain to fail.
319 defer wp.mtx.Unlock()
320 if time.Now().Before(wp.atQuotaUntil) ||
321 wp.instanceSet.throttleCreate.Error() != nil ||
322 (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating)) {
325 // The maxConcurrentInstanceCreateOps knob throttles the number of node create
326 // requests in flight. It was added to work around a limitation in Azure's
327 // managed disks, which support no more than 20 concurrent node creation
328 // requests from a single disk image (cf.
329 // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
330 // The code assumes that node creation, from Azure's perspective, means the
331 // period until the instance appears in the "get all instances" list.
332 if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
333 logger.Info("reached MaxConcurrentInstanceCreateOps")
334 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
338 secret := randomHex(instanceSecretLength)
339 wp.creating[secret] = createCall{time: now, instanceType: it}
342 tags := cloud.InstanceTags{
343 wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID),
344 wp.tagKeyPrefix + tagKeyInstanceType: it.Name,
345 wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
346 wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
348 initCmd := TagVerifier{nil, secret, nil}.InitCommand()
349 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
351 defer wp.mtx.Unlock()
352 // delete() is deferred so the updateWorker() call
353 // below knows to use StateBooting when adding a new
355 defer delete(wp.creating, secret)
357 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
359 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
360 time.AfterFunc(quotaErrorTTL, wp.notify)
362 logger.WithError(err).Error("create failed")
363 wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
366 wp.updateWorker(inst, it)
368 if len(wp.creating)+len(wp.workers) == wp.maxInstances {
369 logger.Infof("now at MaxInstances limit of %d instances", wp.maxInstances)
374 // AtQuota returns true if Create is not expected to work at the
375 // moment (e.g., cloud provider has reported quota errors, or we are
376 // already at our own configured quota).
377 func (wp *Pool) AtQuota() bool {
379 defer wp.mtx.Unlock()
380 return time.Now().Before(wp.atQuotaUntil) || (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating))
383 // SetIdleBehavior determines how the indicated instance will behave
384 // when it has no containers running.
385 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
387 defer wp.mtx.Unlock()
388 wkr, ok := wp.workers[id]
390 return errors.New("requested instance does not exist")
392 wkr.setIdleBehavior(idleBehavior)
396 // Successful connection to the SSH daemon, update the mTimeToSSH metric
397 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
399 defer wp.mtx.Unlock()
400 wkr, ok := wp.workers[inst.ID()]
402 // race: inst was removed from the pool
405 if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
406 // the node is not in booting state (can happen if
407 // a-d-c is restarted) OR this is not the first SSH
412 wkr.firstSSHConnection = time.Now()
413 if wp.mTimeToSSH != nil {
414 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
418 // Add or update worker attached to the given instance.
420 // The second return value is true if a new worker is created.
422 // A newly added instance has state=StateBooting if its tags match an
423 // entry in wp.creating, otherwise StateUnknown.
425 // Caller must have lock.
426 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
427 secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
428 inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
430 if wkr := wp.workers[id]; wkr != nil {
431 wkr.executor.SetTarget(inst)
433 wkr.updated = time.Now()
438 state := StateUnknown
439 if _, ok := wp.creating[secret]; ok {
443 // If an instance has a valid IdleBehavior tag when it first
444 // appears, initialize the new worker accordingly (this is how
445 // we restore IdleBehavior that was set by a prior dispatch
446 // process); otherwise, default to "run". After this,
447 // wkr.idleBehavior is the source of truth, and will only be
448 // changed via SetIdleBehavior().
449 idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
450 if !validIdleBehavior[idleBehavior] {
451 idleBehavior = IdleBehaviorRun
454 logger := wp.logger.WithFields(logrus.Fields{
455 "InstanceType": it.Name,
456 "Instance": inst.ID(),
457 "Address": inst.Address(),
459 logger.WithFields(logrus.Fields{
461 "IdleBehavior": idleBehavior,
462 }).Infof("instance appeared in cloud")
468 executor: wp.newExecutor(inst),
470 idleBehavior: idleBehavior,
477 running: make(map[string]*remoteRunner),
478 starting: make(map[string]*remoteRunner),
479 probing: make(chan struct{}, 1),
485 // Shutdown shuts down a worker with the given type, or returns false
486 // if all workers with the given type are busy.
487 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
488 wp.setupOnce.Do(wp.setup)
490 defer wp.mtx.Unlock()
491 logger := wp.logger.WithField("InstanceType", it.Name)
492 logger.Info("shutdown requested")
493 for _, tryState := range []State{StateBooting, StateIdle} {
494 // TODO: shutdown the worker with the longest idle
495 // time (Idle) or the earliest create time (Booting)
496 for _, wkr := range wp.workers {
497 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
498 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
499 wkr.reportBootOutcome(BootOutcomeAborted)
508 // CountWorkers returns the current number of workers in each state.
510 // CountWorkers blocks, if necessary, until the initial instance list
511 // has been loaded from the cloud provider.
512 func (wp *Pool) CountWorkers() map[State]int {
513 wp.setupOnce.Do(wp.setup)
516 defer wp.mtx.Unlock()
518 for _, w := range wp.workers {
524 // Running returns the container UUIDs being prepared/run on workers.
526 // In the returned map, the time value indicates when the Pool
527 // observed that the container process had exited. A container that
528 // has not yet exited has a zero time value. The caller should use
529 // ForgetContainer() to garbage-collect the entries for exited
531 func (wp *Pool) Running() map[string]time.Time {
532 wp.setupOnce.Do(wp.setup)
534 defer wp.mtx.Unlock()
535 r := map[string]time.Time{}
536 for _, wkr := range wp.workers {
537 for uuid := range wkr.running {
538 r[uuid] = time.Time{}
540 for uuid := range wkr.starting {
541 r[uuid] = time.Time{}
544 for uuid, exited := range wp.exited {
550 // StartContainer starts a container on an idle worker immediately if
551 // possible, otherwise returns false.
552 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
553 wp.setupOnce.Do(wp.setup)
555 defer wp.mtx.Unlock()
557 for _, w := range wp.workers {
558 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
559 if wkr == nil || w.busy.After(wkr.busy) {
567 wkr.startContainer(ctr)
571 // KillContainer kills the crunch-run process for the given container
572 // UUID, if it's running on any worker.
574 // KillContainer returns immediately; the act of killing the container
575 // takes some time, and runs in the background.
577 // KillContainer returns false if the container has already ended.
578 func (wp *Pool) KillContainer(uuid string, reason string) bool {
580 defer wp.mtx.Unlock()
581 logger := wp.logger.WithFields(logrus.Fields{
582 "ContainerUUID": uuid,
585 for _, wkr := range wp.workers {
586 rr := wkr.running[uuid]
588 rr = wkr.starting[uuid]
595 logger.Debug("cannot kill: already disappeared")
599 // ForgetContainer clears the placeholder for the given exited
600 // container, so it isn't returned by subsequent calls to Running().
602 // ForgetContainer has no effect if the container has not yet exited.
604 // The "container exited at time T" placeholder (which necessitates
605 // ForgetContainer) exists to make it easier for the caller
606 // (scheduler) to distinguish a container that exited without
607 // finalizing its state from a container that exited too recently for
608 // its final state to have appeared in the scheduler's queue cache.
609 func (wp *Pool) ForgetContainer(uuid string) {
611 defer wp.mtx.Unlock()
612 if _, ok := wp.exited[uuid]; ok {
613 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
614 delete(wp.exited, uuid)
618 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
620 reg = prometheus.NewRegistry()
622 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
623 Namespace: "arvados",
624 Subsystem: "dispatchcloud",
625 Name: "containers_running",
626 Help: "Number of containers reported running by cloud VMs.",
628 reg.MustRegister(wp.mContainersRunning)
629 wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
630 Namespace: "arvados",
631 Subsystem: "dispatchcloud",
632 Name: "instances_total",
633 Help: "Number of cloud VMs.",
634 }, []string{"category", "instance_type"})
635 reg.MustRegister(wp.mInstances)
636 wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
637 Namespace: "arvados",
638 Subsystem: "dispatchcloud",
639 Name: "instances_price",
640 Help: "Price of cloud VMs.",
641 }, []string{"category"})
642 reg.MustRegister(wp.mInstancesPrice)
643 wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
644 Namespace: "arvados",
645 Subsystem: "dispatchcloud",
647 Help: "Total VCPUs on all cloud VMs.",
648 }, []string{"category"})
649 reg.MustRegister(wp.mVCPUs)
650 wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
651 Namespace: "arvados",
652 Subsystem: "dispatchcloud",
653 Name: "memory_bytes_total",
654 Help: "Total memory on all cloud VMs.",
655 }, []string{"category"})
656 reg.MustRegister(wp.mMemory)
657 wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
658 Namespace: "arvados",
659 Subsystem: "dispatchcloud",
660 Name: "boot_outcomes",
661 Help: "Boot outcomes by type.",
662 }, []string{"outcome"})
663 for k := range validBootOutcomes {
664 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
666 reg.MustRegister(wp.mBootOutcomes)
667 wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
668 Namespace: "arvados",
669 Subsystem: "dispatchcloud",
670 Name: "instances_disappeared",
671 Help: "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
672 }, []string{"state"})
673 for _, v := range stateString {
674 wp.mDisappearances.WithLabelValues(v).Add(0)
676 reg.MustRegister(wp.mDisappearances)
677 wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
678 Namespace: "arvados",
679 Subsystem: "dispatchcloud",
680 Name: "instances_time_to_ssh_seconds",
681 Help: "Number of seconds between instance creation and the first successful SSH connection.",
682 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
684 reg.MustRegister(wp.mTimeToSSH)
685 wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
686 Namespace: "arvados",
687 Subsystem: "dispatchcloud",
688 Name: "instances_time_to_ready_for_container_seconds",
689 Help: "Number of seconds between the first successful SSH connection and ready to run a container.",
690 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
692 reg.MustRegister(wp.mTimeToReadyForContainer)
693 wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
694 Namespace: "arvados",
695 Subsystem: "dispatchcloud",
696 Name: "instances_time_from_shutdown_request_to_disappearance_seconds",
697 Help: "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
698 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
700 reg.MustRegister(wp.mTimeFromShutdownToGone)
701 wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
702 Namespace: "arvados",
703 Subsystem: "dispatchcloud",
704 Name: "containers_time_from_queue_to_crunch_run_seconds",
705 Help: "Number of seconds between the queuing of a container and the start of crunch-run.",
706 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
708 reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
709 wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
710 Namespace: "arvados",
711 Subsystem: "dispatchcloud",
712 Name: "instances_run_probe_duration_seconds",
713 Help: "Number of seconds per runProbe call.",
714 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
715 }, []string{"outcome"})
716 reg.MustRegister(wp.mRunProbeDuration)
719 func (wp *Pool) runMetrics() {
721 defer wp.Unsubscribe(ch)
728 func (wp *Pool) updateMetrics() {
730 defer wp.mtx.RUnlock()
736 instances := map[entKey]int64{}
737 price := map[string]float64{}
738 cpu := map[string]int64{}
739 mem := map[string]int64{}
741 for _, wkr := range wp.workers {
744 case len(wkr.running)+len(wkr.starting) > 0:
746 case wkr.idleBehavior == IdleBehaviorHold:
748 case wkr.state == StateBooting:
750 case wkr.state == StateUnknown:
755 instances[entKey{cat, wkr.instType.Name}]++
756 price[cat] += wkr.instType.Price
757 cpu[cat] += int64(wkr.instType.VCPUs)
758 mem[cat] += int64(wkr.instType.RAM)
759 running += int64(len(wkr.running) + len(wkr.starting))
761 for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
762 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
763 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
764 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
765 // make sure to reset gauges for non-existing category/nodetype combinations
766 for _, it := range wp.instanceTypes {
767 if _, ok := instances[entKey{cat, it.Name}]; !ok {
768 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
772 for k, v := range instances {
773 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
775 wp.mContainersRunning.Set(float64(running))
778 func (wp *Pool) runProbes() {
779 maxPPS := wp.maxProbesPerSecond
781 maxPPS = defaultMaxProbesPerSecond
783 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
784 defer limitticker.Stop()
786 probeticker := time.NewTicker(wp.probeInterval)
787 defer probeticker.Stop()
789 workers := []cloud.InstanceID{}
790 for range probeticker.C {
791 // Add some jitter. Without this, if probeInterval is
792 // a multiple of syncInterval and sync is
793 // instantaneous (as with the loopback driver), the
794 // first few probes race with sync operations and
795 // don't update the workers.
796 time.Sleep(time.Duration(mathrand.Int63n(int64(wp.probeInterval) / 23)))
798 workers = workers[:0]
800 for id, wkr := range wp.workers {
801 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
804 workers = append(workers, id)
808 for _, id := range workers {
810 wkr, ok := wp.workers[id]
813 // Deleted while we were probing
817 go wkr.ProbeAndUpdate()
821 case <-limitticker.C:
827 func (wp *Pool) runSync() {
828 // sync once immediately, then wait syncInterval, sync again,
830 timer := time.NewTimer(1)
834 err := wp.getInstancesAndSync()
836 wp.logger.WithError(err).Warn("sync failed")
838 timer.Reset(wp.syncInterval)
840 wp.logger.Debug("worker.Pool stopped")
846 // Stop synchronizing with the InstanceSet.
847 func (wp *Pool) Stop() {
848 wp.setupOnce.Do(wp.setup)
852 // Instances returns an InstanceView for each worker in the pool,
853 // summarizing its current state and recent activity.
854 func (wp *Pool) Instances() []InstanceView {
856 wp.setupOnce.Do(wp.setup)
858 for _, w := range wp.workers {
859 r = append(r, InstanceView{
860 Instance: w.instance.ID(),
861 Address: w.instance.Address(),
862 Price: w.instType.Price,
863 ArvadosInstanceType: w.instType.Name,
864 ProviderInstanceType: w.instType.ProviderType,
865 LastContainerUUID: w.lastUUID,
867 WorkerState: w.state.String(),
868 IdleBehavior: w.idleBehavior,
872 sort.Slice(r, func(i, j int) bool {
873 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
878 // KillInstance destroys a cloud VM instance. It returns an error if
879 // the given instance does not exist.
880 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
881 wkr, ok := wp.workers[id]
883 return errors.New("instance not found")
885 wkr.logger.WithField("Reason", reason).Info("shutting down")
886 wkr.reportBootOutcome(BootOutcomeAborted)
891 func (wp *Pool) setup() {
892 wp.creating = map[string]createCall{}
893 wp.exited = map[string]time.Time{}
894 wp.workers = map[cloud.InstanceID]*worker{}
895 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
899 // Load the runner program to be deployed on worker nodes into
900 // wp.runnerData, if necessary. Errors are logged.
902 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
904 // Caller must not have lock.
905 func (wp *Pool) loadRunnerData() error {
907 defer wp.mtx.Unlock()
908 if wp.runnerData != nil {
910 } else if wp.runnerSource == "" {
911 wp.runnerCmd = wp.runnerCmdDefault
912 wp.runnerData = []byte{}
915 logger := wp.logger.WithField("source", wp.runnerSource)
916 logger.Debug("loading runner")
917 buf, err := ioutil.ReadFile(wp.runnerSource)
919 logger.WithError(err).Error("failed to load runner program")
923 wp.runnerMD5 = md5.Sum(buf)
924 wp.runnerCmd = fmt.Sprintf("/tmp/arvados-crunch-run/crunch-run~%x", wp.runnerMD5)
928 func (wp *Pool) notify() {
930 defer wp.mtx.RUnlock()
931 for _, send := range wp.subscribers {
933 case send <- struct{}{}:
939 func (wp *Pool) getInstancesAndSync() error {
940 wp.setupOnce.Do(wp.setup)
941 if err := wp.instanceSet.throttleInstances.Error(); err != nil {
944 wp.logger.Debug("getting instance list")
945 threshold := time.Now()
946 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
948 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
951 wp.sync(threshold, instances)
952 wp.logger.Debug("sync done")
956 // Add/remove/update workers based on instances, which was obtained
957 // from the instanceSet. However, don't clobber any other updates that
958 // already happened after threshold.
959 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
961 defer wp.mtx.Unlock()
962 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
965 for _, inst := range instances {
966 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
967 it, ok := wp.instanceTypes[itTag]
969 wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
972 if wkr, isNew := wp.updateWorker(inst, it); isNew {
974 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
975 wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
980 for id, wkr := range wp.workers {
981 if wkr.updated.After(threshold) {
984 logger := wp.logger.WithFields(logrus.Fields{
985 "Instance": wkr.instance.ID(),
986 "WorkerState": wkr.state,
988 logger.Info("instance disappeared in cloud")
989 wkr.reportBootOutcome(BootOutcomeDisappeared)
990 if wp.mDisappearances != nil {
991 wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
993 // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
994 if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
995 wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
997 delete(wp.workers, id)
1005 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
1013 func (wp *Pool) waitUntilLoaded() {
1014 ch := wp.Subscribe()
1016 defer wp.mtx.RUnlock()
1024 func (wp *Pool) gatewayAuthSecret(uuid string) string {
1025 h := hmac.New(sha256.New, []byte(wp.systemRootToken))
1027 return fmt.Sprintf("%x", h.Sum(nil))
1030 // Return a random string of n hexadecimal digits (n*4 random bits). n
1032 func randomHex(n int) string {
1033 buf := make([]byte, n/2)
1034 _, err := rand.Read(buf)
1038 return fmt.Sprintf("%x", buf)