1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
19 "git.arvados.org/arvados.git/lib/cloud"
20 "git.arvados.org/arvados.git/sdk/go/arvados"
21 "github.com/prometheus/client_golang/prometheus"
22 "github.com/sirupsen/logrus"
23 "golang.org/x/crypto/ssh"
27 tagKeyInstanceType = "InstanceType"
28 tagKeyIdleBehavior = "IdleBehavior"
29 tagKeyInstanceSecret = "InstanceSecret"
30 tagKeyInstanceSetID = "InstanceSetID"
33 // An InstanceView shows a worker's current state and recent activity.
34 type InstanceView struct {
35 Instance cloud.InstanceID `json:"instance"`
36 Address string `json:"address"`
37 Price float64 `json:"price"`
38 ArvadosInstanceType string `json:"arvados_instance_type"`
39 ProviderInstanceType string `json:"provider_instance_type"`
40 LastContainerUUID string `json:"last_container_uuid"`
41 LastBusy time.Time `json:"last_busy"`
42 WorkerState string `json:"worker_state"`
43 IdleBehavior IdleBehavior `json:"idle_behavior"`
46 // An Executor executes shell commands on a remote host.
47 type Executor interface {
48 // Run cmd on the current target.
49 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
51 // Use the given target for subsequent operations. The new
52 // target is the same host as the previous target, but it
53 // might return a different address and verify a different
56 // SetTarget is called frequently, and in most cases the new
57 // target will behave exactly the same as the old one. An
58 // implementation should optimize accordingly.
60 // SetTarget must not block on concurrent Execute calls.
61 SetTarget(cloud.ExecutorTarget)
67 defaultSyncInterval = time.Minute
68 defaultProbeInterval = time.Second * 10
69 defaultMaxProbesPerSecond = 10
70 defaultTimeoutIdle = time.Minute
71 defaultTimeoutBooting = time.Minute * 10
72 defaultTimeoutProbe = time.Minute * 10
73 defaultTimeoutShutdown = time.Second * 10
74 defaultTimeoutTERM = time.Minute * 2
75 defaultTimeoutSignal = time.Second * 5
76 defaultTimeoutStaleRunLock = time.Second * 5
78 // Time after a quota error to try again anyway, even if no
79 // instances have been shutdown.
80 quotaErrorTTL = time.Minute
82 // Time between "X failed because rate limiting" messages
83 logRateLimitErrorInterval = time.Second * 10
86 func duration(conf arvados.Duration, def time.Duration) time.Duration {
88 return time.Duration(conf)
93 // NewPool creates a Pool of workers backed by instanceSet.
95 // New instances are configured and set up according to the given
96 // cluster configuration.
97 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
100 arvClient: arvClient,
101 instanceSetID: instanceSetID,
102 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
103 newExecutor: newExecutor,
104 bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
105 runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
106 imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
107 instanceTypes: cluster.InstanceTypes,
108 maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
109 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
110 probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
111 syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
112 timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
113 timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
114 timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
115 timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
116 timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
117 timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
118 timeoutStaleRunLock: duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
119 installPublicKey: installPublicKey,
120 tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
121 stop: make(chan bool),
123 wp.registerMetrics(reg)
125 wp.setupOnce.Do(wp.setup)
133 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
134 // zero Pool should not be used. Call NewPool to create a new Pool.
137 logger logrus.FieldLogger
138 arvClient *arvados.Client
139 instanceSetID cloud.InstanceSetID
140 instanceSet *throttledInstanceSet
141 newExecutor func(cloud.Instance) Executor
142 bootProbeCommand string
144 imageID cloud.ImageID
145 instanceTypes map[string]arvados.InstanceType
146 syncInterval time.Duration
147 probeInterval time.Duration
148 maxProbesPerSecond int
149 maxConcurrentInstanceCreateOps int
150 timeoutIdle time.Duration
151 timeoutBooting time.Duration
152 timeoutProbe time.Duration
153 timeoutShutdown time.Duration
154 timeoutTERM time.Duration
155 timeoutSignal time.Duration
156 timeoutStaleRunLock time.Duration
157 installPublicKey ssh.PublicKey
161 subscribers map[<-chan struct{}]chan<- struct{}
162 creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
163 workers map[cloud.InstanceID]*worker
164 loaded bool // loaded list of instances from InstanceSet at least once
165 exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
166 atQuotaUntil time.Time
167 atQuotaErr cloud.QuotaError
172 runnerMD5 [md5.Size]byte
175 mContainersRunning prometheus.Gauge
176 mInstances *prometheus.GaugeVec
177 mInstancesPrice *prometheus.GaugeVec
178 mVCPUs *prometheus.GaugeVec
179 mMemory *prometheus.GaugeVec
180 mBootOutcomes *prometheus.CounterVec
181 mDisappearances *prometheus.CounterVec
182 mTimeToSSH prometheus.Summary
183 mTimeToReadyForContainer prometheus.Summary
184 mTimeFromShutdownToGone prometheus.Summary
185 mTimeFromQueueToCrunchRun prometheus.Summary
186 mRunProbeDuration *prometheus.SummaryVec
189 type createCall struct {
191 instanceType arvados.InstanceType
194 func (wp *Pool) CheckHealth() error {
195 wp.setupOnce.Do(wp.setup)
196 if err := wp.loadRunnerData(); err != nil {
197 return fmt.Errorf("error loading runner binary: %s", err)
202 // Subscribe returns a buffered channel that becomes ready after any
203 // change to the pool's state that could have scheduling implications:
204 // a worker's state changes, a new worker appears, the cloud
205 // provider's API rate limiting period ends, etc.
207 // Additional events that occur while the channel is already ready
208 // will be dropped, so it is OK if the caller services the channel
213 // ch := wp.Subscribe()
214 // defer wp.Unsubscribe(ch)
221 func (wp *Pool) Subscribe() <-chan struct{} {
222 wp.setupOnce.Do(wp.setup)
224 defer wp.mtx.Unlock()
225 ch := make(chan struct{}, 1)
226 wp.subscribers[ch] = ch
230 // Unsubscribe stops sending updates to the given channel.
231 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
232 wp.setupOnce.Do(wp.setup)
234 defer wp.mtx.Unlock()
235 delete(wp.subscribers, ch)
238 // Unallocated returns the number of unallocated (creating + booting +
239 // idle + unknown) workers for each instance type. Workers in
240 // hold/drain mode are not included.
241 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
242 wp.setupOnce.Do(wp.setup)
244 defer wp.mtx.RUnlock()
245 unalloc := map[arvados.InstanceType]int{}
246 creating := map[arvados.InstanceType]int{}
247 oldestCreate := map[arvados.InstanceType]time.Time{}
248 for _, cc := range wp.creating {
249 it := cc.instanceType
251 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
252 oldestCreate[it] = cc.time
255 for _, wkr := range wp.workers {
256 // Skip workers that are not expected to become
257 // available soon. Note len(wkr.running)>0 is not
258 // redundant here: it can be true even in
260 if wkr.state == StateShutdown ||
261 wkr.state == StateRunning ||
262 wkr.idleBehavior != IdleBehaviorRun ||
263 len(wkr.running) > 0 {
268 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
269 // If up to N new workers appear in
270 // Instances() while we are waiting for N
271 // Create() calls to complete, we assume we're
272 // just seeing a race between Instances() and
273 // Create() responses.
275 // The other common reason why nodes have
276 // state==Unknown is that they appeared at
277 // startup, before any Create calls. They
278 // don't match the above timing condition, so
279 // we never mistakenly attribute them to
280 // pending Create calls.
284 for it, c := range creating {
290 // Create a new instance with the given type, and add it to the worker
291 // pool. The worker is added immediately; instance creation runs in
294 // Create returns false if a pre-existing error state prevents it from
295 // even attempting to create a new instance. Those errors are logged
296 // by the Pool, so the caller does not need to log anything in such
298 func (wp *Pool) Create(it arvados.InstanceType) bool {
299 logger := wp.logger.WithField("InstanceType", it.Name)
300 wp.setupOnce.Do(wp.setup)
301 if wp.loadRunnerData() != nil {
302 // Boot probe is certain to fail.
306 defer wp.mtx.Unlock()
307 if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil {
310 // The maxConcurrentInstanceCreateOps knob throttles the number of node create
311 // requests in flight. It was added to work around a limitation in Azure's
312 // managed disks, which support no more than 20 concurrent node creation
313 // requests from a single disk image (cf.
314 // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
315 // The code assumes that node creation, from Azure's perspective, means the
316 // period until the instance appears in the "get all instances" list.
317 if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
318 logger.Info("reached MaxConcurrentInstanceCreateOps")
319 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
323 secret := randomHex(instanceSecretLength)
324 wp.creating[secret] = createCall{time: now, instanceType: it}
327 tags := cloud.InstanceTags{
328 wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID),
329 wp.tagKeyPrefix + tagKeyInstanceType: it.Name,
330 wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
331 wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
333 initCmd := TagVerifier{nil, secret, nil}.InitCommand()
334 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
336 defer wp.mtx.Unlock()
337 // delete() is deferred so the updateWorker() call
338 // below knows to use StateBooting when adding a new
340 defer delete(wp.creating, secret)
342 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
344 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
345 time.AfterFunc(quotaErrorTTL, wp.notify)
347 logger.WithError(err).Error("create failed")
348 wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
351 wp.updateWorker(inst, it)
356 // AtQuota returns true if Create is not expected to work at the
358 func (wp *Pool) AtQuota() bool {
360 defer wp.mtx.Unlock()
361 return time.Now().Before(wp.atQuotaUntil)
364 // SetIdleBehavior determines how the indicated instance will behave
365 // when it has no containers running.
366 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
368 defer wp.mtx.Unlock()
369 wkr, ok := wp.workers[id]
371 return errors.New("requested instance does not exist")
373 wkr.setIdleBehavior(idleBehavior)
377 // Successful connection to the SSH daemon, update the mTimeToSSH metric
378 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
380 defer wp.mtx.Unlock()
381 wkr := wp.workers[inst.ID()]
382 if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
383 // the node is not in booting state (can happen if a-d-c is restarted) OR
384 // this is not the first SSH connection
388 wkr.firstSSHConnection = time.Now()
389 if wp.mTimeToSSH != nil {
390 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
394 // Add or update worker attached to the given instance.
396 // The second return value is true if a new worker is created.
398 // A newly added instance has state=StateBooting if its tags match an
399 // entry in wp.creating, otherwise StateUnknown.
401 // Caller must have lock.
402 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
403 secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
404 inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
406 if wkr := wp.workers[id]; wkr != nil {
407 wkr.executor.SetTarget(inst)
409 wkr.updated = time.Now()
414 state := StateUnknown
415 if _, ok := wp.creating[secret]; ok {
419 // If an instance has a valid IdleBehavior tag when it first
420 // appears, initialize the new worker accordingly (this is how
421 // we restore IdleBehavior that was set by a prior dispatch
422 // process); otherwise, default to "run". After this,
423 // wkr.idleBehavior is the source of truth, and will only be
424 // changed via SetIdleBehavior().
425 idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
426 if !validIdleBehavior[idleBehavior] {
427 idleBehavior = IdleBehaviorRun
430 logger := wp.logger.WithFields(logrus.Fields{
431 "InstanceType": it.Name,
432 "Instance": inst.ID(),
433 "Address": inst.Address(),
435 logger.WithFields(logrus.Fields{
437 "IdleBehavior": idleBehavior,
438 }).Infof("instance appeared in cloud")
444 executor: wp.newExecutor(inst),
446 idleBehavior: idleBehavior,
453 running: make(map[string]*remoteRunner),
454 starting: make(map[string]*remoteRunner),
455 probing: make(chan struct{}, 1),
461 // Shutdown shuts down a worker with the given type, or returns false
462 // if all workers with the given type are busy.
463 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
464 wp.setupOnce.Do(wp.setup)
466 defer wp.mtx.Unlock()
467 logger := wp.logger.WithField("InstanceType", it.Name)
468 logger.Info("shutdown requested")
469 for _, tryState := range []State{StateBooting, StateIdle} {
470 // TODO: shutdown the worker with the longest idle
471 // time (Idle) or the earliest create time (Booting)
472 for _, wkr := range wp.workers {
473 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
474 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
475 wkr.reportBootOutcome(BootOutcomeAborted)
484 // CountWorkers returns the current number of workers in each state.
486 // CountWorkers blocks, if necessary, until the initial instance list
487 // has been loaded from the cloud provider.
488 func (wp *Pool) CountWorkers() map[State]int {
489 wp.setupOnce.Do(wp.setup)
492 defer wp.mtx.Unlock()
494 for _, w := range wp.workers {
500 // Running returns the container UUIDs being prepared/run on workers.
502 // In the returned map, the time value indicates when the Pool
503 // observed that the container process had exited. A container that
504 // has not yet exited has a zero time value. The caller should use
505 // ForgetContainer() to garbage-collect the entries for exited
507 func (wp *Pool) Running() map[string]time.Time {
508 wp.setupOnce.Do(wp.setup)
510 defer wp.mtx.Unlock()
511 r := map[string]time.Time{}
512 for _, wkr := range wp.workers {
513 for uuid := range wkr.running {
514 r[uuid] = time.Time{}
516 for uuid := range wkr.starting {
517 r[uuid] = time.Time{}
520 for uuid, exited := range wp.exited {
526 // StartContainer starts a container on an idle worker immediately if
527 // possible, otherwise returns false.
528 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
529 wp.setupOnce.Do(wp.setup)
531 defer wp.mtx.Unlock()
533 for _, w := range wp.workers {
534 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
535 if wkr == nil || w.busy.After(wkr.busy) {
543 wkr.startContainer(ctr)
547 // KillContainer kills the crunch-run process for the given container
548 // UUID, if it's running on any worker.
550 // KillContainer returns immediately; the act of killing the container
551 // takes some time, and runs in the background.
553 // KillContainer returns false if the container has already ended.
554 func (wp *Pool) KillContainer(uuid string, reason string) bool {
556 defer wp.mtx.Unlock()
557 logger := wp.logger.WithFields(logrus.Fields{
558 "ContainerUUID": uuid,
561 for _, wkr := range wp.workers {
562 rr := wkr.running[uuid]
564 rr = wkr.starting[uuid]
571 logger.Debug("cannot kill: already disappeared")
575 // ForgetContainer clears the placeholder for the given exited
576 // container, so it isn't returned by subsequent calls to Running().
578 // ForgetContainer has no effect if the container has not yet exited.
580 // The "container exited at time T" placeholder (which necessitates
581 // ForgetContainer) exists to make it easier for the caller
582 // (scheduler) to distinguish a container that exited without
583 // finalizing its state from a container that exited too recently for
584 // its final state to have appeared in the scheduler's queue cache.
585 func (wp *Pool) ForgetContainer(uuid string) {
587 defer wp.mtx.Unlock()
588 if _, ok := wp.exited[uuid]; ok {
589 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
590 delete(wp.exited, uuid)
594 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
596 reg = prometheus.NewRegistry()
598 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
599 Namespace: "arvados",
600 Subsystem: "dispatchcloud",
601 Name: "containers_running",
602 Help: "Number of containers reported running by cloud VMs.",
604 reg.MustRegister(wp.mContainersRunning)
605 wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
606 Namespace: "arvados",
607 Subsystem: "dispatchcloud",
608 Name: "instances_total",
609 Help: "Number of cloud VMs.",
610 }, []string{"category", "instance_type"})
611 reg.MustRegister(wp.mInstances)
612 wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
613 Namespace: "arvados",
614 Subsystem: "dispatchcloud",
615 Name: "instances_price",
616 Help: "Price of cloud VMs.",
617 }, []string{"category"})
618 reg.MustRegister(wp.mInstancesPrice)
619 wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
620 Namespace: "arvados",
621 Subsystem: "dispatchcloud",
623 Help: "Total VCPUs on all cloud VMs.",
624 }, []string{"category"})
625 reg.MustRegister(wp.mVCPUs)
626 wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
627 Namespace: "arvados",
628 Subsystem: "dispatchcloud",
629 Name: "memory_bytes_total",
630 Help: "Total memory on all cloud VMs.",
631 }, []string{"category"})
632 reg.MustRegister(wp.mMemory)
633 wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
634 Namespace: "arvados",
635 Subsystem: "dispatchcloud",
636 Name: "boot_outcomes",
637 Help: "Boot outcomes by type.",
638 }, []string{"outcome"})
639 for k := range validBootOutcomes {
640 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
642 reg.MustRegister(wp.mBootOutcomes)
643 wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
644 Namespace: "arvados",
645 Subsystem: "dispatchcloud",
646 Name: "instances_disappeared",
647 Help: "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
648 }, []string{"state"})
649 for _, v := range stateString {
650 wp.mDisappearances.WithLabelValues(v).Add(0)
652 reg.MustRegister(wp.mDisappearances)
653 wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
654 Namespace: "arvados",
655 Subsystem: "dispatchcloud",
656 Name: "instances_time_to_ssh_seconds",
657 Help: "Number of seconds between instance creation and the first successful SSH connection.",
658 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
660 reg.MustRegister(wp.mTimeToSSH)
661 wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
662 Namespace: "arvados",
663 Subsystem: "dispatchcloud",
664 Name: "instances_time_to_ready_for_container_seconds",
665 Help: "Number of seconds between the first successful SSH connection and ready to run a container.",
666 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
668 reg.MustRegister(wp.mTimeToReadyForContainer)
669 wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
670 Namespace: "arvados",
671 Subsystem: "dispatchcloud",
672 Name: "instances_time_from_shutdown_request_to_disappearance_seconds",
673 Help: "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
674 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
676 reg.MustRegister(wp.mTimeFromShutdownToGone)
677 wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
678 Namespace: "arvados",
679 Subsystem: "dispatchcloud",
680 Name: "containers_time_from_queue_to_crunch_run_seconds",
681 Help: "Number of seconds between the queuing of a container and the start of crunch-run.",
682 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
684 reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
685 wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
686 Namespace: "arvados",
687 Subsystem: "dispatchcloud",
688 Name: "instances_run_probe_duration_seconds",
689 Help: "Number of seconds per runProbe call.",
690 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
691 }, []string{"outcome"})
692 reg.MustRegister(wp.mRunProbeDuration)
695 func (wp *Pool) runMetrics() {
697 defer wp.Unsubscribe(ch)
704 func (wp *Pool) updateMetrics() {
706 defer wp.mtx.RUnlock()
712 instances := map[entKey]int64{}
713 price := map[string]float64{}
714 cpu := map[string]int64{}
715 mem := map[string]int64{}
717 for _, wkr := range wp.workers {
720 case len(wkr.running)+len(wkr.starting) > 0:
722 case wkr.idleBehavior == IdleBehaviorHold:
724 case wkr.state == StateBooting:
726 case wkr.state == StateUnknown:
731 instances[entKey{cat, wkr.instType.Name}]++
732 price[cat] += wkr.instType.Price
733 cpu[cat] += int64(wkr.instType.VCPUs)
734 mem[cat] += int64(wkr.instType.RAM)
735 running += int64(len(wkr.running) + len(wkr.starting))
737 for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
738 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
739 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
740 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
741 // make sure to reset gauges for non-existing category/nodetype combinations
742 for _, it := range wp.instanceTypes {
743 if _, ok := instances[entKey{cat, it.Name}]; !ok {
744 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
748 for k, v := range instances {
749 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
751 wp.mContainersRunning.Set(float64(running))
754 func (wp *Pool) runProbes() {
755 maxPPS := wp.maxProbesPerSecond
757 maxPPS = defaultMaxProbesPerSecond
759 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
760 defer limitticker.Stop()
762 probeticker := time.NewTicker(wp.probeInterval)
763 defer probeticker.Stop()
765 workers := []cloud.InstanceID{}
766 for range probeticker.C {
767 workers = workers[:0]
769 for id, wkr := range wp.workers {
770 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
773 workers = append(workers, id)
777 for _, id := range workers {
779 wkr, ok := wp.workers[id]
782 // Deleted while we were probing
786 go wkr.ProbeAndUpdate()
790 case <-limitticker.C:
796 func (wp *Pool) runSync() {
797 // sync once immediately, then wait syncInterval, sync again,
799 timer := time.NewTimer(1)
803 err := wp.getInstancesAndSync()
805 wp.logger.WithError(err).Warn("sync failed")
807 timer.Reset(wp.syncInterval)
809 wp.logger.Debug("worker.Pool stopped")
815 // Stop synchronizing with the InstanceSet.
816 func (wp *Pool) Stop() {
817 wp.setupOnce.Do(wp.setup)
821 // Instances returns an InstanceView for each worker in the pool,
822 // summarizing its current state and recent activity.
823 func (wp *Pool) Instances() []InstanceView {
825 wp.setupOnce.Do(wp.setup)
827 for _, w := range wp.workers {
828 r = append(r, InstanceView{
829 Instance: w.instance.ID(),
830 Address: w.instance.Address(),
831 Price: w.instType.Price,
832 ArvadosInstanceType: w.instType.Name,
833 ProviderInstanceType: w.instType.ProviderType,
834 LastContainerUUID: w.lastUUID,
836 WorkerState: w.state.String(),
837 IdleBehavior: w.idleBehavior,
841 sort.Slice(r, func(i, j int) bool {
842 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
847 // KillInstance destroys a cloud VM instance. It returns an error if
848 // the given instance does not exist.
849 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
850 wkr, ok := wp.workers[id]
852 return errors.New("instance not found")
854 wkr.logger.WithField("Reason", reason).Info("shutting down")
855 wkr.reportBootOutcome(BootOutcomeAborted)
860 func (wp *Pool) setup() {
861 wp.creating = map[string]createCall{}
862 wp.exited = map[string]time.Time{}
863 wp.workers = map[cloud.InstanceID]*worker{}
864 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
868 // Load the runner program to be deployed on worker nodes into
869 // wp.runnerData, if necessary. Errors are logged.
871 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
873 // Caller must not have lock.
874 func (wp *Pool) loadRunnerData() error {
876 defer wp.mtx.Unlock()
877 if wp.runnerData != nil {
879 } else if wp.runnerSource == "" {
880 wp.runnerCmd = "crunch-run"
881 wp.runnerData = []byte{}
884 logger := wp.logger.WithField("source", wp.runnerSource)
885 logger.Debug("loading runner")
886 buf, err := ioutil.ReadFile(wp.runnerSource)
888 logger.WithError(err).Error("failed to load runner program")
892 wp.runnerMD5 = md5.Sum(buf)
893 wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
897 func (wp *Pool) notify() {
899 defer wp.mtx.RUnlock()
900 for _, send := range wp.subscribers {
902 case send <- struct{}{}:
908 func (wp *Pool) getInstancesAndSync() error {
909 wp.setupOnce.Do(wp.setup)
910 if err := wp.instanceSet.throttleInstances.Error(); err != nil {
913 wp.logger.Debug("getting instance list")
914 threshold := time.Now()
915 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
917 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
920 wp.sync(threshold, instances)
921 wp.logger.Debug("sync done")
925 // Add/remove/update workers based on instances, which was obtained
926 // from the instanceSet. However, don't clobber any other updates that
927 // already happened after threshold.
928 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
930 defer wp.mtx.Unlock()
931 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
934 for _, inst := range instances {
935 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
936 it, ok := wp.instanceTypes[itTag]
938 wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
941 if wkr, isNew := wp.updateWorker(inst, it); isNew {
943 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
944 wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
949 for id, wkr := range wp.workers {
950 if wkr.updated.After(threshold) {
953 logger := wp.logger.WithFields(logrus.Fields{
954 "Instance": wkr.instance.ID(),
955 "WorkerState": wkr.state,
957 logger.Info("instance disappeared in cloud")
958 wkr.reportBootOutcome(BootOutcomeDisappeared)
959 if wp.mDisappearances != nil {
960 wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
962 // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
963 if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
964 wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
966 delete(wp.workers, id)
974 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
982 func (wp *Pool) waitUntilLoaded() {
985 defer wp.mtx.RUnlock()
993 // Return a random string of n hexadecimal digits (n*4 random bits). n
995 func randomHex(n int) string {
996 buf := make([]byte, n/2)
997 _, err := rand.Read(buf)
1001 return fmt.Sprintf("%x", buf)