1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
22 "git.arvados.org/arvados.git/lib/cloud"
23 "git.arvados.org/arvados.git/sdk/go/arvados"
24 "github.com/prometheus/client_golang/prometheus"
25 "github.com/sirupsen/logrus"
26 "golang.org/x/crypto/ssh"
30 tagKeyInstanceType = "InstanceType"
31 tagKeyIdleBehavior = "IdleBehavior"
32 tagKeyInstanceSecret = "InstanceSecret"
33 tagKeyInstanceSetID = "InstanceSetID"
36 // An InstanceView shows a worker's current state and recent activity.
37 type InstanceView struct {
38 Instance cloud.InstanceID `json:"instance"`
39 Address string `json:"address"`
40 Price float64 `json:"price"`
41 ArvadosInstanceType string `json:"arvados_instance_type"`
42 ProviderInstanceType string `json:"provider_instance_type"`
43 LastContainerUUID string `json:"last_container_uuid"`
44 LastBusy time.Time `json:"last_busy"`
45 WorkerState string `json:"worker_state"`
46 IdleBehavior IdleBehavior `json:"idle_behavior"`
49 // An Executor executes shell commands on a remote host.
50 type Executor interface {
51 // Run cmd on the current target.
52 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
54 // Use the given target for subsequent operations. The new
55 // target is the same host as the previous target, but it
56 // might return a different address and verify a different
59 // SetTarget is called frequently, and in most cases the new
60 // target will behave exactly the same as the old one. An
61 // implementation should optimize accordingly.
63 // SetTarget must not block on concurrent Execute calls.
64 SetTarget(cloud.ExecutorTarget)
70 defaultSyncInterval = time.Minute
71 defaultProbeInterval = time.Second * 10
72 defaultMaxProbesPerSecond = 10
73 defaultTimeoutIdle = time.Minute
74 defaultTimeoutBooting = time.Minute * 10
75 defaultTimeoutProbe = time.Minute * 10
76 defaultTimeoutShutdown = time.Second * 10
77 defaultTimeoutTERM = time.Minute * 2
78 defaultTimeoutSignal = time.Second * 5
79 defaultTimeoutStaleRunLock = time.Second * 5
81 // Time after a quota error to try again anyway, even if no
82 // instances have been shutdown.
83 quotaErrorTTL = time.Minute
85 // Time after a capacity error to try again
86 capacityErrorTTL = time.Minute
88 // Time between "X failed because rate limiting" messages
89 logRateLimitErrorInterval = time.Second * 10
92 func duration(conf arvados.Duration, def time.Duration) time.Duration {
94 return time.Duration(conf)
99 // NewPool creates a Pool of workers backed by instanceSet.
101 // New instances are configured and set up according to the given
102 // cluster configuration.
103 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
106 arvClient: arvClient,
107 instanceSetID: instanceSetID,
108 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
109 newExecutor: newExecutor,
111 bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
112 instanceInitCommand: cloud.InitCommand(cluster.Containers.CloudVMs.InstanceInitCommand),
113 runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
114 imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
115 instanceTypes: cluster.InstanceTypes,
116 maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
117 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
118 maxInstances: cluster.Containers.CloudVMs.MaxInstances,
119 probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
120 syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
121 timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
122 timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
123 timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
124 timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
125 timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
126 timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
127 timeoutStaleRunLock: duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
128 systemRootToken: cluster.SystemRootToken,
129 installPublicKey: installPublicKey,
130 tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
131 runnerCmdDefault: cluster.Containers.CrunchRunCommand,
132 runnerArgs: append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...),
133 stop: make(chan bool),
135 wp.registerMetrics(reg)
137 wp.setupOnce.Do(wp.setup)
145 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
146 // zero Pool should not be used. Call NewPool to create a new Pool.
149 logger logrus.FieldLogger
150 arvClient *arvados.Client
151 instanceSetID cloud.InstanceSetID
152 instanceSet *throttledInstanceSet
153 newExecutor func(cloud.Instance) Executor
154 cluster *arvados.Cluster
155 bootProbeCommand string
156 instanceInitCommand cloud.InitCommand
158 imageID cloud.ImageID
159 instanceTypes map[string]arvados.InstanceType
160 syncInterval time.Duration
161 probeInterval time.Duration
162 maxProbesPerSecond int
163 maxConcurrentInstanceCreateOps int
165 timeoutIdle time.Duration
166 timeoutBooting time.Duration
167 timeoutProbe time.Duration
168 timeoutShutdown time.Duration
169 timeoutTERM time.Duration
170 timeoutSignal time.Duration
171 timeoutStaleRunLock time.Duration
172 systemRootToken string
173 installPublicKey ssh.PublicKey
175 runnerCmdDefault string // crunch-run command to use if not deploying a binary
176 runnerArgs []string // extra args passed to crunch-run
179 subscribers map[<-chan struct{}]chan<- struct{}
180 creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
181 workers map[cloud.InstanceID]*worker
182 loaded bool // loaded list of instances from InstanceSet at least once
183 exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
184 atQuotaUntilFewerInstances int
185 atQuotaUntil time.Time
186 atQuotaErr cloud.QuotaError
187 atCapacityUntil map[interface{}]time.Time
192 runnerMD5 [md5.Size]byte
195 mContainersRunning prometheus.Gauge
196 mInstances *prometheus.GaugeVec
197 mInstancesPrice *prometheus.GaugeVec
198 mVCPUs *prometheus.GaugeVec
199 mMemory *prometheus.GaugeVec
200 mBootOutcomes *prometheus.CounterVec
201 mDisappearances *prometheus.CounterVec
202 mTimeToSSH prometheus.Summary
203 mTimeToReadyForContainer prometheus.Summary
204 mTimeFromShutdownToGone prometheus.Summary
205 mTimeFromQueueToCrunchRun prometheus.Summary
206 mRunProbeDuration *prometheus.SummaryVec
207 mProbeAgeMax prometheus.Gauge
208 mProbeAgeMedian prometheus.Gauge
211 type createCall struct {
213 instanceType arvados.InstanceType
216 func (wp *Pool) CheckHealth() error {
217 wp.setupOnce.Do(wp.setup)
218 if err := wp.loadRunnerData(); err != nil {
219 return fmt.Errorf("error loading runner binary: %s", err)
224 // Subscribe returns a buffered channel that becomes ready after any
225 // change to the pool's state that could have scheduling implications:
226 // a worker's state changes, a new worker appears, the cloud
227 // provider's API rate limiting period ends, etc.
229 // Additional events that occur while the channel is already ready
230 // will be dropped, so it is OK if the caller services the channel
235 // ch := wp.Subscribe()
236 // defer wp.Unsubscribe(ch)
243 func (wp *Pool) Subscribe() <-chan struct{} {
244 wp.setupOnce.Do(wp.setup)
246 defer wp.mtx.Unlock()
247 ch := make(chan struct{}, 1)
248 wp.subscribers[ch] = ch
252 // Unsubscribe stops sending updates to the given channel.
253 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
254 wp.setupOnce.Do(wp.setup)
256 defer wp.mtx.Unlock()
257 delete(wp.subscribers, ch)
260 // Unallocated returns the number of unallocated (creating + booting +
261 // idle + unknown) workers for each instance type. Workers in
262 // hold/drain mode are not included.
263 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
264 wp.setupOnce.Do(wp.setup)
266 defer wp.mtx.RUnlock()
267 unalloc := map[arvados.InstanceType]int{}
268 creating := map[arvados.InstanceType]int{}
269 oldestCreate := map[arvados.InstanceType]time.Time{}
270 for _, cc := range wp.creating {
271 it := cc.instanceType
273 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
274 oldestCreate[it] = cc.time
277 for _, wkr := range wp.workers {
278 // Skip workers that are not expected to become
279 // available soon. Note len(wkr.running)>0 is not
280 // redundant here: it can be true even in
282 if wkr.state == StateShutdown ||
283 wkr.state == StateRunning ||
284 wkr.idleBehavior != IdleBehaviorRun ||
285 len(wkr.running) > 0 {
290 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
291 // If up to N new workers appear in
292 // Instances() while we are waiting for N
293 // Create() calls to complete, we assume we're
294 // just seeing a race between Instances() and
295 // Create() responses.
297 // The other common reason why nodes have
298 // state==Unknown is that they appeared at
299 // startup, before any Create calls. They
300 // don't match the above timing condition, so
301 // we never mistakenly attribute them to
302 // pending Create calls.
306 for it, c := range creating {
312 // Create a new instance with the given type, and add it to the worker
313 // pool. The worker is added immediately; instance creation runs in
316 // Create returns false if a pre-existing error or a configuration
317 // setting prevents it from even attempting to create a new
318 // instance. Those errors are logged by the Pool, so the caller does
319 // not need to log anything in such cases.
320 func (wp *Pool) Create(it arvados.InstanceType) bool {
321 logger := wp.logger.WithField("InstanceType", it.Name)
322 wp.setupOnce.Do(wp.setup)
323 if wp.loadRunnerData() != nil {
324 // Boot probe is certain to fail.
327 if wp.AtCapacity(it) || wp.AtQuota() || wp.instanceSet.throttleCreate.Error() != nil {
331 defer wp.mtx.Unlock()
332 // The maxConcurrentInstanceCreateOps knob throttles the number of node create
333 // requests in flight. It was added to work around a limitation in Azure's
334 // managed disks, which support no more than 20 concurrent node creation
335 // requests from a single disk image (cf.
336 // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
337 // The code assumes that node creation, from Azure's perspective, means the
338 // period until the instance appears in the "get all instances" list.
339 if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
340 logger.Info("reached MaxConcurrentInstanceCreateOps")
341 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
345 secret := randomHex(instanceSecretLength)
346 wp.creating[secret] = createCall{time: now, instanceType: it}
349 tags := cloud.InstanceTags{
350 wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID),
351 wp.tagKeyPrefix + tagKeyInstanceType: it.Name,
352 wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
353 wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
355 initCmd := TagVerifier{nil, secret, nil}.InitCommand() + "\n" + wp.instanceInitCommand
356 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
358 defer wp.mtx.Unlock()
359 // delete() is deferred so the updateWorker() call
360 // below knows to use StateBooting when adding a new
362 defer delete(wp.creating, secret)
364 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
366 n := len(wp.workers) + len(wp.creating) - 1
368 // Quota error with no
369 // instances running --
370 // nothing to do but wait
371 wp.atQuotaUntilFewerInstances = 0
372 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
373 time.AfterFunc(quotaErrorTTL, wp.notify)
374 logger.WithField("atQuotaUntil", wp.atQuotaUntil).Info("quota error with 0 running -- waiting for quotaErrorTTL")
375 } else if n < wp.atQuotaUntilFewerInstances || wp.atQuotaUntilFewerInstances == 0 {
376 // Quota error with N
377 // instances running -- report
378 // AtQuota until some
379 // instances shut down
380 wp.atQuotaUntilFewerInstances = n
381 wp.atQuotaUntil = time.Time{}
382 logger.WithField("atQuotaUntilFewerInstances", n).Info("quota error -- waiting for next instance shutdown")
385 if err, ok := err.(cloud.CapacityError); ok && err.IsCapacityError() {
386 var capKey interface{} = it.ProviderType
387 if err.IsInstanceTypeSpecific() {
388 capKey = it.ProviderType
389 } else if err.IsInstanceFamilySpecific() {
390 capKey = wp.instanceSet.InstanceFamily(it)
394 if wp.atCapacityUntil == nil {
395 wp.atCapacityUntil = map[interface{}]time.Time{}
397 wp.atCapacityUntil[capKey] = time.Now().Add(capacityErrorTTL)
398 time.AfterFunc(capacityErrorTTL, wp.notify)
400 logger.WithError(err).Error("create failed")
401 wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
404 wp.updateWorker(inst, it)
406 if len(wp.creating)+len(wp.workers) == wp.maxInstances {
407 logger.Infof("now at MaxInstances limit of %d instances", wp.maxInstances)
412 // AtCapacity returns true if Create() is currently expected to fail
413 // for the given instance type.
414 func (wp *Pool) AtCapacity(it arvados.InstanceType) bool {
416 defer wp.mtx.Unlock()
417 for _, capKey := range []interface{}{
418 "", // all instance types
419 wp.instanceSet.InstanceFamily(it), // instance family
420 it.ProviderType, // just this instance type
422 if t, ok := wp.atCapacityUntil[capKey]; ok && time.Now().Before(t) {
429 // AtQuota returns true if Create is not expected to work at the
430 // moment (e.g., cloud provider has reported quota errors, or we are
431 // already at our own configured quota).
432 func (wp *Pool) AtQuota() bool {
434 defer wp.mtx.Unlock()
435 return wp.atQuotaUntilFewerInstances > 0 ||
436 time.Now().Before(wp.atQuotaUntil) ||
437 (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating))
440 // SetIdleBehavior determines how the indicated instance will behave
441 // when it has no containers running.
442 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
444 defer wp.mtx.Unlock()
445 wkr, ok := wp.workers[id]
447 return errors.New("requested instance does not exist")
449 wkr.setIdleBehavior(idleBehavior)
453 // Successful connection to the SSH daemon, update the mTimeToSSH metric
454 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
456 defer wp.mtx.Unlock()
457 wkr, ok := wp.workers[inst.ID()]
459 // race: inst was removed from the pool
462 if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
463 // the node is not in booting state (can happen if
464 // a-d-c is restarted) OR this is not the first SSH
469 wkr.firstSSHConnection = time.Now()
470 if wp.mTimeToSSH != nil {
471 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
475 // Add or update worker attached to the given instance.
477 // The second return value is true if a new worker is created.
479 // A newly added instance has state=StateBooting if its tags match an
480 // entry in wp.creating, otherwise StateUnknown.
482 // Caller must have lock.
483 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
484 secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
485 inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
487 if wkr := wp.workers[id]; wkr != nil {
488 wkr.executor.SetTarget(inst)
490 wkr.updated = time.Now()
495 state := StateUnknown
496 if _, ok := wp.creating[secret]; ok {
500 // If an instance has a valid IdleBehavior tag when it first
501 // appears, initialize the new worker accordingly (this is how
502 // we restore IdleBehavior that was set by a prior dispatch
503 // process); otherwise, default to "run". After this,
504 // wkr.idleBehavior is the source of truth, and will only be
505 // changed via SetIdleBehavior().
506 idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
507 if !validIdleBehavior[idleBehavior] {
508 idleBehavior = IdleBehaviorRun
511 logger := wp.logger.WithFields(logrus.Fields{
512 "InstanceType": it.Name,
513 "Instance": inst.ID(),
514 "Address": inst.Address(),
516 logger.WithFields(logrus.Fields{
518 "IdleBehavior": idleBehavior,
519 }).Infof("instance appeared in cloud")
525 executor: wp.newExecutor(inst),
527 idleBehavior: idleBehavior,
534 running: make(map[string]*remoteRunner),
535 starting: make(map[string]*remoteRunner),
536 probing: make(chan struct{}, 1),
542 // Shutdown shuts down a worker with the given type, or returns false
543 // if all workers with the given type are busy.
544 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
545 wp.setupOnce.Do(wp.setup)
547 defer wp.mtx.Unlock()
548 logger := wp.logger.WithField("InstanceType", it.Name)
549 logger.Info("shutdown requested")
550 for _, tryState := range []State{StateBooting, StateIdle} {
551 // TODO: shutdown the worker with the longest idle
552 // time (Idle) or the earliest create time (Booting)
553 for _, wkr := range wp.workers {
554 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
555 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
556 wkr.reportBootOutcome(BootOutcomeAborted)
565 // CountWorkers returns the current number of workers in each state.
567 // CountWorkers blocks, if necessary, until the initial instance list
568 // has been loaded from the cloud provider.
569 func (wp *Pool) CountWorkers() map[State]int {
570 wp.setupOnce.Do(wp.setup)
573 defer wp.mtx.Unlock()
575 for _, w := range wp.workers {
581 // Running returns the container UUIDs being prepared/run on workers.
583 // In the returned map, the time value indicates when the Pool
584 // observed that the container process had exited. A container that
585 // has not yet exited has a zero time value. The caller should use
586 // ForgetContainer() to garbage-collect the entries for exited
588 func (wp *Pool) Running() map[string]time.Time {
589 wp.setupOnce.Do(wp.setup)
591 defer wp.mtx.Unlock()
592 r := map[string]time.Time{}
593 for _, wkr := range wp.workers {
594 for uuid := range wkr.running {
595 r[uuid] = time.Time{}
597 for uuid := range wkr.starting {
598 r[uuid] = time.Time{}
601 for uuid, exited := range wp.exited {
607 // StartContainer starts a container on an idle worker immediately if
608 // possible, otherwise returns false.
609 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
610 wp.setupOnce.Do(wp.setup)
612 defer wp.mtx.Unlock()
614 for _, w := range wp.workers {
615 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
616 if wkr == nil || w.busy.After(wkr.busy) {
624 wkr.startContainer(ctr)
628 // KillContainer kills the crunch-run process for the given container
629 // UUID, if it's running on any worker.
631 // KillContainer returns immediately; the act of killing the container
632 // takes some time, and runs in the background.
634 // KillContainer returns false if the container has already ended.
635 func (wp *Pool) KillContainer(uuid string, reason string) bool {
637 defer wp.mtx.Unlock()
638 logger := wp.logger.WithFields(logrus.Fields{
639 "ContainerUUID": uuid,
642 for _, wkr := range wp.workers {
643 rr := wkr.running[uuid]
645 rr = wkr.starting[uuid]
652 logger.Debug("cannot kill: already disappeared")
656 // ForgetContainer clears the placeholder for the given exited
657 // container, so it isn't returned by subsequent calls to Running().
659 // ForgetContainer has no effect if the container has not yet exited.
661 // The "container exited at time T" placeholder (which necessitates
662 // ForgetContainer) exists to make it easier for the caller
663 // (scheduler) to distinguish a container that exited without
664 // finalizing its state from a container that exited too recently for
665 // its final state to have appeared in the scheduler's queue cache.
666 func (wp *Pool) ForgetContainer(uuid string) {
668 defer wp.mtx.Unlock()
669 if _, ok := wp.exited[uuid]; ok {
670 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
671 delete(wp.exited, uuid)
675 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
677 reg = prometheus.NewRegistry()
679 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
680 Namespace: "arvados",
681 Subsystem: "dispatchcloud",
682 Name: "containers_running",
683 Help: "Number of containers reported running by cloud VMs.",
685 reg.MustRegister(wp.mContainersRunning)
686 wp.mProbeAgeMax = prometheus.NewGauge(prometheus.GaugeOpts{
687 Namespace: "arvados",
688 Subsystem: "dispatchcloud",
689 Name: "probe_age_seconds_max",
690 Help: "Maximum number of seconds since an instance's most recent successful probe.",
692 reg.MustRegister(wp.mProbeAgeMax)
693 wp.mProbeAgeMedian = prometheus.NewGauge(prometheus.GaugeOpts{
694 Namespace: "arvados",
695 Subsystem: "dispatchcloud",
696 Name: "probe_age_seconds_median",
697 Help: "Median number of seconds since an instance's most recent successful probe.",
699 reg.MustRegister(wp.mProbeAgeMedian)
700 wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
701 Namespace: "arvados",
702 Subsystem: "dispatchcloud",
703 Name: "instances_total",
704 Help: "Number of cloud VMs.",
705 }, []string{"category", "instance_type"})
706 reg.MustRegister(wp.mInstances)
707 wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
708 Namespace: "arvados",
709 Subsystem: "dispatchcloud",
710 Name: "instances_price",
711 Help: "Price of cloud VMs.",
712 }, []string{"category"})
713 reg.MustRegister(wp.mInstancesPrice)
714 wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
715 Namespace: "arvados",
716 Subsystem: "dispatchcloud",
718 Help: "Total VCPUs on all cloud VMs.",
719 }, []string{"category"})
720 reg.MustRegister(wp.mVCPUs)
721 wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
722 Namespace: "arvados",
723 Subsystem: "dispatchcloud",
724 Name: "memory_bytes_total",
725 Help: "Total memory on all cloud VMs.",
726 }, []string{"category"})
727 reg.MustRegister(wp.mMemory)
728 wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
729 Namespace: "arvados",
730 Subsystem: "dispatchcloud",
731 Name: "boot_outcomes",
732 Help: "Boot outcomes by type.",
733 }, []string{"outcome"})
734 for k := range validBootOutcomes {
735 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
737 reg.MustRegister(wp.mBootOutcomes)
738 wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
739 Namespace: "arvados",
740 Subsystem: "dispatchcloud",
741 Name: "instances_disappeared",
742 Help: "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
743 }, []string{"state"})
744 for _, v := range stateString {
745 wp.mDisappearances.WithLabelValues(v).Add(0)
747 reg.MustRegister(wp.mDisappearances)
748 wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
749 Namespace: "arvados",
750 Subsystem: "dispatchcloud",
751 Name: "instances_time_to_ssh_seconds",
752 Help: "Number of seconds between instance creation and the first successful SSH connection.",
753 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
755 reg.MustRegister(wp.mTimeToSSH)
756 wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
757 Namespace: "arvados",
758 Subsystem: "dispatchcloud",
759 Name: "instances_time_to_ready_for_container_seconds",
760 Help: "Number of seconds between the first successful SSH connection and ready to run a container.",
761 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
763 reg.MustRegister(wp.mTimeToReadyForContainer)
764 wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
765 Namespace: "arvados",
766 Subsystem: "dispatchcloud",
767 Name: "instances_time_from_shutdown_request_to_disappearance_seconds",
768 Help: "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
769 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
771 reg.MustRegister(wp.mTimeFromShutdownToGone)
772 wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
773 Namespace: "arvados",
774 Subsystem: "dispatchcloud",
775 Name: "containers_time_from_queue_to_crunch_run_seconds",
776 Help: "Number of seconds between the queuing of a container and the start of crunch-run.",
777 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
779 reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
780 wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
781 Namespace: "arvados",
782 Subsystem: "dispatchcloud",
783 Name: "instances_run_probe_duration_seconds",
784 Help: "Number of seconds per runProbe call.",
785 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
786 }, []string{"outcome"})
787 reg.MustRegister(wp.mRunProbeDuration)
790 func (wp *Pool) runMetrics() {
792 defer wp.Unsubscribe(ch)
799 func (wp *Pool) updateMetrics() {
801 defer wp.mtx.RUnlock()
807 instances := map[entKey]int64{}
808 price := map[string]float64{}
809 cpu := map[string]int64{}
810 mem := map[string]int64{}
813 var probed []time.Time
814 for _, wkr := range wp.workers {
817 case len(wkr.running)+len(wkr.starting) > 0:
819 case wkr.idleBehavior == IdleBehaviorHold:
821 case wkr.state == StateBooting:
823 case wkr.state == StateUnknown:
828 instances[entKey{cat, wkr.instType.Name}]++
829 price[cat] += wkr.instType.Price
830 cpu[cat] += int64(wkr.instType.VCPUs)
831 mem[cat] += int64(wkr.instType.RAM)
832 running += int64(len(wkr.running) + len(wkr.starting))
833 probed = append(probed, wkr.probed)
835 for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
836 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
837 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
838 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
839 // make sure to reset gauges for non-existing category/nodetype combinations
840 for _, it := range wp.instanceTypes {
841 if _, ok := instances[entKey{cat, it.Name}]; !ok {
842 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
846 for k, v := range instances {
847 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
849 wp.mContainersRunning.Set(float64(running))
851 if len(probed) == 0 {
852 wp.mProbeAgeMax.Set(0)
853 wp.mProbeAgeMedian.Set(0)
855 sort.Slice(probed, func(i, j int) bool { return probed[i].Before(probed[j]) })
856 wp.mProbeAgeMax.Set(now.Sub(probed[0]).Seconds())
857 wp.mProbeAgeMedian.Set(now.Sub(probed[len(probed)/2]).Seconds())
861 func (wp *Pool) runProbes() {
862 maxPPS := wp.maxProbesPerSecond
864 maxPPS = defaultMaxProbesPerSecond
866 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
867 defer limitticker.Stop()
869 probeticker := time.NewTicker(wp.probeInterval)
870 defer probeticker.Stop()
872 workers := []cloud.InstanceID{}
873 for range probeticker.C {
874 // Add some jitter. Without this, if probeInterval is
875 // a multiple of syncInterval and sync is
876 // instantaneous (as with the loopback driver), the
877 // first few probes race with sync operations and
878 // don't update the workers.
879 time.Sleep(time.Duration(mathrand.Int63n(int64(wp.probeInterval) / 23)))
881 workers = workers[:0]
883 for id, wkr := range wp.workers {
884 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
887 workers = append(workers, id)
891 for _, id := range workers {
893 wkr, ok := wp.workers[id]
896 // Deleted while we were probing
900 go wkr.ProbeAndUpdate()
904 case <-limitticker.C:
910 func (wp *Pool) runSync() {
911 // sync once immediately, then wait syncInterval, sync again,
913 timer := time.NewTimer(1)
917 err := wp.getInstancesAndSync()
919 wp.logger.WithError(err).Warn("sync failed")
921 timer.Reset(wp.syncInterval)
923 wp.logger.Debug("worker.Pool stopped")
929 // Stop synchronizing with the InstanceSet.
930 func (wp *Pool) Stop() {
931 wp.setupOnce.Do(wp.setup)
935 // Instances returns an InstanceView for each worker in the pool,
936 // summarizing its current state and recent activity.
937 func (wp *Pool) Instances() []InstanceView {
939 wp.setupOnce.Do(wp.setup)
941 for _, w := range wp.workers {
942 r = append(r, InstanceView{
943 Instance: w.instance.ID(),
944 Address: w.instance.Address(),
945 Price: w.instType.Price,
946 ArvadosInstanceType: w.instType.Name,
947 ProviderInstanceType: w.instType.ProviderType,
948 LastContainerUUID: w.lastUUID,
950 WorkerState: w.state.String(),
951 IdleBehavior: w.idleBehavior,
955 sort.Slice(r, func(i, j int) bool {
956 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
961 // KillInstance destroys a cloud VM instance. It returns an error if
962 // the given instance does not exist.
963 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
964 wp.setupOnce.Do(wp.setup)
966 defer wp.mtx.Unlock()
967 wkr, ok := wp.workers[id]
969 return errors.New("instance not found")
971 wkr.logger.WithField("Reason", reason).Info("shutting down")
972 wkr.reportBootOutcome(BootOutcomeAborted)
977 func (wp *Pool) setup() {
978 wp.creating = map[string]createCall{}
979 wp.exited = map[string]time.Time{}
980 wp.workers = map[cloud.InstanceID]*worker{}
981 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
985 // Load the runner program to be deployed on worker nodes into
986 // wp.runnerData, if necessary. Errors are logged.
988 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
990 // Caller must not have lock.
991 func (wp *Pool) loadRunnerData() error {
993 defer wp.mtx.Unlock()
994 if wp.runnerData != nil {
996 } else if wp.runnerSource == "" {
997 wp.runnerCmd = wp.runnerCmdDefault
998 wp.runnerData = []byte{}
1001 logger := wp.logger.WithField("source", wp.runnerSource)
1002 logger.Debug("loading runner")
1003 buf, err := ioutil.ReadFile(wp.runnerSource)
1005 logger.WithError(err).Error("failed to load runner program")
1009 wp.runnerMD5 = md5.Sum(buf)
1010 wp.runnerCmd = fmt.Sprintf("/tmp/arvados-crunch-run/crunch-run~%x", wp.runnerMD5)
1014 func (wp *Pool) notify() {
1016 defer wp.mtx.RUnlock()
1017 for _, send := range wp.subscribers {
1019 case send <- struct{}{}:
1025 func (wp *Pool) getInstancesAndSync() error {
1026 wp.setupOnce.Do(wp.setup)
1027 if err := wp.instanceSet.throttleInstances.Error(); err != nil {
1030 wp.logger.Debug("getting instance list")
1031 threshold := time.Now()
1032 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
1034 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
1037 wp.sync(threshold, instances)
1038 wp.logger.Debug("sync done")
1042 // Add/remove/update workers based on instances, which was obtained
1043 // from the instanceSet. However, don't clobber any other updates that
1044 // already happened after threshold.
1045 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
1047 defer wp.mtx.Unlock()
1048 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
1051 for _, inst := range instances {
1052 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
1053 it, ok := wp.instanceTypes[itTag]
1055 wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
1058 if wkr, isNew := wp.updateWorker(inst, it); isNew {
1060 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
1061 wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
1066 for id, wkr := range wp.workers {
1067 if wkr.updated.After(threshold) {
1070 logger := wp.logger.WithFields(logrus.Fields{
1071 "Instance": wkr.instance.ID(),
1072 "WorkerState": wkr.state,
1074 logger.Info("instance disappeared in cloud")
1075 wkr.reportBootOutcome(BootOutcomeDisappeared)
1076 if wp.mDisappearances != nil {
1077 wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
1079 // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
1080 if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
1081 wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
1083 delete(wp.workers, id)
1088 if wp.atQuotaUntilFewerInstances > len(wp.workers)+len(wp.creating) {
1089 // After syncing, there are fewer instances (including
1090 // pending creates) than there were last time we saw a
1091 // quota error. This might mean it's now possible to
1092 // create new instances. Reset our "at quota" state.
1093 wp.atQuotaUntilFewerInstances = 0
1099 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
1107 func (wp *Pool) waitUntilLoaded() {
1108 ch := wp.Subscribe()
1110 defer wp.mtx.RUnlock()
1118 func (wp *Pool) gatewayAuthSecret(uuid string) string {
1119 h := hmac.New(sha256.New, []byte(wp.systemRootToken))
1121 return fmt.Sprintf("%x", h.Sum(nil))
1124 // Return a random string of n hexadecimal digits (n*4 random bits). n
1126 func randomHex(n int) string {
1127 buf := make([]byte, n/2)
1128 _, err := rand.Read(buf)
1132 return fmt.Sprintf("%x", buf)