1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
22 "git.arvados.org/arvados.git/lib/cloud"
23 "git.arvados.org/arvados.git/sdk/go/arvados"
24 "github.com/prometheus/client_golang/prometheus"
25 "github.com/sirupsen/logrus"
26 "golang.org/x/crypto/ssh"
30 tagKeyInstanceType = "InstanceType"
31 tagKeyIdleBehavior = "IdleBehavior"
32 tagKeyInstanceSecret = "InstanceSecret"
33 tagKeyInstanceSetID = "InstanceSetID"
36 // An InstanceView shows a worker's current state and recent activity.
37 type InstanceView struct {
38 Instance cloud.InstanceID `json:"instance"`
39 Address string `json:"address"`
40 Price float64 `json:"price"`
41 ArvadosInstanceType string `json:"arvados_instance_type"`
42 ProviderInstanceType string `json:"provider_instance_type"`
43 LastContainerUUID string `json:"last_container_uuid"`
44 LastBusy time.Time `json:"last_busy"`
45 WorkerState string `json:"worker_state"`
46 IdleBehavior IdleBehavior `json:"idle_behavior"`
49 // An Executor executes shell commands on a remote host.
50 type Executor interface {
51 // Run cmd on the current target.
52 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
54 // Use the given target for subsequent operations. The new
55 // target is the same host as the previous target, but it
56 // might return a different address and verify a different
59 // SetTarget is called frequently, and in most cases the new
60 // target will behave exactly the same as the old one. An
61 // implementation should optimize accordingly.
63 // SetTarget must not block on concurrent Execute calls.
64 SetTarget(cloud.ExecutorTarget)
70 defaultSyncInterval = time.Minute
71 defaultProbeInterval = time.Second * 10
72 defaultMaxProbesPerSecond = 10
73 defaultTimeoutIdle = time.Minute
74 defaultTimeoutBooting = time.Minute * 10
75 defaultTimeoutProbe = time.Minute * 10
76 defaultTimeoutShutdown = time.Second * 10
77 defaultTimeoutTERM = time.Minute * 2
78 defaultTimeoutSignal = time.Second * 5
79 defaultTimeoutStaleRunLock = time.Second * 5
81 // Time after a quota error to try again anyway, even if no
82 // instances have been shutdown.
83 quotaErrorTTL = time.Minute
85 // Time after a capacity error to try again
86 capacityErrorTTL = time.Minute
88 // Time between "X failed because rate limiting" messages
89 logRateLimitErrorInterval = time.Second * 10
92 func duration(conf arvados.Duration, def time.Duration) time.Duration {
94 return time.Duration(conf)
99 // NewPool creates a Pool of workers backed by instanceSet.
101 // New instances are configured and set up according to the given
102 // cluster configuration.
103 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
106 arvClient: arvClient,
107 instanceSetID: instanceSetID,
108 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
109 newExecutor: newExecutor,
111 bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
112 instanceInitCommand: cloud.InitCommand(cluster.Containers.CloudVMs.InstanceInitCommand),
113 runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
114 runnerDeployDirectory: cluster.Containers.CloudVMs.DeployRunnerDirectory,
115 imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
116 instanceTypes: cluster.InstanceTypes,
117 maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
118 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
119 maxInstances: cluster.Containers.CloudVMs.MaxInstances,
120 probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
121 syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
122 timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
123 timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
124 timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
125 timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
126 timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
127 timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
128 timeoutStaleRunLock: duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
129 systemRootToken: cluster.SystemRootToken,
130 installPublicKey: installPublicKey,
131 tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
132 runnerCmdDefault: cluster.Containers.CrunchRunCommand,
133 runnerArgs: append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...),
134 stop: make(chan bool),
136 wp.registerMetrics(reg)
138 wp.setupOnce.Do(wp.setup)
146 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
147 // zero Pool should not be used. Call NewPool to create a new Pool.
150 logger logrus.FieldLogger
151 arvClient *arvados.Client
152 instanceSetID cloud.InstanceSetID
153 instanceSet *throttledInstanceSet
154 newExecutor func(cloud.Instance) Executor
155 cluster *arvados.Cluster
156 bootProbeCommand string
157 instanceInitCommand cloud.InitCommand
159 runnerDeployDirectory string
160 imageID cloud.ImageID
161 instanceTypes map[string]arvados.InstanceType
162 syncInterval time.Duration
163 probeInterval time.Duration
164 maxProbesPerSecond int
165 maxConcurrentInstanceCreateOps int
167 timeoutIdle time.Duration
168 timeoutBooting time.Duration
169 timeoutProbe time.Duration
170 timeoutShutdown time.Duration
171 timeoutTERM time.Duration
172 timeoutSignal time.Duration
173 timeoutStaleRunLock time.Duration
174 systemRootToken string
175 installPublicKey ssh.PublicKey
177 runnerCmdDefault string // crunch-run command to use if not deploying a binary
178 runnerArgs []string // extra args passed to crunch-run
181 subscribers map[<-chan struct{}]chan<- struct{}
182 creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
183 workers map[cloud.InstanceID]*worker
184 loaded bool // loaded list of instances from InstanceSet at least once
185 exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
186 atQuotaUntilFewerInstances int
187 atQuotaUntil time.Time
188 atQuotaErr cloud.QuotaError
189 atCapacityUntil map[interface{}]time.Time
194 runnerMD5 [md5.Size]byte
197 mContainersRunning prometheus.Gauge
198 mInstances *prometheus.GaugeVec
199 mInstancesPrice *prometheus.GaugeVec
200 mVCPUs *prometheus.GaugeVec
201 mMemory *prometheus.GaugeVec
202 mBootOutcomes *prometheus.CounterVec
203 mDisappearances *prometheus.CounterVec
204 mTimeToSSH prometheus.Summary
205 mTimeToReadyForContainer prometheus.Summary
206 mTimeFromShutdownToGone prometheus.Summary
207 mTimeFromQueueToCrunchRun prometheus.Summary
208 mRunProbeDuration *prometheus.SummaryVec
209 mProbeAgeMax prometheus.Gauge
210 mProbeAgeMedian prometheus.Gauge
213 type createCall struct {
215 instanceType arvados.InstanceType
218 func (wp *Pool) CheckHealth() error {
219 wp.setupOnce.Do(wp.setup)
220 if err := wp.loadRunnerData(); err != nil {
221 return fmt.Errorf("error loading runner binary: %s", err)
226 // Subscribe returns a buffered channel that becomes ready after any
227 // change to the pool's state that could have scheduling implications:
228 // a worker's state changes, a new worker appears, the cloud
229 // provider's API rate limiting period ends, etc.
231 // Additional events that occur while the channel is already ready
232 // will be dropped, so it is OK if the caller services the channel
237 // ch := wp.Subscribe()
238 // defer wp.Unsubscribe(ch)
245 func (wp *Pool) Subscribe() <-chan struct{} {
246 wp.setupOnce.Do(wp.setup)
248 defer wp.mtx.Unlock()
249 ch := make(chan struct{}, 1)
250 wp.subscribers[ch] = ch
254 // Unsubscribe stops sending updates to the given channel.
255 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
256 wp.setupOnce.Do(wp.setup)
258 defer wp.mtx.Unlock()
259 delete(wp.subscribers, ch)
262 // Unallocated returns the number of unallocated (creating + booting +
263 // idle + unknown) workers for each instance type. Workers in
264 // hold/drain mode are not included.
265 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
266 wp.setupOnce.Do(wp.setup)
268 defer wp.mtx.RUnlock()
269 unalloc := map[arvados.InstanceType]int{}
270 creating := map[arvados.InstanceType]int{}
271 oldestCreate := map[arvados.InstanceType]time.Time{}
272 for _, cc := range wp.creating {
273 it := cc.instanceType
275 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
276 oldestCreate[it] = cc.time
279 for _, wkr := range wp.workers {
280 // Skip workers that are not expected to become
281 // available soon. Note len(wkr.running)>0 is not
282 // redundant here: it can be true even in
284 if wkr.state == StateShutdown ||
285 wkr.state == StateRunning ||
286 wkr.idleBehavior != IdleBehaviorRun ||
287 len(wkr.running) > 0 {
292 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
293 // If up to N new workers appear in
294 // Instances() while we are waiting for N
295 // Create() calls to complete, we assume we're
296 // just seeing a race between Instances() and
297 // Create() responses.
299 // The other common reason why nodes have
300 // state==Unknown is that they appeared at
301 // startup, before any Create calls. They
302 // don't match the above timing condition, so
303 // we never mistakenly attribute them to
304 // pending Create calls.
308 for it, c := range creating {
314 // Create a new instance with the given type, and add it to the worker
315 // pool. The worker is added immediately; instance creation runs in
318 // Create returns false if a pre-existing error or a configuration
319 // setting prevents it from even attempting to create a new
320 // instance. Those errors are logged by the Pool, so the caller does
321 // not need to log anything in such cases.
322 func (wp *Pool) Create(it arvados.InstanceType) bool {
323 logger := wp.logger.WithField("InstanceType", it.Name)
324 wp.setupOnce.Do(wp.setup)
325 if wp.loadRunnerData() != nil {
326 // Boot probe is certain to fail.
329 if wp.AtCapacity(it) || wp.AtQuota() || wp.instanceSet.throttleCreate.Error() != nil {
333 defer wp.mtx.Unlock()
334 // The maxConcurrentInstanceCreateOps knob throttles the number of node create
335 // requests in flight. It was added to work around a limitation in Azure's
336 // managed disks, which support no more than 20 concurrent node creation
337 // requests from a single disk image (cf.
338 // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
339 // The code assumes that node creation, from Azure's perspective, means the
340 // period until the instance appears in the "get all instances" list.
341 if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
342 logger.Info("reached MaxConcurrentInstanceCreateOps")
343 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
347 secret := randomHex(instanceSecretLength)
348 wp.creating[secret] = createCall{time: now, instanceType: it}
351 tags := cloud.InstanceTags{
352 wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID),
353 wp.tagKeyPrefix + tagKeyInstanceType: it.Name,
354 wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
355 wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
357 initCmd := TagVerifier{nil, secret, nil}.InitCommand() + "\n" + wp.instanceInitCommand
358 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
360 defer wp.mtx.Unlock()
361 // delete() is deferred so the updateWorker() call
362 // below knows to use StateBooting when adding a new
364 defer delete(wp.creating, secret)
366 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
368 n := len(wp.workers) + len(wp.creating) - 1
370 // Quota error with no
371 // instances running --
372 // nothing to do but wait
373 wp.atQuotaUntilFewerInstances = 0
374 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
375 time.AfterFunc(quotaErrorTTL, wp.notify)
376 logger.WithField("atQuotaUntil", wp.atQuotaUntil).Info("quota error with 0 running -- waiting for quotaErrorTTL")
377 } else if n < wp.atQuotaUntilFewerInstances || wp.atQuotaUntilFewerInstances == 0 {
378 // Quota error with N
379 // instances running -- report
380 // AtQuota until some
381 // instances shut down
382 wp.atQuotaUntilFewerInstances = n
383 wp.atQuotaUntil = time.Time{}
384 logger.WithField("atQuotaUntilFewerInstances", n).Info("quota error -- waiting for next instance shutdown")
387 if err, ok := err.(cloud.CapacityError); ok && err.IsCapacityError() {
388 var capKey interface{} = it.ProviderType
389 if err.IsInstanceTypeSpecific() {
390 capKey = it.ProviderType
391 } else if err.IsInstanceQuotaGroupSpecific() {
392 capKey = wp.instanceSet.InstanceQuotaGroup(it)
396 if wp.atCapacityUntil == nil {
397 wp.atCapacityUntil = map[interface{}]time.Time{}
399 wp.atCapacityUntil[capKey] = time.Now().Add(capacityErrorTTL)
400 time.AfterFunc(capacityErrorTTL, wp.notify)
402 logger.WithError(err).Error("create failed")
403 wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
406 wp.updateWorker(inst, it)
408 if len(wp.creating)+len(wp.workers) == wp.maxInstances {
409 logger.Infof("now at MaxInstances limit of %d instances", wp.maxInstances)
414 // AtCapacity returns true if Create() is currently expected to fail
415 // for the given instance type.
416 func (wp *Pool) AtCapacity(it arvados.InstanceType) bool {
418 defer wp.mtx.Unlock()
419 for _, capKey := range []interface{}{
420 "", // all instance types
421 wp.instanceSet.InstanceQuotaGroup(it), // instance quota group
422 it.ProviderType, // just this instance type
424 if t, ok := wp.atCapacityUntil[capKey]; ok && time.Now().Before(t) {
431 // AtQuota returns true if Create is not expected to work at the
432 // moment (e.g., cloud provider has reported quota errors, or we are
433 // already at our own configured quota).
434 func (wp *Pool) AtQuota() bool {
436 defer wp.mtx.Unlock()
437 return wp.atQuotaUntilFewerInstances > 0 ||
438 time.Now().Before(wp.atQuotaUntil) ||
439 (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating))
442 // SetIdleBehavior determines how the indicated instance will behave
443 // when it has no containers running.
444 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
446 defer wp.mtx.Unlock()
447 wkr, ok := wp.workers[id]
449 return errors.New("requested instance does not exist")
451 wkr.setIdleBehavior(idleBehavior)
455 // Successful connection to the SSH daemon, update the mTimeToSSH metric
456 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
458 defer wp.mtx.Unlock()
459 wkr, ok := wp.workers[inst.ID()]
461 // race: inst was removed from the pool
464 if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
465 // the node is not in booting state (can happen if
466 // a-d-c is restarted) OR this is not the first SSH
471 wkr.firstSSHConnection = time.Now()
472 if wp.mTimeToSSH != nil {
473 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
477 // Add or update worker attached to the given instance.
479 // The second return value is true if a new worker is created.
481 // A newly added instance has state=StateBooting if its tags match an
482 // entry in wp.creating, otherwise StateUnknown.
484 // Caller must have lock.
485 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
486 secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
487 inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
489 if wkr := wp.workers[id]; wkr != nil {
490 wkr.executor.SetTarget(inst)
492 wkr.updated = time.Now()
497 state := StateUnknown
498 if _, ok := wp.creating[secret]; ok {
502 // If an instance has a valid IdleBehavior tag when it first
503 // appears, initialize the new worker accordingly (this is how
504 // we restore IdleBehavior that was set by a prior dispatch
505 // process); otherwise, default to "run". After this,
506 // wkr.idleBehavior is the source of truth, and will only be
507 // changed via SetIdleBehavior().
508 idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
509 if !validIdleBehavior[idleBehavior] {
510 idleBehavior = IdleBehaviorRun
513 logger := wp.logger.WithFields(logrus.Fields{
514 "InstanceType": it.Name,
515 "Instance": inst.ID(),
516 "Address": inst.Address(),
518 logger.WithFields(logrus.Fields{
520 "IdleBehavior": idleBehavior,
521 }).Infof("instance appeared in cloud")
527 executor: wp.newExecutor(inst),
529 idleBehavior: idleBehavior,
536 running: make(map[string]*remoteRunner),
537 starting: make(map[string]*remoteRunner),
538 probing: make(chan struct{}, 1),
544 // Shutdown shuts down a worker with the given type, or returns false
545 // if all workers with the given type are busy.
546 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
547 wp.setupOnce.Do(wp.setup)
549 defer wp.mtx.Unlock()
550 logger := wp.logger.WithField("InstanceType", it.Name)
551 logger.Info("shutdown requested")
552 for _, tryState := range []State{StateBooting, StateIdle} {
553 // TODO: shutdown the worker with the longest idle
554 // time (Idle) or the earliest create time (Booting)
555 for _, wkr := range wp.workers {
556 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
557 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
558 wkr.reportBootOutcome(BootOutcomeAborted)
567 // CountWorkers returns the current number of workers in each state.
569 // CountWorkers blocks, if necessary, until the initial instance list
570 // has been loaded from the cloud provider.
571 func (wp *Pool) CountWorkers() map[State]int {
572 wp.setupOnce.Do(wp.setup)
575 defer wp.mtx.Unlock()
577 for _, w := range wp.workers {
583 // Running returns the container UUIDs being prepared/run on workers.
585 // In the returned map, the time value indicates when the Pool
586 // observed that the container process had exited. A container that
587 // has not yet exited has a zero time value. The caller should use
588 // ForgetContainer() to garbage-collect the entries for exited
590 func (wp *Pool) Running() map[string]time.Time {
591 wp.setupOnce.Do(wp.setup)
593 defer wp.mtx.Unlock()
594 r := map[string]time.Time{}
595 for _, wkr := range wp.workers {
596 for uuid := range wkr.running {
597 r[uuid] = time.Time{}
599 for uuid := range wkr.starting {
600 r[uuid] = time.Time{}
603 for uuid, exited := range wp.exited {
609 // StartContainer starts a container on an idle worker immediately if
610 // possible, otherwise returns false.
611 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
612 wp.setupOnce.Do(wp.setup)
614 defer wp.mtx.Unlock()
616 for _, w := range wp.workers {
617 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
618 if wkr == nil || w.busy.After(wkr.busy) {
626 wkr.startContainer(ctr)
630 // KillContainer kills the crunch-run process for the given container
631 // UUID, if it's running on any worker.
633 // KillContainer returns immediately; the act of killing the container
634 // takes some time, and runs in the background.
636 // KillContainer returns false if the container has already ended.
637 func (wp *Pool) KillContainer(uuid string, reason string) bool {
639 defer wp.mtx.Unlock()
640 logger := wp.logger.WithFields(logrus.Fields{
641 "ContainerUUID": uuid,
644 for _, wkr := range wp.workers {
645 rr := wkr.running[uuid]
647 rr = wkr.starting[uuid]
654 logger.Debug("cannot kill: already disappeared")
658 // ForgetContainer clears the placeholder for the given exited
659 // container, so it isn't returned by subsequent calls to Running().
661 // ForgetContainer has no effect if the container has not yet exited.
663 // The "container exited at time T" placeholder (which necessitates
664 // ForgetContainer) exists to make it easier for the caller
665 // (scheduler) to distinguish a container that exited without
666 // finalizing its state from a container that exited too recently for
667 // its final state to have appeared in the scheduler's queue cache.
668 func (wp *Pool) ForgetContainer(uuid string) {
670 defer wp.mtx.Unlock()
671 if _, ok := wp.exited[uuid]; ok {
672 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
673 delete(wp.exited, uuid)
677 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
679 reg = prometheus.NewRegistry()
681 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
682 Namespace: "arvados",
683 Subsystem: "dispatchcloud",
684 Name: "containers_running",
685 Help: "Number of containers reported running by cloud VMs.",
687 reg.MustRegister(wp.mContainersRunning)
688 wp.mProbeAgeMax = prometheus.NewGauge(prometheus.GaugeOpts{
689 Namespace: "arvados",
690 Subsystem: "dispatchcloud",
691 Name: "probe_age_seconds_max",
692 Help: "Maximum number of seconds since an instance's most recent successful probe.",
694 reg.MustRegister(wp.mProbeAgeMax)
695 wp.mProbeAgeMedian = prometheus.NewGauge(prometheus.GaugeOpts{
696 Namespace: "arvados",
697 Subsystem: "dispatchcloud",
698 Name: "probe_age_seconds_median",
699 Help: "Median number of seconds since an instance's most recent successful probe.",
701 reg.MustRegister(wp.mProbeAgeMedian)
702 wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
703 Namespace: "arvados",
704 Subsystem: "dispatchcloud",
705 Name: "instances_total",
706 Help: "Number of cloud VMs.",
707 }, []string{"category", "instance_type"})
708 reg.MustRegister(wp.mInstances)
709 wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
710 Namespace: "arvados",
711 Subsystem: "dispatchcloud",
712 Name: "instances_price",
713 Help: "Price of cloud VMs.",
714 }, []string{"category"})
715 reg.MustRegister(wp.mInstancesPrice)
716 wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
717 Namespace: "arvados",
718 Subsystem: "dispatchcloud",
720 Help: "Total VCPUs on all cloud VMs.",
721 }, []string{"category"})
722 reg.MustRegister(wp.mVCPUs)
723 wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
724 Namespace: "arvados",
725 Subsystem: "dispatchcloud",
726 Name: "memory_bytes_total",
727 Help: "Total memory on all cloud VMs.",
728 }, []string{"category"})
729 reg.MustRegister(wp.mMemory)
730 wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
731 Namespace: "arvados",
732 Subsystem: "dispatchcloud",
733 Name: "boot_outcomes",
734 Help: "Boot outcomes by type.",
735 }, []string{"outcome"})
736 for k := range validBootOutcomes {
737 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
739 reg.MustRegister(wp.mBootOutcomes)
740 wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
741 Namespace: "arvados",
742 Subsystem: "dispatchcloud",
743 Name: "instances_disappeared",
744 Help: "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
745 }, []string{"state"})
746 for _, v := range stateString {
747 wp.mDisappearances.WithLabelValues(v).Add(0)
749 reg.MustRegister(wp.mDisappearances)
750 wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
751 Namespace: "arvados",
752 Subsystem: "dispatchcloud",
753 Name: "instances_time_to_ssh_seconds",
754 Help: "Number of seconds between instance creation and the first successful SSH connection.",
755 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
757 reg.MustRegister(wp.mTimeToSSH)
758 wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
759 Namespace: "arvados",
760 Subsystem: "dispatchcloud",
761 Name: "instances_time_to_ready_for_container_seconds",
762 Help: "Number of seconds between the first successful SSH connection and ready to run a container.",
763 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
765 reg.MustRegister(wp.mTimeToReadyForContainer)
766 wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
767 Namespace: "arvados",
768 Subsystem: "dispatchcloud",
769 Name: "instances_time_from_shutdown_request_to_disappearance_seconds",
770 Help: "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
771 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
773 reg.MustRegister(wp.mTimeFromShutdownToGone)
774 wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
775 Namespace: "arvados",
776 Subsystem: "dispatchcloud",
777 Name: "containers_time_from_queue_to_crunch_run_seconds",
778 Help: "Number of seconds between the queuing of a container and the start of crunch-run.",
779 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
781 reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
782 wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
783 Namespace: "arvados",
784 Subsystem: "dispatchcloud",
785 Name: "instances_run_probe_duration_seconds",
786 Help: "Number of seconds per runProbe call.",
787 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
788 }, []string{"outcome"})
789 reg.MustRegister(wp.mRunProbeDuration)
792 func (wp *Pool) runMetrics() {
794 defer wp.Unsubscribe(ch)
801 func (wp *Pool) updateMetrics() {
803 defer wp.mtx.RUnlock()
809 instances := map[entKey]int64{}
810 price := map[string]float64{}
811 cpu := map[string]int64{}
812 mem := map[string]int64{}
815 var probed []time.Time
816 for _, wkr := range wp.workers {
819 case len(wkr.running)+len(wkr.starting) > 0:
821 case wkr.idleBehavior == IdleBehaviorHold:
823 case wkr.state == StateBooting:
825 case wkr.state == StateUnknown:
830 instances[entKey{cat, wkr.instType.Name}]++
831 price[cat] += wkr.instType.Price
832 cpu[cat] += int64(wkr.instType.VCPUs)
833 mem[cat] += int64(wkr.instType.RAM)
834 running += int64(len(wkr.running) + len(wkr.starting))
835 probed = append(probed, wkr.probed)
837 for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
838 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
839 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
840 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
841 // make sure to reset gauges for non-existing category/nodetype combinations
842 for _, it := range wp.instanceTypes {
843 if _, ok := instances[entKey{cat, it.Name}]; !ok {
844 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
848 for k, v := range instances {
849 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
851 wp.mContainersRunning.Set(float64(running))
853 if len(probed) == 0 {
854 wp.mProbeAgeMax.Set(0)
855 wp.mProbeAgeMedian.Set(0)
857 sort.Slice(probed, func(i, j int) bool { return probed[i].Before(probed[j]) })
858 wp.mProbeAgeMax.Set(now.Sub(probed[0]).Seconds())
859 wp.mProbeAgeMedian.Set(now.Sub(probed[len(probed)/2]).Seconds())
863 func (wp *Pool) runProbes() {
864 maxPPS := wp.maxProbesPerSecond
866 maxPPS = defaultMaxProbesPerSecond
868 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
869 defer limitticker.Stop()
871 probeticker := time.NewTicker(wp.probeInterval)
872 defer probeticker.Stop()
874 workers := []cloud.InstanceID{}
875 for range probeticker.C {
876 // Add some jitter. Without this, if probeInterval is
877 // a multiple of syncInterval and sync is
878 // instantaneous (as with the loopback driver), the
879 // first few probes race with sync operations and
880 // don't update the workers.
881 time.Sleep(time.Duration(mathrand.Int63n(int64(wp.probeInterval) / 23)))
883 workers = workers[:0]
885 for id, wkr := range wp.workers {
886 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
889 workers = append(workers, id)
893 for _, id := range workers {
895 wkr, ok := wp.workers[id]
898 // Deleted while we were probing
902 go wkr.ProbeAndUpdate()
906 case <-limitticker.C:
912 func (wp *Pool) runSync() {
913 // sync once immediately, then wait syncInterval, sync again,
915 timer := time.NewTimer(1)
919 err := wp.getInstancesAndSync()
921 wp.logger.WithError(err).Warn("sync failed")
923 timer.Reset(wp.syncInterval)
925 wp.logger.Debug("worker.Pool stopped")
931 // Stop synchronizing with the InstanceSet.
932 func (wp *Pool) Stop() {
933 wp.setupOnce.Do(wp.setup)
937 // Instances returns an InstanceView for each worker in the pool,
938 // summarizing its current state and recent activity.
939 func (wp *Pool) Instances() []InstanceView {
941 wp.setupOnce.Do(wp.setup)
943 for _, w := range wp.workers {
944 r = append(r, InstanceView{
945 Instance: w.instance.ID(),
946 Address: w.instance.Address(),
947 Price: w.instType.Price,
948 ArvadosInstanceType: w.instType.Name,
949 ProviderInstanceType: w.instType.ProviderType,
950 LastContainerUUID: w.lastUUID,
952 WorkerState: w.state.String(),
953 IdleBehavior: w.idleBehavior,
957 sort.Slice(r, func(i, j int) bool {
958 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
963 // KillInstance destroys a cloud VM instance. It returns an error if
964 // the given instance does not exist.
965 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
966 wp.setupOnce.Do(wp.setup)
968 defer wp.mtx.Unlock()
969 wkr, ok := wp.workers[id]
971 return errors.New("instance not found")
973 wkr.logger.WithField("Reason", reason).Info("shutting down")
974 wkr.reportBootOutcome(BootOutcomeAborted)
979 func (wp *Pool) setup() {
980 wp.creating = map[string]createCall{}
981 wp.exited = map[string]time.Time{}
982 wp.workers = map[cloud.InstanceID]*worker{}
983 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
987 // Load the runner program to be deployed on worker nodes into
988 // wp.runnerData, if necessary. Errors are logged.
990 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
992 // Caller must not have lock.
993 func (wp *Pool) loadRunnerData() error {
995 defer wp.mtx.Unlock()
996 if wp.runnerData != nil {
998 } else if wp.runnerSource == "" {
999 wp.runnerCmd = wp.runnerCmdDefault
1000 wp.runnerData = []byte{}
1003 logger := wp.logger.WithField("source", wp.runnerSource)
1004 logger.Debug("loading runner")
1005 buf, err := ioutil.ReadFile(wp.runnerSource)
1007 logger.WithError(err).Error("failed to load runner program")
1011 wp.runnerMD5 = md5.Sum(buf)
1012 wp.runnerCmd = fmt.Sprintf("%s/crunch-run~%x", wp.runnerDeployDirectory, wp.runnerMD5)
1016 func (wp *Pool) notify() {
1018 defer wp.mtx.RUnlock()
1019 for _, send := range wp.subscribers {
1021 case send <- struct{}{}:
1027 func (wp *Pool) getInstancesAndSync() error {
1028 wp.setupOnce.Do(wp.setup)
1029 if err := wp.instanceSet.throttleInstances.Error(); err != nil {
1032 wp.logger.Debug("getting instance list")
1033 threshold := time.Now()
1034 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
1036 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
1039 wp.sync(threshold, instances)
1040 wp.logger.Debug("sync done")
1044 // Add/remove/update workers based on instances, which was obtained
1045 // from the instanceSet. However, don't clobber any other updates that
1046 // already happened after threshold.
1047 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
1049 defer wp.mtx.Unlock()
1050 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
1053 for _, inst := range instances {
1054 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
1055 it, ok := wp.instanceTypes[itTag]
1057 wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
1060 if wkr, isNew := wp.updateWorker(inst, it); isNew {
1062 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
1063 wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
1068 for id, wkr := range wp.workers {
1069 if wkr.updated.After(threshold) {
1072 logger := wp.logger.WithFields(logrus.Fields{
1073 "Instance": wkr.instance.ID(),
1074 "WorkerState": wkr.state,
1076 logger.Info("instance disappeared in cloud")
1077 wkr.reportBootOutcome(BootOutcomeDisappeared)
1078 if wp.mDisappearances != nil {
1079 wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
1081 // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
1082 if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
1083 wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
1085 delete(wp.workers, id)
1090 if wp.atQuotaUntilFewerInstances > len(wp.workers)+len(wp.creating) {
1091 // After syncing, there are fewer instances (including
1092 // pending creates) than there were last time we saw a
1093 // quota error. This might mean it's now possible to
1094 // create new instances. Reset our "at quota" state.
1095 wp.atQuotaUntilFewerInstances = 0
1101 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
1109 func (wp *Pool) waitUntilLoaded() {
1110 ch := wp.Subscribe()
1112 defer wp.mtx.RUnlock()
1120 func (wp *Pool) gatewayAuthSecret(uuid string) string {
1121 h := hmac.New(sha256.New, []byte(wp.systemRootToken))
1123 return fmt.Sprintf("%x", h.Sum(nil))
1126 // Return a random string of n hexadecimal digits (n*4 random bits). n
1128 func randomHex(n int) string {
1129 buf := make([]byte, n/2)
1130 _, err := rand.Read(buf)
1134 return fmt.Sprintf("%x", buf)