1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
22 "git.arvados.org/arvados.git/lib/cloud"
23 "git.arvados.org/arvados.git/sdk/go/arvados"
24 "github.com/prometheus/client_golang/prometheus"
25 "github.com/sirupsen/logrus"
26 "golang.org/x/crypto/ssh"
30 tagKeyInstanceType = "InstanceType"
31 tagKeyIdleBehavior = "IdleBehavior"
32 tagKeyInstanceSecret = "InstanceSecret"
33 tagKeyInstanceSetID = "InstanceSetID"
36 // An InstanceView shows a worker's current state and recent activity.
37 type InstanceView struct {
38 Instance cloud.InstanceID `json:"instance"`
39 Address string `json:"address"`
40 Price float64 `json:"price"`
41 ArvadosInstanceType string `json:"arvados_instance_type"`
42 ProviderInstanceType string `json:"provider_instance_type"`
43 LastContainerUUID string `json:"last_container_uuid"`
44 LastBusy time.Time `json:"last_busy"`
45 WorkerState string `json:"worker_state"`
46 IdleBehavior IdleBehavior `json:"idle_behavior"`
49 // An Executor executes shell commands on a remote host.
50 type Executor interface {
51 // Run cmd on the current target.
52 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
54 // Use the given target for subsequent operations. The new
55 // target is the same host as the previous target, but it
56 // might return a different address and verify a different
59 // SetTarget is called frequently, and in most cases the new
60 // target will behave exactly the same as the old one. An
61 // implementation should optimize accordingly.
63 // SetTarget must not block on concurrent Execute calls.
64 SetTarget(cloud.ExecutorTarget)
70 defaultSyncInterval = time.Minute
71 defaultProbeInterval = time.Second * 10
72 defaultMaxProbesPerSecond = 10
73 defaultTimeoutIdle = time.Minute
74 defaultTimeoutBooting = time.Minute * 10
75 defaultTimeoutProbe = time.Minute * 10
76 defaultTimeoutShutdown = time.Second * 10
77 defaultTimeoutTERM = time.Minute * 2
78 defaultTimeoutSignal = time.Second * 5
79 defaultTimeoutStaleRunLock = time.Second * 5
81 // Time after a quota error to try again anyway, even if no
82 // instances have been shutdown.
83 quotaErrorTTL = time.Minute
85 // Time between "X failed because rate limiting" messages
86 logRateLimitErrorInterval = time.Second * 10
89 func duration(conf arvados.Duration, def time.Duration) time.Duration {
91 return time.Duration(conf)
96 // NewPool creates a Pool of workers backed by instanceSet.
98 // New instances are configured and set up according to the given
99 // cluster configuration.
100 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
103 arvClient: arvClient,
104 instanceSetID: instanceSetID,
105 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
106 newExecutor: newExecutor,
108 bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
109 runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
110 imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
111 instanceTypes: cluster.InstanceTypes,
112 maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
113 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
114 maxInstances: cluster.Containers.CloudVMs.MaxInstances,
115 probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
116 syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
117 timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
118 timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
119 timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
120 timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
121 timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
122 timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
123 timeoutStaleRunLock: duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
124 systemRootToken: cluster.SystemRootToken,
125 installPublicKey: installPublicKey,
126 tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
127 runnerCmdDefault: cluster.Containers.CrunchRunCommand,
128 runnerArgs: append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...),
129 stop: make(chan bool),
131 wp.registerMetrics(reg)
133 wp.setupOnce.Do(wp.setup)
141 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
142 // zero Pool should not be used. Call NewPool to create a new Pool.
145 logger logrus.FieldLogger
146 arvClient *arvados.Client
147 instanceSetID cloud.InstanceSetID
148 instanceSet *throttledInstanceSet
149 newExecutor func(cloud.Instance) Executor
150 cluster *arvados.Cluster
151 bootProbeCommand string
153 imageID cloud.ImageID
154 instanceTypes map[string]arvados.InstanceType
155 syncInterval time.Duration
156 probeInterval time.Duration
157 maxProbesPerSecond int
158 maxConcurrentInstanceCreateOps int
160 timeoutIdle time.Duration
161 timeoutBooting time.Duration
162 timeoutProbe time.Duration
163 timeoutShutdown time.Duration
164 timeoutTERM time.Duration
165 timeoutSignal time.Duration
166 timeoutStaleRunLock time.Duration
167 systemRootToken string
168 installPublicKey ssh.PublicKey
170 runnerCmdDefault string // crunch-run command to use if not deploying a binary
171 runnerArgs []string // extra args passed to crunch-run
174 subscribers map[<-chan struct{}]chan<- struct{}
175 creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
176 workers map[cloud.InstanceID]*worker
177 loaded bool // loaded list of instances from InstanceSet at least once
178 exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
179 atQuotaUntil time.Time
180 atQuotaErr cloud.QuotaError
185 runnerMD5 [md5.Size]byte
188 mContainersRunning prometheus.Gauge
189 mInstances *prometheus.GaugeVec
190 mInstancesPrice *prometheus.GaugeVec
191 mVCPUs *prometheus.GaugeVec
192 mMemory *prometheus.GaugeVec
193 mBootOutcomes *prometheus.CounterVec
194 mDisappearances *prometheus.CounterVec
195 mTimeToSSH prometheus.Summary
196 mTimeToReadyForContainer prometheus.Summary
197 mTimeFromShutdownToGone prometheus.Summary
198 mTimeFromQueueToCrunchRun prometheus.Summary
199 mRunProbeDuration *prometheus.SummaryVec
200 mProbeAgeMax prometheus.Gauge
201 mProbeAgeMedian prometheus.Gauge
204 type createCall struct {
206 instanceType arvados.InstanceType
209 func (wp *Pool) CheckHealth() error {
210 wp.setupOnce.Do(wp.setup)
211 if err := wp.loadRunnerData(); err != nil {
212 return fmt.Errorf("error loading runner binary: %s", err)
217 // Subscribe returns a buffered channel that becomes ready after any
218 // change to the pool's state that could have scheduling implications:
219 // a worker's state changes, a new worker appears, the cloud
220 // provider's API rate limiting period ends, etc.
222 // Additional events that occur while the channel is already ready
223 // will be dropped, so it is OK if the caller services the channel
228 // ch := wp.Subscribe()
229 // defer wp.Unsubscribe(ch)
236 func (wp *Pool) Subscribe() <-chan struct{} {
237 wp.setupOnce.Do(wp.setup)
239 defer wp.mtx.Unlock()
240 ch := make(chan struct{}, 1)
241 wp.subscribers[ch] = ch
245 // Unsubscribe stops sending updates to the given channel.
246 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
247 wp.setupOnce.Do(wp.setup)
249 defer wp.mtx.Unlock()
250 delete(wp.subscribers, ch)
253 // Unallocated returns the number of unallocated (creating + booting +
254 // idle + unknown) workers for each instance type. Workers in
255 // hold/drain mode are not included.
256 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
257 wp.setupOnce.Do(wp.setup)
259 defer wp.mtx.RUnlock()
260 unalloc := map[arvados.InstanceType]int{}
261 creating := map[arvados.InstanceType]int{}
262 oldestCreate := map[arvados.InstanceType]time.Time{}
263 for _, cc := range wp.creating {
264 it := cc.instanceType
266 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
267 oldestCreate[it] = cc.time
270 for _, wkr := range wp.workers {
271 // Skip workers that are not expected to become
272 // available soon. Note len(wkr.running)>0 is not
273 // redundant here: it can be true even in
275 if wkr.state == StateShutdown ||
276 wkr.state == StateRunning ||
277 wkr.idleBehavior != IdleBehaviorRun ||
278 len(wkr.running) > 0 {
283 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
284 // If up to N new workers appear in
285 // Instances() while we are waiting for N
286 // Create() calls to complete, we assume we're
287 // just seeing a race between Instances() and
288 // Create() responses.
290 // The other common reason why nodes have
291 // state==Unknown is that they appeared at
292 // startup, before any Create calls. They
293 // don't match the above timing condition, so
294 // we never mistakenly attribute them to
295 // pending Create calls.
299 for it, c := range creating {
305 // Create a new instance with the given type, and add it to the worker
306 // pool. The worker is added immediately; instance creation runs in
309 // Create returns false if a pre-existing error or a configuration
310 // setting prevents it from even attempting to create a new
311 // instance. Those errors are logged by the Pool, so the caller does
312 // not need to log anything in such cases.
313 func (wp *Pool) Create(it arvados.InstanceType) bool {
314 logger := wp.logger.WithField("InstanceType", it.Name)
315 wp.setupOnce.Do(wp.setup)
316 if wp.loadRunnerData() != nil {
317 // Boot probe is certain to fail.
321 defer wp.mtx.Unlock()
322 if time.Now().Before(wp.atQuotaUntil) ||
323 wp.instanceSet.throttleCreate.Error() != nil ||
324 (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating)) {
327 // The maxConcurrentInstanceCreateOps knob throttles the number of node create
328 // requests in flight. It was added to work around a limitation in Azure's
329 // managed disks, which support no more than 20 concurrent node creation
330 // requests from a single disk image (cf.
331 // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
332 // The code assumes that node creation, from Azure's perspective, means the
333 // period until the instance appears in the "get all instances" list.
334 if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
335 logger.Info("reached MaxConcurrentInstanceCreateOps")
336 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
340 secret := randomHex(instanceSecretLength)
341 wp.creating[secret] = createCall{time: now, instanceType: it}
344 tags := cloud.InstanceTags{
345 wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID),
346 wp.tagKeyPrefix + tagKeyInstanceType: it.Name,
347 wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
348 wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
350 initCmd := TagVerifier{nil, secret, nil}.InitCommand()
351 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
353 defer wp.mtx.Unlock()
354 // delete() is deferred so the updateWorker() call
355 // below knows to use StateBooting when adding a new
357 defer delete(wp.creating, secret)
359 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
361 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
362 time.AfterFunc(quotaErrorTTL, wp.notify)
364 logger.WithError(err).Error("create failed")
365 wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
368 wp.updateWorker(inst, it)
370 if len(wp.creating)+len(wp.workers) == wp.maxInstances {
371 logger.Infof("now at MaxInstances limit of %d instances", wp.maxInstances)
376 // AtQuota returns true if Create is not expected to work at the
377 // moment (e.g., cloud provider has reported quota errors, or we are
378 // already at our own configured quota).
379 func (wp *Pool) AtQuota() bool {
381 defer wp.mtx.Unlock()
382 return time.Now().Before(wp.atQuotaUntil) || (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating))
385 // SetIdleBehavior determines how the indicated instance will behave
386 // when it has no containers running.
387 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
389 defer wp.mtx.Unlock()
390 wkr, ok := wp.workers[id]
392 return errors.New("requested instance does not exist")
394 wkr.setIdleBehavior(idleBehavior)
398 // Successful connection to the SSH daemon, update the mTimeToSSH metric
399 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
401 defer wp.mtx.Unlock()
402 wkr, ok := wp.workers[inst.ID()]
404 // race: inst was removed from the pool
407 if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
408 // the node is not in booting state (can happen if
409 // a-d-c is restarted) OR this is not the first SSH
414 wkr.firstSSHConnection = time.Now()
415 if wp.mTimeToSSH != nil {
416 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
420 // Add or update worker attached to the given instance.
422 // The second return value is true if a new worker is created.
424 // A newly added instance has state=StateBooting if its tags match an
425 // entry in wp.creating, otherwise StateUnknown.
427 // Caller must have lock.
428 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
429 secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
430 inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
432 if wkr := wp.workers[id]; wkr != nil {
433 wkr.executor.SetTarget(inst)
435 wkr.updated = time.Now()
440 state := StateUnknown
441 if _, ok := wp.creating[secret]; ok {
445 // If an instance has a valid IdleBehavior tag when it first
446 // appears, initialize the new worker accordingly (this is how
447 // we restore IdleBehavior that was set by a prior dispatch
448 // process); otherwise, default to "run". After this,
449 // wkr.idleBehavior is the source of truth, and will only be
450 // changed via SetIdleBehavior().
451 idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
452 if !validIdleBehavior[idleBehavior] {
453 idleBehavior = IdleBehaviorRun
456 logger := wp.logger.WithFields(logrus.Fields{
457 "InstanceType": it.Name,
458 "Instance": inst.ID(),
459 "Address": inst.Address(),
461 logger.WithFields(logrus.Fields{
463 "IdleBehavior": idleBehavior,
464 }).Infof("instance appeared in cloud")
470 executor: wp.newExecutor(inst),
472 idleBehavior: idleBehavior,
479 running: make(map[string]*remoteRunner),
480 starting: make(map[string]*remoteRunner),
481 probing: make(chan struct{}, 1),
487 // Shutdown shuts down a worker with the given type, or returns false
488 // if all workers with the given type are busy.
489 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
490 wp.setupOnce.Do(wp.setup)
492 defer wp.mtx.Unlock()
493 logger := wp.logger.WithField("InstanceType", it.Name)
494 logger.Info("shutdown requested")
495 for _, tryState := range []State{StateBooting, StateIdle} {
496 // TODO: shutdown the worker with the longest idle
497 // time (Idle) or the earliest create time (Booting)
498 for _, wkr := range wp.workers {
499 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
500 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
501 wkr.reportBootOutcome(BootOutcomeAborted)
510 // CountWorkers returns the current number of workers in each state.
512 // CountWorkers blocks, if necessary, until the initial instance list
513 // has been loaded from the cloud provider.
514 func (wp *Pool) CountWorkers() map[State]int {
515 wp.setupOnce.Do(wp.setup)
518 defer wp.mtx.Unlock()
520 for _, w := range wp.workers {
526 // Running returns the container UUIDs being prepared/run on workers.
528 // In the returned map, the time value indicates when the Pool
529 // observed that the container process had exited. A container that
530 // has not yet exited has a zero time value. The caller should use
531 // ForgetContainer() to garbage-collect the entries for exited
533 func (wp *Pool) Running() map[string]time.Time {
534 wp.setupOnce.Do(wp.setup)
536 defer wp.mtx.Unlock()
537 r := map[string]time.Time{}
538 for _, wkr := range wp.workers {
539 for uuid := range wkr.running {
540 r[uuid] = time.Time{}
542 for uuid := range wkr.starting {
543 r[uuid] = time.Time{}
546 for uuid, exited := range wp.exited {
552 // StartContainer starts a container on an idle worker immediately if
553 // possible, otherwise returns false.
554 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
555 wp.setupOnce.Do(wp.setup)
557 defer wp.mtx.Unlock()
559 for _, w := range wp.workers {
560 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
561 if wkr == nil || w.busy.After(wkr.busy) {
569 wkr.startContainer(ctr)
573 // KillContainer kills the crunch-run process for the given container
574 // UUID, if it's running on any worker.
576 // KillContainer returns immediately; the act of killing the container
577 // takes some time, and runs in the background.
579 // KillContainer returns false if the container has already ended.
580 func (wp *Pool) KillContainer(uuid string, reason string) bool {
582 defer wp.mtx.Unlock()
583 logger := wp.logger.WithFields(logrus.Fields{
584 "ContainerUUID": uuid,
587 for _, wkr := range wp.workers {
588 rr := wkr.running[uuid]
590 rr = wkr.starting[uuid]
597 logger.Debug("cannot kill: already disappeared")
601 // ForgetContainer clears the placeholder for the given exited
602 // container, so it isn't returned by subsequent calls to Running().
604 // ForgetContainer has no effect if the container has not yet exited.
606 // The "container exited at time T" placeholder (which necessitates
607 // ForgetContainer) exists to make it easier for the caller
608 // (scheduler) to distinguish a container that exited without
609 // finalizing its state from a container that exited too recently for
610 // its final state to have appeared in the scheduler's queue cache.
611 func (wp *Pool) ForgetContainer(uuid string) {
613 defer wp.mtx.Unlock()
614 if _, ok := wp.exited[uuid]; ok {
615 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
616 delete(wp.exited, uuid)
620 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
622 reg = prometheus.NewRegistry()
624 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
625 Namespace: "arvados",
626 Subsystem: "dispatchcloud",
627 Name: "containers_running",
628 Help: "Number of containers reported running by cloud VMs.",
630 reg.MustRegister(wp.mContainersRunning)
631 wp.mProbeAgeMax = prometheus.NewGauge(prometheus.GaugeOpts{
632 Namespace: "arvados",
633 Subsystem: "dispatchcloud",
634 Name: "probe_age_seconds_max",
635 Help: "Maximum number of seconds since an instance's most recent successful probe.",
637 reg.MustRegister(wp.mProbeAgeMax)
638 wp.mProbeAgeMedian = prometheus.NewGauge(prometheus.GaugeOpts{
639 Namespace: "arvados",
640 Subsystem: "dispatchcloud",
641 Name: "probe_age_seconds_median",
642 Help: "Median number of seconds since an instance's most recent successful probe.",
644 reg.MustRegister(wp.mProbeAgeMedian)
645 wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
646 Namespace: "arvados",
647 Subsystem: "dispatchcloud",
648 Name: "instances_total",
649 Help: "Number of cloud VMs.",
650 }, []string{"category", "instance_type"})
651 reg.MustRegister(wp.mInstances)
652 wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
653 Namespace: "arvados",
654 Subsystem: "dispatchcloud",
655 Name: "instances_price",
656 Help: "Price of cloud VMs.",
657 }, []string{"category"})
658 reg.MustRegister(wp.mInstancesPrice)
659 wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
660 Namespace: "arvados",
661 Subsystem: "dispatchcloud",
663 Help: "Total VCPUs on all cloud VMs.",
664 }, []string{"category"})
665 reg.MustRegister(wp.mVCPUs)
666 wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
667 Namespace: "arvados",
668 Subsystem: "dispatchcloud",
669 Name: "memory_bytes_total",
670 Help: "Total memory on all cloud VMs.",
671 }, []string{"category"})
672 reg.MustRegister(wp.mMemory)
673 wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
674 Namespace: "arvados",
675 Subsystem: "dispatchcloud",
676 Name: "boot_outcomes",
677 Help: "Boot outcomes by type.",
678 }, []string{"outcome"})
679 for k := range validBootOutcomes {
680 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
682 reg.MustRegister(wp.mBootOutcomes)
683 wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
684 Namespace: "arvados",
685 Subsystem: "dispatchcloud",
686 Name: "instances_disappeared",
687 Help: "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
688 }, []string{"state"})
689 for _, v := range stateString {
690 wp.mDisappearances.WithLabelValues(v).Add(0)
692 reg.MustRegister(wp.mDisappearances)
693 wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
694 Namespace: "arvados",
695 Subsystem: "dispatchcloud",
696 Name: "instances_time_to_ssh_seconds",
697 Help: "Number of seconds between instance creation and the first successful SSH connection.",
698 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
700 reg.MustRegister(wp.mTimeToSSH)
701 wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
702 Namespace: "arvados",
703 Subsystem: "dispatchcloud",
704 Name: "instances_time_to_ready_for_container_seconds",
705 Help: "Number of seconds between the first successful SSH connection and ready to run a container.",
706 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
708 reg.MustRegister(wp.mTimeToReadyForContainer)
709 wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
710 Namespace: "arvados",
711 Subsystem: "dispatchcloud",
712 Name: "instances_time_from_shutdown_request_to_disappearance_seconds",
713 Help: "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
714 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
716 reg.MustRegister(wp.mTimeFromShutdownToGone)
717 wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
718 Namespace: "arvados",
719 Subsystem: "dispatchcloud",
720 Name: "containers_time_from_queue_to_crunch_run_seconds",
721 Help: "Number of seconds between the queuing of a container and the start of crunch-run.",
722 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
724 reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
725 wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
726 Namespace: "arvados",
727 Subsystem: "dispatchcloud",
728 Name: "instances_run_probe_duration_seconds",
729 Help: "Number of seconds per runProbe call.",
730 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
731 }, []string{"outcome"})
732 reg.MustRegister(wp.mRunProbeDuration)
735 func (wp *Pool) runMetrics() {
737 defer wp.Unsubscribe(ch)
744 func (wp *Pool) updateMetrics() {
746 defer wp.mtx.RUnlock()
752 instances := map[entKey]int64{}
753 price := map[string]float64{}
754 cpu := map[string]int64{}
755 mem := map[string]int64{}
758 var probed []time.Time
759 for _, wkr := range wp.workers {
762 case len(wkr.running)+len(wkr.starting) > 0:
764 case wkr.idleBehavior == IdleBehaviorHold:
766 case wkr.state == StateBooting:
768 case wkr.state == StateUnknown:
773 instances[entKey{cat, wkr.instType.Name}]++
774 price[cat] += wkr.instType.Price
775 cpu[cat] += int64(wkr.instType.VCPUs)
776 mem[cat] += int64(wkr.instType.RAM)
777 running += int64(len(wkr.running) + len(wkr.starting))
778 probed = append(probed, wkr.probed)
780 for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
781 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
782 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
783 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
784 // make sure to reset gauges for non-existing category/nodetype combinations
785 for _, it := range wp.instanceTypes {
786 if _, ok := instances[entKey{cat, it.Name}]; !ok {
787 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
791 for k, v := range instances {
792 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
794 wp.mContainersRunning.Set(float64(running))
796 if len(probed) == 0 {
797 wp.mProbeAgeMax.Set(0)
798 wp.mProbeAgeMedian.Set(0)
800 sort.Slice(probed, func(i, j int) bool { return probed[i].Before(probed[j]) })
801 wp.mProbeAgeMax.Set(now.Sub(probed[0]).Seconds())
802 wp.mProbeAgeMedian.Set(now.Sub(probed[len(probed)/2]).Seconds())
806 func (wp *Pool) runProbes() {
807 maxPPS := wp.maxProbesPerSecond
809 maxPPS = defaultMaxProbesPerSecond
811 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
812 defer limitticker.Stop()
814 probeticker := time.NewTicker(wp.probeInterval)
815 defer probeticker.Stop()
817 workers := []cloud.InstanceID{}
818 for range probeticker.C {
819 // Add some jitter. Without this, if probeInterval is
820 // a multiple of syncInterval and sync is
821 // instantaneous (as with the loopback driver), the
822 // first few probes race with sync operations and
823 // don't update the workers.
824 time.Sleep(time.Duration(mathrand.Int63n(int64(wp.probeInterval) / 23)))
826 workers = workers[:0]
828 for id, wkr := range wp.workers {
829 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
832 workers = append(workers, id)
836 for _, id := range workers {
838 wkr, ok := wp.workers[id]
841 // Deleted while we were probing
845 go wkr.ProbeAndUpdate()
849 case <-limitticker.C:
855 func (wp *Pool) runSync() {
856 // sync once immediately, then wait syncInterval, sync again,
858 timer := time.NewTimer(1)
862 err := wp.getInstancesAndSync()
864 wp.logger.WithError(err).Warn("sync failed")
866 timer.Reset(wp.syncInterval)
868 wp.logger.Debug("worker.Pool stopped")
874 // Stop synchronizing with the InstanceSet.
875 func (wp *Pool) Stop() {
876 wp.setupOnce.Do(wp.setup)
880 // Instances returns an InstanceView for each worker in the pool,
881 // summarizing its current state and recent activity.
882 func (wp *Pool) Instances() []InstanceView {
884 wp.setupOnce.Do(wp.setup)
886 for _, w := range wp.workers {
887 r = append(r, InstanceView{
888 Instance: w.instance.ID(),
889 Address: w.instance.Address(),
890 Price: w.instType.Price,
891 ArvadosInstanceType: w.instType.Name,
892 ProviderInstanceType: w.instType.ProviderType,
893 LastContainerUUID: w.lastUUID,
895 WorkerState: w.state.String(),
896 IdleBehavior: w.idleBehavior,
900 sort.Slice(r, func(i, j int) bool {
901 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
906 // KillInstance destroys a cloud VM instance. It returns an error if
907 // the given instance does not exist.
908 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
909 wkr, ok := wp.workers[id]
911 return errors.New("instance not found")
913 wkr.logger.WithField("Reason", reason).Info("shutting down")
914 wkr.reportBootOutcome(BootOutcomeAborted)
919 func (wp *Pool) setup() {
920 wp.creating = map[string]createCall{}
921 wp.exited = map[string]time.Time{}
922 wp.workers = map[cloud.InstanceID]*worker{}
923 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
927 // Load the runner program to be deployed on worker nodes into
928 // wp.runnerData, if necessary. Errors are logged.
930 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
932 // Caller must not have lock.
933 func (wp *Pool) loadRunnerData() error {
935 defer wp.mtx.Unlock()
936 if wp.runnerData != nil {
938 } else if wp.runnerSource == "" {
939 wp.runnerCmd = wp.runnerCmdDefault
940 wp.runnerData = []byte{}
943 logger := wp.logger.WithField("source", wp.runnerSource)
944 logger.Debug("loading runner")
945 buf, err := ioutil.ReadFile(wp.runnerSource)
947 logger.WithError(err).Error("failed to load runner program")
951 wp.runnerMD5 = md5.Sum(buf)
952 wp.runnerCmd = fmt.Sprintf("/tmp/arvados-crunch-run/crunch-run~%x", wp.runnerMD5)
956 func (wp *Pool) notify() {
958 defer wp.mtx.RUnlock()
959 for _, send := range wp.subscribers {
961 case send <- struct{}{}:
967 func (wp *Pool) getInstancesAndSync() error {
968 wp.setupOnce.Do(wp.setup)
969 if err := wp.instanceSet.throttleInstances.Error(); err != nil {
972 wp.logger.Debug("getting instance list")
973 threshold := time.Now()
974 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
976 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
979 wp.sync(threshold, instances)
980 wp.logger.Debug("sync done")
984 // Add/remove/update workers based on instances, which was obtained
985 // from the instanceSet. However, don't clobber any other updates that
986 // already happened after threshold.
987 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
989 defer wp.mtx.Unlock()
990 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
993 for _, inst := range instances {
994 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
995 it, ok := wp.instanceTypes[itTag]
997 wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
1000 if wkr, isNew := wp.updateWorker(inst, it); isNew {
1002 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
1003 wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
1008 for id, wkr := range wp.workers {
1009 if wkr.updated.After(threshold) {
1012 logger := wp.logger.WithFields(logrus.Fields{
1013 "Instance": wkr.instance.ID(),
1014 "WorkerState": wkr.state,
1016 logger.Info("instance disappeared in cloud")
1017 wkr.reportBootOutcome(BootOutcomeDisappeared)
1018 if wp.mDisappearances != nil {
1019 wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
1021 // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
1022 if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
1023 wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
1025 delete(wp.workers, id)
1033 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
1041 func (wp *Pool) waitUntilLoaded() {
1042 ch := wp.Subscribe()
1044 defer wp.mtx.RUnlock()
1052 func (wp *Pool) gatewayAuthSecret(uuid string) string {
1053 h := hmac.New(sha256.New, []byte(wp.systemRootToken))
1055 return fmt.Sprintf("%x", h.Sum(nil))
1058 // Return a random string of n hexadecimal digits (n*4 random bits). n
1060 func randomHex(n int) string {
1061 buf := make([]byte, n/2)
1062 _, err := rand.Read(buf)
1066 return fmt.Sprintf("%x", buf)