1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
19 "git.arvados.org/arvados.git/lib/cloud"
20 "git.arvados.org/arvados.git/sdk/go/arvados"
21 "github.com/prometheus/client_golang/prometheus"
22 "github.com/sirupsen/logrus"
23 "golang.org/x/crypto/ssh"
27 tagKeyInstanceType = "InstanceType"
28 tagKeyIdleBehavior = "IdleBehavior"
29 tagKeyInstanceSecret = "InstanceSecret"
30 tagKeyInstanceSetID = "InstanceSetID"
33 // An InstanceView shows a worker's current state and recent activity.
34 type InstanceView struct {
35 Instance cloud.InstanceID `json:"instance"`
36 Address string `json:"address"`
37 Price float64 `json:"price"`
38 ArvadosInstanceType string `json:"arvados_instance_type"`
39 ProviderInstanceType string `json:"provider_instance_type"`
40 LastContainerUUID string `json:"last_container_uuid"`
41 LastBusy time.Time `json:"last_busy"`
42 WorkerState string `json:"worker_state"`
43 IdleBehavior IdleBehavior `json:"idle_behavior"`
46 // An Executor executes shell commands on a remote host.
47 type Executor interface {
48 // Run cmd on the current target.
49 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
51 // Use the given target for subsequent operations. The new
52 // target is the same host as the previous target, but it
53 // might return a different address and verify a different
56 // SetTarget is called frequently, and in most cases the new
57 // target will behave exactly the same as the old one. An
58 // implementation should optimize accordingly.
60 // SetTarget must not block on concurrent Execute calls.
61 SetTarget(cloud.ExecutorTarget)
67 defaultSyncInterval = time.Minute
68 defaultProbeInterval = time.Second * 10
69 defaultMaxProbesPerSecond = 10
70 defaultTimeoutIdle = time.Minute
71 defaultTimeoutBooting = time.Minute * 10
72 defaultTimeoutProbe = time.Minute * 10
73 defaultTimeoutShutdown = time.Second * 10
74 defaultTimeoutTERM = time.Minute * 2
75 defaultTimeoutSignal = time.Second * 5
76 defaultTimeoutStaleRunLock = time.Second * 5
78 // Time after a quota error to try again anyway, even if no
79 // instances have been shutdown.
80 quotaErrorTTL = time.Minute
82 // Time between "X failed because rate limiting" messages
83 logRateLimitErrorInterval = time.Second * 10
86 func duration(conf arvados.Duration, def time.Duration) time.Duration {
88 return time.Duration(conf)
94 // NewPool creates a Pool of workers backed by instanceSet.
96 // New instances are configured and set up according to the given
97 // cluster configuration.
98 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
101 arvClient: arvClient,
102 instanceSetID: instanceSetID,
103 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
104 newExecutor: newExecutor,
105 bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
106 runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
107 imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
108 instanceTypes: cluster.InstanceTypes,
109 maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
110 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
111 probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
112 syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
113 timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
114 timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
115 timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
116 timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
117 timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
118 timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
119 timeoutStaleRunLock: duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
120 installPublicKey: installPublicKey,
121 tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
122 stop: make(chan bool),
124 wp.registerMetrics(reg)
126 wp.setupOnce.Do(wp.setup)
134 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
135 // zero Pool should not be used. Call NewPool to create a new Pool.
138 logger logrus.FieldLogger
139 arvClient *arvados.Client
140 instanceSetID cloud.InstanceSetID
141 instanceSet *throttledInstanceSet
142 newExecutor func(cloud.Instance) Executor
143 bootProbeCommand string
145 imageID cloud.ImageID
146 instanceTypes map[string]arvados.InstanceType
147 syncInterval time.Duration
148 probeInterval time.Duration
149 maxProbesPerSecond int
150 maxConcurrentInstanceCreateOps int
151 timeoutIdle time.Duration
152 timeoutBooting time.Duration
153 timeoutProbe time.Duration
154 timeoutShutdown time.Duration
155 timeoutTERM time.Duration
156 timeoutSignal time.Duration
157 timeoutStaleRunLock time.Duration
158 installPublicKey ssh.PublicKey
162 subscribers map[<-chan struct{}]chan<- struct{}
163 creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
164 workers map[cloud.InstanceID]*worker
165 loaded bool // loaded list of instances from InstanceSet at least once
166 exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
167 atQuotaUntil time.Time
168 atQuotaErr cloud.QuotaError
173 runnerMD5 [md5.Size]byte
176 mContainersRunning prometheus.Gauge
177 mInstances *prometheus.GaugeVec
178 mInstancesPrice *prometheus.GaugeVec
179 mVCPUs *prometheus.GaugeVec
180 mMemory *prometheus.GaugeVec
181 mBootOutcomes *prometheus.CounterVec
182 mDisappearances *prometheus.CounterVec
183 mTimeToSSH prometheus.Summary
184 mTimeToReadyForContainer prometheus.Summary
185 mTimeFromShutdownToGone prometheus.Summary
186 mTimeFromQueueToCrunchRun prometheus.Summary
187 mRunProbeDuration *prometheus.SummaryVec
190 type createCall struct {
192 instanceType arvados.InstanceType
195 func (wp *Pool) CheckHealth() error {
196 wp.setupOnce.Do(wp.setup)
197 if err := wp.loadRunnerData(); err != nil {
198 return fmt.Errorf("error loading runner binary: %s", err)
203 // Subscribe returns a buffered channel that becomes ready after any
204 // change to the pool's state that could have scheduling implications:
205 // a worker's state changes, a new worker appears, the cloud
206 // provider's API rate limiting period ends, etc.
208 // Additional events that occur while the channel is already ready
209 // will be dropped, so it is OK if the caller services the channel
214 // ch := wp.Subscribe()
215 // defer wp.Unsubscribe(ch)
222 func (wp *Pool) Subscribe() <-chan struct{} {
223 wp.setupOnce.Do(wp.setup)
225 defer wp.mtx.Unlock()
226 ch := make(chan struct{}, 1)
227 wp.subscribers[ch] = ch
231 // Unsubscribe stops sending updates to the given channel.
232 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
233 wp.setupOnce.Do(wp.setup)
235 defer wp.mtx.Unlock()
236 delete(wp.subscribers, ch)
239 // Unallocated returns the number of unallocated (creating + booting +
240 // idle + unknown) workers for each instance type. Workers in
241 // hold/drain mode are not included.
242 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
243 wp.setupOnce.Do(wp.setup)
245 defer wp.mtx.RUnlock()
246 unalloc := map[arvados.InstanceType]int{}
247 creating := map[arvados.InstanceType]int{}
248 oldestCreate := map[arvados.InstanceType]time.Time{}
249 for _, cc := range wp.creating {
250 it := cc.instanceType
252 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
253 oldestCreate[it] = cc.time
256 for _, wkr := range wp.workers {
257 // Skip workers that are not expected to become
258 // available soon. Note len(wkr.running)>0 is not
259 // redundant here: it can be true even in
261 if wkr.state == StateShutdown ||
262 wkr.state == StateRunning ||
263 wkr.idleBehavior != IdleBehaviorRun ||
264 len(wkr.running) > 0 {
269 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
270 // If up to N new workers appear in
271 // Instances() while we are waiting for N
272 // Create() calls to complete, we assume we're
273 // just seeing a race between Instances() and
274 // Create() responses.
276 // The other common reason why nodes have
277 // state==Unknown is that they appeared at
278 // startup, before any Create calls. They
279 // don't match the above timing condition, so
280 // we never mistakenly attribute them to
281 // pending Create calls.
285 for it, c := range creating {
291 // Create a new instance with the given type, and add it to the worker
292 // pool. The worker is added immediately; instance creation runs in
295 // Create returns false if a pre-existing error state prevents it from
296 // even attempting to create a new instance. Those errors are logged
297 // by the Pool, so the caller does not need to log anything in such
299 func (wp *Pool) Create(it arvados.InstanceType) bool {
300 logger := wp.logger.WithField("InstanceType", it.Name)
301 wp.setupOnce.Do(wp.setup)
302 if wp.loadRunnerData() != nil {
303 // Boot probe is certain to fail.
307 defer wp.mtx.Unlock()
308 if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil {
311 // The maxConcurrentInstanceCreateOps knob throttles the number of node create
312 // requests in flight. It was added to work around a limitation in Azure's
313 // managed disks, which support no more than 20 concurrent node creation
314 // requests from a single disk image (cf.
315 // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
316 // The code assumes that node creation, from Azure's perspective, means the
317 // period until the instance appears in the "get all instances" list.
318 if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
319 logger.Info("reached MaxConcurrentInstanceCreateOps")
320 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
324 secret := randomHex(instanceSecretLength)
325 wp.creating[secret] = createCall{time: now, instanceType: it}
328 tags := cloud.InstanceTags{
329 wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID),
330 wp.tagKeyPrefix + tagKeyInstanceType: it.Name,
331 wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
332 wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
334 initCmd := TagVerifier{nil, secret, nil}.InitCommand()
335 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
337 defer wp.mtx.Unlock()
338 // delete() is deferred so the updateWorker() call
339 // below knows to use StateBooting when adding a new
341 defer delete(wp.creating, secret)
343 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
345 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
346 time.AfterFunc(quotaErrorTTL, wp.notify)
348 logger.WithError(err).Error("create failed")
349 wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
352 wp.updateWorker(inst, it)
357 // AtQuota returns true if Create is not expected to work at the
359 func (wp *Pool) AtQuota() bool {
361 defer wp.mtx.Unlock()
362 return time.Now().Before(wp.atQuotaUntil)
365 // SetIdleBehavior determines how the indicated instance will behave
366 // when it has no containers running.
367 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
369 defer wp.mtx.Unlock()
370 wkr, ok := wp.workers[id]
372 return errors.New("requested instance does not exist")
374 wkr.setIdleBehavior(idleBehavior)
378 // Successful connection to the SSH daemon, update the mTimeToSSH metric
379 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
381 defer wp.mtx.Unlock()
382 wkr := wp.workers[inst.ID()]
383 if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
384 // the node is not in booting state (can happen if a-d-c is restarted) OR
385 // this is not the first SSH connection
389 wkr.firstSSHConnection = time.Now()
390 if wp.mTimeToSSH != nil {
391 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
395 // Add or update worker attached to the given instance.
397 // The second return value is true if a new worker is created.
399 // A newly added instance has state=StateBooting if its tags match an
400 // entry in wp.creating, otherwise StateUnknown.
402 // Caller must have lock.
403 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
404 secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
405 inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
407 if wkr := wp.workers[id]; wkr != nil {
408 wkr.executor.SetTarget(inst)
410 wkr.updated = time.Now()
415 state := StateUnknown
416 if _, ok := wp.creating[secret]; ok {
420 // If an instance has a valid IdleBehavior tag when it first
421 // appears, initialize the new worker accordingly (this is how
422 // we restore IdleBehavior that was set by a prior dispatch
423 // process); otherwise, default to "run". After this,
424 // wkr.idleBehavior is the source of truth, and will only be
425 // changed via SetIdleBehavior().
426 idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
427 if !validIdleBehavior[idleBehavior] {
428 idleBehavior = IdleBehaviorRun
431 logger := wp.logger.WithFields(logrus.Fields{
432 "InstanceType": it.Name,
433 "Instance": inst.ID(),
434 "Address": inst.Address(),
436 logger.WithFields(logrus.Fields{
438 "IdleBehavior": idleBehavior,
439 }).Infof("instance appeared in cloud")
445 executor: wp.newExecutor(inst),
447 idleBehavior: idleBehavior,
454 running: make(map[string]*remoteRunner),
455 starting: make(map[string]*remoteRunner),
456 probing: make(chan struct{}, 1),
462 // Shutdown shuts down a worker with the given type, or returns false
463 // if all workers with the given type are busy.
464 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
465 wp.setupOnce.Do(wp.setup)
467 defer wp.mtx.Unlock()
468 logger := wp.logger.WithField("InstanceType", it.Name)
469 logger.Info("shutdown requested")
470 for _, tryState := range []State{StateBooting, StateIdle} {
471 // TODO: shutdown the worker with the longest idle
472 // time (Idle) or the earliest create time (Booting)
473 for _, wkr := range wp.workers {
474 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
475 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
476 wkr.reportBootOutcome(BootOutcomeAborted)
485 // CountWorkers returns the current number of workers in each state.
487 // CountWorkers blocks, if necessary, until the initial instance list
488 // has been loaded from the cloud provider.
489 func (wp *Pool) CountWorkers() map[State]int {
490 wp.setupOnce.Do(wp.setup)
493 defer wp.mtx.Unlock()
495 for _, w := range wp.workers {
501 // Running returns the container UUIDs being prepared/run on workers.
503 // In the returned map, the time value indicates when the Pool
504 // observed that the container process had exited. A container that
505 // has not yet exited has a zero time value. The caller should use
506 // ForgetContainer() to garbage-collect the entries for exited
508 func (wp *Pool) Running() map[string]time.Time {
509 wp.setupOnce.Do(wp.setup)
511 defer wp.mtx.Unlock()
512 r := map[string]time.Time{}
513 for _, wkr := range wp.workers {
514 for uuid := range wkr.running {
515 r[uuid] = time.Time{}
517 for uuid := range wkr.starting {
518 r[uuid] = time.Time{}
521 for uuid, exited := range wp.exited {
527 // StartContainer starts a container on an idle worker immediately if
528 // possible, otherwise returns false.
529 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
530 wp.setupOnce.Do(wp.setup)
532 defer wp.mtx.Unlock()
534 for _, w := range wp.workers {
535 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
536 if wkr == nil || w.busy.After(wkr.busy) {
544 wkr.startContainer(ctr)
548 // KillContainer kills the crunch-run process for the given container
549 // UUID, if it's running on any worker.
551 // KillContainer returns immediately; the act of killing the container
552 // takes some time, and runs in the background.
554 // KillContainer returns false if the container has already ended.
555 func (wp *Pool) KillContainer(uuid string, reason string) bool {
557 defer wp.mtx.Unlock()
558 logger := wp.logger.WithFields(logrus.Fields{
559 "ContainerUUID": uuid,
562 for _, wkr := range wp.workers {
563 rr := wkr.running[uuid]
565 rr = wkr.starting[uuid]
572 logger.Debug("cannot kill: already disappeared")
576 // ForgetContainer clears the placeholder for the given exited
577 // container, so it isn't returned by subsequent calls to Running().
579 // ForgetContainer has no effect if the container has not yet exited.
581 // The "container exited at time T" placeholder (which necessitates
582 // ForgetContainer) exists to make it easier for the caller
583 // (scheduler) to distinguish a container that exited without
584 // finalizing its state from a container that exited too recently for
585 // its final state to have appeared in the scheduler's queue cache.
586 func (wp *Pool) ForgetContainer(uuid string) {
588 defer wp.mtx.Unlock()
589 if _, ok := wp.exited[uuid]; ok {
590 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
591 delete(wp.exited, uuid)
595 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
597 reg = prometheus.NewRegistry()
599 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
600 Namespace: "arvados",
601 Subsystem: "dispatchcloud",
602 Name: "containers_running",
603 Help: "Number of containers reported running by cloud VMs.",
605 reg.MustRegister(wp.mContainersRunning)
606 wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
607 Namespace: "arvados",
608 Subsystem: "dispatchcloud",
609 Name: "instances_total",
610 Help: "Number of cloud VMs.",
611 }, []string{"category", "instance_type"})
612 reg.MustRegister(wp.mInstances)
613 wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
614 Namespace: "arvados",
615 Subsystem: "dispatchcloud",
616 Name: "instances_price",
617 Help: "Price of cloud VMs.",
618 }, []string{"category"})
619 reg.MustRegister(wp.mInstancesPrice)
620 wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
621 Namespace: "arvados",
622 Subsystem: "dispatchcloud",
624 Help: "Total VCPUs on all cloud VMs.",
625 }, []string{"category"})
626 reg.MustRegister(wp.mVCPUs)
627 wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
628 Namespace: "arvados",
629 Subsystem: "dispatchcloud",
630 Name: "memory_bytes_total",
631 Help: "Total memory on all cloud VMs.",
632 }, []string{"category"})
633 reg.MustRegister(wp.mMemory)
634 wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
635 Namespace: "arvados",
636 Subsystem: "dispatchcloud",
637 Name: "boot_outcomes",
638 Help: "Boot outcomes by type.",
639 }, []string{"outcome"})
640 for k := range validBootOutcomes {
641 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
643 reg.MustRegister(wp.mBootOutcomes)
644 wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
645 Namespace: "arvados",
646 Subsystem: "dispatchcloud",
647 Name: "instances_disappeared",
648 Help: "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
649 }, []string{"state"})
650 for _, v := range stateString {
651 wp.mDisappearances.WithLabelValues(v).Add(0)
653 reg.MustRegister(wp.mDisappearances)
654 wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
655 Namespace: "arvados",
656 Subsystem: "dispatchcloud",
657 Name: "instances_time_to_ssh_seconds",
658 Help: "Number of seconds between instance creation and the first successful SSH connection.",
659 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
661 reg.MustRegister(wp.mTimeToSSH)
662 wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
663 Namespace: "arvados",
664 Subsystem: "dispatchcloud",
665 Name: "instances_time_to_ready_for_container_seconds",
666 Help: "Number of seconds between the first successful SSH connection and ready to run a container.",
667 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
669 reg.MustRegister(wp.mTimeToReadyForContainer)
670 wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
671 Namespace: "arvados",
672 Subsystem: "dispatchcloud",
673 Name: "instances_time_from_shutdown_request_to_disappearance_seconds",
674 Help: "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
675 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
677 reg.MustRegister(wp.mTimeFromShutdownToGone)
678 wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
679 Namespace: "arvados",
680 Subsystem: "dispatchcloud",
681 Name: "containers_time_from_queue_to_crunch_run_seconds",
682 Help: "Number of seconds between the queuing of a container and the start of crunch-run.",
683 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
685 reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
686 wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
687 Namespace: "arvados",
688 Subsystem: "dispatchcloud",
689 Name: "instances_run_probe_duration_seconds",
690 Help: "Number of seconds per runProbe call.",
691 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
692 }, []string{"outcome"})
693 for _, v := range []string{"success", "fail"} {
694 wp.mRunProbeDuration.WithLabelValues(v).Observe(0)
696 reg.MustRegister(wp.mRunProbeDuration)
699 func (wp *Pool) runMetrics() {
701 defer wp.Unsubscribe(ch)
708 func (wp *Pool) updateMetrics() {
710 defer wp.mtx.RUnlock()
716 instances := map[entKey]int64{}
717 price := map[string]float64{}
718 cpu := map[string]int64{}
719 mem := map[string]int64{}
721 for _, wkr := range wp.workers {
724 case len(wkr.running)+len(wkr.starting) > 0:
726 case wkr.idleBehavior == IdleBehaviorHold:
728 case wkr.state == StateBooting:
730 case wkr.state == StateUnknown:
735 instances[entKey{cat, wkr.instType.Name}]++
736 price[cat] += wkr.instType.Price
737 cpu[cat] += int64(wkr.instType.VCPUs)
738 mem[cat] += int64(wkr.instType.RAM)
739 running += int64(len(wkr.running) + len(wkr.starting))
741 for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
742 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
743 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
744 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
745 // make sure to reset gauges for non-existing category/nodetype combinations
746 for _, it := range wp.instanceTypes {
747 if _, ok := instances[entKey{cat, it.Name}]; !ok {
748 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
752 for k, v := range instances {
753 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
755 wp.mContainersRunning.Set(float64(running))
758 func (wp *Pool) runProbes() {
759 maxPPS := wp.maxProbesPerSecond
761 maxPPS = defaultMaxProbesPerSecond
763 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
764 defer limitticker.Stop()
766 probeticker := time.NewTicker(wp.probeInterval)
767 defer probeticker.Stop()
769 workers := []cloud.InstanceID{}
770 for range probeticker.C {
771 workers = workers[:0]
773 for id, wkr := range wp.workers {
774 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
777 workers = append(workers, id)
781 for _, id := range workers {
783 wkr, ok := wp.workers[id]
786 // Deleted while we were probing
790 go wkr.ProbeAndUpdate()
794 case <-limitticker.C:
800 func (wp *Pool) runSync() {
801 // sync once immediately, then wait syncInterval, sync again,
803 timer := time.NewTimer(1)
807 err := wp.getInstancesAndSync()
809 wp.logger.WithError(err).Warn("sync failed")
811 timer.Reset(wp.syncInterval)
813 wp.logger.Debug("worker.Pool stopped")
819 // Stop synchronizing with the InstanceSet.
820 func (wp *Pool) Stop() {
821 wp.setupOnce.Do(wp.setup)
825 // Instances returns an InstanceView for each worker in the pool,
826 // summarizing its current state and recent activity.
827 func (wp *Pool) Instances() []InstanceView {
829 wp.setupOnce.Do(wp.setup)
831 for _, w := range wp.workers {
832 r = append(r, InstanceView{
833 Instance: w.instance.ID(),
834 Address: w.instance.Address(),
835 Price: w.instType.Price,
836 ArvadosInstanceType: w.instType.Name,
837 ProviderInstanceType: w.instType.ProviderType,
838 LastContainerUUID: w.lastUUID,
840 WorkerState: w.state.String(),
841 IdleBehavior: w.idleBehavior,
845 sort.Slice(r, func(i, j int) bool {
846 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
851 // KillInstance destroys a cloud VM instance. It returns an error if
852 // the given instance does not exist.
853 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
854 wkr, ok := wp.workers[id]
856 return errors.New("instance not found")
858 wkr.logger.WithField("Reason", reason).Info("shutting down")
859 wkr.reportBootOutcome(BootOutcomeAborted)
864 func (wp *Pool) setup() {
865 wp.creating = map[string]createCall{}
866 wp.exited = map[string]time.Time{}
867 wp.workers = map[cloud.InstanceID]*worker{}
868 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
872 // Load the runner program to be deployed on worker nodes into
873 // wp.runnerData, if necessary. Errors are logged.
875 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
877 // Caller must not have lock.
878 func (wp *Pool) loadRunnerData() error {
880 defer wp.mtx.Unlock()
881 if wp.runnerData != nil {
883 } else if wp.runnerSource == "" {
884 wp.runnerCmd = "crunch-run"
885 wp.runnerData = []byte{}
888 logger := wp.logger.WithField("source", wp.runnerSource)
889 logger.Debug("loading runner")
890 buf, err := ioutil.ReadFile(wp.runnerSource)
892 logger.WithError(err).Error("failed to load runner program")
896 wp.runnerMD5 = md5.Sum(buf)
897 wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
901 func (wp *Pool) notify() {
903 defer wp.mtx.RUnlock()
904 for _, send := range wp.subscribers {
906 case send <- struct{}{}:
912 func (wp *Pool) getInstancesAndSync() error {
913 wp.setupOnce.Do(wp.setup)
914 if err := wp.instanceSet.throttleInstances.Error(); err != nil {
917 wp.logger.Debug("getting instance list")
918 threshold := time.Now()
919 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
921 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
924 wp.sync(threshold, instances)
925 wp.logger.Debug("sync done")
929 // Add/remove/update workers based on instances, which was obtained
930 // from the instanceSet. However, don't clobber any other updates that
931 // already happened after threshold.
932 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
934 defer wp.mtx.Unlock()
935 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
938 for _, inst := range instances {
939 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
940 it, ok := wp.instanceTypes[itTag]
942 wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
945 if wkr, isNew := wp.updateWorker(inst, it); isNew {
947 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
948 wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
953 for id, wkr := range wp.workers {
954 if wkr.updated.After(threshold) {
957 logger := wp.logger.WithFields(logrus.Fields{
958 "Instance": wkr.instance.ID(),
959 "WorkerState": wkr.state,
961 logger.Info("instance disappeared in cloud")
962 wkr.reportBootOutcome(BootOutcomeDisappeared)
963 if wp.mDisappearances != nil {
964 wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
966 // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
967 if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
968 wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
970 delete(wp.workers, id)
978 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
986 func (wp *Pool) waitUntilLoaded() {
989 defer wp.mtx.RUnlock()
997 // Return a random string of n hexadecimal digits (n*4 random bits). n
999 func randomHex(n int) string {
1000 buf := make([]byte, n/2)
1001 _, err := rand.Read(buf)
1005 return fmt.Sprintf("%x", buf)