1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
22 "git.arvados.org/arvados.git/lib/cloud"
23 "git.arvados.org/arvados.git/sdk/go/arvados"
24 "github.com/prometheus/client_golang/prometheus"
25 "github.com/sirupsen/logrus"
26 "golang.org/x/crypto/ssh"
30 tagKeyInstanceType = "InstanceType"
31 tagKeyIdleBehavior = "IdleBehavior"
32 tagKeyInstanceSecret = "InstanceSecret"
33 tagKeyInstanceSetID = "InstanceSetID"
36 // An InstanceView shows a worker's current state and recent activity.
37 type InstanceView struct {
38 Instance cloud.InstanceID `json:"instance"`
39 Address string `json:"address"`
40 Price float64 `json:"price"`
41 ArvadosInstanceType string `json:"arvados_instance_type"`
42 ProviderInstanceType string `json:"provider_instance_type"`
43 LastContainerUUID string `json:"last_container_uuid"`
44 LastBusy time.Time `json:"last_busy"`
45 WorkerState string `json:"worker_state"`
46 IdleBehavior IdleBehavior `json:"idle_behavior"`
49 // An Executor executes shell commands on a remote host.
50 type Executor interface {
51 // Run cmd on the current target.
52 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
54 // Use the given target for subsequent operations. The new
55 // target is the same host as the previous target, but it
56 // might return a different address and verify a different
59 // SetTarget is called frequently, and in most cases the new
60 // target will behave exactly the same as the old one. An
61 // implementation should optimize accordingly.
63 // SetTarget must not block on concurrent Execute calls.
64 SetTarget(cloud.ExecutorTarget)
70 defaultSyncInterval = time.Minute
71 defaultProbeInterval = time.Second * 10
72 defaultMaxProbesPerSecond = 10
73 defaultTimeoutIdle = time.Minute
74 defaultTimeoutBooting = time.Minute * 10
75 defaultTimeoutProbe = time.Minute * 10
76 defaultTimeoutShutdown = time.Second * 10
77 defaultTimeoutTERM = time.Minute * 2
78 defaultTimeoutSignal = time.Second * 5
79 defaultTimeoutStaleRunLock = time.Second * 5
81 // Time after a quota error to try again anyway, even if no
82 // instances have been shutdown.
83 quotaErrorTTL = time.Minute
85 // Time between "X failed because rate limiting" messages
86 logRateLimitErrorInterval = time.Second * 10
89 func duration(conf arvados.Duration, def time.Duration) time.Duration {
91 return time.Duration(conf)
96 // NewPool creates a Pool of workers backed by instanceSet.
98 // New instances are configured and set up according to the given
99 // cluster configuration.
100 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
103 arvClient: arvClient,
104 instanceSetID: instanceSetID,
105 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
106 newExecutor: newExecutor,
108 bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
109 runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
110 imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
111 instanceTypes: cluster.InstanceTypes,
112 maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
113 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
114 probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
115 syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
116 timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
117 timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
118 timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
119 timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
120 timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
121 timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
122 timeoutStaleRunLock: duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
123 systemRootToken: cluster.SystemRootToken,
124 installPublicKey: installPublicKey,
125 tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
126 runnerCmdDefault: cluster.Containers.CrunchRunCommand,
127 runnerArgs: append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...),
128 stop: make(chan bool),
130 wp.registerMetrics(reg)
132 wp.setupOnce.Do(wp.setup)
140 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
141 // zero Pool should not be used. Call NewPool to create a new Pool.
144 logger logrus.FieldLogger
145 arvClient *arvados.Client
146 instanceSetID cloud.InstanceSetID
147 instanceSet *throttledInstanceSet
148 newExecutor func(cloud.Instance) Executor
149 cluster *arvados.Cluster
150 bootProbeCommand string
152 imageID cloud.ImageID
153 instanceTypes map[string]arvados.InstanceType
154 syncInterval time.Duration
155 probeInterval time.Duration
156 maxProbesPerSecond int
157 maxConcurrentInstanceCreateOps int
158 timeoutIdle time.Duration
159 timeoutBooting time.Duration
160 timeoutProbe time.Duration
161 timeoutShutdown time.Duration
162 timeoutTERM time.Duration
163 timeoutSignal time.Duration
164 timeoutStaleRunLock time.Duration
165 systemRootToken string
166 installPublicKey ssh.PublicKey
168 runnerCmdDefault string // crunch-run command to use if not deploying a binary
169 runnerArgs []string // extra args passed to crunch-run
172 subscribers map[<-chan struct{}]chan<- struct{}
173 creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
174 workers map[cloud.InstanceID]*worker
175 loaded bool // loaded list of instances from InstanceSet at least once
176 exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
177 atQuotaUntil time.Time
178 atQuotaErr cloud.QuotaError
183 runnerMD5 [md5.Size]byte
186 mContainersRunning prometheus.Gauge
187 mInstances *prometheus.GaugeVec
188 mInstancesPrice *prometheus.GaugeVec
189 mVCPUs *prometheus.GaugeVec
190 mMemory *prometheus.GaugeVec
191 mBootOutcomes *prometheus.CounterVec
192 mDisappearances *prometheus.CounterVec
193 mTimeToSSH prometheus.Summary
194 mTimeToReadyForContainer prometheus.Summary
195 mTimeFromShutdownToGone prometheus.Summary
196 mTimeFromQueueToCrunchRun prometheus.Summary
197 mRunProbeDuration *prometheus.SummaryVec
200 type createCall struct {
202 instanceType arvados.InstanceType
205 func (wp *Pool) CheckHealth() error {
206 wp.setupOnce.Do(wp.setup)
207 if err := wp.loadRunnerData(); err != nil {
208 return fmt.Errorf("error loading runner binary: %s", err)
213 // Subscribe returns a buffered channel that becomes ready after any
214 // change to the pool's state that could have scheduling implications:
215 // a worker's state changes, a new worker appears, the cloud
216 // provider's API rate limiting period ends, etc.
218 // Additional events that occur while the channel is already ready
219 // will be dropped, so it is OK if the caller services the channel
224 // ch := wp.Subscribe()
225 // defer wp.Unsubscribe(ch)
232 func (wp *Pool) Subscribe() <-chan struct{} {
233 wp.setupOnce.Do(wp.setup)
235 defer wp.mtx.Unlock()
236 ch := make(chan struct{}, 1)
237 wp.subscribers[ch] = ch
241 // Unsubscribe stops sending updates to the given channel.
242 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
243 wp.setupOnce.Do(wp.setup)
245 defer wp.mtx.Unlock()
246 delete(wp.subscribers, ch)
249 // Unallocated returns the number of unallocated (creating + booting +
250 // idle + unknown) workers for each instance type. Workers in
251 // hold/drain mode are not included.
252 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
253 wp.setupOnce.Do(wp.setup)
255 defer wp.mtx.RUnlock()
256 unalloc := map[arvados.InstanceType]int{}
257 creating := map[arvados.InstanceType]int{}
258 oldestCreate := map[arvados.InstanceType]time.Time{}
259 for _, cc := range wp.creating {
260 it := cc.instanceType
262 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
263 oldestCreate[it] = cc.time
266 for _, wkr := range wp.workers {
267 // Skip workers that are not expected to become
268 // available soon. Note len(wkr.running)>0 is not
269 // redundant here: it can be true even in
271 if wkr.state == StateShutdown ||
272 wkr.state == StateRunning ||
273 wkr.idleBehavior != IdleBehaviorRun ||
274 len(wkr.running) > 0 {
279 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
280 // If up to N new workers appear in
281 // Instances() while we are waiting for N
282 // Create() calls to complete, we assume we're
283 // just seeing a race between Instances() and
284 // Create() responses.
286 // The other common reason why nodes have
287 // state==Unknown is that they appeared at
288 // startup, before any Create calls. They
289 // don't match the above timing condition, so
290 // we never mistakenly attribute them to
291 // pending Create calls.
295 for it, c := range creating {
301 // Create a new instance with the given type, and add it to the worker
302 // pool. The worker is added immediately; instance creation runs in
305 // Create returns false if a pre-existing error state prevents it from
306 // even attempting to create a new instance. Those errors are logged
307 // by the Pool, so the caller does not need to log anything in such
309 func (wp *Pool) Create(it arvados.InstanceType) bool {
310 logger := wp.logger.WithField("InstanceType", it.Name)
311 wp.setupOnce.Do(wp.setup)
312 if wp.loadRunnerData() != nil {
313 // Boot probe is certain to fail.
317 defer wp.mtx.Unlock()
318 if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil {
321 // The maxConcurrentInstanceCreateOps knob throttles the number of node create
322 // requests in flight. It was added to work around a limitation in Azure's
323 // managed disks, which support no more than 20 concurrent node creation
324 // requests from a single disk image (cf.
325 // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
326 // The code assumes that node creation, from Azure's perspective, means the
327 // period until the instance appears in the "get all instances" list.
328 if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
329 logger.Info("reached MaxConcurrentInstanceCreateOps")
330 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
334 secret := randomHex(instanceSecretLength)
335 wp.creating[secret] = createCall{time: now, instanceType: it}
338 tags := cloud.InstanceTags{
339 wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID),
340 wp.tagKeyPrefix + tagKeyInstanceType: it.Name,
341 wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
342 wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
344 initCmd := TagVerifier{nil, secret, nil}.InitCommand()
345 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
347 defer wp.mtx.Unlock()
348 // delete() is deferred so the updateWorker() call
349 // below knows to use StateBooting when adding a new
351 defer delete(wp.creating, secret)
353 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
355 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
356 time.AfterFunc(quotaErrorTTL, wp.notify)
358 logger.WithError(err).Error("create failed")
359 wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
362 wp.updateWorker(inst, it)
367 // AtQuota returns true if Create is not expected to work at the
369 func (wp *Pool) AtQuota() bool {
371 defer wp.mtx.Unlock()
372 return time.Now().Before(wp.atQuotaUntil)
375 // SetIdleBehavior determines how the indicated instance will behave
376 // when it has no containers running.
377 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
379 defer wp.mtx.Unlock()
380 wkr, ok := wp.workers[id]
382 return errors.New("requested instance does not exist")
384 wkr.setIdleBehavior(idleBehavior)
388 // Successful connection to the SSH daemon, update the mTimeToSSH metric
389 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
391 defer wp.mtx.Unlock()
392 wkr := wp.workers[inst.ID()]
393 if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
394 // the node is not in booting state (can happen if a-d-c is restarted) OR
395 // this is not the first SSH connection
399 wkr.firstSSHConnection = time.Now()
400 if wp.mTimeToSSH != nil {
401 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
405 // Add or update worker attached to the given instance.
407 // The second return value is true if a new worker is created.
409 // A newly added instance has state=StateBooting if its tags match an
410 // entry in wp.creating, otherwise StateUnknown.
412 // Caller must have lock.
413 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
414 secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
415 inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
417 if wkr := wp.workers[id]; wkr != nil {
418 wkr.executor.SetTarget(inst)
420 wkr.updated = time.Now()
425 state := StateUnknown
426 if _, ok := wp.creating[secret]; ok {
430 // If an instance has a valid IdleBehavior tag when it first
431 // appears, initialize the new worker accordingly (this is how
432 // we restore IdleBehavior that was set by a prior dispatch
433 // process); otherwise, default to "run". After this,
434 // wkr.idleBehavior is the source of truth, and will only be
435 // changed via SetIdleBehavior().
436 idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
437 if !validIdleBehavior[idleBehavior] {
438 idleBehavior = IdleBehaviorRun
441 logger := wp.logger.WithFields(logrus.Fields{
442 "InstanceType": it.Name,
443 "Instance": inst.ID(),
444 "Address": inst.Address(),
446 logger.WithFields(logrus.Fields{
448 "IdleBehavior": idleBehavior,
449 }).Infof("instance appeared in cloud")
455 executor: wp.newExecutor(inst),
457 idleBehavior: idleBehavior,
464 running: make(map[string]*remoteRunner),
465 starting: make(map[string]*remoteRunner),
466 probing: make(chan struct{}, 1),
472 // Shutdown shuts down a worker with the given type, or returns false
473 // if all workers with the given type are busy.
474 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
475 wp.setupOnce.Do(wp.setup)
477 defer wp.mtx.Unlock()
478 logger := wp.logger.WithField("InstanceType", it.Name)
479 logger.Info("shutdown requested")
480 for _, tryState := range []State{StateBooting, StateIdle} {
481 // TODO: shutdown the worker with the longest idle
482 // time (Idle) or the earliest create time (Booting)
483 for _, wkr := range wp.workers {
484 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
485 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
486 wkr.reportBootOutcome(BootOutcomeAborted)
495 // CountWorkers returns the current number of workers in each state.
497 // CountWorkers blocks, if necessary, until the initial instance list
498 // has been loaded from the cloud provider.
499 func (wp *Pool) CountWorkers() map[State]int {
500 wp.setupOnce.Do(wp.setup)
503 defer wp.mtx.Unlock()
505 for _, w := range wp.workers {
511 // Running returns the container UUIDs being prepared/run on workers.
513 // In the returned map, the time value indicates when the Pool
514 // observed that the container process had exited. A container that
515 // has not yet exited has a zero time value. The caller should use
516 // ForgetContainer() to garbage-collect the entries for exited
518 func (wp *Pool) Running() map[string]time.Time {
519 wp.setupOnce.Do(wp.setup)
521 defer wp.mtx.Unlock()
522 r := map[string]time.Time{}
523 for _, wkr := range wp.workers {
524 for uuid := range wkr.running {
525 r[uuid] = time.Time{}
527 for uuid := range wkr.starting {
528 r[uuid] = time.Time{}
531 for uuid, exited := range wp.exited {
537 // StartContainer starts a container on an idle worker immediately if
538 // possible, otherwise returns false.
539 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
540 wp.setupOnce.Do(wp.setup)
542 defer wp.mtx.Unlock()
544 for _, w := range wp.workers {
545 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
546 if wkr == nil || w.busy.After(wkr.busy) {
554 wkr.startContainer(ctr)
558 // KillContainer kills the crunch-run process for the given container
559 // UUID, if it's running on any worker.
561 // KillContainer returns immediately; the act of killing the container
562 // takes some time, and runs in the background.
564 // KillContainer returns false if the container has already ended.
565 func (wp *Pool) KillContainer(uuid string, reason string) bool {
567 defer wp.mtx.Unlock()
568 logger := wp.logger.WithFields(logrus.Fields{
569 "ContainerUUID": uuid,
572 for _, wkr := range wp.workers {
573 rr := wkr.running[uuid]
575 rr = wkr.starting[uuid]
582 logger.Debug("cannot kill: already disappeared")
586 // ForgetContainer clears the placeholder for the given exited
587 // container, so it isn't returned by subsequent calls to Running().
589 // ForgetContainer has no effect if the container has not yet exited.
591 // The "container exited at time T" placeholder (which necessitates
592 // ForgetContainer) exists to make it easier for the caller
593 // (scheduler) to distinguish a container that exited without
594 // finalizing its state from a container that exited too recently for
595 // its final state to have appeared in the scheduler's queue cache.
596 func (wp *Pool) ForgetContainer(uuid string) {
598 defer wp.mtx.Unlock()
599 if _, ok := wp.exited[uuid]; ok {
600 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
601 delete(wp.exited, uuid)
605 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
607 reg = prometheus.NewRegistry()
609 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
610 Namespace: "arvados",
611 Subsystem: "dispatchcloud",
612 Name: "containers_running",
613 Help: "Number of containers reported running by cloud VMs.",
615 reg.MustRegister(wp.mContainersRunning)
616 wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
617 Namespace: "arvados",
618 Subsystem: "dispatchcloud",
619 Name: "instances_total",
620 Help: "Number of cloud VMs.",
621 }, []string{"category", "instance_type"})
622 reg.MustRegister(wp.mInstances)
623 wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
624 Namespace: "arvados",
625 Subsystem: "dispatchcloud",
626 Name: "instances_price",
627 Help: "Price of cloud VMs.",
628 }, []string{"category"})
629 reg.MustRegister(wp.mInstancesPrice)
630 wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
631 Namespace: "arvados",
632 Subsystem: "dispatchcloud",
634 Help: "Total VCPUs on all cloud VMs.",
635 }, []string{"category"})
636 reg.MustRegister(wp.mVCPUs)
637 wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
638 Namespace: "arvados",
639 Subsystem: "dispatchcloud",
640 Name: "memory_bytes_total",
641 Help: "Total memory on all cloud VMs.",
642 }, []string{"category"})
643 reg.MustRegister(wp.mMemory)
644 wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
645 Namespace: "arvados",
646 Subsystem: "dispatchcloud",
647 Name: "boot_outcomes",
648 Help: "Boot outcomes by type.",
649 }, []string{"outcome"})
650 for k := range validBootOutcomes {
651 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
653 reg.MustRegister(wp.mBootOutcomes)
654 wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
655 Namespace: "arvados",
656 Subsystem: "dispatchcloud",
657 Name: "instances_disappeared",
658 Help: "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
659 }, []string{"state"})
660 for _, v := range stateString {
661 wp.mDisappearances.WithLabelValues(v).Add(0)
663 reg.MustRegister(wp.mDisappearances)
664 wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
665 Namespace: "arvados",
666 Subsystem: "dispatchcloud",
667 Name: "instances_time_to_ssh_seconds",
668 Help: "Number of seconds between instance creation and the first successful SSH connection.",
669 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
671 reg.MustRegister(wp.mTimeToSSH)
672 wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
673 Namespace: "arvados",
674 Subsystem: "dispatchcloud",
675 Name: "instances_time_to_ready_for_container_seconds",
676 Help: "Number of seconds between the first successful SSH connection and ready to run a container.",
677 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
679 reg.MustRegister(wp.mTimeToReadyForContainer)
680 wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
681 Namespace: "arvados",
682 Subsystem: "dispatchcloud",
683 Name: "instances_time_from_shutdown_request_to_disappearance_seconds",
684 Help: "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
685 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
687 reg.MustRegister(wp.mTimeFromShutdownToGone)
688 wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
689 Namespace: "arvados",
690 Subsystem: "dispatchcloud",
691 Name: "containers_time_from_queue_to_crunch_run_seconds",
692 Help: "Number of seconds between the queuing of a container and the start of crunch-run.",
693 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
695 reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
696 wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
697 Namespace: "arvados",
698 Subsystem: "dispatchcloud",
699 Name: "instances_run_probe_duration_seconds",
700 Help: "Number of seconds per runProbe call.",
701 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
702 }, []string{"outcome"})
703 reg.MustRegister(wp.mRunProbeDuration)
706 func (wp *Pool) runMetrics() {
708 defer wp.Unsubscribe(ch)
715 func (wp *Pool) updateMetrics() {
717 defer wp.mtx.RUnlock()
723 instances := map[entKey]int64{}
724 price := map[string]float64{}
725 cpu := map[string]int64{}
726 mem := map[string]int64{}
728 for _, wkr := range wp.workers {
731 case len(wkr.running)+len(wkr.starting) > 0:
733 case wkr.idleBehavior == IdleBehaviorHold:
735 case wkr.state == StateBooting:
737 case wkr.state == StateUnknown:
742 instances[entKey{cat, wkr.instType.Name}]++
743 price[cat] += wkr.instType.Price
744 cpu[cat] += int64(wkr.instType.VCPUs)
745 mem[cat] += int64(wkr.instType.RAM)
746 running += int64(len(wkr.running) + len(wkr.starting))
748 for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
749 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
750 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
751 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
752 // make sure to reset gauges for non-existing category/nodetype combinations
753 for _, it := range wp.instanceTypes {
754 if _, ok := instances[entKey{cat, it.Name}]; !ok {
755 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
759 for k, v := range instances {
760 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
762 wp.mContainersRunning.Set(float64(running))
765 func (wp *Pool) runProbes() {
766 maxPPS := wp.maxProbesPerSecond
768 maxPPS = defaultMaxProbesPerSecond
770 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
771 defer limitticker.Stop()
773 probeticker := time.NewTicker(wp.probeInterval)
774 defer probeticker.Stop()
776 workers := []cloud.InstanceID{}
777 for range probeticker.C {
778 // Add some jitter. Without this, if probeInterval is
779 // a multiple of syncInterval and sync is
780 // instantaneous (as with the loopback driver), the
781 // first few probes race with sync operations and
782 // don't update the workers.
783 time.Sleep(time.Duration(mathrand.Int63n(int64(wp.probeInterval) / 23)))
785 workers = workers[:0]
787 for id, wkr := range wp.workers {
788 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
791 workers = append(workers, id)
795 for _, id := range workers {
797 wkr, ok := wp.workers[id]
800 // Deleted while we were probing
804 go wkr.ProbeAndUpdate()
808 case <-limitticker.C:
814 func (wp *Pool) runSync() {
815 // sync once immediately, then wait syncInterval, sync again,
817 timer := time.NewTimer(1)
821 err := wp.getInstancesAndSync()
823 wp.logger.WithError(err).Warn("sync failed")
825 timer.Reset(wp.syncInterval)
827 wp.logger.Debug("worker.Pool stopped")
833 // Stop synchronizing with the InstanceSet.
834 func (wp *Pool) Stop() {
835 wp.setupOnce.Do(wp.setup)
839 // Instances returns an InstanceView for each worker in the pool,
840 // summarizing its current state and recent activity.
841 func (wp *Pool) Instances() []InstanceView {
843 wp.setupOnce.Do(wp.setup)
845 for _, w := range wp.workers {
846 r = append(r, InstanceView{
847 Instance: w.instance.ID(),
848 Address: w.instance.Address(),
849 Price: w.instType.Price,
850 ArvadosInstanceType: w.instType.Name,
851 ProviderInstanceType: w.instType.ProviderType,
852 LastContainerUUID: w.lastUUID,
854 WorkerState: w.state.String(),
855 IdleBehavior: w.idleBehavior,
859 sort.Slice(r, func(i, j int) bool {
860 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
865 // KillInstance destroys a cloud VM instance. It returns an error if
866 // the given instance does not exist.
867 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
868 wkr, ok := wp.workers[id]
870 return errors.New("instance not found")
872 wkr.logger.WithField("Reason", reason).Info("shutting down")
873 wkr.reportBootOutcome(BootOutcomeAborted)
878 func (wp *Pool) setup() {
879 wp.creating = map[string]createCall{}
880 wp.exited = map[string]time.Time{}
881 wp.workers = map[cloud.InstanceID]*worker{}
882 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
886 // Load the runner program to be deployed on worker nodes into
887 // wp.runnerData, if necessary. Errors are logged.
889 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
891 // Caller must not have lock.
892 func (wp *Pool) loadRunnerData() error {
894 defer wp.mtx.Unlock()
895 if wp.runnerData != nil {
897 } else if wp.runnerSource == "" {
898 wp.runnerCmd = wp.runnerCmdDefault
899 wp.runnerData = []byte{}
902 logger := wp.logger.WithField("source", wp.runnerSource)
903 logger.Debug("loading runner")
904 buf, err := ioutil.ReadFile(wp.runnerSource)
906 logger.WithError(err).Error("failed to load runner program")
910 wp.runnerMD5 = md5.Sum(buf)
911 wp.runnerCmd = fmt.Sprintf("/tmp/arvados-crunch-run/crunch-run~%x", wp.runnerMD5)
915 func (wp *Pool) notify() {
917 defer wp.mtx.RUnlock()
918 for _, send := range wp.subscribers {
920 case send <- struct{}{}:
926 func (wp *Pool) getInstancesAndSync() error {
927 wp.setupOnce.Do(wp.setup)
928 if err := wp.instanceSet.throttleInstances.Error(); err != nil {
931 wp.logger.Debug("getting instance list")
932 threshold := time.Now()
933 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
935 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
938 wp.sync(threshold, instances)
939 wp.logger.Debug("sync done")
943 // Add/remove/update workers based on instances, which was obtained
944 // from the instanceSet. However, don't clobber any other updates that
945 // already happened after threshold.
946 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
948 defer wp.mtx.Unlock()
949 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
952 for _, inst := range instances {
953 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
954 it, ok := wp.instanceTypes[itTag]
956 wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
959 if wkr, isNew := wp.updateWorker(inst, it); isNew {
961 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
962 wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
967 for id, wkr := range wp.workers {
968 if wkr.updated.After(threshold) {
971 logger := wp.logger.WithFields(logrus.Fields{
972 "Instance": wkr.instance.ID(),
973 "WorkerState": wkr.state,
975 logger.Info("instance disappeared in cloud")
976 wkr.reportBootOutcome(BootOutcomeDisappeared)
977 if wp.mDisappearances != nil {
978 wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
980 // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
981 if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
982 wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
984 delete(wp.workers, id)
992 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
1000 func (wp *Pool) waitUntilLoaded() {
1001 ch := wp.Subscribe()
1003 defer wp.mtx.RUnlock()
1011 func (wp *Pool) gatewayAuthSecret(uuid string) string {
1012 h := hmac.New(sha256.New, []byte(wp.systemRootToken))
1014 return fmt.Sprintf("%x", h.Sum(nil))
1017 // Return a random string of n hexadecimal digits (n*4 random bits). n
1019 func randomHex(n int) string {
1020 buf := make([]byte, n/2)
1021 _, err := rand.Read(buf)
1025 return fmt.Sprintf("%x", buf)