1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
21 "git.arvados.org/arvados.git/lib/cloud"
22 "git.arvados.org/arvados.git/sdk/go/arvados"
23 "github.com/prometheus/client_golang/prometheus"
24 "github.com/sirupsen/logrus"
25 "golang.org/x/crypto/ssh"
29 tagKeyInstanceType = "InstanceType"
30 tagKeyIdleBehavior = "IdleBehavior"
31 tagKeyInstanceSecret = "InstanceSecret"
32 tagKeyInstanceSetID = "InstanceSetID"
35 // An InstanceView shows a worker's current state and recent activity.
36 type InstanceView struct {
37 Instance cloud.InstanceID `json:"instance"`
38 Address string `json:"address"`
39 Price float64 `json:"price"`
40 ArvadosInstanceType string `json:"arvados_instance_type"`
41 ProviderInstanceType string `json:"provider_instance_type"`
42 LastContainerUUID string `json:"last_container_uuid"`
43 LastBusy time.Time `json:"last_busy"`
44 WorkerState string `json:"worker_state"`
45 IdleBehavior IdleBehavior `json:"idle_behavior"`
48 // An Executor executes shell commands on a remote host.
49 type Executor interface {
50 // Run cmd on the current target.
51 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
53 // Use the given target for subsequent operations. The new
54 // target is the same host as the previous target, but it
55 // might return a different address and verify a different
58 // SetTarget is called frequently, and in most cases the new
59 // target will behave exactly the same as the old one. An
60 // implementation should optimize accordingly.
62 // SetTarget must not block on concurrent Execute calls.
63 SetTarget(cloud.ExecutorTarget)
69 defaultSyncInterval = time.Minute
70 defaultProbeInterval = time.Second * 10
71 defaultMaxProbesPerSecond = 10
72 defaultTimeoutIdle = time.Minute
73 defaultTimeoutBooting = time.Minute * 10
74 defaultTimeoutProbe = time.Minute * 10
75 defaultTimeoutShutdown = time.Second * 10
76 defaultTimeoutTERM = time.Minute * 2
77 defaultTimeoutSignal = time.Second * 5
78 defaultTimeoutStaleRunLock = time.Second * 5
80 // Time after a quota error to try again anyway, even if no
81 // instances have been shutdown.
82 quotaErrorTTL = time.Minute
84 // Time between "X failed because rate limiting" messages
85 logRateLimitErrorInterval = time.Second * 10
88 func duration(conf arvados.Duration, def time.Duration) time.Duration {
90 return time.Duration(conf)
95 // NewPool creates a Pool of workers backed by instanceSet.
97 // New instances are configured and set up according to the given
98 // cluster configuration.
99 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
102 arvClient: arvClient,
103 instanceSetID: instanceSetID,
104 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
105 newExecutor: newExecutor,
106 bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
107 runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
108 imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
109 instanceTypes: cluster.InstanceTypes,
110 maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
111 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
112 probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
113 syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
114 timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
115 timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
116 timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
117 timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
118 timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
119 timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
120 timeoutStaleRunLock: duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
121 systemRootToken: cluster.SystemRootToken,
122 installPublicKey: installPublicKey,
123 tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
124 stop: make(chan bool),
126 wp.registerMetrics(reg)
128 wp.setupOnce.Do(wp.setup)
136 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
137 // zero Pool should not be used. Call NewPool to create a new Pool.
140 logger logrus.FieldLogger
141 arvClient *arvados.Client
142 instanceSetID cloud.InstanceSetID
143 instanceSet *throttledInstanceSet
144 newExecutor func(cloud.Instance) Executor
145 bootProbeCommand string
147 imageID cloud.ImageID
148 instanceTypes map[string]arvados.InstanceType
149 syncInterval time.Duration
150 probeInterval time.Duration
151 maxProbesPerSecond int
152 maxConcurrentInstanceCreateOps int
153 timeoutIdle time.Duration
154 timeoutBooting time.Duration
155 timeoutProbe time.Duration
156 timeoutShutdown time.Duration
157 timeoutTERM time.Duration
158 timeoutSignal time.Duration
159 timeoutStaleRunLock time.Duration
160 systemRootToken string
161 installPublicKey ssh.PublicKey
165 subscribers map[<-chan struct{}]chan<- struct{}
166 creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
167 workers map[cloud.InstanceID]*worker
168 loaded bool // loaded list of instances from InstanceSet at least once
169 exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
170 atQuotaUntil time.Time
171 atQuotaErr cloud.QuotaError
176 runnerMD5 [md5.Size]byte
179 mContainersRunning prometheus.Gauge
180 mInstances *prometheus.GaugeVec
181 mInstancesPrice *prometheus.GaugeVec
182 mVCPUs *prometheus.GaugeVec
183 mMemory *prometheus.GaugeVec
184 mBootOutcomes *prometheus.CounterVec
185 mDisappearances *prometheus.CounterVec
186 mTimeToSSH prometheus.Summary
187 mTimeToReadyForContainer prometheus.Summary
188 mTimeFromShutdownToGone prometheus.Summary
189 mTimeFromQueueToCrunchRun prometheus.Summary
190 mRunProbeDuration *prometheus.SummaryVec
193 type createCall struct {
195 instanceType arvados.InstanceType
198 func (wp *Pool) CheckHealth() error {
199 wp.setupOnce.Do(wp.setup)
200 if err := wp.loadRunnerData(); err != nil {
201 return fmt.Errorf("error loading runner binary: %s", err)
206 // Subscribe returns a buffered channel that becomes ready after any
207 // change to the pool's state that could have scheduling implications:
208 // a worker's state changes, a new worker appears, the cloud
209 // provider's API rate limiting period ends, etc.
211 // Additional events that occur while the channel is already ready
212 // will be dropped, so it is OK if the caller services the channel
217 // ch := wp.Subscribe()
218 // defer wp.Unsubscribe(ch)
225 func (wp *Pool) Subscribe() <-chan struct{} {
226 wp.setupOnce.Do(wp.setup)
228 defer wp.mtx.Unlock()
229 ch := make(chan struct{}, 1)
230 wp.subscribers[ch] = ch
234 // Unsubscribe stops sending updates to the given channel.
235 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
236 wp.setupOnce.Do(wp.setup)
238 defer wp.mtx.Unlock()
239 delete(wp.subscribers, ch)
242 // Unallocated returns the number of unallocated (creating + booting +
243 // idle + unknown) workers for each instance type. Workers in
244 // hold/drain mode are not included.
245 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
246 wp.setupOnce.Do(wp.setup)
248 defer wp.mtx.RUnlock()
249 unalloc := map[arvados.InstanceType]int{}
250 creating := map[arvados.InstanceType]int{}
251 oldestCreate := map[arvados.InstanceType]time.Time{}
252 for _, cc := range wp.creating {
253 it := cc.instanceType
255 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
256 oldestCreate[it] = cc.time
259 for _, wkr := range wp.workers {
260 // Skip workers that are not expected to become
261 // available soon. Note len(wkr.running)>0 is not
262 // redundant here: it can be true even in
264 if wkr.state == StateShutdown ||
265 wkr.state == StateRunning ||
266 wkr.idleBehavior != IdleBehaviorRun ||
267 len(wkr.running) > 0 {
272 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
273 // If up to N new workers appear in
274 // Instances() while we are waiting for N
275 // Create() calls to complete, we assume we're
276 // just seeing a race between Instances() and
277 // Create() responses.
279 // The other common reason why nodes have
280 // state==Unknown is that they appeared at
281 // startup, before any Create calls. They
282 // don't match the above timing condition, so
283 // we never mistakenly attribute them to
284 // pending Create calls.
288 for it, c := range creating {
294 // Create a new instance with the given type, and add it to the worker
295 // pool. The worker is added immediately; instance creation runs in
298 // Create returns false if a pre-existing error state prevents it from
299 // even attempting to create a new instance. Those errors are logged
300 // by the Pool, so the caller does not need to log anything in such
302 func (wp *Pool) Create(it arvados.InstanceType) bool {
303 logger := wp.logger.WithField("InstanceType", it.Name)
304 wp.setupOnce.Do(wp.setup)
305 if wp.loadRunnerData() != nil {
306 // Boot probe is certain to fail.
310 defer wp.mtx.Unlock()
311 if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil {
314 // The maxConcurrentInstanceCreateOps knob throttles the number of node create
315 // requests in flight. It was added to work around a limitation in Azure's
316 // managed disks, which support no more than 20 concurrent node creation
317 // requests from a single disk image (cf.
318 // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
319 // The code assumes that node creation, from Azure's perspective, means the
320 // period until the instance appears in the "get all instances" list.
321 if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
322 logger.Info("reached MaxConcurrentInstanceCreateOps")
323 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
327 secret := randomHex(instanceSecretLength)
328 wp.creating[secret] = createCall{time: now, instanceType: it}
331 tags := cloud.InstanceTags{
332 wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID),
333 wp.tagKeyPrefix + tagKeyInstanceType: it.Name,
334 wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
335 wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
337 initCmd := TagVerifier{nil, secret, nil}.InitCommand()
338 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
340 defer wp.mtx.Unlock()
341 // delete() is deferred so the updateWorker() call
342 // below knows to use StateBooting when adding a new
344 defer delete(wp.creating, secret)
346 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
348 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
349 time.AfterFunc(quotaErrorTTL, wp.notify)
351 logger.WithError(err).Error("create failed")
352 wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
355 wp.updateWorker(inst, it)
360 // AtQuota returns true if Create is not expected to work at the
362 func (wp *Pool) AtQuota() bool {
364 defer wp.mtx.Unlock()
365 return time.Now().Before(wp.atQuotaUntil)
368 // SetIdleBehavior determines how the indicated instance will behave
369 // when it has no containers running.
370 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
372 defer wp.mtx.Unlock()
373 wkr, ok := wp.workers[id]
375 return errors.New("requested instance does not exist")
377 wkr.setIdleBehavior(idleBehavior)
381 // Successful connection to the SSH daemon, update the mTimeToSSH metric
382 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
384 defer wp.mtx.Unlock()
385 wkr := wp.workers[inst.ID()]
386 if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
387 // the node is not in booting state (can happen if a-d-c is restarted) OR
388 // this is not the first SSH connection
392 wkr.firstSSHConnection = time.Now()
393 if wp.mTimeToSSH != nil {
394 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
398 // Add or update worker attached to the given instance.
400 // The second return value is true if a new worker is created.
402 // A newly added instance has state=StateBooting if its tags match an
403 // entry in wp.creating, otherwise StateUnknown.
405 // Caller must have lock.
406 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
407 secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
408 inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
410 if wkr := wp.workers[id]; wkr != nil {
411 wkr.executor.SetTarget(inst)
413 wkr.updated = time.Now()
418 state := StateUnknown
419 if _, ok := wp.creating[secret]; ok {
423 // If an instance has a valid IdleBehavior tag when it first
424 // appears, initialize the new worker accordingly (this is how
425 // we restore IdleBehavior that was set by a prior dispatch
426 // process); otherwise, default to "run". After this,
427 // wkr.idleBehavior is the source of truth, and will only be
428 // changed via SetIdleBehavior().
429 idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
430 if !validIdleBehavior[idleBehavior] {
431 idleBehavior = IdleBehaviorRun
434 logger := wp.logger.WithFields(logrus.Fields{
435 "InstanceType": it.Name,
436 "Instance": inst.ID(),
437 "Address": inst.Address(),
439 logger.WithFields(logrus.Fields{
441 "IdleBehavior": idleBehavior,
442 }).Infof("instance appeared in cloud")
448 executor: wp.newExecutor(inst),
450 idleBehavior: idleBehavior,
457 running: make(map[string]*remoteRunner),
458 starting: make(map[string]*remoteRunner),
459 probing: make(chan struct{}, 1),
465 // Shutdown shuts down a worker with the given type, or returns false
466 // if all workers with the given type are busy.
467 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
468 wp.setupOnce.Do(wp.setup)
470 defer wp.mtx.Unlock()
471 logger := wp.logger.WithField("InstanceType", it.Name)
472 logger.Info("shutdown requested")
473 for _, tryState := range []State{StateBooting, StateIdle} {
474 // TODO: shutdown the worker with the longest idle
475 // time (Idle) or the earliest create time (Booting)
476 for _, wkr := range wp.workers {
477 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
478 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
479 wkr.reportBootOutcome(BootOutcomeAborted)
488 // CountWorkers returns the current number of workers in each state.
490 // CountWorkers blocks, if necessary, until the initial instance list
491 // has been loaded from the cloud provider.
492 func (wp *Pool) CountWorkers() map[State]int {
493 wp.setupOnce.Do(wp.setup)
496 defer wp.mtx.Unlock()
498 for _, w := range wp.workers {
504 // Running returns the container UUIDs being prepared/run on workers.
506 // In the returned map, the time value indicates when the Pool
507 // observed that the container process had exited. A container that
508 // has not yet exited has a zero time value. The caller should use
509 // ForgetContainer() to garbage-collect the entries for exited
511 func (wp *Pool) Running() map[string]time.Time {
512 wp.setupOnce.Do(wp.setup)
514 defer wp.mtx.Unlock()
515 r := map[string]time.Time{}
516 for _, wkr := range wp.workers {
517 for uuid := range wkr.running {
518 r[uuid] = time.Time{}
520 for uuid := range wkr.starting {
521 r[uuid] = time.Time{}
524 for uuid, exited := range wp.exited {
530 // StartContainer starts a container on an idle worker immediately if
531 // possible, otherwise returns false.
532 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
533 wp.setupOnce.Do(wp.setup)
535 defer wp.mtx.Unlock()
537 for _, w := range wp.workers {
538 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
539 if wkr == nil || w.busy.After(wkr.busy) {
547 wkr.startContainer(ctr)
551 // KillContainer kills the crunch-run process for the given container
552 // UUID, if it's running on any worker.
554 // KillContainer returns immediately; the act of killing the container
555 // takes some time, and runs in the background.
557 // KillContainer returns false if the container has already ended.
558 func (wp *Pool) KillContainer(uuid string, reason string) bool {
560 defer wp.mtx.Unlock()
561 logger := wp.logger.WithFields(logrus.Fields{
562 "ContainerUUID": uuid,
565 for _, wkr := range wp.workers {
566 rr := wkr.running[uuid]
568 rr = wkr.starting[uuid]
575 logger.Debug("cannot kill: already disappeared")
579 // ForgetContainer clears the placeholder for the given exited
580 // container, so it isn't returned by subsequent calls to Running().
582 // ForgetContainer has no effect if the container has not yet exited.
584 // The "container exited at time T" placeholder (which necessitates
585 // ForgetContainer) exists to make it easier for the caller
586 // (scheduler) to distinguish a container that exited without
587 // finalizing its state from a container that exited too recently for
588 // its final state to have appeared in the scheduler's queue cache.
589 func (wp *Pool) ForgetContainer(uuid string) {
591 defer wp.mtx.Unlock()
592 if _, ok := wp.exited[uuid]; ok {
593 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
594 delete(wp.exited, uuid)
598 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
600 reg = prometheus.NewRegistry()
602 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
603 Namespace: "arvados",
604 Subsystem: "dispatchcloud",
605 Name: "containers_running",
606 Help: "Number of containers reported running by cloud VMs.",
608 reg.MustRegister(wp.mContainersRunning)
609 wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
610 Namespace: "arvados",
611 Subsystem: "dispatchcloud",
612 Name: "instances_total",
613 Help: "Number of cloud VMs.",
614 }, []string{"category", "instance_type"})
615 reg.MustRegister(wp.mInstances)
616 wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
617 Namespace: "arvados",
618 Subsystem: "dispatchcloud",
619 Name: "instances_price",
620 Help: "Price of cloud VMs.",
621 }, []string{"category"})
622 reg.MustRegister(wp.mInstancesPrice)
623 wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
624 Namespace: "arvados",
625 Subsystem: "dispatchcloud",
627 Help: "Total VCPUs on all cloud VMs.",
628 }, []string{"category"})
629 reg.MustRegister(wp.mVCPUs)
630 wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
631 Namespace: "arvados",
632 Subsystem: "dispatchcloud",
633 Name: "memory_bytes_total",
634 Help: "Total memory on all cloud VMs.",
635 }, []string{"category"})
636 reg.MustRegister(wp.mMemory)
637 wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
638 Namespace: "arvados",
639 Subsystem: "dispatchcloud",
640 Name: "boot_outcomes",
641 Help: "Boot outcomes by type.",
642 }, []string{"outcome"})
643 for k := range validBootOutcomes {
644 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
646 reg.MustRegister(wp.mBootOutcomes)
647 wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
648 Namespace: "arvados",
649 Subsystem: "dispatchcloud",
650 Name: "instances_disappeared",
651 Help: "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
652 }, []string{"state"})
653 for _, v := range stateString {
654 wp.mDisappearances.WithLabelValues(v).Add(0)
656 reg.MustRegister(wp.mDisappearances)
657 wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
658 Namespace: "arvados",
659 Subsystem: "dispatchcloud",
660 Name: "instances_time_to_ssh_seconds",
661 Help: "Number of seconds between instance creation and the first successful SSH connection.",
662 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
664 reg.MustRegister(wp.mTimeToSSH)
665 wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
666 Namespace: "arvados",
667 Subsystem: "dispatchcloud",
668 Name: "instances_time_to_ready_for_container_seconds",
669 Help: "Number of seconds between the first successful SSH connection and ready to run a container.",
670 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
672 reg.MustRegister(wp.mTimeToReadyForContainer)
673 wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
674 Namespace: "arvados",
675 Subsystem: "dispatchcloud",
676 Name: "instances_time_from_shutdown_request_to_disappearance_seconds",
677 Help: "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
678 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
680 reg.MustRegister(wp.mTimeFromShutdownToGone)
681 wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
682 Namespace: "arvados",
683 Subsystem: "dispatchcloud",
684 Name: "containers_time_from_queue_to_crunch_run_seconds",
685 Help: "Number of seconds between the queuing of a container and the start of crunch-run.",
686 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
688 reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
689 wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
690 Namespace: "arvados",
691 Subsystem: "dispatchcloud",
692 Name: "instances_run_probe_duration_seconds",
693 Help: "Number of seconds per runProbe call.",
694 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
695 }, []string{"outcome"})
696 reg.MustRegister(wp.mRunProbeDuration)
699 func (wp *Pool) runMetrics() {
701 defer wp.Unsubscribe(ch)
708 func (wp *Pool) updateMetrics() {
710 defer wp.mtx.RUnlock()
716 instances := map[entKey]int64{}
717 price := map[string]float64{}
718 cpu := map[string]int64{}
719 mem := map[string]int64{}
721 for _, wkr := range wp.workers {
724 case len(wkr.running)+len(wkr.starting) > 0:
726 case wkr.idleBehavior == IdleBehaviorHold:
728 case wkr.state == StateBooting:
730 case wkr.state == StateUnknown:
735 instances[entKey{cat, wkr.instType.Name}]++
736 price[cat] += wkr.instType.Price
737 cpu[cat] += int64(wkr.instType.VCPUs)
738 mem[cat] += int64(wkr.instType.RAM)
739 running += int64(len(wkr.running) + len(wkr.starting))
741 for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
742 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
743 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
744 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
745 // make sure to reset gauges for non-existing category/nodetype combinations
746 for _, it := range wp.instanceTypes {
747 if _, ok := instances[entKey{cat, it.Name}]; !ok {
748 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
752 for k, v := range instances {
753 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
755 wp.mContainersRunning.Set(float64(running))
758 func (wp *Pool) runProbes() {
759 maxPPS := wp.maxProbesPerSecond
761 maxPPS = defaultMaxProbesPerSecond
763 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
764 defer limitticker.Stop()
766 probeticker := time.NewTicker(wp.probeInterval)
767 defer probeticker.Stop()
769 workers := []cloud.InstanceID{}
770 for range probeticker.C {
771 workers = workers[:0]
773 for id, wkr := range wp.workers {
774 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
777 workers = append(workers, id)
781 for _, id := range workers {
783 wkr, ok := wp.workers[id]
786 // Deleted while we were probing
790 go wkr.ProbeAndUpdate()
794 case <-limitticker.C:
800 func (wp *Pool) runSync() {
801 // sync once immediately, then wait syncInterval, sync again,
803 timer := time.NewTimer(1)
807 err := wp.getInstancesAndSync()
809 wp.logger.WithError(err).Warn("sync failed")
811 timer.Reset(wp.syncInterval)
813 wp.logger.Debug("worker.Pool stopped")
819 // Stop synchronizing with the InstanceSet.
820 func (wp *Pool) Stop() {
821 wp.setupOnce.Do(wp.setup)
825 // Instances returns an InstanceView for each worker in the pool,
826 // summarizing its current state and recent activity.
827 func (wp *Pool) Instances() []InstanceView {
829 wp.setupOnce.Do(wp.setup)
831 for _, w := range wp.workers {
832 r = append(r, InstanceView{
833 Instance: w.instance.ID(),
834 Address: w.instance.Address(),
835 Price: w.instType.Price,
836 ArvadosInstanceType: w.instType.Name,
837 ProviderInstanceType: w.instType.ProviderType,
838 LastContainerUUID: w.lastUUID,
840 WorkerState: w.state.String(),
841 IdleBehavior: w.idleBehavior,
845 sort.Slice(r, func(i, j int) bool {
846 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
851 // KillInstance destroys a cloud VM instance. It returns an error if
852 // the given instance does not exist.
853 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
854 wkr, ok := wp.workers[id]
856 return errors.New("instance not found")
858 wkr.logger.WithField("Reason", reason).Info("shutting down")
859 wkr.reportBootOutcome(BootOutcomeAborted)
864 func (wp *Pool) setup() {
865 wp.creating = map[string]createCall{}
866 wp.exited = map[string]time.Time{}
867 wp.workers = map[cloud.InstanceID]*worker{}
868 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
872 // Load the runner program to be deployed on worker nodes into
873 // wp.runnerData, if necessary. Errors are logged.
875 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
877 // Caller must not have lock.
878 func (wp *Pool) loadRunnerData() error {
880 defer wp.mtx.Unlock()
881 if wp.runnerData != nil {
883 } else if wp.runnerSource == "" {
884 wp.runnerCmd = "crunch-run"
885 wp.runnerData = []byte{}
888 logger := wp.logger.WithField("source", wp.runnerSource)
889 logger.Debug("loading runner")
890 buf, err := ioutil.ReadFile(wp.runnerSource)
892 logger.WithError(err).Error("failed to load runner program")
896 wp.runnerMD5 = md5.Sum(buf)
897 wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
901 func (wp *Pool) notify() {
903 defer wp.mtx.RUnlock()
904 for _, send := range wp.subscribers {
906 case send <- struct{}{}:
912 func (wp *Pool) getInstancesAndSync() error {
913 wp.setupOnce.Do(wp.setup)
914 if err := wp.instanceSet.throttleInstances.Error(); err != nil {
917 wp.logger.Debug("getting instance list")
918 threshold := time.Now()
919 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
921 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
924 wp.sync(threshold, instances)
925 wp.logger.Debug("sync done")
929 // Add/remove/update workers based on instances, which was obtained
930 // from the instanceSet. However, don't clobber any other updates that
931 // already happened after threshold.
932 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
934 defer wp.mtx.Unlock()
935 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
938 for _, inst := range instances {
939 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
940 it, ok := wp.instanceTypes[itTag]
942 wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
945 if wkr, isNew := wp.updateWorker(inst, it); isNew {
947 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
948 wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
953 for id, wkr := range wp.workers {
954 if wkr.updated.After(threshold) {
957 logger := wp.logger.WithFields(logrus.Fields{
958 "Instance": wkr.instance.ID(),
959 "WorkerState": wkr.state,
961 logger.Info("instance disappeared in cloud")
962 wkr.reportBootOutcome(BootOutcomeDisappeared)
963 if wp.mDisappearances != nil {
964 wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
966 // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
967 if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
968 wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
970 delete(wp.workers, id)
978 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
986 func (wp *Pool) waitUntilLoaded() {
989 defer wp.mtx.RUnlock()
997 func (wp *Pool) gatewayAuthSecret(uuid string) string {
998 h := hmac.New(sha256.New, []byte(wp.systemRootToken))
1000 return fmt.Sprintf("%x", h.Sum(nil))
1003 // Return a random string of n hexadecimal digits (n*4 random bits). n
1005 func randomHex(n int) string {
1006 buf := make([]byte, n/2)
1007 _, err := rand.Read(buf)
1011 return fmt.Sprintf("%x", buf)