1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
22 "git.arvados.org/arvados.git/lib/cloud"
23 "git.arvados.org/arvados.git/sdk/go/arvados"
24 "github.com/prometheus/client_golang/prometheus"
25 "github.com/sirupsen/logrus"
26 "golang.org/x/crypto/ssh"
30 tagKeyInstanceType = "InstanceType"
31 tagKeyIdleBehavior = "IdleBehavior"
32 tagKeyInstanceSecret = "InstanceSecret"
33 tagKeyInstanceSetID = "InstanceSetID"
36 // An InstanceView shows a worker's current state and recent activity.
37 type InstanceView struct {
38 Instance cloud.InstanceID `json:"instance"`
39 Address string `json:"address"`
40 Price float64 `json:"price"`
41 ArvadosInstanceType string `json:"arvados_instance_type"`
42 ProviderInstanceType string `json:"provider_instance_type"`
43 LastContainerUUID string `json:"last_container_uuid"`
44 LastBusy time.Time `json:"last_busy"`
45 WorkerState string `json:"worker_state"`
46 IdleBehavior IdleBehavior `json:"idle_behavior"`
49 // An Executor executes shell commands on a remote host.
50 type Executor interface {
51 // Run cmd on the current target.
52 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
54 // Use the given target for subsequent operations. The new
55 // target is the same host as the previous target, but it
56 // might return a different address and verify a different
59 // SetTarget is called frequently, and in most cases the new
60 // target will behave exactly the same as the old one. An
61 // implementation should optimize accordingly.
63 // SetTarget must not block on concurrent Execute calls.
64 SetTarget(cloud.ExecutorTarget)
70 defaultSyncInterval = time.Minute
71 defaultProbeInterval = time.Second * 10
72 defaultMaxProbesPerSecond = 10
73 defaultTimeoutIdle = time.Minute
74 defaultTimeoutBooting = time.Minute * 10
75 defaultTimeoutProbe = time.Minute * 10
76 defaultTimeoutShutdown = time.Second * 10
77 defaultTimeoutTERM = time.Minute * 2
78 defaultTimeoutSignal = time.Second * 5
79 defaultTimeoutStaleRunLock = time.Second * 5
81 // Time after a quota error to try again anyway, even if no
82 // instances have been shutdown.
83 quotaErrorTTL = time.Minute
85 // Time after a capacity error to try again
86 capacityErrorTTL = time.Minute
88 // Time between "X failed because rate limiting" messages
89 logRateLimitErrorInterval = time.Second * 10
92 func duration(conf arvados.Duration, def time.Duration) time.Duration {
94 return time.Duration(conf)
99 // NewPool creates a Pool of workers backed by instanceSet.
101 // New instances are configured and set up according to the given
102 // cluster configuration.
103 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
106 arvClient: arvClient,
107 instanceSetID: instanceSetID,
108 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
109 newExecutor: newExecutor,
111 bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
112 instanceInitCommand: cloud.InitCommand(cluster.Containers.CloudVMs.InstanceInitCommand),
113 runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
114 imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
115 instanceTypes: cluster.InstanceTypes,
116 maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
117 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
118 maxInstances: cluster.Containers.CloudVMs.MaxInstances,
119 probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
120 syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
121 timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
122 timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
123 timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
124 timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
125 timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
126 timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
127 timeoutStaleRunLock: duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
128 systemRootToken: cluster.SystemRootToken,
129 installPublicKey: installPublicKey,
130 tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
131 runnerCmdDefault: cluster.Containers.CrunchRunCommand,
132 runnerArgs: append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...),
133 stop: make(chan bool),
135 wp.registerMetrics(reg)
137 wp.setupOnce.Do(wp.setup)
145 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
146 // zero Pool should not be used. Call NewPool to create a new Pool.
149 logger logrus.FieldLogger
150 arvClient *arvados.Client
151 instanceSetID cloud.InstanceSetID
152 instanceSet *throttledInstanceSet
153 newExecutor func(cloud.Instance) Executor
154 cluster *arvados.Cluster
155 bootProbeCommand string
156 instanceInitCommand cloud.InitCommand
158 imageID cloud.ImageID
159 instanceTypes map[string]arvados.InstanceType
160 syncInterval time.Duration
161 probeInterval time.Duration
162 maxProbesPerSecond int
163 maxConcurrentInstanceCreateOps int
165 timeoutIdle time.Duration
166 timeoutBooting time.Duration
167 timeoutProbe time.Duration
168 timeoutShutdown time.Duration
169 timeoutTERM time.Duration
170 timeoutSignal time.Duration
171 timeoutStaleRunLock time.Duration
172 systemRootToken string
173 installPublicKey ssh.PublicKey
175 runnerCmdDefault string // crunch-run command to use if not deploying a binary
176 runnerArgs []string // extra args passed to crunch-run
179 subscribers map[<-chan struct{}]chan<- struct{}
180 creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
181 workers map[cloud.InstanceID]*worker
182 loaded bool // loaded list of instances from InstanceSet at least once
183 exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
184 atQuotaUntilFewerInstances int
185 atQuotaUntil time.Time
186 atQuotaErr cloud.QuotaError
187 atCapacityUntil map[string]time.Time
192 runnerMD5 [md5.Size]byte
195 mContainersRunning prometheus.Gauge
196 mInstances *prometheus.GaugeVec
197 mInstancesPrice *prometheus.GaugeVec
198 mVCPUs *prometheus.GaugeVec
199 mMemory *prometheus.GaugeVec
200 mBootOutcomes *prometheus.CounterVec
201 mDisappearances *prometheus.CounterVec
202 mTimeToSSH prometheus.Summary
203 mTimeToReadyForContainer prometheus.Summary
204 mTimeFromShutdownToGone prometheus.Summary
205 mTimeFromQueueToCrunchRun prometheus.Summary
206 mRunProbeDuration *prometheus.SummaryVec
207 mProbeAgeMax prometheus.Gauge
208 mProbeAgeMedian prometheus.Gauge
211 type createCall struct {
213 instanceType arvados.InstanceType
216 func (wp *Pool) CheckHealth() error {
217 wp.setupOnce.Do(wp.setup)
218 if err := wp.loadRunnerData(); err != nil {
219 return fmt.Errorf("error loading runner binary: %s", err)
224 // Subscribe returns a buffered channel that becomes ready after any
225 // change to the pool's state that could have scheduling implications:
226 // a worker's state changes, a new worker appears, the cloud
227 // provider's API rate limiting period ends, etc.
229 // Additional events that occur while the channel is already ready
230 // will be dropped, so it is OK if the caller services the channel
235 // ch := wp.Subscribe()
236 // defer wp.Unsubscribe(ch)
243 func (wp *Pool) Subscribe() <-chan struct{} {
244 wp.setupOnce.Do(wp.setup)
246 defer wp.mtx.Unlock()
247 ch := make(chan struct{}, 1)
248 wp.subscribers[ch] = ch
252 // Unsubscribe stops sending updates to the given channel.
253 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
254 wp.setupOnce.Do(wp.setup)
256 defer wp.mtx.Unlock()
257 delete(wp.subscribers, ch)
260 // Unallocated returns the number of unallocated (creating + booting +
261 // idle + unknown) workers for each instance type. Workers in
262 // hold/drain mode are not included.
263 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
264 wp.setupOnce.Do(wp.setup)
266 defer wp.mtx.RUnlock()
267 unalloc := map[arvados.InstanceType]int{}
268 creating := map[arvados.InstanceType]int{}
269 oldestCreate := map[arvados.InstanceType]time.Time{}
270 for _, cc := range wp.creating {
271 it := cc.instanceType
273 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
274 oldestCreate[it] = cc.time
277 for _, wkr := range wp.workers {
278 // Skip workers that are not expected to become
279 // available soon. Note len(wkr.running)>0 is not
280 // redundant here: it can be true even in
282 if wkr.state == StateShutdown ||
283 wkr.state == StateRunning ||
284 wkr.idleBehavior != IdleBehaviorRun ||
285 len(wkr.running) > 0 {
290 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
291 // If up to N new workers appear in
292 // Instances() while we are waiting for N
293 // Create() calls to complete, we assume we're
294 // just seeing a race between Instances() and
295 // Create() responses.
297 // The other common reason why nodes have
298 // state==Unknown is that they appeared at
299 // startup, before any Create calls. They
300 // don't match the above timing condition, so
301 // we never mistakenly attribute them to
302 // pending Create calls.
306 for it, c := range creating {
312 // Create a new instance with the given type, and add it to the worker
313 // pool. The worker is added immediately; instance creation runs in
316 // Create returns false if a pre-existing error or a configuration
317 // setting prevents it from even attempting to create a new
318 // instance. Those errors are logged by the Pool, so the caller does
319 // not need to log anything in such cases.
320 func (wp *Pool) Create(it arvados.InstanceType) bool {
321 logger := wp.logger.WithField("InstanceType", it.Name)
322 wp.setupOnce.Do(wp.setup)
323 if wp.loadRunnerData() != nil {
324 // Boot probe is certain to fail.
327 if wp.AtCapacity(it) || wp.AtQuota() || wp.instanceSet.throttleCreate.Error() != nil {
331 defer wp.mtx.Unlock()
332 // The maxConcurrentInstanceCreateOps knob throttles the number of node create
333 // requests in flight. It was added to work around a limitation in Azure's
334 // managed disks, which support no more than 20 concurrent node creation
335 // requests from a single disk image (cf.
336 // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
337 // The code assumes that node creation, from Azure's perspective, means the
338 // period until the instance appears in the "get all instances" list.
339 if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
340 logger.Info("reached MaxConcurrentInstanceCreateOps")
341 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
345 secret := randomHex(instanceSecretLength)
346 wp.creating[secret] = createCall{time: now, instanceType: it}
349 tags := cloud.InstanceTags{
350 wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID),
351 wp.tagKeyPrefix + tagKeyInstanceType: it.Name,
352 wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
353 wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
355 initCmd := TagVerifier{nil, secret, nil}.InitCommand() + "\n" + wp.instanceInitCommand
356 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
358 defer wp.mtx.Unlock()
359 // delete() is deferred so the updateWorker() call
360 // below knows to use StateBooting when adding a new
362 defer delete(wp.creating, secret)
364 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
366 n := len(wp.workers) + len(wp.creating) - 1
368 // Quota error with no
369 // instances running --
370 // nothing to do but wait
371 wp.atQuotaUntilFewerInstances = 0
372 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
373 time.AfterFunc(quotaErrorTTL, wp.notify)
374 logger.WithField("atQuotaUntil", wp.atQuotaUntil).Info("quota error with 0 running -- waiting for quotaErrorTTL")
375 } else if n < wp.atQuotaUntilFewerInstances || wp.atQuotaUntilFewerInstances == 0 {
376 // Quota error with N
377 // instances running -- report
378 // AtQuota until some
379 // instances shut down
380 wp.atQuotaUntilFewerInstances = n
381 wp.atQuotaUntil = time.Time{}
382 logger.WithField("atQuotaUntilFewerInstances", n).Info("quota error -- waiting for next instance shutdown")
385 if err, ok := err.(cloud.CapacityError); ok && err.IsCapacityError() {
386 capKey := it.ProviderType
387 if !err.IsInstanceTypeSpecific() {
388 // set capacity flag for all
392 if wp.atCapacityUntil == nil {
393 wp.atCapacityUntil = map[string]time.Time{}
395 wp.atCapacityUntil[capKey] = time.Now().Add(capacityErrorTTL)
397 logger.WithError(err).Error("create failed")
398 wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
401 wp.updateWorker(inst, it)
403 if len(wp.creating)+len(wp.workers) == wp.maxInstances {
404 logger.Infof("now at MaxInstances limit of %d instances", wp.maxInstances)
409 // AtCapacity returns true if Create() is currently expected to fail
410 // for the given instance type.
411 func (wp *Pool) AtCapacity(it arvados.InstanceType) bool {
413 defer wp.mtx.Unlock()
414 if t, ok := wp.atCapacityUntil[it.ProviderType]; ok && time.Now().Before(t) {
415 // at capacity for this instance type
418 if t, ok := wp.atCapacityUntil[""]; ok && time.Now().Before(t) {
419 // at capacity for all instance types
425 // AtQuota returns true if Create is not expected to work at the
426 // moment (e.g., cloud provider has reported quota errors, or we are
427 // already at our own configured quota).
428 func (wp *Pool) AtQuota() bool {
430 defer wp.mtx.Unlock()
431 return wp.atQuotaUntilFewerInstances > 0 ||
432 time.Now().Before(wp.atQuotaUntil) ||
433 (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating))
436 // SetIdleBehavior determines how the indicated instance will behave
437 // when it has no containers running.
438 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
440 defer wp.mtx.Unlock()
441 wkr, ok := wp.workers[id]
443 return errors.New("requested instance does not exist")
445 wkr.setIdleBehavior(idleBehavior)
449 // Successful connection to the SSH daemon, update the mTimeToSSH metric
450 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
452 defer wp.mtx.Unlock()
453 wkr, ok := wp.workers[inst.ID()]
455 // race: inst was removed from the pool
458 if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
459 // the node is not in booting state (can happen if
460 // a-d-c is restarted) OR this is not the first SSH
465 wkr.firstSSHConnection = time.Now()
466 if wp.mTimeToSSH != nil {
467 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
471 // Add or update worker attached to the given instance.
473 // The second return value is true if a new worker is created.
475 // A newly added instance has state=StateBooting if its tags match an
476 // entry in wp.creating, otherwise StateUnknown.
478 // Caller must have lock.
479 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
480 secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
481 inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
483 if wkr := wp.workers[id]; wkr != nil {
484 wkr.executor.SetTarget(inst)
486 wkr.updated = time.Now()
491 state := StateUnknown
492 if _, ok := wp.creating[secret]; ok {
496 // If an instance has a valid IdleBehavior tag when it first
497 // appears, initialize the new worker accordingly (this is how
498 // we restore IdleBehavior that was set by a prior dispatch
499 // process); otherwise, default to "run". After this,
500 // wkr.idleBehavior is the source of truth, and will only be
501 // changed via SetIdleBehavior().
502 idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
503 if !validIdleBehavior[idleBehavior] {
504 idleBehavior = IdleBehaviorRun
507 logger := wp.logger.WithFields(logrus.Fields{
508 "InstanceType": it.Name,
509 "Instance": inst.ID(),
510 "Address": inst.Address(),
512 logger.WithFields(logrus.Fields{
514 "IdleBehavior": idleBehavior,
515 }).Infof("instance appeared in cloud")
521 executor: wp.newExecutor(inst),
523 idleBehavior: idleBehavior,
530 running: make(map[string]*remoteRunner),
531 starting: make(map[string]*remoteRunner),
532 probing: make(chan struct{}, 1),
538 // Shutdown shuts down a worker with the given type, or returns false
539 // if all workers with the given type are busy.
540 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
541 wp.setupOnce.Do(wp.setup)
543 defer wp.mtx.Unlock()
544 logger := wp.logger.WithField("InstanceType", it.Name)
545 logger.Info("shutdown requested")
546 for _, tryState := range []State{StateBooting, StateIdle} {
547 // TODO: shutdown the worker with the longest idle
548 // time (Idle) or the earliest create time (Booting)
549 for _, wkr := range wp.workers {
550 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
551 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
552 wkr.reportBootOutcome(BootOutcomeAborted)
561 // CountWorkers returns the current number of workers in each state.
563 // CountWorkers blocks, if necessary, until the initial instance list
564 // has been loaded from the cloud provider.
565 func (wp *Pool) CountWorkers() map[State]int {
566 wp.setupOnce.Do(wp.setup)
569 defer wp.mtx.Unlock()
571 for _, w := range wp.workers {
577 // Running returns the container UUIDs being prepared/run on workers.
579 // In the returned map, the time value indicates when the Pool
580 // observed that the container process had exited. A container that
581 // has not yet exited has a zero time value. The caller should use
582 // ForgetContainer() to garbage-collect the entries for exited
584 func (wp *Pool) Running() map[string]time.Time {
585 wp.setupOnce.Do(wp.setup)
587 defer wp.mtx.Unlock()
588 r := map[string]time.Time{}
589 for _, wkr := range wp.workers {
590 for uuid := range wkr.running {
591 r[uuid] = time.Time{}
593 for uuid := range wkr.starting {
594 r[uuid] = time.Time{}
597 for uuid, exited := range wp.exited {
603 // StartContainer starts a container on an idle worker immediately if
604 // possible, otherwise returns false.
605 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
606 wp.setupOnce.Do(wp.setup)
608 defer wp.mtx.Unlock()
610 for _, w := range wp.workers {
611 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
612 if wkr == nil || w.busy.After(wkr.busy) {
620 wkr.startContainer(ctr)
624 // KillContainer kills the crunch-run process for the given container
625 // UUID, if it's running on any worker.
627 // KillContainer returns immediately; the act of killing the container
628 // takes some time, and runs in the background.
630 // KillContainer returns false if the container has already ended.
631 func (wp *Pool) KillContainer(uuid string, reason string) bool {
633 defer wp.mtx.Unlock()
634 logger := wp.logger.WithFields(logrus.Fields{
635 "ContainerUUID": uuid,
638 for _, wkr := range wp.workers {
639 rr := wkr.running[uuid]
641 rr = wkr.starting[uuid]
648 logger.Debug("cannot kill: already disappeared")
652 // ForgetContainer clears the placeholder for the given exited
653 // container, so it isn't returned by subsequent calls to Running().
655 // ForgetContainer has no effect if the container has not yet exited.
657 // The "container exited at time T" placeholder (which necessitates
658 // ForgetContainer) exists to make it easier for the caller
659 // (scheduler) to distinguish a container that exited without
660 // finalizing its state from a container that exited too recently for
661 // its final state to have appeared in the scheduler's queue cache.
662 func (wp *Pool) ForgetContainer(uuid string) {
664 defer wp.mtx.Unlock()
665 if _, ok := wp.exited[uuid]; ok {
666 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
667 delete(wp.exited, uuid)
671 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
673 reg = prometheus.NewRegistry()
675 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
676 Namespace: "arvados",
677 Subsystem: "dispatchcloud",
678 Name: "containers_running",
679 Help: "Number of containers reported running by cloud VMs.",
681 reg.MustRegister(wp.mContainersRunning)
682 wp.mProbeAgeMax = prometheus.NewGauge(prometheus.GaugeOpts{
683 Namespace: "arvados",
684 Subsystem: "dispatchcloud",
685 Name: "probe_age_seconds_max",
686 Help: "Maximum number of seconds since an instance's most recent successful probe.",
688 reg.MustRegister(wp.mProbeAgeMax)
689 wp.mProbeAgeMedian = prometheus.NewGauge(prometheus.GaugeOpts{
690 Namespace: "arvados",
691 Subsystem: "dispatchcloud",
692 Name: "probe_age_seconds_median",
693 Help: "Median number of seconds since an instance's most recent successful probe.",
695 reg.MustRegister(wp.mProbeAgeMedian)
696 wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
697 Namespace: "arvados",
698 Subsystem: "dispatchcloud",
699 Name: "instances_total",
700 Help: "Number of cloud VMs.",
701 }, []string{"category", "instance_type"})
702 reg.MustRegister(wp.mInstances)
703 wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
704 Namespace: "arvados",
705 Subsystem: "dispatchcloud",
706 Name: "instances_price",
707 Help: "Price of cloud VMs.",
708 }, []string{"category"})
709 reg.MustRegister(wp.mInstancesPrice)
710 wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
711 Namespace: "arvados",
712 Subsystem: "dispatchcloud",
714 Help: "Total VCPUs on all cloud VMs.",
715 }, []string{"category"})
716 reg.MustRegister(wp.mVCPUs)
717 wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
718 Namespace: "arvados",
719 Subsystem: "dispatchcloud",
720 Name: "memory_bytes_total",
721 Help: "Total memory on all cloud VMs.",
722 }, []string{"category"})
723 reg.MustRegister(wp.mMemory)
724 wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
725 Namespace: "arvados",
726 Subsystem: "dispatchcloud",
727 Name: "boot_outcomes",
728 Help: "Boot outcomes by type.",
729 }, []string{"outcome"})
730 for k := range validBootOutcomes {
731 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
733 reg.MustRegister(wp.mBootOutcomes)
734 wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
735 Namespace: "arvados",
736 Subsystem: "dispatchcloud",
737 Name: "instances_disappeared",
738 Help: "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
739 }, []string{"state"})
740 for _, v := range stateString {
741 wp.mDisappearances.WithLabelValues(v).Add(0)
743 reg.MustRegister(wp.mDisappearances)
744 wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
745 Namespace: "arvados",
746 Subsystem: "dispatchcloud",
747 Name: "instances_time_to_ssh_seconds",
748 Help: "Number of seconds between instance creation and the first successful SSH connection.",
749 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
751 reg.MustRegister(wp.mTimeToSSH)
752 wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
753 Namespace: "arvados",
754 Subsystem: "dispatchcloud",
755 Name: "instances_time_to_ready_for_container_seconds",
756 Help: "Number of seconds between the first successful SSH connection and ready to run a container.",
757 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
759 reg.MustRegister(wp.mTimeToReadyForContainer)
760 wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
761 Namespace: "arvados",
762 Subsystem: "dispatchcloud",
763 Name: "instances_time_from_shutdown_request_to_disappearance_seconds",
764 Help: "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
765 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
767 reg.MustRegister(wp.mTimeFromShutdownToGone)
768 wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
769 Namespace: "arvados",
770 Subsystem: "dispatchcloud",
771 Name: "containers_time_from_queue_to_crunch_run_seconds",
772 Help: "Number of seconds between the queuing of a container and the start of crunch-run.",
773 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
775 reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
776 wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
777 Namespace: "arvados",
778 Subsystem: "dispatchcloud",
779 Name: "instances_run_probe_duration_seconds",
780 Help: "Number of seconds per runProbe call.",
781 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
782 }, []string{"outcome"})
783 reg.MustRegister(wp.mRunProbeDuration)
786 func (wp *Pool) runMetrics() {
788 defer wp.Unsubscribe(ch)
795 func (wp *Pool) updateMetrics() {
797 defer wp.mtx.RUnlock()
803 instances := map[entKey]int64{}
804 price := map[string]float64{}
805 cpu := map[string]int64{}
806 mem := map[string]int64{}
809 var probed []time.Time
810 for _, wkr := range wp.workers {
813 case len(wkr.running)+len(wkr.starting) > 0:
815 case wkr.idleBehavior == IdleBehaviorHold:
817 case wkr.state == StateBooting:
819 case wkr.state == StateUnknown:
824 instances[entKey{cat, wkr.instType.Name}]++
825 price[cat] += wkr.instType.Price
826 cpu[cat] += int64(wkr.instType.VCPUs)
827 mem[cat] += int64(wkr.instType.RAM)
828 running += int64(len(wkr.running) + len(wkr.starting))
829 probed = append(probed, wkr.probed)
831 for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
832 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
833 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
834 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
835 // make sure to reset gauges for non-existing category/nodetype combinations
836 for _, it := range wp.instanceTypes {
837 if _, ok := instances[entKey{cat, it.Name}]; !ok {
838 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
842 for k, v := range instances {
843 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
845 wp.mContainersRunning.Set(float64(running))
847 if len(probed) == 0 {
848 wp.mProbeAgeMax.Set(0)
849 wp.mProbeAgeMedian.Set(0)
851 sort.Slice(probed, func(i, j int) bool { return probed[i].Before(probed[j]) })
852 wp.mProbeAgeMax.Set(now.Sub(probed[0]).Seconds())
853 wp.mProbeAgeMedian.Set(now.Sub(probed[len(probed)/2]).Seconds())
857 func (wp *Pool) runProbes() {
858 maxPPS := wp.maxProbesPerSecond
860 maxPPS = defaultMaxProbesPerSecond
862 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
863 defer limitticker.Stop()
865 probeticker := time.NewTicker(wp.probeInterval)
866 defer probeticker.Stop()
868 workers := []cloud.InstanceID{}
869 for range probeticker.C {
870 // Add some jitter. Without this, if probeInterval is
871 // a multiple of syncInterval and sync is
872 // instantaneous (as with the loopback driver), the
873 // first few probes race with sync operations and
874 // don't update the workers.
875 time.Sleep(time.Duration(mathrand.Int63n(int64(wp.probeInterval) / 23)))
877 workers = workers[:0]
879 for id, wkr := range wp.workers {
880 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
883 workers = append(workers, id)
887 for _, id := range workers {
889 wkr, ok := wp.workers[id]
892 // Deleted while we were probing
896 go wkr.ProbeAndUpdate()
900 case <-limitticker.C:
906 func (wp *Pool) runSync() {
907 // sync once immediately, then wait syncInterval, sync again,
909 timer := time.NewTimer(1)
913 err := wp.getInstancesAndSync()
915 wp.logger.WithError(err).Warn("sync failed")
917 timer.Reset(wp.syncInterval)
919 wp.logger.Debug("worker.Pool stopped")
925 // Stop synchronizing with the InstanceSet.
926 func (wp *Pool) Stop() {
927 wp.setupOnce.Do(wp.setup)
931 // Instances returns an InstanceView for each worker in the pool,
932 // summarizing its current state and recent activity.
933 func (wp *Pool) Instances() []InstanceView {
935 wp.setupOnce.Do(wp.setup)
937 for _, w := range wp.workers {
938 r = append(r, InstanceView{
939 Instance: w.instance.ID(),
940 Address: w.instance.Address(),
941 Price: w.instType.Price,
942 ArvadosInstanceType: w.instType.Name,
943 ProviderInstanceType: w.instType.ProviderType,
944 LastContainerUUID: w.lastUUID,
946 WorkerState: w.state.String(),
947 IdleBehavior: w.idleBehavior,
951 sort.Slice(r, func(i, j int) bool {
952 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
957 // KillInstance destroys a cloud VM instance. It returns an error if
958 // the given instance does not exist.
959 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
960 wp.setupOnce.Do(wp.setup)
962 defer wp.mtx.Unlock()
963 wkr, ok := wp.workers[id]
965 return errors.New("instance not found")
967 wkr.logger.WithField("Reason", reason).Info("shutting down")
968 wkr.reportBootOutcome(BootOutcomeAborted)
973 func (wp *Pool) setup() {
974 wp.creating = map[string]createCall{}
975 wp.exited = map[string]time.Time{}
976 wp.workers = map[cloud.InstanceID]*worker{}
977 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
981 // Load the runner program to be deployed on worker nodes into
982 // wp.runnerData, if necessary. Errors are logged.
984 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
986 // Caller must not have lock.
987 func (wp *Pool) loadRunnerData() error {
989 defer wp.mtx.Unlock()
990 if wp.runnerData != nil {
992 } else if wp.runnerSource == "" {
993 wp.runnerCmd = wp.runnerCmdDefault
994 wp.runnerData = []byte{}
997 logger := wp.logger.WithField("source", wp.runnerSource)
998 logger.Debug("loading runner")
999 buf, err := ioutil.ReadFile(wp.runnerSource)
1001 logger.WithError(err).Error("failed to load runner program")
1005 wp.runnerMD5 = md5.Sum(buf)
1006 wp.runnerCmd = fmt.Sprintf("/tmp/arvados-crunch-run/crunch-run~%x", wp.runnerMD5)
1010 func (wp *Pool) notify() {
1012 defer wp.mtx.RUnlock()
1013 for _, send := range wp.subscribers {
1015 case send <- struct{}{}:
1021 func (wp *Pool) getInstancesAndSync() error {
1022 wp.setupOnce.Do(wp.setup)
1023 if err := wp.instanceSet.throttleInstances.Error(); err != nil {
1026 wp.logger.Debug("getting instance list")
1027 threshold := time.Now()
1028 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
1030 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
1033 wp.sync(threshold, instances)
1034 wp.logger.Debug("sync done")
1038 // Add/remove/update workers based on instances, which was obtained
1039 // from the instanceSet. However, don't clobber any other updates that
1040 // already happened after threshold.
1041 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
1043 defer wp.mtx.Unlock()
1044 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
1047 for _, inst := range instances {
1048 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
1049 it, ok := wp.instanceTypes[itTag]
1051 wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
1054 if wkr, isNew := wp.updateWorker(inst, it); isNew {
1056 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
1057 wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
1062 for id, wkr := range wp.workers {
1063 if wkr.updated.After(threshold) {
1066 logger := wp.logger.WithFields(logrus.Fields{
1067 "Instance": wkr.instance.ID(),
1068 "WorkerState": wkr.state,
1070 logger.Info("instance disappeared in cloud")
1071 wkr.reportBootOutcome(BootOutcomeDisappeared)
1072 if wp.mDisappearances != nil {
1073 wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
1075 // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
1076 if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
1077 wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
1079 delete(wp.workers, id)
1084 if wp.atQuotaUntilFewerInstances > len(wp.workers)+len(wp.creating) {
1085 // After syncing, there are fewer instances (including
1086 // pending creates) than there were last time we saw a
1087 // quota error. This might mean it's now possible to
1088 // create new instances. Reset our "at quota" state.
1089 wp.atQuotaUntilFewerInstances = 0
1095 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
1103 func (wp *Pool) waitUntilLoaded() {
1104 ch := wp.Subscribe()
1106 defer wp.mtx.RUnlock()
1114 func (wp *Pool) gatewayAuthSecret(uuid string) string {
1115 h := hmac.New(sha256.New, []byte(wp.systemRootToken))
1117 return fmt.Sprintf("%x", h.Sum(nil))
1120 // Return a random string of n hexadecimal digits (n*4 random bits). n
1122 func randomHex(n int) string {
1123 buf := make([]byte, n/2)
1124 _, err := rand.Read(buf)
1128 return fmt.Sprintf("%x", buf)