1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
19 "git.arvados.org/arvados.git/lib/cloud"
20 "git.arvados.org/arvados.git/sdk/go/arvados"
21 "github.com/prometheus/client_golang/prometheus"
22 "github.com/sirupsen/logrus"
23 "golang.org/x/crypto/ssh"
27 tagKeyInstanceType = "InstanceType"
28 tagKeyIdleBehavior = "IdleBehavior"
29 tagKeyInstanceSecret = "InstanceSecret"
30 tagKeyInstanceSetID = "InstanceSetID"
33 // An InstanceView shows a worker's current state and recent activity.
34 type InstanceView struct {
35 Instance cloud.InstanceID `json:"instance"`
36 Address string `json:"address"`
37 Price float64 `json:"price"`
38 ArvadosInstanceType string `json:"arvados_instance_type"`
39 ProviderInstanceType string `json:"provider_instance_type"`
40 LastContainerUUID string `json:"last_container_uuid"`
41 LastBusy time.Time `json:"last_busy"`
42 WorkerState string `json:"worker_state"`
43 IdleBehavior IdleBehavior `json:"idle_behavior"`
46 // An Executor executes shell commands on a remote host.
47 type Executor interface {
48 // Run cmd on the current target.
49 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
51 // Use the given target for subsequent operations. The new
52 // target is the same host as the previous target, but it
53 // might return a different address and verify a different
56 // SetTarget is called frequently, and in most cases the new
57 // target will behave exactly the same as the old one. An
58 // implementation should optimize accordingly.
60 // SetTarget must not block on concurrent Execute calls.
61 SetTarget(cloud.ExecutorTarget)
67 defaultSyncInterval = time.Minute
68 defaultProbeInterval = time.Second * 10
69 defaultMaxProbesPerSecond = 10
70 defaultTimeoutIdle = time.Minute
71 defaultTimeoutBooting = time.Minute * 10
72 defaultTimeoutProbe = time.Minute * 10
73 defaultTimeoutShutdown = time.Second * 10
74 defaultTimeoutTERM = time.Minute * 2
75 defaultTimeoutSignal = time.Second * 5
77 // Time after a quota error to try again anyway, even if no
78 // instances have been shutdown.
79 quotaErrorTTL = time.Minute
81 // Time between "X failed because rate limiting" messages
82 logRateLimitErrorInterval = time.Second * 10
85 func duration(conf arvados.Duration, def time.Duration) time.Duration {
87 return time.Duration(conf)
93 // NewPool creates a Pool of workers backed by instanceSet.
95 // New instances are configured and set up according to the given
96 // cluster configuration.
97 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
100 arvClient: arvClient,
101 instanceSetID: instanceSetID,
102 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
103 newExecutor: newExecutor,
104 bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
105 runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
106 imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
107 instanceTypes: cluster.InstanceTypes,
108 maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
109 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
110 probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
111 syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
112 timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
113 timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
114 timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
115 timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
116 timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
117 timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
118 installPublicKey: installPublicKey,
119 tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
120 stop: make(chan bool),
122 wp.registerMetrics(reg)
124 wp.setupOnce.Do(wp.setup)
132 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
133 // zero Pool should not be used. Call NewPool to create a new Pool.
136 logger logrus.FieldLogger
137 arvClient *arvados.Client
138 instanceSetID cloud.InstanceSetID
139 instanceSet *throttledInstanceSet
140 newExecutor func(cloud.Instance) Executor
141 bootProbeCommand string
143 imageID cloud.ImageID
144 instanceTypes map[string]arvados.InstanceType
145 syncInterval time.Duration
146 probeInterval time.Duration
147 maxProbesPerSecond int
148 maxConcurrentInstanceCreateOps int
149 timeoutIdle time.Duration
150 timeoutBooting time.Duration
151 timeoutProbe time.Duration
152 timeoutShutdown time.Duration
153 timeoutTERM time.Duration
154 timeoutSignal time.Duration
155 installPublicKey ssh.PublicKey
159 subscribers map[<-chan struct{}]chan<- struct{}
160 creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
161 workers map[cloud.InstanceID]*worker
162 loaded bool // loaded list of instances from InstanceSet at least once
163 exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
164 atQuotaUntil time.Time
165 atQuotaErr cloud.QuotaError
170 runnerMD5 [md5.Size]byte
173 mContainersRunning prometheus.Gauge
174 mInstances *prometheus.GaugeVec
175 mInstancesPrice *prometheus.GaugeVec
176 mVCPUs *prometheus.GaugeVec
177 mMemory *prometheus.GaugeVec
178 mBootOutcomes *prometheus.CounterVec
179 mDisappearances *prometheus.CounterVec
180 mTimeToSSH prometheus.Summary
181 mTimeToReadyForContainer prometheus.Summary
182 mTimeFromShutdownToGone prometheus.Summary
183 mTimeFromQueueToCrunchRun prometheus.Summary
186 type createCall struct {
188 instanceType arvados.InstanceType
191 func (wp *Pool) CheckHealth() error {
192 wp.setupOnce.Do(wp.setup)
193 if err := wp.loadRunnerData(); err != nil {
194 return fmt.Errorf("error loading runner binary: %s", err)
199 // Subscribe returns a buffered channel that becomes ready after any
200 // change to the pool's state that could have scheduling implications:
201 // a worker's state changes, a new worker appears, the cloud
202 // provider's API rate limiting period ends, etc.
204 // Additional events that occur while the channel is already ready
205 // will be dropped, so it is OK if the caller services the channel
210 // ch := wp.Subscribe()
211 // defer wp.Unsubscribe(ch)
218 func (wp *Pool) Subscribe() <-chan struct{} {
219 wp.setupOnce.Do(wp.setup)
221 defer wp.mtx.Unlock()
222 ch := make(chan struct{}, 1)
223 wp.subscribers[ch] = ch
227 // Unsubscribe stops sending updates to the given channel.
228 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
229 wp.setupOnce.Do(wp.setup)
231 defer wp.mtx.Unlock()
232 delete(wp.subscribers, ch)
235 // Unallocated returns the number of unallocated (creating + booting +
236 // idle + unknown) workers for each instance type. Workers in
237 // hold/drain mode are not included.
238 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
239 wp.setupOnce.Do(wp.setup)
241 defer wp.mtx.RUnlock()
242 unalloc := map[arvados.InstanceType]int{}
243 creating := map[arvados.InstanceType]int{}
244 oldestCreate := map[arvados.InstanceType]time.Time{}
245 for _, cc := range wp.creating {
246 it := cc.instanceType
248 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
249 oldestCreate[it] = cc.time
252 for _, wkr := range wp.workers {
253 // Skip workers that are not expected to become
254 // available soon. Note len(wkr.running)>0 is not
255 // redundant here: it can be true even in
257 if wkr.state == StateShutdown ||
258 wkr.state == StateRunning ||
259 wkr.idleBehavior != IdleBehaviorRun ||
260 len(wkr.running) > 0 {
265 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
266 // If up to N new workers appear in
267 // Instances() while we are waiting for N
268 // Create() calls to complete, we assume we're
269 // just seeing a race between Instances() and
270 // Create() responses.
272 // The other common reason why nodes have
273 // state==Unknown is that they appeared at
274 // startup, before any Create calls. They
275 // don't match the above timing condition, so
276 // we never mistakenly attribute them to
277 // pending Create calls.
281 for it, c := range creating {
287 // Create a new instance with the given type, and add it to the worker
288 // pool. The worker is added immediately; instance creation runs in
291 // Create returns false if a pre-existing error state prevents it from
292 // even attempting to create a new instance. Those errors are logged
293 // by the Pool, so the caller does not need to log anything in such
295 func (wp *Pool) Create(it arvados.InstanceType) bool {
296 logger := wp.logger.WithField("InstanceType", it.Name)
297 wp.setupOnce.Do(wp.setup)
298 if wp.loadRunnerData() != nil {
299 // Boot probe is certain to fail.
303 defer wp.mtx.Unlock()
304 if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil {
307 // The maxConcurrentInstanceCreateOps knob throttles the number of node create
308 // requests in flight. It was added to work around a limitation in Azure's
309 // managed disks, which support no more than 20 concurrent node creation
310 // requests from a single disk image (cf.
311 // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
312 // The code assumes that node creation, from Azure's perspective, means the
313 // period until the instance appears in the "get all instances" list.
314 if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
315 logger.Info("reached MaxConcurrentInstanceCreateOps")
316 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
320 secret := randomHex(instanceSecretLength)
321 wp.creating[secret] = createCall{time: now, instanceType: it}
324 tags := cloud.InstanceTags{
325 wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID),
326 wp.tagKeyPrefix + tagKeyInstanceType: it.Name,
327 wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
328 wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
330 initCmd := TagVerifier{nil, secret, nil}.InitCommand()
331 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
333 defer wp.mtx.Unlock()
334 // delete() is deferred so the updateWorker() call
335 // below knows to use StateBooting when adding a new
337 defer delete(wp.creating, secret)
339 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
341 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
342 time.AfterFunc(quotaErrorTTL, wp.notify)
344 logger.WithError(err).Error("create failed")
345 wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
348 wp.updateWorker(inst, it)
353 // AtQuota returns true if Create is not expected to work at the
355 func (wp *Pool) AtQuota() bool {
357 defer wp.mtx.Unlock()
358 return time.Now().Before(wp.atQuotaUntil)
361 // SetIdleBehavior determines how the indicated instance will behave
362 // when it has no containers running.
363 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
365 defer wp.mtx.Unlock()
366 wkr, ok := wp.workers[id]
368 return errors.New("requested instance does not exist")
370 wkr.setIdleBehavior(idleBehavior)
374 // Successful connection to the SSH daemon, update the mTimeToSSH metric
375 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
377 defer wp.mtx.Unlock()
378 wkr := wp.workers[inst.ID()]
379 if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
380 // the node is not in booting state (can happen if a-d-c is restarted) OR
381 // this is not the first SSH connection
385 wkr.firstSSHConnection = time.Now()
386 if wp.mTimeToSSH != nil {
387 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
391 // Add or update worker attached to the given instance.
393 // The second return value is true if a new worker is created.
395 // A newly added instance has state=StateBooting if its tags match an
396 // entry in wp.creating, otherwise StateUnknown.
398 // Caller must have lock.
399 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
400 secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
401 inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
403 if wkr := wp.workers[id]; wkr != nil {
404 wkr.executor.SetTarget(inst)
406 wkr.updated = time.Now()
411 state := StateUnknown
412 if _, ok := wp.creating[secret]; ok {
416 // If an instance has a valid IdleBehavior tag when it first
417 // appears, initialize the new worker accordingly (this is how
418 // we restore IdleBehavior that was set by a prior dispatch
419 // process); otherwise, default to "run". After this,
420 // wkr.idleBehavior is the source of truth, and will only be
421 // changed via SetIdleBehavior().
422 idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
423 if !validIdleBehavior[idleBehavior] {
424 idleBehavior = IdleBehaviorRun
427 logger := wp.logger.WithFields(logrus.Fields{
428 "InstanceType": it.Name,
429 "Instance": inst.ID(),
430 "Address": inst.Address(),
432 logger.WithFields(logrus.Fields{
434 "IdleBehavior": idleBehavior,
435 }).Infof("instance appeared in cloud")
441 executor: wp.newExecutor(inst),
443 idleBehavior: idleBehavior,
450 running: make(map[string]*remoteRunner),
451 starting: make(map[string]*remoteRunner),
452 probing: make(chan struct{}, 1),
458 // Shutdown shuts down a worker with the given type, or returns false
459 // if all workers with the given type are busy.
460 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
461 wp.setupOnce.Do(wp.setup)
463 defer wp.mtx.Unlock()
464 logger := wp.logger.WithField("InstanceType", it.Name)
465 logger.Info("shutdown requested")
466 for _, tryState := range []State{StateBooting, StateIdle} {
467 // TODO: shutdown the worker with the longest idle
468 // time (Idle) or the earliest create time (Booting)
469 for _, wkr := range wp.workers {
470 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
471 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
472 wkr.reportBootOutcome(BootOutcomeAborted)
481 // CountWorkers returns the current number of workers in each state.
483 // CountWorkers blocks, if necessary, until the initial instance list
484 // has been loaded from the cloud provider.
485 func (wp *Pool) CountWorkers() map[State]int {
486 wp.setupOnce.Do(wp.setup)
489 defer wp.mtx.Unlock()
491 for _, w := range wp.workers {
497 // Running returns the container UUIDs being prepared/run on workers.
499 // In the returned map, the time value indicates when the Pool
500 // observed that the container process had exited. A container that
501 // has not yet exited has a zero time value. The caller should use
502 // ForgetContainer() to garbage-collect the entries for exited
504 func (wp *Pool) Running() map[string]time.Time {
505 wp.setupOnce.Do(wp.setup)
507 defer wp.mtx.Unlock()
508 r := map[string]time.Time{}
509 for _, wkr := range wp.workers {
510 for uuid := range wkr.running {
511 r[uuid] = time.Time{}
513 for uuid := range wkr.starting {
514 r[uuid] = time.Time{}
517 for uuid, exited := range wp.exited {
523 // StartContainer starts a container on an idle worker immediately if
524 // possible, otherwise returns false.
525 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
526 wp.setupOnce.Do(wp.setup)
528 defer wp.mtx.Unlock()
530 for _, w := range wp.workers {
531 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
532 if wkr == nil || w.busy.After(wkr.busy) {
540 wkr.startContainer(ctr)
544 // KillContainer kills the crunch-run process for the given container
545 // UUID, if it's running on any worker.
547 // KillContainer returns immediately; the act of killing the container
548 // takes some time, and runs in the background.
550 // KillContainer returns false if the container has already ended.
551 func (wp *Pool) KillContainer(uuid string, reason string) bool {
553 defer wp.mtx.Unlock()
554 logger := wp.logger.WithFields(logrus.Fields{
555 "ContainerUUID": uuid,
558 for _, wkr := range wp.workers {
559 rr := wkr.running[uuid]
561 rr = wkr.starting[uuid]
568 logger.Debug("cannot kill: already disappeared")
572 // ForgetContainer clears the placeholder for the given exited
573 // container, so it isn't returned by subsequent calls to Running().
575 // ForgetContainer has no effect if the container has not yet exited.
577 // The "container exited at time T" placeholder (which necessitates
578 // ForgetContainer) exists to make it easier for the caller
579 // (scheduler) to distinguish a container that exited without
580 // finalizing its state from a container that exited too recently for
581 // its final state to have appeared in the scheduler's queue cache.
582 func (wp *Pool) ForgetContainer(uuid string) {
584 defer wp.mtx.Unlock()
585 if _, ok := wp.exited[uuid]; ok {
586 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
587 delete(wp.exited, uuid)
591 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
593 reg = prometheus.NewRegistry()
595 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
596 Namespace: "arvados",
597 Subsystem: "dispatchcloud",
598 Name: "containers_running",
599 Help: "Number of containers reported running by cloud VMs.",
601 reg.MustRegister(wp.mContainersRunning)
602 wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
603 Namespace: "arvados",
604 Subsystem: "dispatchcloud",
605 Name: "instances_total",
606 Help: "Number of cloud VMs.",
607 }, []string{"category", "instance_type"})
608 reg.MustRegister(wp.mInstances)
609 wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
610 Namespace: "arvados",
611 Subsystem: "dispatchcloud",
612 Name: "instances_price",
613 Help: "Price of cloud VMs.",
614 }, []string{"category"})
615 reg.MustRegister(wp.mInstancesPrice)
616 wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
617 Namespace: "arvados",
618 Subsystem: "dispatchcloud",
620 Help: "Total VCPUs on all cloud VMs.",
621 }, []string{"category"})
622 reg.MustRegister(wp.mVCPUs)
623 wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
624 Namespace: "arvados",
625 Subsystem: "dispatchcloud",
626 Name: "memory_bytes_total",
627 Help: "Total memory on all cloud VMs.",
628 }, []string{"category"})
629 reg.MustRegister(wp.mMemory)
630 wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
631 Namespace: "arvados",
632 Subsystem: "dispatchcloud",
633 Name: "boot_outcomes",
634 Help: "Boot outcomes by type.",
635 }, []string{"outcome"})
636 for k := range validBootOutcomes {
637 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
639 reg.MustRegister(wp.mBootOutcomes)
640 wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
641 Namespace: "arvados",
642 Subsystem: "dispatchcloud",
643 Name: "instances_disappeared",
644 Help: "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
645 }, []string{"state"})
646 for _, v := range stateString {
647 wp.mDisappearances.WithLabelValues(v).Add(0)
649 reg.MustRegister(wp.mDisappearances)
650 wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
651 Namespace: "arvados",
652 Subsystem: "dispatchcloud",
653 Name: "instances_time_to_ssh_seconds",
654 Help: "Number of seconds between instance creation and the first successful SSH connection.",
655 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
657 reg.MustRegister(wp.mTimeToSSH)
658 wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
659 Namespace: "arvados",
660 Subsystem: "dispatchcloud",
661 Name: "instances_time_to_ready_for_container_seconds",
662 Help: "Number of seconds between the first successful SSH connection and ready to run a container.",
663 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
665 reg.MustRegister(wp.mTimeToReadyForContainer)
666 wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
667 Namespace: "arvados",
668 Subsystem: "dispatchcloud",
669 Name: "instances_time_from_shutdown_request_to_disappearance_seconds",
670 Help: "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
671 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
673 reg.MustRegister(wp.mTimeFromShutdownToGone)
674 wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
675 Namespace: "arvados",
676 Subsystem: "dispatchcloud",
677 Name: "containers_time_from_queue_to_crunch_run_seconds",
678 Help: "Number of seconds between the queuing of a container and the start of crunch-run.",
679 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
681 reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
684 func (wp *Pool) runMetrics() {
686 defer wp.Unsubscribe(ch)
693 func (wp *Pool) updateMetrics() {
695 defer wp.mtx.RUnlock()
701 instances := map[entKey]int64{}
702 price := map[string]float64{}
703 cpu := map[string]int64{}
704 mem := map[string]int64{}
706 for _, wkr := range wp.workers {
709 case len(wkr.running)+len(wkr.starting) > 0:
711 case wkr.idleBehavior == IdleBehaviorHold:
713 case wkr.state == StateBooting:
715 case wkr.state == StateUnknown:
720 instances[entKey{cat, wkr.instType.Name}]++
721 price[cat] += wkr.instType.Price
722 cpu[cat] += int64(wkr.instType.VCPUs)
723 mem[cat] += int64(wkr.instType.RAM)
724 running += int64(len(wkr.running) + len(wkr.starting))
726 for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
727 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
728 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
729 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
730 // make sure to reset gauges for non-existing category/nodetype combinations
731 for _, it := range wp.instanceTypes {
732 if _, ok := instances[entKey{cat, it.Name}]; !ok {
733 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
737 for k, v := range instances {
738 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
740 wp.mContainersRunning.Set(float64(running))
743 func (wp *Pool) runProbes() {
744 maxPPS := wp.maxProbesPerSecond
746 maxPPS = defaultMaxProbesPerSecond
748 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
749 defer limitticker.Stop()
751 probeticker := time.NewTicker(wp.probeInterval)
752 defer probeticker.Stop()
754 workers := []cloud.InstanceID{}
755 for range probeticker.C {
756 workers = workers[:0]
758 for id, wkr := range wp.workers {
759 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
762 workers = append(workers, id)
766 for _, id := range workers {
768 wkr, ok := wp.workers[id]
771 // Deleted while we were probing
775 go wkr.ProbeAndUpdate()
779 case <-limitticker.C:
785 func (wp *Pool) runSync() {
786 // sync once immediately, then wait syncInterval, sync again,
788 timer := time.NewTimer(1)
792 err := wp.getInstancesAndSync()
794 wp.logger.WithError(err).Warn("sync failed")
796 timer.Reset(wp.syncInterval)
798 wp.logger.Debug("worker.Pool stopped")
804 // Stop synchronizing with the InstanceSet.
805 func (wp *Pool) Stop() {
806 wp.setupOnce.Do(wp.setup)
810 // Instances returns an InstanceView for each worker in the pool,
811 // summarizing its current state and recent activity.
812 func (wp *Pool) Instances() []InstanceView {
814 wp.setupOnce.Do(wp.setup)
816 for _, w := range wp.workers {
817 r = append(r, InstanceView{
818 Instance: w.instance.ID(),
819 Address: w.instance.Address(),
820 Price: w.instType.Price,
821 ArvadosInstanceType: w.instType.Name,
822 ProviderInstanceType: w.instType.ProviderType,
823 LastContainerUUID: w.lastUUID,
825 WorkerState: w.state.String(),
826 IdleBehavior: w.idleBehavior,
830 sort.Slice(r, func(i, j int) bool {
831 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
836 // KillInstance destroys a cloud VM instance. It returns an error if
837 // the given instance does not exist.
838 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
839 wkr, ok := wp.workers[id]
841 return errors.New("instance not found")
843 wkr.logger.WithField("Reason", reason).Info("shutting down")
844 wkr.reportBootOutcome(BootOutcomeAborted)
849 func (wp *Pool) setup() {
850 wp.creating = map[string]createCall{}
851 wp.exited = map[string]time.Time{}
852 wp.workers = map[cloud.InstanceID]*worker{}
853 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
857 // Load the runner program to be deployed on worker nodes into
858 // wp.runnerData, if necessary. Errors are logged.
860 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
862 // Caller must not have lock.
863 func (wp *Pool) loadRunnerData() error {
865 defer wp.mtx.Unlock()
866 if wp.runnerData != nil {
868 } else if wp.runnerSource == "" {
869 wp.runnerCmd = "crunch-run"
870 wp.runnerData = []byte{}
873 logger := wp.logger.WithField("source", wp.runnerSource)
874 logger.Debug("loading runner")
875 buf, err := ioutil.ReadFile(wp.runnerSource)
877 logger.WithError(err).Error("failed to load runner program")
881 wp.runnerMD5 = md5.Sum(buf)
882 wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
886 func (wp *Pool) notify() {
888 defer wp.mtx.RUnlock()
889 for _, send := range wp.subscribers {
891 case send <- struct{}{}:
897 func (wp *Pool) getInstancesAndSync() error {
898 wp.setupOnce.Do(wp.setup)
899 if err := wp.instanceSet.throttleInstances.Error(); err != nil {
902 wp.logger.Debug("getting instance list")
903 threshold := time.Now()
904 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
906 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
909 wp.sync(threshold, instances)
910 wp.logger.Debug("sync done")
914 // Add/remove/update workers based on instances, which was obtained
915 // from the instanceSet. However, don't clobber any other updates that
916 // already happened after threshold.
917 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
919 defer wp.mtx.Unlock()
920 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
923 for _, inst := range instances {
924 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
925 it, ok := wp.instanceTypes[itTag]
927 wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
930 if wkr, isNew := wp.updateWorker(inst, it); isNew {
932 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
933 wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
938 for id, wkr := range wp.workers {
939 if wkr.updated.After(threshold) {
942 logger := wp.logger.WithFields(logrus.Fields{
943 "Instance": wkr.instance.ID(),
944 "WorkerState": wkr.state,
946 logger.Info("instance disappeared in cloud")
947 wkr.reportBootOutcome(BootOutcomeDisappeared)
948 if wp.mDisappearances != nil {
949 wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
951 // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
952 if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
953 wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
955 delete(wp.workers, id)
963 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
971 func (wp *Pool) waitUntilLoaded() {
974 defer wp.mtx.RUnlock()
982 // Return a random string of n hexadecimal digits (n*4 random bits). n
984 func randomHex(n int) string {
985 buf := make([]byte, n/2)
986 _, err := rand.Read(buf)
990 return fmt.Sprintf("%x", buf)