1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
19 "git.arvados.org/arvados.git/lib/cloud"
20 "git.arvados.org/arvados.git/sdk/go/arvados"
21 "github.com/prometheus/client_golang/prometheus"
22 "github.com/sirupsen/logrus"
23 "golang.org/x/crypto/ssh"
27 tagKeyInstanceType = "InstanceType"
28 tagKeyIdleBehavior = "IdleBehavior"
29 tagKeyInstanceSecret = "InstanceSecret"
30 tagKeyInstanceSetID = "InstanceSetID"
33 // An InstanceView shows a worker's current state and recent activity.
34 type InstanceView struct {
35 Instance cloud.InstanceID `json:"instance"`
36 Address string `json:"address"`
37 Price float64 `json:"price"`
38 ArvadosInstanceType string `json:"arvados_instance_type"`
39 ProviderInstanceType string `json:"provider_instance_type"`
40 LastContainerUUID string `json:"last_container_uuid"`
41 LastBusy time.Time `json:"last_busy"`
42 WorkerState string `json:"worker_state"`
43 IdleBehavior IdleBehavior `json:"idle_behavior"`
46 // An Executor executes shell commands on a remote host.
47 type Executor interface {
48 // Run cmd on the current target.
49 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
51 // Use the given target for subsequent operations. The new
52 // target is the same host as the previous target, but it
53 // might return a different address and verify a different
56 // SetTarget is called frequently, and in most cases the new
57 // target will behave exactly the same as the old one. An
58 // implementation should optimize accordingly.
60 // SetTarget must not block on concurrent Execute calls.
61 SetTarget(cloud.ExecutorTarget)
67 defaultSyncInterval = time.Minute
68 defaultProbeInterval = time.Second * 10
69 defaultMaxProbesPerSecond = 10
70 defaultTimeoutIdle = time.Minute
71 defaultTimeoutBooting = time.Minute * 10
72 defaultTimeoutProbe = time.Minute * 10
73 defaultTimeoutShutdown = time.Second * 10
74 defaultTimeoutTERM = time.Minute * 2
75 defaultTimeoutSignal = time.Second * 5
77 // Time after a quota error to try again anyway, even if no
78 // instances have been shutdown.
79 quotaErrorTTL = time.Minute
81 // Time between "X failed because rate limiting" messages
82 logRateLimitErrorInterval = time.Second * 10
85 func duration(conf arvados.Duration, def time.Duration) time.Duration {
87 return time.Duration(conf)
93 // NewPool creates a Pool of workers backed by instanceSet.
95 // New instances are configured and set up according to the given
96 // cluster configuration.
97 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
100 arvClient: arvClient,
101 instanceSetID: instanceSetID,
102 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
103 newExecutor: newExecutor,
104 bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
105 runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
106 imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
107 instanceTypes: cluster.InstanceTypes,
108 maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
109 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
110 probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
111 syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
112 timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
113 timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
114 timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
115 timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
116 timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
117 timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
118 installPublicKey: installPublicKey,
119 tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
120 stop: make(chan bool),
122 wp.registerMetrics(reg)
124 wp.setupOnce.Do(wp.setup)
132 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
133 // zero Pool should not be used. Call NewPool to create a new Pool.
136 logger logrus.FieldLogger
137 arvClient *arvados.Client
138 instanceSetID cloud.InstanceSetID
139 instanceSet *throttledInstanceSet
140 newExecutor func(cloud.Instance) Executor
141 bootProbeCommand string
143 imageID cloud.ImageID
144 instanceTypes map[string]arvados.InstanceType
145 syncInterval time.Duration
146 probeInterval time.Duration
147 maxProbesPerSecond int
148 maxConcurrentInstanceCreateOps int
149 timeoutIdle time.Duration
150 timeoutBooting time.Duration
151 timeoutProbe time.Duration
152 timeoutShutdown time.Duration
153 timeoutTERM time.Duration
154 timeoutSignal time.Duration
155 installPublicKey ssh.PublicKey
159 subscribers map[<-chan struct{}]chan<- struct{}
160 creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
161 workers map[cloud.InstanceID]*worker
162 loaded bool // loaded list of instances from InstanceSet at least once
163 exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
164 atQuotaUntil time.Time
165 atQuotaErr cloud.QuotaError
170 runnerMD5 [md5.Size]byte
173 mContainersRunning prometheus.Gauge
174 mInstances *prometheus.GaugeVec
175 mInstancesPrice *prometheus.GaugeVec
176 mVCPUs *prometheus.GaugeVec
177 mMemory *prometheus.GaugeVec
178 mBootOutcomes *prometheus.CounterVec
179 mDisappearances *prometheus.CounterVec
180 mTimeToSSH prometheus.Summary
181 mTimeToReadyForContainer prometheus.Summary
182 mTimeFromShutdownToGone prometheus.Summary
185 type createCall struct {
187 instanceType arvados.InstanceType
190 func (wp *Pool) CheckHealth() error {
191 wp.setupOnce.Do(wp.setup)
192 if err := wp.loadRunnerData(); err != nil {
193 return fmt.Errorf("error loading runner binary: %s", err)
198 // Subscribe returns a buffered channel that becomes ready after any
199 // change to the pool's state that could have scheduling implications:
200 // a worker's state changes, a new worker appears, the cloud
201 // provider's API rate limiting period ends, etc.
203 // Additional events that occur while the channel is already ready
204 // will be dropped, so it is OK if the caller services the channel
209 // ch := wp.Subscribe()
210 // defer wp.Unsubscribe(ch)
217 func (wp *Pool) Subscribe() <-chan struct{} {
218 wp.setupOnce.Do(wp.setup)
220 defer wp.mtx.Unlock()
221 ch := make(chan struct{}, 1)
222 wp.subscribers[ch] = ch
226 // Unsubscribe stops sending updates to the given channel.
227 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
228 wp.setupOnce.Do(wp.setup)
230 defer wp.mtx.Unlock()
231 delete(wp.subscribers, ch)
234 // Unallocated returns the number of unallocated (creating + booting +
235 // idle + unknown) workers for each instance type. Workers in
236 // hold/drain mode are not included.
237 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
238 wp.setupOnce.Do(wp.setup)
240 defer wp.mtx.RUnlock()
241 unalloc := map[arvados.InstanceType]int{}
242 creating := map[arvados.InstanceType]int{}
243 oldestCreate := map[arvados.InstanceType]time.Time{}
244 for _, cc := range wp.creating {
245 it := cc.instanceType
247 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
248 oldestCreate[it] = cc.time
251 for _, wkr := range wp.workers {
252 // Skip workers that are not expected to become
253 // available soon. Note len(wkr.running)>0 is not
254 // redundant here: it can be true even in
256 if wkr.state == StateShutdown ||
257 wkr.state == StateRunning ||
258 wkr.idleBehavior != IdleBehaviorRun ||
259 len(wkr.running) > 0 {
264 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
265 // If up to N new workers appear in
266 // Instances() while we are waiting for N
267 // Create() calls to complete, we assume we're
268 // just seeing a race between Instances() and
269 // Create() responses.
271 // The other common reason why nodes have
272 // state==Unknown is that they appeared at
273 // startup, before any Create calls. They
274 // don't match the above timing condition, so
275 // we never mistakenly attribute them to
276 // pending Create calls.
280 for it, c := range creating {
286 // Create a new instance with the given type, and add it to the worker
287 // pool. The worker is added immediately; instance creation runs in
290 // Create returns false if a pre-existing error state prevents it from
291 // even attempting to create a new instance. Those errors are logged
292 // by the Pool, so the caller does not need to log anything in such
294 func (wp *Pool) Create(it arvados.InstanceType) bool {
295 logger := wp.logger.WithField("InstanceType", it.Name)
296 wp.setupOnce.Do(wp.setup)
297 if wp.loadRunnerData() != nil {
298 // Boot probe is certain to fail.
302 defer wp.mtx.Unlock()
303 if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil {
306 // The maxConcurrentInstanceCreateOps knob throttles the number of node create
307 // requests in flight. It was added to work around a limitation in Azure's
308 // managed disks, which support no more than 20 concurrent node creation
309 // requests from a single disk image (cf.
310 // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
311 // The code assumes that node creation, from Azure's perspective, means the
312 // period until the instance appears in the "get all instances" list.
313 if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
314 logger.Info("reached MaxConcurrentInstanceCreateOps")
315 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
319 secret := randomHex(instanceSecretLength)
320 wp.creating[secret] = createCall{time: now, instanceType: it}
323 tags := cloud.InstanceTags{
324 wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID),
325 wp.tagKeyPrefix + tagKeyInstanceType: it.Name,
326 wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
327 wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
329 initCmd := TagVerifier{nil, secret, nil}.InitCommand()
330 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
332 defer wp.mtx.Unlock()
333 // delete() is deferred so the updateWorker() call
334 // below knows to use StateBooting when adding a new
336 defer delete(wp.creating, secret)
338 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
340 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
341 time.AfterFunc(quotaErrorTTL, wp.notify)
343 logger.WithError(err).Error("create failed")
344 wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
347 wp.updateWorker(inst, it)
352 // AtQuota returns true if Create is not expected to work at the
354 func (wp *Pool) AtQuota() bool {
356 defer wp.mtx.Unlock()
357 return time.Now().Before(wp.atQuotaUntil)
360 // SetIdleBehavior determines how the indicated instance will behave
361 // when it has no containers running.
362 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
364 defer wp.mtx.Unlock()
365 wkr, ok := wp.workers[id]
367 return errors.New("requested instance does not exist")
369 wkr.setIdleBehavior(idleBehavior)
373 // Successful connection to the SSH daemon, update the mTimeToSSH metric
374 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
376 defer wp.mtx.Unlock()
377 wkr := wp.workers[inst.ID()]
378 if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
379 // the node is not in booting state (can happen if a-d-c is restarted) OR
380 // this is not the first SSH connection
384 wkr.firstSSHConnection = time.Now()
385 if wp.mTimeToSSH != nil {
386 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
390 // Add or update worker attached to the given instance.
392 // The second return value is true if a new worker is created.
394 // A newly added instance has state=StateBooting if its tags match an
395 // entry in wp.creating, otherwise StateUnknown.
397 // Caller must have lock.
398 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
399 secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
400 inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
402 if wkr := wp.workers[id]; wkr != nil {
403 wkr.executor.SetTarget(inst)
405 wkr.updated = time.Now()
410 state := StateUnknown
411 if _, ok := wp.creating[secret]; ok {
415 // If an instance has a valid IdleBehavior tag when it first
416 // appears, initialize the new worker accordingly (this is how
417 // we restore IdleBehavior that was set by a prior dispatch
418 // process); otherwise, default to "run". After this,
419 // wkr.idleBehavior is the source of truth, and will only be
420 // changed via SetIdleBehavior().
421 idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
422 if !validIdleBehavior[idleBehavior] {
423 idleBehavior = IdleBehaviorRun
426 logger := wp.logger.WithFields(logrus.Fields{
427 "InstanceType": it.Name,
428 "Instance": inst.ID(),
429 "Address": inst.Address(),
431 logger.WithFields(logrus.Fields{
433 "IdleBehavior": idleBehavior,
434 }).Infof("instance appeared in cloud")
440 executor: wp.newExecutor(inst),
442 idleBehavior: idleBehavior,
449 running: make(map[string]*remoteRunner),
450 starting: make(map[string]*remoteRunner),
451 probing: make(chan struct{}, 1),
457 // Shutdown shuts down a worker with the given type, or returns false
458 // if all workers with the given type are busy.
459 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
460 wp.setupOnce.Do(wp.setup)
462 defer wp.mtx.Unlock()
463 logger := wp.logger.WithField("InstanceType", it.Name)
464 logger.Info("shutdown requested")
465 for _, tryState := range []State{StateBooting, StateIdle} {
466 // TODO: shutdown the worker with the longest idle
467 // time (Idle) or the earliest create time (Booting)
468 for _, wkr := range wp.workers {
469 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
470 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
471 wkr.reportBootOutcome(BootOutcomeAborted)
480 // CountWorkers returns the current number of workers in each state.
482 // CountWorkers blocks, if necessary, until the initial instance list
483 // has been loaded from the cloud provider.
484 func (wp *Pool) CountWorkers() map[State]int {
485 wp.setupOnce.Do(wp.setup)
488 defer wp.mtx.Unlock()
490 for _, w := range wp.workers {
496 // Running returns the container UUIDs being prepared/run on workers.
498 // In the returned map, the time value indicates when the Pool
499 // observed that the container process had exited. A container that
500 // has not yet exited has a zero time value. The caller should use
501 // ForgetContainer() to garbage-collect the entries for exited
503 func (wp *Pool) Running() map[string]time.Time {
504 wp.setupOnce.Do(wp.setup)
506 defer wp.mtx.Unlock()
507 r := map[string]time.Time{}
508 for _, wkr := range wp.workers {
509 for uuid := range wkr.running {
510 r[uuid] = time.Time{}
512 for uuid := range wkr.starting {
513 r[uuid] = time.Time{}
516 for uuid, exited := range wp.exited {
522 // StartContainer starts a container on an idle worker immediately if
523 // possible, otherwise returns false.
524 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
525 wp.setupOnce.Do(wp.setup)
527 defer wp.mtx.Unlock()
529 for _, w := range wp.workers {
530 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
531 if wkr == nil || w.busy.After(wkr.busy) {
539 wkr.startContainer(ctr)
543 // KillContainer kills the crunch-run process for the given container
544 // UUID, if it's running on any worker.
546 // KillContainer returns immediately; the act of killing the container
547 // takes some time, and runs in the background.
549 // KillContainer returns false if the container has already ended.
550 func (wp *Pool) KillContainer(uuid string, reason string) bool {
552 defer wp.mtx.Unlock()
553 logger := wp.logger.WithFields(logrus.Fields{
554 "ContainerUUID": uuid,
557 for _, wkr := range wp.workers {
558 rr := wkr.running[uuid]
560 rr = wkr.starting[uuid]
567 logger.Debug("cannot kill: already disappeared")
571 // ForgetContainer clears the placeholder for the given exited
572 // container, so it isn't returned by subsequent calls to Running().
574 // ForgetContainer has no effect if the container has not yet exited.
576 // The "container exited at time T" placeholder (which necessitates
577 // ForgetContainer) exists to make it easier for the caller
578 // (scheduler) to distinguish a container that exited without
579 // finalizing its state from a container that exited too recently for
580 // its final state to have appeared in the scheduler's queue cache.
581 func (wp *Pool) ForgetContainer(uuid string) {
583 defer wp.mtx.Unlock()
584 if _, ok := wp.exited[uuid]; ok {
585 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
586 delete(wp.exited, uuid)
590 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
592 reg = prometheus.NewRegistry()
594 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
595 Namespace: "arvados",
596 Subsystem: "dispatchcloud",
597 Name: "containers_running",
598 Help: "Number of containers reported running by cloud VMs.",
600 reg.MustRegister(wp.mContainersRunning)
601 wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
602 Namespace: "arvados",
603 Subsystem: "dispatchcloud",
604 Name: "instances_total",
605 Help: "Number of cloud VMs.",
606 }, []string{"category", "instance_type"})
607 reg.MustRegister(wp.mInstances)
608 wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
609 Namespace: "arvados",
610 Subsystem: "dispatchcloud",
611 Name: "instances_price",
612 Help: "Price of cloud VMs.",
613 }, []string{"category"})
614 reg.MustRegister(wp.mInstancesPrice)
615 wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
616 Namespace: "arvados",
617 Subsystem: "dispatchcloud",
619 Help: "Total VCPUs on all cloud VMs.",
620 }, []string{"category"})
621 reg.MustRegister(wp.mVCPUs)
622 wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
623 Namespace: "arvados",
624 Subsystem: "dispatchcloud",
625 Name: "memory_bytes_total",
626 Help: "Total memory on all cloud VMs.",
627 }, []string{"category"})
628 reg.MustRegister(wp.mMemory)
629 wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
630 Namespace: "arvados",
631 Subsystem: "dispatchcloud",
632 Name: "boot_outcomes",
633 Help: "Boot outcomes by type.",
634 }, []string{"outcome"})
635 for k := range validBootOutcomes {
636 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
638 reg.MustRegister(wp.mBootOutcomes)
639 wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
640 Namespace: "arvados",
641 Subsystem: "dispatchcloud",
642 Name: "instances_disappeared",
643 Help: "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
644 }, []string{"state"})
645 for _, v := range stateString {
646 wp.mDisappearances.WithLabelValues(v).Add(0)
648 reg.MustRegister(wp.mDisappearances)
649 wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
650 Namespace: "arvados",
651 Subsystem: "dispatchcloud",
652 Name: "instances_time_to_ssh_seconds",
653 Help: "Number of seconds between instance creation and the first successful SSH connection.",
654 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
656 reg.MustRegister(wp.mTimeToSSH)
657 wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
658 Namespace: "arvados",
659 Subsystem: "dispatchcloud",
660 Name: "instances_time_to_ready_for_container_seconds",
661 Help: "Number of seconds between the first successful SSH connection and ready to run a container.",
662 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
664 reg.MustRegister(wp.mTimeToReadyForContainer)
665 wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
666 Namespace: "arvados",
667 Subsystem: "dispatchcloud",
668 Name: "instances_time_from_shutdown_request_to_disappearance_seconds",
669 Help: "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
670 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
672 reg.MustRegister(wp.mTimeFromShutdownToGone)
675 func (wp *Pool) runMetrics() {
677 defer wp.Unsubscribe(ch)
684 func (wp *Pool) updateMetrics() {
686 defer wp.mtx.RUnlock()
692 instances := map[entKey]int64{}
693 price := map[string]float64{}
694 cpu := map[string]int64{}
695 mem := map[string]int64{}
697 for _, wkr := range wp.workers {
700 case len(wkr.running)+len(wkr.starting) > 0:
702 case wkr.idleBehavior == IdleBehaviorHold:
704 case wkr.state == StateBooting:
706 case wkr.state == StateUnknown:
711 instances[entKey{cat, wkr.instType.Name}]++
712 price[cat] += wkr.instType.Price
713 cpu[cat] += int64(wkr.instType.VCPUs)
714 mem[cat] += int64(wkr.instType.RAM)
715 running += int64(len(wkr.running) + len(wkr.starting))
717 for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
718 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
719 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
720 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
721 // make sure to reset gauges for non-existing category/nodetype combinations
722 for _, it := range wp.instanceTypes {
723 if _, ok := instances[entKey{cat, it.Name}]; !ok {
724 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
728 for k, v := range instances {
729 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
731 wp.mContainersRunning.Set(float64(running))
734 func (wp *Pool) runProbes() {
735 maxPPS := wp.maxProbesPerSecond
737 maxPPS = defaultMaxProbesPerSecond
739 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
740 defer limitticker.Stop()
742 probeticker := time.NewTicker(wp.probeInterval)
743 defer probeticker.Stop()
745 workers := []cloud.InstanceID{}
746 for range probeticker.C {
747 workers = workers[:0]
749 for id, wkr := range wp.workers {
750 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
753 workers = append(workers, id)
757 for _, id := range workers {
759 wkr, ok := wp.workers[id]
762 // Deleted while we were probing
766 go wkr.ProbeAndUpdate()
770 case <-limitticker.C:
776 func (wp *Pool) runSync() {
777 // sync once immediately, then wait syncInterval, sync again,
779 timer := time.NewTimer(1)
783 err := wp.getInstancesAndSync()
785 wp.logger.WithError(err).Warn("sync failed")
787 timer.Reset(wp.syncInterval)
789 wp.logger.Debug("worker.Pool stopped")
795 // Stop synchronizing with the InstanceSet.
796 func (wp *Pool) Stop() {
797 wp.setupOnce.Do(wp.setup)
801 // Instances returns an InstanceView for each worker in the pool,
802 // summarizing its current state and recent activity.
803 func (wp *Pool) Instances() []InstanceView {
805 wp.setupOnce.Do(wp.setup)
807 for _, w := range wp.workers {
808 r = append(r, InstanceView{
809 Instance: w.instance.ID(),
810 Address: w.instance.Address(),
811 Price: w.instType.Price,
812 ArvadosInstanceType: w.instType.Name,
813 ProviderInstanceType: w.instType.ProviderType,
814 LastContainerUUID: w.lastUUID,
816 WorkerState: w.state.String(),
817 IdleBehavior: w.idleBehavior,
821 sort.Slice(r, func(i, j int) bool {
822 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
827 // KillInstance destroys a cloud VM instance. It returns an error if
828 // the given instance does not exist.
829 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
830 wkr, ok := wp.workers[id]
832 return errors.New("instance not found")
834 wkr.logger.WithField("Reason", reason).Info("shutting down")
835 wkr.reportBootOutcome(BootOutcomeAborted)
840 func (wp *Pool) setup() {
841 wp.creating = map[string]createCall{}
842 wp.exited = map[string]time.Time{}
843 wp.workers = map[cloud.InstanceID]*worker{}
844 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
848 // Load the runner program to be deployed on worker nodes into
849 // wp.runnerData, if necessary. Errors are logged.
851 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
853 // Caller must not have lock.
854 func (wp *Pool) loadRunnerData() error {
856 defer wp.mtx.Unlock()
857 if wp.runnerData != nil {
859 } else if wp.runnerSource == "" {
860 wp.runnerCmd = "crunch-run"
861 wp.runnerData = []byte{}
864 logger := wp.logger.WithField("source", wp.runnerSource)
865 logger.Debug("loading runner")
866 buf, err := ioutil.ReadFile(wp.runnerSource)
868 logger.WithError(err).Error("failed to load runner program")
872 wp.runnerMD5 = md5.Sum(buf)
873 wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
877 func (wp *Pool) notify() {
879 defer wp.mtx.RUnlock()
880 for _, send := range wp.subscribers {
882 case send <- struct{}{}:
888 func (wp *Pool) getInstancesAndSync() error {
889 wp.setupOnce.Do(wp.setup)
890 if err := wp.instanceSet.throttleInstances.Error(); err != nil {
893 wp.logger.Debug("getting instance list")
894 threshold := time.Now()
895 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
897 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
900 wp.sync(threshold, instances)
901 wp.logger.Debug("sync done")
905 // Add/remove/update workers based on instances, which was obtained
906 // from the instanceSet. However, don't clobber any other updates that
907 // already happened after threshold.
908 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
910 defer wp.mtx.Unlock()
911 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
914 for _, inst := range instances {
915 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
916 it, ok := wp.instanceTypes[itTag]
918 wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
921 if wkr, isNew := wp.updateWorker(inst, it); isNew {
923 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
924 wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
929 for id, wkr := range wp.workers {
930 if wkr.updated.After(threshold) {
933 logger := wp.logger.WithFields(logrus.Fields{
934 "Instance": wkr.instance.ID(),
935 "WorkerState": wkr.state,
937 logger.Info("instance disappeared in cloud")
938 wkr.reportBootOutcome(BootOutcomeDisappeared)
939 if wp.mDisappearances != nil {
940 wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
942 if wp.mTimeFromShutdownToGone != nil {
943 wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
945 delete(wp.workers, id)
953 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
961 func (wp *Pool) waitUntilLoaded() {
964 defer wp.mtx.RUnlock()
972 // Return a random string of n hexadecimal digits (n*4 random bits). n
974 func randomHex(n int) string {
975 buf := make([]byte, n/2)
976 _, err := rand.Read(buf)
980 return fmt.Sprintf("%x", buf)