1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
22 "git.arvados.org/arvados.git/lib/cloud"
23 "git.arvados.org/arvados.git/sdk/go/arvados"
24 "github.com/prometheus/client_golang/prometheus"
25 "github.com/sirupsen/logrus"
26 "golang.org/x/crypto/ssh"
30 tagKeyInstanceType = "InstanceType"
31 tagKeyIdleBehavior = "IdleBehavior"
32 tagKeyInstanceSecret = "InstanceSecret"
33 tagKeyInstanceSetID = "InstanceSetID"
36 // An InstanceView shows a worker's current state and recent activity.
37 type InstanceView struct {
38 Instance cloud.InstanceID `json:"instance"`
39 Address string `json:"address"`
40 Price float64 `json:"price"`
41 ArvadosInstanceType string `json:"arvados_instance_type"`
42 ProviderInstanceType string `json:"provider_instance_type"`
43 LastContainerUUID string `json:"last_container_uuid"`
44 LastBusy time.Time `json:"last_busy"`
45 WorkerState string `json:"worker_state"`
46 IdleBehavior IdleBehavior `json:"idle_behavior"`
49 // An Executor executes shell commands on a remote host.
50 type Executor interface {
51 // Run cmd on the current target.
52 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
54 // Use the given target for subsequent operations. The new
55 // target is the same host as the previous target, but it
56 // might return a different address and verify a different
59 // SetTarget is called frequently, and in most cases the new
60 // target will behave exactly the same as the old one. An
61 // implementation should optimize accordingly.
63 // SetTarget must not block on concurrent Execute calls.
64 SetTarget(cloud.ExecutorTarget)
70 defaultSyncInterval = time.Minute
71 defaultProbeInterval = time.Second * 10
72 defaultMaxProbesPerSecond = 10
73 defaultTimeoutIdle = time.Minute
74 defaultTimeoutBooting = time.Minute * 10
75 defaultTimeoutProbe = time.Minute * 10
76 defaultTimeoutShutdown = time.Second * 10
77 defaultTimeoutTERM = time.Minute * 2
78 defaultTimeoutSignal = time.Second * 5
79 defaultTimeoutStaleRunLock = time.Second * 5
81 // Time after a quota error to try again anyway, even if no
82 // instances have been shutdown.
83 quotaErrorTTL = time.Minute
85 // Time after a capacity error to try again
86 capacityErrorTTL = time.Minute
88 // Time between "X failed because rate limiting" messages
89 logRateLimitErrorInterval = time.Second * 10
92 func duration(conf arvados.Duration, def time.Duration) time.Duration {
94 return time.Duration(conf)
99 // NewPool creates a Pool of workers backed by instanceSet.
101 // New instances are configured and set up according to the given
102 // cluster configuration.
103 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
106 arvClient: arvClient,
107 instanceSetID: instanceSetID,
108 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
109 newExecutor: newExecutor,
111 bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
112 instanceInitCommand: cloud.InitCommand(cluster.Containers.CloudVMs.InstanceInitCommand),
113 runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
114 imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
115 instanceTypes: cluster.InstanceTypes,
116 maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
117 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
118 maxInstances: cluster.Containers.CloudVMs.MaxInstances,
119 probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
120 syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
121 timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
122 timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
123 timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
124 timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
125 timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
126 timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
127 timeoutStaleRunLock: duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
128 systemRootToken: cluster.SystemRootToken,
129 installPublicKey: installPublicKey,
130 tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
131 runnerCmdDefault: cluster.Containers.CrunchRunCommand,
132 runnerArgs: append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...),
133 stop: make(chan bool),
135 wp.registerMetrics(reg)
137 wp.setupOnce.Do(wp.setup)
145 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
146 // zero Pool should not be used. Call NewPool to create a new Pool.
149 logger logrus.FieldLogger
150 arvClient *arvados.Client
151 instanceSetID cloud.InstanceSetID
152 instanceSet *throttledInstanceSet
153 newExecutor func(cloud.Instance) Executor
154 cluster *arvados.Cluster
155 bootProbeCommand string
156 instanceInitCommand cloud.InitCommand
158 imageID cloud.ImageID
159 instanceTypes map[string]arvados.InstanceType
160 syncInterval time.Duration
161 probeInterval time.Duration
162 maxProbesPerSecond int
163 maxConcurrentInstanceCreateOps int
165 timeoutIdle time.Duration
166 timeoutBooting time.Duration
167 timeoutProbe time.Duration
168 timeoutShutdown time.Duration
169 timeoutTERM time.Duration
170 timeoutSignal time.Duration
171 timeoutStaleRunLock time.Duration
172 systemRootToken string
173 installPublicKey ssh.PublicKey
175 runnerCmdDefault string // crunch-run command to use if not deploying a binary
176 runnerArgs []string // extra args passed to crunch-run
179 subscribers map[<-chan struct{}]chan<- struct{}
180 creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
181 workers map[cloud.InstanceID]*worker
182 loaded bool // loaded list of instances from InstanceSet at least once
183 exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
184 atQuotaUntilFewerInstances int
185 atQuotaUntil time.Time
186 atQuotaErr cloud.QuotaError
187 atCapacityUntil map[string]time.Time
192 runnerMD5 [md5.Size]byte
195 mContainersRunning prometheus.Gauge
196 mInstances *prometheus.GaugeVec
197 mInstancesPrice *prometheus.GaugeVec
198 mVCPUs *prometheus.GaugeVec
199 mMemory *prometheus.GaugeVec
200 mBootOutcomes *prometheus.CounterVec
201 mDisappearances *prometheus.CounterVec
202 mTimeToSSH prometheus.Summary
203 mTimeToReadyForContainer prometheus.Summary
204 mTimeFromShutdownToGone prometheus.Summary
205 mTimeFromQueueToCrunchRun prometheus.Summary
206 mRunProbeDuration *prometheus.SummaryVec
207 mProbeAgeMax prometheus.Gauge
208 mProbeAgeMedian prometheus.Gauge
211 type createCall struct {
213 instanceType arvados.InstanceType
216 func (wp *Pool) CheckHealth() error {
217 wp.setupOnce.Do(wp.setup)
218 if err := wp.loadRunnerData(); err != nil {
219 return fmt.Errorf("error loading runner binary: %s", err)
224 // Subscribe returns a buffered channel that becomes ready after any
225 // change to the pool's state that could have scheduling implications:
226 // a worker's state changes, a new worker appears, the cloud
227 // provider's API rate limiting period ends, etc.
229 // Additional events that occur while the channel is already ready
230 // will be dropped, so it is OK if the caller services the channel
235 // ch := wp.Subscribe()
236 // defer wp.Unsubscribe(ch)
243 func (wp *Pool) Subscribe() <-chan struct{} {
244 wp.setupOnce.Do(wp.setup)
246 defer wp.mtx.Unlock()
247 ch := make(chan struct{}, 1)
248 wp.subscribers[ch] = ch
252 // Unsubscribe stops sending updates to the given channel.
253 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
254 wp.setupOnce.Do(wp.setup)
256 defer wp.mtx.Unlock()
257 delete(wp.subscribers, ch)
260 // Unallocated returns the number of unallocated (creating + booting +
261 // idle + unknown) workers for each instance type. Workers in
262 // hold/drain mode are not included.
263 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
264 wp.setupOnce.Do(wp.setup)
266 defer wp.mtx.RUnlock()
267 unalloc := map[arvados.InstanceType]int{}
268 creating := map[arvados.InstanceType]int{}
269 oldestCreate := map[arvados.InstanceType]time.Time{}
270 for _, cc := range wp.creating {
271 it := cc.instanceType
273 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
274 oldestCreate[it] = cc.time
277 for _, wkr := range wp.workers {
278 // Skip workers that are not expected to become
279 // available soon. Note len(wkr.running)>0 is not
280 // redundant here: it can be true even in
282 if wkr.state == StateShutdown ||
283 wkr.state == StateRunning ||
284 wkr.idleBehavior != IdleBehaviorRun ||
285 len(wkr.running) > 0 {
290 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
291 // If up to N new workers appear in
292 // Instances() while we are waiting for N
293 // Create() calls to complete, we assume we're
294 // just seeing a race between Instances() and
295 // Create() responses.
297 // The other common reason why nodes have
298 // state==Unknown is that they appeared at
299 // startup, before any Create calls. They
300 // don't match the above timing condition, so
301 // we never mistakenly attribute them to
302 // pending Create calls.
306 for it, c := range creating {
312 // Create a new instance with the given type, and add it to the worker
313 // pool. The worker is added immediately; instance creation runs in
316 // Create returns false if a pre-existing error or a configuration
317 // setting prevents it from even attempting to create a new
318 // instance. Those errors are logged by the Pool, so the caller does
319 // not need to log anything in such cases.
320 func (wp *Pool) Create(it arvados.InstanceType) bool {
321 logger := wp.logger.WithField("InstanceType", it.Name)
322 wp.setupOnce.Do(wp.setup)
323 if wp.loadRunnerData() != nil {
324 // Boot probe is certain to fail.
327 if wp.AtCapacity(it) || wp.AtQuota() || wp.instanceSet.throttleCreate.Error() != nil {
331 defer wp.mtx.Unlock()
332 // The maxConcurrentInstanceCreateOps knob throttles the number of node create
333 // requests in flight. It was added to work around a limitation in Azure's
334 // managed disks, which support no more than 20 concurrent node creation
335 // requests from a single disk image (cf.
336 // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
337 // The code assumes that node creation, from Azure's perspective, means the
338 // period until the instance appears in the "get all instances" list.
339 if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
340 logger.Info("reached MaxConcurrentInstanceCreateOps")
341 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
345 secret := randomHex(instanceSecretLength)
346 wp.creating[secret] = createCall{time: now, instanceType: it}
349 tags := cloud.InstanceTags{
350 wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID),
351 wp.tagKeyPrefix + tagKeyInstanceType: it.Name,
352 wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
353 wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
355 initCmd := TagVerifier{nil, secret, nil}.InitCommand() + "\n" + wp.instanceInitCommand
356 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
358 defer wp.mtx.Unlock()
359 // delete() is deferred so the updateWorker() call
360 // below knows to use StateBooting when adding a new
362 defer delete(wp.creating, secret)
364 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
366 n := len(wp.workers) + len(wp.creating) - 1
368 // Quota error with no
369 // instances running --
370 // nothing to do but wait
371 wp.atQuotaUntilFewerInstances = 0
372 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
373 time.AfterFunc(quotaErrorTTL, wp.notify)
374 logger.WithField("atQuotaUntil", wp.atQuotaUntil).Info("quota error with 0 running -- waiting for quotaErrorTTL")
375 } else if n < wp.atQuotaUntilFewerInstances || wp.atQuotaUntilFewerInstances == 0 {
376 // Quota error with N
377 // instances running -- report
378 // AtQuota until some
379 // instances shut down
380 wp.atQuotaUntilFewerInstances = n
381 wp.atQuotaUntil = time.Time{}
382 logger.WithField("atQuotaUntilFewerInstances", n).Info("quota error -- waiting for next instance shutdown")
385 if err, ok := err.(cloud.CapacityError); ok && err.IsCapacityError() {
386 capKey := it.ProviderType
387 if !err.IsInstanceTypeSpecific() {
388 // set capacity flag for all
392 if wp.atCapacityUntil == nil {
393 wp.atCapacityUntil = map[string]time.Time{}
395 wp.atCapacityUntil[capKey] = time.Now().Add(capacityErrorTTL)
396 time.AfterFunc(capacityErrorTTL, wp.notify)
398 logger.WithError(err).Error("create failed")
399 wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
402 wp.updateWorker(inst, it)
404 if len(wp.creating)+len(wp.workers) == wp.maxInstances {
405 logger.Infof("now at MaxInstances limit of %d instances", wp.maxInstances)
410 // AtCapacity returns true if Create() is currently expected to fail
411 // for the given instance type.
412 func (wp *Pool) AtCapacity(it arvados.InstanceType) bool {
414 defer wp.mtx.Unlock()
415 if t, ok := wp.atCapacityUntil[it.ProviderType]; ok && time.Now().Before(t) {
416 // at capacity for this instance type
419 if t, ok := wp.atCapacityUntil[""]; ok && time.Now().Before(t) {
420 // at capacity for all instance types
426 // AtQuota returns true if Create is not expected to work at the
427 // moment (e.g., cloud provider has reported quota errors, or we are
428 // already at our own configured quota).
429 func (wp *Pool) AtQuota() bool {
431 defer wp.mtx.Unlock()
432 return wp.atQuotaUntilFewerInstances > 0 ||
433 time.Now().Before(wp.atQuotaUntil) ||
434 (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating))
437 // SetIdleBehavior determines how the indicated instance will behave
438 // when it has no containers running.
439 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
441 defer wp.mtx.Unlock()
442 wkr, ok := wp.workers[id]
444 return errors.New("requested instance does not exist")
446 wkr.setIdleBehavior(idleBehavior)
450 // Successful connection to the SSH daemon, update the mTimeToSSH metric
451 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
453 defer wp.mtx.Unlock()
454 wkr, ok := wp.workers[inst.ID()]
456 // race: inst was removed from the pool
459 if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
460 // the node is not in booting state (can happen if
461 // a-d-c is restarted) OR this is not the first SSH
466 wkr.firstSSHConnection = time.Now()
467 if wp.mTimeToSSH != nil {
468 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
472 // Add or update worker attached to the given instance.
474 // The second return value is true if a new worker is created.
476 // A newly added instance has state=StateBooting if its tags match an
477 // entry in wp.creating, otherwise StateUnknown.
479 // Caller must have lock.
480 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
481 secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
482 inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
484 if wkr := wp.workers[id]; wkr != nil {
485 wkr.executor.SetTarget(inst)
487 wkr.updated = time.Now()
492 state := StateUnknown
493 if _, ok := wp.creating[secret]; ok {
497 // If an instance has a valid IdleBehavior tag when it first
498 // appears, initialize the new worker accordingly (this is how
499 // we restore IdleBehavior that was set by a prior dispatch
500 // process); otherwise, default to "run". After this,
501 // wkr.idleBehavior is the source of truth, and will only be
502 // changed via SetIdleBehavior().
503 idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
504 if !validIdleBehavior[idleBehavior] {
505 idleBehavior = IdleBehaviorRun
508 logger := wp.logger.WithFields(logrus.Fields{
509 "InstanceType": it.Name,
510 "Instance": inst.ID(),
511 "Address": inst.Address(),
513 logger.WithFields(logrus.Fields{
515 "IdleBehavior": idleBehavior,
516 }).Infof("instance appeared in cloud")
522 executor: wp.newExecutor(inst),
524 idleBehavior: idleBehavior,
531 running: make(map[string]*remoteRunner),
532 starting: make(map[string]*remoteRunner),
533 probing: make(chan struct{}, 1),
539 // Shutdown shuts down a worker with the given type, or returns false
540 // if all workers with the given type are busy.
541 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
542 wp.setupOnce.Do(wp.setup)
544 defer wp.mtx.Unlock()
545 logger := wp.logger.WithField("InstanceType", it.Name)
546 logger.Info("shutdown requested")
547 for _, tryState := range []State{StateBooting, StateIdle} {
548 // TODO: shutdown the worker with the longest idle
549 // time (Idle) or the earliest create time (Booting)
550 for _, wkr := range wp.workers {
551 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
552 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
553 wkr.reportBootOutcome(BootOutcomeAborted)
562 // CountWorkers returns the current number of workers in each state.
564 // CountWorkers blocks, if necessary, until the initial instance list
565 // has been loaded from the cloud provider.
566 func (wp *Pool) CountWorkers() map[State]int {
567 wp.setupOnce.Do(wp.setup)
570 defer wp.mtx.Unlock()
572 for _, w := range wp.workers {
578 // Running returns the container UUIDs being prepared/run on workers.
580 // In the returned map, the time value indicates when the Pool
581 // observed that the container process had exited. A container that
582 // has not yet exited has a zero time value. The caller should use
583 // ForgetContainer() to garbage-collect the entries for exited
585 func (wp *Pool) Running() map[string]time.Time {
586 wp.setupOnce.Do(wp.setup)
588 defer wp.mtx.Unlock()
589 r := map[string]time.Time{}
590 for _, wkr := range wp.workers {
591 for uuid := range wkr.running {
592 r[uuid] = time.Time{}
594 for uuid := range wkr.starting {
595 r[uuid] = time.Time{}
598 for uuid, exited := range wp.exited {
604 // StartContainer starts a container on an idle worker immediately if
605 // possible, otherwise returns false.
606 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
607 wp.setupOnce.Do(wp.setup)
609 defer wp.mtx.Unlock()
611 for _, w := range wp.workers {
612 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
613 if wkr == nil || w.busy.After(wkr.busy) {
621 wkr.startContainer(ctr)
625 // KillContainer kills the crunch-run process for the given container
626 // UUID, if it's running on any worker.
628 // KillContainer returns immediately; the act of killing the container
629 // takes some time, and runs in the background.
631 // KillContainer returns false if the container has already ended.
632 func (wp *Pool) KillContainer(uuid string, reason string) bool {
634 defer wp.mtx.Unlock()
635 logger := wp.logger.WithFields(logrus.Fields{
636 "ContainerUUID": uuid,
639 for _, wkr := range wp.workers {
640 rr := wkr.running[uuid]
642 rr = wkr.starting[uuid]
649 logger.Debug("cannot kill: already disappeared")
653 // ForgetContainer clears the placeholder for the given exited
654 // container, so it isn't returned by subsequent calls to Running().
656 // ForgetContainer has no effect if the container has not yet exited.
658 // The "container exited at time T" placeholder (which necessitates
659 // ForgetContainer) exists to make it easier for the caller
660 // (scheduler) to distinguish a container that exited without
661 // finalizing its state from a container that exited too recently for
662 // its final state to have appeared in the scheduler's queue cache.
663 func (wp *Pool) ForgetContainer(uuid string) {
665 defer wp.mtx.Unlock()
666 if _, ok := wp.exited[uuid]; ok {
667 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
668 delete(wp.exited, uuid)
672 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
674 reg = prometheus.NewRegistry()
676 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
677 Namespace: "arvados",
678 Subsystem: "dispatchcloud",
679 Name: "containers_running",
680 Help: "Number of containers reported running by cloud VMs.",
682 reg.MustRegister(wp.mContainersRunning)
683 wp.mProbeAgeMax = prometheus.NewGauge(prometheus.GaugeOpts{
684 Namespace: "arvados",
685 Subsystem: "dispatchcloud",
686 Name: "probe_age_seconds_max",
687 Help: "Maximum number of seconds since an instance's most recent successful probe.",
689 reg.MustRegister(wp.mProbeAgeMax)
690 wp.mProbeAgeMedian = prometheus.NewGauge(prometheus.GaugeOpts{
691 Namespace: "arvados",
692 Subsystem: "dispatchcloud",
693 Name: "probe_age_seconds_median",
694 Help: "Median number of seconds since an instance's most recent successful probe.",
696 reg.MustRegister(wp.mProbeAgeMedian)
697 wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
698 Namespace: "arvados",
699 Subsystem: "dispatchcloud",
700 Name: "instances_total",
701 Help: "Number of cloud VMs.",
702 }, []string{"category", "instance_type"})
703 reg.MustRegister(wp.mInstances)
704 wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
705 Namespace: "arvados",
706 Subsystem: "dispatchcloud",
707 Name: "instances_price",
708 Help: "Price of cloud VMs.",
709 }, []string{"category"})
710 reg.MustRegister(wp.mInstancesPrice)
711 wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
712 Namespace: "arvados",
713 Subsystem: "dispatchcloud",
715 Help: "Total VCPUs on all cloud VMs.",
716 }, []string{"category"})
717 reg.MustRegister(wp.mVCPUs)
718 wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
719 Namespace: "arvados",
720 Subsystem: "dispatchcloud",
721 Name: "memory_bytes_total",
722 Help: "Total memory on all cloud VMs.",
723 }, []string{"category"})
724 reg.MustRegister(wp.mMemory)
725 wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
726 Namespace: "arvados",
727 Subsystem: "dispatchcloud",
728 Name: "boot_outcomes",
729 Help: "Boot outcomes by type.",
730 }, []string{"outcome"})
731 for k := range validBootOutcomes {
732 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
734 reg.MustRegister(wp.mBootOutcomes)
735 wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
736 Namespace: "arvados",
737 Subsystem: "dispatchcloud",
738 Name: "instances_disappeared",
739 Help: "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
740 }, []string{"state"})
741 for _, v := range stateString {
742 wp.mDisappearances.WithLabelValues(v).Add(0)
744 reg.MustRegister(wp.mDisappearances)
745 wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
746 Namespace: "arvados",
747 Subsystem: "dispatchcloud",
748 Name: "instances_time_to_ssh_seconds",
749 Help: "Number of seconds between instance creation and the first successful SSH connection.",
750 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
752 reg.MustRegister(wp.mTimeToSSH)
753 wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
754 Namespace: "arvados",
755 Subsystem: "dispatchcloud",
756 Name: "instances_time_to_ready_for_container_seconds",
757 Help: "Number of seconds between the first successful SSH connection and ready to run a container.",
758 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
760 reg.MustRegister(wp.mTimeToReadyForContainer)
761 wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
762 Namespace: "arvados",
763 Subsystem: "dispatchcloud",
764 Name: "instances_time_from_shutdown_request_to_disappearance_seconds",
765 Help: "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
766 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
768 reg.MustRegister(wp.mTimeFromShutdownToGone)
769 wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
770 Namespace: "arvados",
771 Subsystem: "dispatchcloud",
772 Name: "containers_time_from_queue_to_crunch_run_seconds",
773 Help: "Number of seconds between the queuing of a container and the start of crunch-run.",
774 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
776 reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
777 wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
778 Namespace: "arvados",
779 Subsystem: "dispatchcloud",
780 Name: "instances_run_probe_duration_seconds",
781 Help: "Number of seconds per runProbe call.",
782 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
783 }, []string{"outcome"})
784 reg.MustRegister(wp.mRunProbeDuration)
787 func (wp *Pool) runMetrics() {
789 defer wp.Unsubscribe(ch)
796 func (wp *Pool) updateMetrics() {
798 defer wp.mtx.RUnlock()
804 instances := map[entKey]int64{}
805 price := map[string]float64{}
806 cpu := map[string]int64{}
807 mem := map[string]int64{}
810 var probed []time.Time
811 for _, wkr := range wp.workers {
814 case len(wkr.running)+len(wkr.starting) > 0:
816 case wkr.idleBehavior == IdleBehaviorHold:
818 case wkr.state == StateBooting:
820 case wkr.state == StateUnknown:
825 instances[entKey{cat, wkr.instType.Name}]++
826 price[cat] += wkr.instType.Price
827 cpu[cat] += int64(wkr.instType.VCPUs)
828 mem[cat] += int64(wkr.instType.RAM)
829 running += int64(len(wkr.running) + len(wkr.starting))
830 probed = append(probed, wkr.probed)
832 for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
833 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
834 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
835 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
836 // make sure to reset gauges for non-existing category/nodetype combinations
837 for _, it := range wp.instanceTypes {
838 if _, ok := instances[entKey{cat, it.Name}]; !ok {
839 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
843 for k, v := range instances {
844 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
846 wp.mContainersRunning.Set(float64(running))
848 if len(probed) == 0 {
849 wp.mProbeAgeMax.Set(0)
850 wp.mProbeAgeMedian.Set(0)
852 sort.Slice(probed, func(i, j int) bool { return probed[i].Before(probed[j]) })
853 wp.mProbeAgeMax.Set(now.Sub(probed[0]).Seconds())
854 wp.mProbeAgeMedian.Set(now.Sub(probed[len(probed)/2]).Seconds())
858 func (wp *Pool) runProbes() {
859 maxPPS := wp.maxProbesPerSecond
861 maxPPS = defaultMaxProbesPerSecond
863 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
864 defer limitticker.Stop()
866 probeticker := time.NewTicker(wp.probeInterval)
867 defer probeticker.Stop()
869 workers := []cloud.InstanceID{}
870 for range probeticker.C {
871 // Add some jitter. Without this, if probeInterval is
872 // a multiple of syncInterval and sync is
873 // instantaneous (as with the loopback driver), the
874 // first few probes race with sync operations and
875 // don't update the workers.
876 time.Sleep(time.Duration(mathrand.Int63n(int64(wp.probeInterval) / 23)))
878 workers = workers[:0]
880 for id, wkr := range wp.workers {
881 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
884 workers = append(workers, id)
888 for _, id := range workers {
890 wkr, ok := wp.workers[id]
893 // Deleted while we were probing
897 go wkr.ProbeAndUpdate()
901 case <-limitticker.C:
907 func (wp *Pool) runSync() {
908 // sync once immediately, then wait syncInterval, sync again,
910 timer := time.NewTimer(1)
914 err := wp.getInstancesAndSync()
916 wp.logger.WithError(err).Warn("sync failed")
918 timer.Reset(wp.syncInterval)
920 wp.logger.Debug("worker.Pool stopped")
926 // Stop synchronizing with the InstanceSet.
927 func (wp *Pool) Stop() {
928 wp.setupOnce.Do(wp.setup)
932 // Instances returns an InstanceView for each worker in the pool,
933 // summarizing its current state and recent activity.
934 func (wp *Pool) Instances() []InstanceView {
936 wp.setupOnce.Do(wp.setup)
938 for _, w := range wp.workers {
939 r = append(r, InstanceView{
940 Instance: w.instance.ID(),
941 Address: w.instance.Address(),
942 Price: w.instType.Price,
943 ArvadosInstanceType: w.instType.Name,
944 ProviderInstanceType: w.instType.ProviderType,
945 LastContainerUUID: w.lastUUID,
947 WorkerState: w.state.String(),
948 IdleBehavior: w.idleBehavior,
952 sort.Slice(r, func(i, j int) bool {
953 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
958 // KillInstance destroys a cloud VM instance. It returns an error if
959 // the given instance does not exist.
960 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
961 wp.setupOnce.Do(wp.setup)
963 defer wp.mtx.Unlock()
964 wkr, ok := wp.workers[id]
966 return errors.New("instance not found")
968 wkr.logger.WithField("Reason", reason).Info("shutting down")
969 wkr.reportBootOutcome(BootOutcomeAborted)
974 func (wp *Pool) setup() {
975 wp.creating = map[string]createCall{}
976 wp.exited = map[string]time.Time{}
977 wp.workers = map[cloud.InstanceID]*worker{}
978 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
982 // Load the runner program to be deployed on worker nodes into
983 // wp.runnerData, if necessary. Errors are logged.
985 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
987 // Caller must not have lock.
988 func (wp *Pool) loadRunnerData() error {
990 defer wp.mtx.Unlock()
991 if wp.runnerData != nil {
993 } else if wp.runnerSource == "" {
994 wp.runnerCmd = wp.runnerCmdDefault
995 wp.runnerData = []byte{}
998 logger := wp.logger.WithField("source", wp.runnerSource)
999 logger.Debug("loading runner")
1000 buf, err := ioutil.ReadFile(wp.runnerSource)
1002 logger.WithError(err).Error("failed to load runner program")
1006 wp.runnerMD5 = md5.Sum(buf)
1007 wp.runnerCmd = fmt.Sprintf("/tmp/arvados-crunch-run/crunch-run~%x", wp.runnerMD5)
1011 func (wp *Pool) notify() {
1013 defer wp.mtx.RUnlock()
1014 for _, send := range wp.subscribers {
1016 case send <- struct{}{}:
1022 func (wp *Pool) getInstancesAndSync() error {
1023 wp.setupOnce.Do(wp.setup)
1024 if err := wp.instanceSet.throttleInstances.Error(); err != nil {
1027 wp.logger.Debug("getting instance list")
1028 threshold := time.Now()
1029 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
1031 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
1034 wp.sync(threshold, instances)
1035 wp.logger.Debug("sync done")
1039 // Add/remove/update workers based on instances, which was obtained
1040 // from the instanceSet. However, don't clobber any other updates that
1041 // already happened after threshold.
1042 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
1044 defer wp.mtx.Unlock()
1045 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
1048 for _, inst := range instances {
1049 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
1050 it, ok := wp.instanceTypes[itTag]
1052 wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
1055 if wkr, isNew := wp.updateWorker(inst, it); isNew {
1057 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
1058 wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
1063 for id, wkr := range wp.workers {
1064 if wkr.updated.After(threshold) {
1067 logger := wp.logger.WithFields(logrus.Fields{
1068 "Instance": wkr.instance.ID(),
1069 "WorkerState": wkr.state,
1071 logger.Info("instance disappeared in cloud")
1072 wkr.reportBootOutcome(BootOutcomeDisappeared)
1073 if wp.mDisappearances != nil {
1074 wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
1076 // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
1077 if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
1078 wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
1080 delete(wp.workers, id)
1085 if wp.atQuotaUntilFewerInstances > len(wp.workers)+len(wp.creating) {
1086 // After syncing, there are fewer instances (including
1087 // pending creates) than there were last time we saw a
1088 // quota error. This might mean it's now possible to
1089 // create new instances. Reset our "at quota" state.
1090 wp.atQuotaUntilFewerInstances = 0
1096 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
1104 func (wp *Pool) waitUntilLoaded() {
1105 ch := wp.Subscribe()
1107 defer wp.mtx.RUnlock()
1115 func (wp *Pool) gatewayAuthSecret(uuid string) string {
1116 h := hmac.New(sha256.New, []byte(wp.systemRootToken))
1118 return fmt.Sprintf("%x", h.Sum(nil))
1121 // Return a random string of n hexadecimal digits (n*4 random bits). n
1123 func randomHex(n int) string {
1124 buf := make([]byte, n/2)
1125 _, err := rand.Read(buf)
1129 return fmt.Sprintf("%x", buf)