1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
21 "git.arvados.org/arvados.git/lib/cloud"
22 "git.arvados.org/arvados.git/sdk/go/arvados"
23 "github.com/prometheus/client_golang/prometheus"
24 "github.com/sirupsen/logrus"
25 "golang.org/x/crypto/ssh"
29 tagKeyInstanceType = "InstanceType"
30 tagKeyIdleBehavior = "IdleBehavior"
31 tagKeyInstanceSecret = "InstanceSecret"
32 tagKeyInstanceSetID = "InstanceSetID"
35 // An InstanceView shows a worker's current state and recent activity.
36 type InstanceView struct {
37 Instance cloud.InstanceID `json:"instance"`
38 Address string `json:"address"`
39 Price float64 `json:"price"`
40 ArvadosInstanceType string `json:"arvados_instance_type"`
41 ProviderInstanceType string `json:"provider_instance_type"`
42 LastContainerUUID string `json:"last_container_uuid"`
43 LastBusy time.Time `json:"last_busy"`
44 WorkerState string `json:"worker_state"`
45 IdleBehavior IdleBehavior `json:"idle_behavior"`
48 // An Executor executes shell commands on a remote host.
49 type Executor interface {
50 // Run cmd on the current target.
51 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
53 // Use the given target for subsequent operations. The new
54 // target is the same host as the previous target, but it
55 // might return a different address and verify a different
58 // SetTarget is called frequently, and in most cases the new
59 // target will behave exactly the same as the old one. An
60 // implementation should optimize accordingly.
62 // SetTarget must not block on concurrent Execute calls.
63 SetTarget(cloud.ExecutorTarget)
69 defaultSyncInterval = time.Minute
70 defaultProbeInterval = time.Second * 10
71 defaultMaxProbesPerSecond = 10
72 defaultTimeoutIdle = time.Minute
73 defaultTimeoutBooting = time.Minute * 10
74 defaultTimeoutProbe = time.Minute * 10
75 defaultTimeoutShutdown = time.Second * 10
76 defaultTimeoutTERM = time.Minute * 2
77 defaultTimeoutSignal = time.Second * 5
78 defaultTimeoutStaleRunLock = time.Second * 5
80 // Time after a quota error to try again anyway, even if no
81 // instances have been shutdown.
82 quotaErrorTTL = time.Minute
84 // Time between "X failed because rate limiting" messages
85 logRateLimitErrorInterval = time.Second * 10
88 func duration(conf arvados.Duration, def time.Duration) time.Duration {
90 return time.Duration(conf)
95 // NewPool creates a Pool of workers backed by instanceSet.
97 // New instances are configured and set up according to the given
98 // cluster configuration.
99 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
102 arvClient: arvClient,
103 instanceSetID: instanceSetID,
104 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
105 newExecutor: newExecutor,
107 bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
108 runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
109 imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
110 instanceTypes: cluster.InstanceTypes,
111 maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
112 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
113 probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
114 syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
115 timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
116 timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
117 timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
118 timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
119 timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
120 timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
121 timeoutStaleRunLock: duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
122 systemRootToken: cluster.SystemRootToken,
123 installPublicKey: installPublicKey,
124 tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
125 runnerCmdDefault: cluster.Containers.CrunchRunCommand,
126 runnerArgs: append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...),
127 stop: make(chan bool),
129 wp.registerMetrics(reg)
131 wp.setupOnce.Do(wp.setup)
139 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
140 // zero Pool should not be used. Call NewPool to create a new Pool.
143 logger logrus.FieldLogger
144 arvClient *arvados.Client
145 instanceSetID cloud.InstanceSetID
146 instanceSet *throttledInstanceSet
147 newExecutor func(cloud.Instance) Executor
148 cluster *arvados.Cluster
149 bootProbeCommand string
151 imageID cloud.ImageID
152 instanceTypes map[string]arvados.InstanceType
153 syncInterval time.Duration
154 probeInterval time.Duration
155 maxProbesPerSecond int
156 maxConcurrentInstanceCreateOps int
157 timeoutIdle time.Duration
158 timeoutBooting time.Duration
159 timeoutProbe time.Duration
160 timeoutShutdown time.Duration
161 timeoutTERM time.Duration
162 timeoutSignal time.Duration
163 timeoutStaleRunLock time.Duration
164 systemRootToken string
165 installPublicKey ssh.PublicKey
167 runnerCmdDefault string // crunch-run command to use if not deploying a binary
168 runnerArgs []string // extra args passed to crunch-run
171 subscribers map[<-chan struct{}]chan<- struct{}
172 creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
173 workers map[cloud.InstanceID]*worker
174 loaded bool // loaded list of instances from InstanceSet at least once
175 exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
176 atQuotaUntil time.Time
177 atQuotaErr cloud.QuotaError
182 runnerMD5 [md5.Size]byte
185 mContainersRunning prometheus.Gauge
186 mInstances *prometheus.GaugeVec
187 mInstancesPrice *prometheus.GaugeVec
188 mVCPUs *prometheus.GaugeVec
189 mMemory *prometheus.GaugeVec
190 mBootOutcomes *prometheus.CounterVec
191 mDisappearances *prometheus.CounterVec
192 mTimeToSSH prometheus.Summary
193 mTimeToReadyForContainer prometheus.Summary
194 mTimeFromShutdownToGone prometheus.Summary
195 mTimeFromQueueToCrunchRun prometheus.Summary
196 mRunProbeDuration *prometheus.SummaryVec
199 type createCall struct {
201 instanceType arvados.InstanceType
204 func (wp *Pool) CheckHealth() error {
205 wp.setupOnce.Do(wp.setup)
206 if err := wp.loadRunnerData(); err != nil {
207 return fmt.Errorf("error loading runner binary: %s", err)
212 // Subscribe returns a buffered channel that becomes ready after any
213 // change to the pool's state that could have scheduling implications:
214 // a worker's state changes, a new worker appears, the cloud
215 // provider's API rate limiting period ends, etc.
217 // Additional events that occur while the channel is already ready
218 // will be dropped, so it is OK if the caller services the channel
223 // ch := wp.Subscribe()
224 // defer wp.Unsubscribe(ch)
231 func (wp *Pool) Subscribe() <-chan struct{} {
232 wp.setupOnce.Do(wp.setup)
234 defer wp.mtx.Unlock()
235 ch := make(chan struct{}, 1)
236 wp.subscribers[ch] = ch
240 // Unsubscribe stops sending updates to the given channel.
241 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
242 wp.setupOnce.Do(wp.setup)
244 defer wp.mtx.Unlock()
245 delete(wp.subscribers, ch)
248 // Unallocated returns the number of unallocated (creating + booting +
249 // idle + unknown) workers for each instance type. Workers in
250 // hold/drain mode are not included.
251 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
252 wp.setupOnce.Do(wp.setup)
254 defer wp.mtx.RUnlock()
255 unalloc := map[arvados.InstanceType]int{}
256 creating := map[arvados.InstanceType]int{}
257 oldestCreate := map[arvados.InstanceType]time.Time{}
258 for _, cc := range wp.creating {
259 it := cc.instanceType
261 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
262 oldestCreate[it] = cc.time
265 for _, wkr := range wp.workers {
266 // Skip workers that are not expected to become
267 // available soon. Note len(wkr.running)>0 is not
268 // redundant here: it can be true even in
270 if wkr.state == StateShutdown ||
271 wkr.state == StateRunning ||
272 wkr.idleBehavior != IdleBehaviorRun ||
273 len(wkr.running) > 0 {
278 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
279 // If up to N new workers appear in
280 // Instances() while we are waiting for N
281 // Create() calls to complete, we assume we're
282 // just seeing a race between Instances() and
283 // Create() responses.
285 // The other common reason why nodes have
286 // state==Unknown is that they appeared at
287 // startup, before any Create calls. They
288 // don't match the above timing condition, so
289 // we never mistakenly attribute them to
290 // pending Create calls.
294 for it, c := range creating {
300 // Create a new instance with the given type, and add it to the worker
301 // pool. The worker is added immediately; instance creation runs in
304 // Create returns false if a pre-existing error state prevents it from
305 // even attempting to create a new instance. Those errors are logged
306 // by the Pool, so the caller does not need to log anything in such
308 func (wp *Pool) Create(it arvados.InstanceType) bool {
309 logger := wp.logger.WithField("InstanceType", it.Name)
310 wp.setupOnce.Do(wp.setup)
311 if wp.loadRunnerData() != nil {
312 // Boot probe is certain to fail.
316 defer wp.mtx.Unlock()
317 if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil {
320 // The maxConcurrentInstanceCreateOps knob throttles the number of node create
321 // requests in flight. It was added to work around a limitation in Azure's
322 // managed disks, which support no more than 20 concurrent node creation
323 // requests from a single disk image (cf.
324 // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
325 // The code assumes that node creation, from Azure's perspective, means the
326 // period until the instance appears in the "get all instances" list.
327 if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
328 logger.Info("reached MaxConcurrentInstanceCreateOps")
329 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
333 secret := randomHex(instanceSecretLength)
334 wp.creating[secret] = createCall{time: now, instanceType: it}
337 tags := cloud.InstanceTags{
338 wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID),
339 wp.tagKeyPrefix + tagKeyInstanceType: it.Name,
340 wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
341 wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
343 initCmd := TagVerifier{nil, secret, nil}.InitCommand()
344 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
346 defer wp.mtx.Unlock()
347 // delete() is deferred so the updateWorker() call
348 // below knows to use StateBooting when adding a new
350 defer delete(wp.creating, secret)
352 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
354 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
355 time.AfterFunc(quotaErrorTTL, wp.notify)
357 logger.WithError(err).Error("create failed")
358 wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
361 wp.updateWorker(inst, it)
366 // AtQuota returns true if Create is not expected to work at the
368 func (wp *Pool) AtQuota() bool {
370 defer wp.mtx.Unlock()
371 return time.Now().Before(wp.atQuotaUntil)
374 // SetIdleBehavior determines how the indicated instance will behave
375 // when it has no containers running.
376 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
378 defer wp.mtx.Unlock()
379 wkr, ok := wp.workers[id]
381 return errors.New("requested instance does not exist")
383 wkr.setIdleBehavior(idleBehavior)
387 // Successful connection to the SSH daemon, update the mTimeToSSH metric
388 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
390 defer wp.mtx.Unlock()
391 wkr := wp.workers[inst.ID()]
392 if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
393 // the node is not in booting state (can happen if a-d-c is restarted) OR
394 // this is not the first SSH connection
398 wkr.firstSSHConnection = time.Now()
399 if wp.mTimeToSSH != nil {
400 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
404 // Add or update worker attached to the given instance.
406 // The second return value is true if a new worker is created.
408 // A newly added instance has state=StateBooting if its tags match an
409 // entry in wp.creating, otherwise StateUnknown.
411 // Caller must have lock.
412 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
413 secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
414 inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
416 if wkr := wp.workers[id]; wkr != nil {
417 wkr.executor.SetTarget(inst)
419 wkr.updated = time.Now()
424 state := StateUnknown
425 if _, ok := wp.creating[secret]; ok {
429 // If an instance has a valid IdleBehavior tag when it first
430 // appears, initialize the new worker accordingly (this is how
431 // we restore IdleBehavior that was set by a prior dispatch
432 // process); otherwise, default to "run". After this,
433 // wkr.idleBehavior is the source of truth, and will only be
434 // changed via SetIdleBehavior().
435 idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
436 if !validIdleBehavior[idleBehavior] {
437 idleBehavior = IdleBehaviorRun
440 logger := wp.logger.WithFields(logrus.Fields{
441 "InstanceType": it.Name,
442 "Instance": inst.ID(),
443 "Address": inst.Address(),
445 logger.WithFields(logrus.Fields{
447 "IdleBehavior": idleBehavior,
448 }).Infof("instance appeared in cloud")
454 executor: wp.newExecutor(inst),
456 idleBehavior: idleBehavior,
463 running: make(map[string]*remoteRunner),
464 starting: make(map[string]*remoteRunner),
465 probing: make(chan struct{}, 1),
471 // Shutdown shuts down a worker with the given type, or returns false
472 // if all workers with the given type are busy.
473 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
474 wp.setupOnce.Do(wp.setup)
476 defer wp.mtx.Unlock()
477 logger := wp.logger.WithField("InstanceType", it.Name)
478 logger.Info("shutdown requested")
479 for _, tryState := range []State{StateBooting, StateIdle} {
480 // TODO: shutdown the worker with the longest idle
481 // time (Idle) or the earliest create time (Booting)
482 for _, wkr := range wp.workers {
483 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
484 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
485 wkr.reportBootOutcome(BootOutcomeAborted)
494 // CountWorkers returns the current number of workers in each state.
496 // CountWorkers blocks, if necessary, until the initial instance list
497 // has been loaded from the cloud provider.
498 func (wp *Pool) CountWorkers() map[State]int {
499 wp.setupOnce.Do(wp.setup)
502 defer wp.mtx.Unlock()
504 for _, w := range wp.workers {
510 // Running returns the container UUIDs being prepared/run on workers.
512 // In the returned map, the time value indicates when the Pool
513 // observed that the container process had exited. A container that
514 // has not yet exited has a zero time value. The caller should use
515 // ForgetContainer() to garbage-collect the entries for exited
517 func (wp *Pool) Running() map[string]time.Time {
518 wp.setupOnce.Do(wp.setup)
520 defer wp.mtx.Unlock()
521 r := map[string]time.Time{}
522 for _, wkr := range wp.workers {
523 for uuid := range wkr.running {
524 r[uuid] = time.Time{}
526 for uuid := range wkr.starting {
527 r[uuid] = time.Time{}
530 for uuid, exited := range wp.exited {
536 // StartContainer starts a container on an idle worker immediately if
537 // possible, otherwise returns false.
538 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
539 wp.setupOnce.Do(wp.setup)
541 defer wp.mtx.Unlock()
543 for _, w := range wp.workers {
544 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
545 if wkr == nil || w.busy.After(wkr.busy) {
553 wkr.startContainer(ctr)
557 // KillContainer kills the crunch-run process for the given container
558 // UUID, if it's running on any worker.
560 // KillContainer returns immediately; the act of killing the container
561 // takes some time, and runs in the background.
563 // KillContainer returns false if the container has already ended.
564 func (wp *Pool) KillContainer(uuid string, reason string) bool {
566 defer wp.mtx.Unlock()
567 logger := wp.logger.WithFields(logrus.Fields{
568 "ContainerUUID": uuid,
571 for _, wkr := range wp.workers {
572 rr := wkr.running[uuid]
574 rr = wkr.starting[uuid]
581 logger.Debug("cannot kill: already disappeared")
585 // ForgetContainer clears the placeholder for the given exited
586 // container, so it isn't returned by subsequent calls to Running().
588 // ForgetContainer has no effect if the container has not yet exited.
590 // The "container exited at time T" placeholder (which necessitates
591 // ForgetContainer) exists to make it easier for the caller
592 // (scheduler) to distinguish a container that exited without
593 // finalizing its state from a container that exited too recently for
594 // its final state to have appeared in the scheduler's queue cache.
595 func (wp *Pool) ForgetContainer(uuid string) {
597 defer wp.mtx.Unlock()
598 if _, ok := wp.exited[uuid]; ok {
599 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
600 delete(wp.exited, uuid)
604 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
606 reg = prometheus.NewRegistry()
608 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
609 Namespace: "arvados",
610 Subsystem: "dispatchcloud",
611 Name: "containers_running",
612 Help: "Number of containers reported running by cloud VMs.",
614 reg.MustRegister(wp.mContainersRunning)
615 wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
616 Namespace: "arvados",
617 Subsystem: "dispatchcloud",
618 Name: "instances_total",
619 Help: "Number of cloud VMs.",
620 }, []string{"category", "instance_type"})
621 reg.MustRegister(wp.mInstances)
622 wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
623 Namespace: "arvados",
624 Subsystem: "dispatchcloud",
625 Name: "instances_price",
626 Help: "Price of cloud VMs.",
627 }, []string{"category"})
628 reg.MustRegister(wp.mInstancesPrice)
629 wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
630 Namespace: "arvados",
631 Subsystem: "dispatchcloud",
633 Help: "Total VCPUs on all cloud VMs.",
634 }, []string{"category"})
635 reg.MustRegister(wp.mVCPUs)
636 wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
637 Namespace: "arvados",
638 Subsystem: "dispatchcloud",
639 Name: "memory_bytes_total",
640 Help: "Total memory on all cloud VMs.",
641 }, []string{"category"})
642 reg.MustRegister(wp.mMemory)
643 wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
644 Namespace: "arvados",
645 Subsystem: "dispatchcloud",
646 Name: "boot_outcomes",
647 Help: "Boot outcomes by type.",
648 }, []string{"outcome"})
649 for k := range validBootOutcomes {
650 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
652 reg.MustRegister(wp.mBootOutcomes)
653 wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
654 Namespace: "arvados",
655 Subsystem: "dispatchcloud",
656 Name: "instances_disappeared",
657 Help: "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
658 }, []string{"state"})
659 for _, v := range stateString {
660 wp.mDisappearances.WithLabelValues(v).Add(0)
662 reg.MustRegister(wp.mDisappearances)
663 wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
664 Namespace: "arvados",
665 Subsystem: "dispatchcloud",
666 Name: "instances_time_to_ssh_seconds",
667 Help: "Number of seconds between instance creation and the first successful SSH connection.",
668 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
670 reg.MustRegister(wp.mTimeToSSH)
671 wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
672 Namespace: "arvados",
673 Subsystem: "dispatchcloud",
674 Name: "instances_time_to_ready_for_container_seconds",
675 Help: "Number of seconds between the first successful SSH connection and ready to run a container.",
676 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
678 reg.MustRegister(wp.mTimeToReadyForContainer)
679 wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
680 Namespace: "arvados",
681 Subsystem: "dispatchcloud",
682 Name: "instances_time_from_shutdown_request_to_disappearance_seconds",
683 Help: "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
684 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
686 reg.MustRegister(wp.mTimeFromShutdownToGone)
687 wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
688 Namespace: "arvados",
689 Subsystem: "dispatchcloud",
690 Name: "containers_time_from_queue_to_crunch_run_seconds",
691 Help: "Number of seconds between the queuing of a container and the start of crunch-run.",
692 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
694 reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
695 wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
696 Namespace: "arvados",
697 Subsystem: "dispatchcloud",
698 Name: "instances_run_probe_duration_seconds",
699 Help: "Number of seconds per runProbe call.",
700 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
701 }, []string{"outcome"})
702 reg.MustRegister(wp.mRunProbeDuration)
705 func (wp *Pool) runMetrics() {
707 defer wp.Unsubscribe(ch)
714 func (wp *Pool) updateMetrics() {
716 defer wp.mtx.RUnlock()
722 instances := map[entKey]int64{}
723 price := map[string]float64{}
724 cpu := map[string]int64{}
725 mem := map[string]int64{}
727 for _, wkr := range wp.workers {
730 case len(wkr.running)+len(wkr.starting) > 0:
732 case wkr.idleBehavior == IdleBehaviorHold:
734 case wkr.state == StateBooting:
736 case wkr.state == StateUnknown:
741 instances[entKey{cat, wkr.instType.Name}]++
742 price[cat] += wkr.instType.Price
743 cpu[cat] += int64(wkr.instType.VCPUs)
744 mem[cat] += int64(wkr.instType.RAM)
745 running += int64(len(wkr.running) + len(wkr.starting))
747 for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
748 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
749 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
750 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
751 // make sure to reset gauges for non-existing category/nodetype combinations
752 for _, it := range wp.instanceTypes {
753 if _, ok := instances[entKey{cat, it.Name}]; !ok {
754 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
758 for k, v := range instances {
759 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
761 wp.mContainersRunning.Set(float64(running))
764 func (wp *Pool) runProbes() {
765 maxPPS := wp.maxProbesPerSecond
767 maxPPS = defaultMaxProbesPerSecond
769 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
770 defer limitticker.Stop()
772 probeticker := time.NewTicker(wp.probeInterval)
773 defer probeticker.Stop()
775 workers := []cloud.InstanceID{}
776 for range probeticker.C {
777 workers = workers[:0]
779 for id, wkr := range wp.workers {
780 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
783 workers = append(workers, id)
787 for _, id := range workers {
789 wkr, ok := wp.workers[id]
792 // Deleted while we were probing
796 go wkr.ProbeAndUpdate()
800 case <-limitticker.C:
806 func (wp *Pool) runSync() {
807 // sync once immediately, then wait syncInterval, sync again,
809 timer := time.NewTimer(1)
813 err := wp.getInstancesAndSync()
815 wp.logger.WithError(err).Warn("sync failed")
817 timer.Reset(wp.syncInterval)
819 wp.logger.Debug("worker.Pool stopped")
825 // Stop synchronizing with the InstanceSet.
826 func (wp *Pool) Stop() {
827 wp.setupOnce.Do(wp.setup)
831 // Instances returns an InstanceView for each worker in the pool,
832 // summarizing its current state and recent activity.
833 func (wp *Pool) Instances() []InstanceView {
835 wp.setupOnce.Do(wp.setup)
837 for _, w := range wp.workers {
838 r = append(r, InstanceView{
839 Instance: w.instance.ID(),
840 Address: w.instance.Address(),
841 Price: w.instType.Price,
842 ArvadosInstanceType: w.instType.Name,
843 ProviderInstanceType: w.instType.ProviderType,
844 LastContainerUUID: w.lastUUID,
846 WorkerState: w.state.String(),
847 IdleBehavior: w.idleBehavior,
851 sort.Slice(r, func(i, j int) bool {
852 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
857 // KillInstance destroys a cloud VM instance. It returns an error if
858 // the given instance does not exist.
859 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
860 wkr, ok := wp.workers[id]
862 return errors.New("instance not found")
864 wkr.logger.WithField("Reason", reason).Info("shutting down")
865 wkr.reportBootOutcome(BootOutcomeAborted)
870 func (wp *Pool) setup() {
871 wp.creating = map[string]createCall{}
872 wp.exited = map[string]time.Time{}
873 wp.workers = map[cloud.InstanceID]*worker{}
874 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
878 // Load the runner program to be deployed on worker nodes into
879 // wp.runnerData, if necessary. Errors are logged.
881 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
883 // Caller must not have lock.
884 func (wp *Pool) loadRunnerData() error {
886 defer wp.mtx.Unlock()
887 if wp.runnerData != nil {
889 } else if wp.runnerSource == "" {
890 wp.runnerCmd = wp.runnerCmdDefault
891 wp.runnerData = []byte{}
894 logger := wp.logger.WithField("source", wp.runnerSource)
895 logger.Debug("loading runner")
896 buf, err := ioutil.ReadFile(wp.runnerSource)
898 logger.WithError(err).Error("failed to load runner program")
902 wp.runnerMD5 = md5.Sum(buf)
903 wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
907 func (wp *Pool) notify() {
909 defer wp.mtx.RUnlock()
910 for _, send := range wp.subscribers {
912 case send <- struct{}{}:
918 func (wp *Pool) getInstancesAndSync() error {
919 wp.setupOnce.Do(wp.setup)
920 if err := wp.instanceSet.throttleInstances.Error(); err != nil {
923 wp.logger.Debug("getting instance list")
924 threshold := time.Now()
925 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
927 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
930 wp.sync(threshold, instances)
931 wp.logger.Debug("sync done")
935 // Add/remove/update workers based on instances, which was obtained
936 // from the instanceSet. However, don't clobber any other updates that
937 // already happened after threshold.
938 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
940 defer wp.mtx.Unlock()
941 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
944 for _, inst := range instances {
945 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
946 it, ok := wp.instanceTypes[itTag]
948 wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
951 if wkr, isNew := wp.updateWorker(inst, it); isNew {
953 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
954 wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
959 for id, wkr := range wp.workers {
960 if wkr.updated.After(threshold) {
963 logger := wp.logger.WithFields(logrus.Fields{
964 "Instance": wkr.instance.ID(),
965 "WorkerState": wkr.state,
967 logger.Info("instance disappeared in cloud")
968 wkr.reportBootOutcome(BootOutcomeDisappeared)
969 if wp.mDisappearances != nil {
970 wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
972 // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
973 if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
974 wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
976 delete(wp.workers, id)
984 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
992 func (wp *Pool) waitUntilLoaded() {
995 defer wp.mtx.RUnlock()
1003 func (wp *Pool) gatewayAuthSecret(uuid string) string {
1004 h := hmac.New(sha256.New, []byte(wp.systemRootToken))
1006 return fmt.Sprintf("%x", h.Sum(nil))
1009 // Return a random string of n hexadecimal digits (n*4 random bits). n
1011 func randomHex(n int) string {
1012 buf := make([]byte, n/2)
1013 _, err := rand.Read(buf)
1017 return fmt.Sprintf("%x", buf)