1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
19 "git.arvados.org/arvados.git/lib/cloud"
20 "git.arvados.org/arvados.git/sdk/go/arvados"
21 "github.com/prometheus/client_golang/prometheus"
22 "github.com/sirupsen/logrus"
23 "golang.org/x/crypto/ssh"
27 tagKeyInstanceType = "InstanceType"
28 tagKeyIdleBehavior = "IdleBehavior"
29 tagKeyInstanceSecret = "InstanceSecret"
30 tagKeyInstanceSetID = "InstanceSetID"
33 // An InstanceView shows a worker's current state and recent activity.
34 type InstanceView struct {
35 Instance cloud.InstanceID `json:"instance"`
36 Address string `json:"address"`
37 Price float64 `json:"price"`
38 ArvadosInstanceType string `json:"arvados_instance_type"`
39 ProviderInstanceType string `json:"provider_instance_type"`
40 LastContainerUUID string `json:"last_container_uuid"`
41 LastBusy time.Time `json:"last_busy"`
42 WorkerState string `json:"worker_state"`
43 IdleBehavior IdleBehavior `json:"idle_behavior"`
46 // An Executor executes shell commands on a remote host.
47 type Executor interface {
48 // Run cmd on the current target.
49 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
51 // Use the given target for subsequent operations. The new
52 // target is the same host as the previous target, but it
53 // might return a different address and verify a different
56 // SetTarget is called frequently, and in most cases the new
57 // target will behave exactly the same as the old one. An
58 // implementation should optimize accordingly.
60 // SetTarget must not block on concurrent Execute calls.
61 SetTarget(cloud.ExecutorTarget)
67 defaultSyncInterval = time.Minute
68 defaultProbeInterval = time.Second * 10
69 defaultMaxProbesPerSecond = 10
70 defaultTimeoutIdle = time.Minute
71 defaultTimeoutBooting = time.Minute * 10
72 defaultTimeoutProbe = time.Minute * 10
73 defaultTimeoutShutdown = time.Second * 10
74 defaultTimeoutTERM = time.Minute * 2
75 defaultTimeoutSignal = time.Second * 5
76 defaultTimeoutStaleRunLock = time.Second * 5
78 // Time after a quota error to try again anyway, even if no
79 // instances have been shutdown.
80 quotaErrorTTL = time.Minute
82 // Time between "X failed because rate limiting" messages
83 logRateLimitErrorInterval = time.Second * 10
86 func duration(conf arvados.Duration, def time.Duration) time.Duration {
88 return time.Duration(conf)
94 // NewPool creates a Pool of workers backed by instanceSet.
96 // New instances are configured and set up according to the given
97 // cluster configuration.
98 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
101 arvClient: arvClient,
102 instanceSetID: instanceSetID,
103 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
104 newExecutor: newExecutor,
105 bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
106 runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
107 imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
108 instanceTypes: cluster.InstanceTypes,
109 maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
110 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
111 probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
112 syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
113 timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
114 timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
115 timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
116 timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
117 timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
118 timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
119 timeoutStaleRunLock: duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
120 installPublicKey: installPublicKey,
121 tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
122 stop: make(chan bool),
124 wp.registerMetrics(reg)
126 wp.setupOnce.Do(wp.setup)
134 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
135 // zero Pool should not be used. Call NewPool to create a new Pool.
138 logger logrus.FieldLogger
139 arvClient *arvados.Client
140 instanceSetID cloud.InstanceSetID
141 instanceSet *throttledInstanceSet
142 newExecutor func(cloud.Instance) Executor
143 bootProbeCommand string
145 imageID cloud.ImageID
146 instanceTypes map[string]arvados.InstanceType
147 syncInterval time.Duration
148 probeInterval time.Duration
149 maxProbesPerSecond int
150 maxConcurrentInstanceCreateOps int
151 timeoutIdle time.Duration
152 timeoutBooting time.Duration
153 timeoutProbe time.Duration
154 timeoutShutdown time.Duration
155 timeoutTERM time.Duration
156 timeoutSignal time.Duration
157 timeoutStaleRunLock time.Duration
158 installPublicKey ssh.PublicKey
162 subscribers map[<-chan struct{}]chan<- struct{}
163 creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
164 workers map[cloud.InstanceID]*worker
165 loaded bool // loaded list of instances from InstanceSet at least once
166 exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
167 atQuotaUntil time.Time
168 atQuotaErr cloud.QuotaError
173 runnerMD5 [md5.Size]byte
176 mContainersRunning prometheus.Gauge
177 mInstances *prometheus.GaugeVec
178 mInstancesPrice *prometheus.GaugeVec
179 mVCPUs *prometheus.GaugeVec
180 mMemory *prometheus.GaugeVec
181 mBootOutcomes *prometheus.CounterVec
182 mDisappearances *prometheus.CounterVec
183 mTimeToSSH prometheus.Summary
184 mTimeToReadyForContainer prometheus.Summary
187 type createCall struct {
189 instanceType arvados.InstanceType
192 func (wp *Pool) CheckHealth() error {
193 wp.setupOnce.Do(wp.setup)
194 if err := wp.loadRunnerData(); err != nil {
195 return fmt.Errorf("error loading runner binary: %s", err)
200 // Subscribe returns a buffered channel that becomes ready after any
201 // change to the pool's state that could have scheduling implications:
202 // a worker's state changes, a new worker appears, the cloud
203 // provider's API rate limiting period ends, etc.
205 // Additional events that occur while the channel is already ready
206 // will be dropped, so it is OK if the caller services the channel
211 // ch := wp.Subscribe()
212 // defer wp.Unsubscribe(ch)
219 func (wp *Pool) Subscribe() <-chan struct{} {
220 wp.setupOnce.Do(wp.setup)
222 defer wp.mtx.Unlock()
223 ch := make(chan struct{}, 1)
224 wp.subscribers[ch] = ch
228 // Unsubscribe stops sending updates to the given channel.
229 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
230 wp.setupOnce.Do(wp.setup)
232 defer wp.mtx.Unlock()
233 delete(wp.subscribers, ch)
236 // Unallocated returns the number of unallocated (creating + booting +
237 // idle + unknown) workers for each instance type. Workers in
238 // hold/drain mode are not included.
239 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
240 wp.setupOnce.Do(wp.setup)
242 defer wp.mtx.RUnlock()
243 unalloc := map[arvados.InstanceType]int{}
244 creating := map[arvados.InstanceType]int{}
245 oldestCreate := map[arvados.InstanceType]time.Time{}
246 for _, cc := range wp.creating {
247 it := cc.instanceType
249 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
250 oldestCreate[it] = cc.time
253 for _, wkr := range wp.workers {
254 // Skip workers that are not expected to become
255 // available soon. Note len(wkr.running)>0 is not
256 // redundant here: it can be true even in
258 if wkr.state == StateShutdown ||
259 wkr.state == StateRunning ||
260 wkr.idleBehavior != IdleBehaviorRun ||
261 len(wkr.running) > 0 {
266 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
267 // If up to N new workers appear in
268 // Instances() while we are waiting for N
269 // Create() calls to complete, we assume we're
270 // just seeing a race between Instances() and
271 // Create() responses.
273 // The other common reason why nodes have
274 // state==Unknown is that they appeared at
275 // startup, before any Create calls. They
276 // don't match the above timing condition, so
277 // we never mistakenly attribute them to
278 // pending Create calls.
282 for it, c := range creating {
288 // Create a new instance with the given type, and add it to the worker
289 // pool. The worker is added immediately; instance creation runs in
292 // Create returns false if a pre-existing error state prevents it from
293 // even attempting to create a new instance. Those errors are logged
294 // by the Pool, so the caller does not need to log anything in such
296 func (wp *Pool) Create(it arvados.InstanceType) bool {
297 logger := wp.logger.WithField("InstanceType", it.Name)
298 wp.setupOnce.Do(wp.setup)
299 if wp.loadRunnerData() != nil {
300 // Boot probe is certain to fail.
304 defer wp.mtx.Unlock()
305 if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil {
308 // The maxConcurrentInstanceCreateOps knob throttles the number of node create
309 // requests in flight. It was added to work around a limitation in Azure's
310 // managed disks, which support no more than 20 concurrent node creation
311 // requests from a single disk image (cf.
312 // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
313 // The code assumes that node creation, from Azure's perspective, means the
314 // period until the instance appears in the "get all instances" list.
315 if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
316 logger.Info("reached MaxConcurrentInstanceCreateOps")
317 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
321 secret := randomHex(instanceSecretLength)
322 wp.creating[secret] = createCall{time: now, instanceType: it}
325 tags := cloud.InstanceTags{
326 wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID),
327 wp.tagKeyPrefix + tagKeyInstanceType: it.Name,
328 wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
329 wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
331 initCmd := TagVerifier{nil, secret, nil}.InitCommand()
332 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
334 defer wp.mtx.Unlock()
335 // delete() is deferred so the updateWorker() call
336 // below knows to use StateBooting when adding a new
338 defer delete(wp.creating, secret)
340 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
342 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
343 time.AfterFunc(quotaErrorTTL, wp.notify)
345 logger.WithError(err).Error("create failed")
346 wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
349 wp.updateWorker(inst, it)
354 // AtQuota returns true if Create is not expected to work at the
356 func (wp *Pool) AtQuota() bool {
358 defer wp.mtx.Unlock()
359 return time.Now().Before(wp.atQuotaUntil)
362 // SetIdleBehavior determines how the indicated instance will behave
363 // when it has no containers running.
364 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
366 defer wp.mtx.Unlock()
367 wkr, ok := wp.workers[id]
369 return errors.New("requested instance does not exist")
371 wkr.setIdleBehavior(idleBehavior)
375 // Successful connection to the SSH daemon, update the mTimeToSSH metric
376 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
378 defer wp.mtx.Unlock()
379 wkr := wp.workers[inst.ID()]
380 if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
381 // the node is not in booting state (can happen if a-d-c is restarted) OR
382 // this is not the first SSH connection
386 wkr.firstSSHConnection = time.Now()
387 if wp.mTimeToSSH != nil {
388 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
392 // Add or update worker attached to the given instance.
394 // The second return value is true if a new worker is created.
396 // A newly added instance has state=StateBooting if its tags match an
397 // entry in wp.creating, otherwise StateUnknown.
399 // Caller must have lock.
400 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
401 secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
402 inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
404 if wkr := wp.workers[id]; wkr != nil {
405 wkr.executor.SetTarget(inst)
407 wkr.updated = time.Now()
412 state := StateUnknown
413 if _, ok := wp.creating[secret]; ok {
417 // If an instance has a valid IdleBehavior tag when it first
418 // appears, initialize the new worker accordingly (this is how
419 // we restore IdleBehavior that was set by a prior dispatch
420 // process); otherwise, default to "run". After this,
421 // wkr.idleBehavior is the source of truth, and will only be
422 // changed via SetIdleBehavior().
423 idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
424 if !validIdleBehavior[idleBehavior] {
425 idleBehavior = IdleBehaviorRun
428 logger := wp.logger.WithFields(logrus.Fields{
429 "InstanceType": it.Name,
430 "Instance": inst.ID(),
431 "Address": inst.Address(),
433 logger.WithFields(logrus.Fields{
435 "IdleBehavior": idleBehavior,
436 }).Infof("instance appeared in cloud")
442 executor: wp.newExecutor(inst),
444 idleBehavior: idleBehavior,
451 running: make(map[string]*remoteRunner),
452 starting: make(map[string]*remoteRunner),
453 probing: make(chan struct{}, 1),
459 // Shutdown shuts down a worker with the given type, or returns false
460 // if all workers with the given type are busy.
461 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
462 wp.setupOnce.Do(wp.setup)
464 defer wp.mtx.Unlock()
465 logger := wp.logger.WithField("InstanceType", it.Name)
466 logger.Info("shutdown requested")
467 for _, tryState := range []State{StateBooting, StateIdle} {
468 // TODO: shutdown the worker with the longest idle
469 // time (Idle) or the earliest create time (Booting)
470 for _, wkr := range wp.workers {
471 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
472 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
473 wkr.reportBootOutcome(BootOutcomeAborted)
482 // CountWorkers returns the current number of workers in each state.
484 // CountWorkers blocks, if necessary, until the initial instance list
485 // has been loaded from the cloud provider.
486 func (wp *Pool) CountWorkers() map[State]int {
487 wp.setupOnce.Do(wp.setup)
490 defer wp.mtx.Unlock()
492 for _, w := range wp.workers {
498 // Running returns the container UUIDs being prepared/run on workers.
500 // In the returned map, the time value indicates when the Pool
501 // observed that the container process had exited. A container that
502 // has not yet exited has a zero time value. The caller should use
503 // ForgetContainer() to garbage-collect the entries for exited
505 func (wp *Pool) Running() map[string]time.Time {
506 wp.setupOnce.Do(wp.setup)
508 defer wp.mtx.Unlock()
509 r := map[string]time.Time{}
510 for _, wkr := range wp.workers {
511 for uuid := range wkr.running {
512 r[uuid] = time.Time{}
514 for uuid := range wkr.starting {
515 r[uuid] = time.Time{}
518 for uuid, exited := range wp.exited {
524 // StartContainer starts a container on an idle worker immediately if
525 // possible, otherwise returns false.
526 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
527 wp.setupOnce.Do(wp.setup)
529 defer wp.mtx.Unlock()
531 for _, w := range wp.workers {
532 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
533 if wkr == nil || w.busy.After(wkr.busy) {
541 wkr.startContainer(ctr)
545 // KillContainer kills the crunch-run process for the given container
546 // UUID, if it's running on any worker.
548 // KillContainer returns immediately; the act of killing the container
549 // takes some time, and runs in the background.
551 // KillContainer returns false if the container has already ended.
552 func (wp *Pool) KillContainer(uuid string, reason string) bool {
554 defer wp.mtx.Unlock()
555 logger := wp.logger.WithFields(logrus.Fields{
556 "ContainerUUID": uuid,
559 for _, wkr := range wp.workers {
560 rr := wkr.running[uuid]
562 rr = wkr.starting[uuid]
569 logger.Debug("cannot kill: already disappeared")
573 // ForgetContainer clears the placeholder for the given exited
574 // container, so it isn't returned by subsequent calls to Running().
576 // ForgetContainer has no effect if the container has not yet exited.
578 // The "container exited at time T" placeholder (which necessitates
579 // ForgetContainer) exists to make it easier for the caller
580 // (scheduler) to distinguish a container that exited without
581 // finalizing its state from a container that exited too recently for
582 // its final state to have appeared in the scheduler's queue cache.
583 func (wp *Pool) ForgetContainer(uuid string) {
585 defer wp.mtx.Unlock()
586 if _, ok := wp.exited[uuid]; ok {
587 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
588 delete(wp.exited, uuid)
592 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
594 reg = prometheus.NewRegistry()
596 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
597 Namespace: "arvados",
598 Subsystem: "dispatchcloud",
599 Name: "containers_running",
600 Help: "Number of containers reported running by cloud VMs.",
602 reg.MustRegister(wp.mContainersRunning)
603 wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
604 Namespace: "arvados",
605 Subsystem: "dispatchcloud",
606 Name: "instances_total",
607 Help: "Number of cloud VMs.",
608 }, []string{"category", "instance_type"})
609 reg.MustRegister(wp.mInstances)
610 wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
611 Namespace: "arvados",
612 Subsystem: "dispatchcloud",
613 Name: "instances_price",
614 Help: "Price of cloud VMs.",
615 }, []string{"category"})
616 reg.MustRegister(wp.mInstancesPrice)
617 wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
618 Namespace: "arvados",
619 Subsystem: "dispatchcloud",
621 Help: "Total VCPUs on all cloud VMs.",
622 }, []string{"category"})
623 reg.MustRegister(wp.mVCPUs)
624 wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
625 Namespace: "arvados",
626 Subsystem: "dispatchcloud",
627 Name: "memory_bytes_total",
628 Help: "Total memory on all cloud VMs.",
629 }, []string{"category"})
630 reg.MustRegister(wp.mMemory)
631 wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
632 Namespace: "arvados",
633 Subsystem: "dispatchcloud",
634 Name: "boot_outcomes",
635 Help: "Boot outcomes by type.",
636 }, []string{"outcome"})
637 for k := range validBootOutcomes {
638 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
640 reg.MustRegister(wp.mBootOutcomes)
641 wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
642 Namespace: "arvados",
643 Subsystem: "dispatchcloud",
644 Name: "instances_disappeared",
645 Help: "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
646 }, []string{"state"})
647 for _, v := range stateString {
648 wp.mDisappearances.WithLabelValues(v).Add(0)
650 reg.MustRegister(wp.mDisappearances)
651 wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
652 Namespace: "arvados",
653 Subsystem: "dispatchcloud",
654 Name: "instances_time_to_ssh_seconds",
655 Help: "Number of seconds between instance creation and the first successful SSH connection.",
656 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
658 reg.MustRegister(wp.mTimeToSSH)
659 wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
660 Namespace: "arvados",
661 Subsystem: "dispatchcloud",
662 Name: "instances_time_to_ready_for_container_seconds",
663 Help: "Number of seconds between the first successful SSH connection and ready to run a container.",
664 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
666 reg.MustRegister(wp.mTimeToReadyForContainer)
669 func (wp *Pool) runMetrics() {
671 defer wp.Unsubscribe(ch)
678 func (wp *Pool) updateMetrics() {
680 defer wp.mtx.RUnlock()
686 instances := map[entKey]int64{}
687 price := map[string]float64{}
688 cpu := map[string]int64{}
689 mem := map[string]int64{}
691 for _, wkr := range wp.workers {
694 case len(wkr.running)+len(wkr.starting) > 0:
696 case wkr.idleBehavior == IdleBehaviorHold:
698 case wkr.state == StateBooting:
700 case wkr.state == StateUnknown:
705 instances[entKey{cat, wkr.instType.Name}]++
706 price[cat] += wkr.instType.Price
707 cpu[cat] += int64(wkr.instType.VCPUs)
708 mem[cat] += int64(wkr.instType.RAM)
709 running += int64(len(wkr.running) + len(wkr.starting))
711 for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
712 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
713 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
714 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
715 // make sure to reset gauges for non-existing category/nodetype combinations
716 for _, it := range wp.instanceTypes {
717 if _, ok := instances[entKey{cat, it.Name}]; !ok {
718 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
722 for k, v := range instances {
723 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
725 wp.mContainersRunning.Set(float64(running))
728 func (wp *Pool) runProbes() {
729 maxPPS := wp.maxProbesPerSecond
731 maxPPS = defaultMaxProbesPerSecond
733 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
734 defer limitticker.Stop()
736 probeticker := time.NewTicker(wp.probeInterval)
737 defer probeticker.Stop()
739 workers := []cloud.InstanceID{}
740 for range probeticker.C {
741 workers = workers[:0]
743 for id, wkr := range wp.workers {
744 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
747 workers = append(workers, id)
751 for _, id := range workers {
753 wkr, ok := wp.workers[id]
756 // Deleted while we were probing
760 go wkr.ProbeAndUpdate()
764 case <-limitticker.C:
770 func (wp *Pool) runSync() {
771 // sync once immediately, then wait syncInterval, sync again,
773 timer := time.NewTimer(1)
777 err := wp.getInstancesAndSync()
779 wp.logger.WithError(err).Warn("sync failed")
781 timer.Reset(wp.syncInterval)
783 wp.logger.Debug("worker.Pool stopped")
789 // Stop synchronizing with the InstanceSet.
790 func (wp *Pool) Stop() {
791 wp.setupOnce.Do(wp.setup)
795 // Instances returns an InstanceView for each worker in the pool,
796 // summarizing its current state and recent activity.
797 func (wp *Pool) Instances() []InstanceView {
799 wp.setupOnce.Do(wp.setup)
801 for _, w := range wp.workers {
802 r = append(r, InstanceView{
803 Instance: w.instance.ID(),
804 Address: w.instance.Address(),
805 Price: w.instType.Price,
806 ArvadosInstanceType: w.instType.Name,
807 ProviderInstanceType: w.instType.ProviderType,
808 LastContainerUUID: w.lastUUID,
810 WorkerState: w.state.String(),
811 IdleBehavior: w.idleBehavior,
815 sort.Slice(r, func(i, j int) bool {
816 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
821 // KillInstance destroys a cloud VM instance. It returns an error if
822 // the given instance does not exist.
823 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
824 wkr, ok := wp.workers[id]
826 return errors.New("instance not found")
828 wkr.logger.WithField("Reason", reason).Info("shutting down")
829 wkr.reportBootOutcome(BootOutcomeAborted)
834 func (wp *Pool) setup() {
835 wp.creating = map[string]createCall{}
836 wp.exited = map[string]time.Time{}
837 wp.workers = map[cloud.InstanceID]*worker{}
838 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
842 // Load the runner program to be deployed on worker nodes into
843 // wp.runnerData, if necessary. Errors are logged.
845 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
847 // Caller must not have lock.
848 func (wp *Pool) loadRunnerData() error {
850 defer wp.mtx.Unlock()
851 if wp.runnerData != nil {
853 } else if wp.runnerSource == "" {
854 wp.runnerCmd = "crunch-run"
855 wp.runnerData = []byte{}
858 logger := wp.logger.WithField("source", wp.runnerSource)
859 logger.Debug("loading runner")
860 buf, err := ioutil.ReadFile(wp.runnerSource)
862 logger.WithError(err).Error("failed to load runner program")
866 wp.runnerMD5 = md5.Sum(buf)
867 wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
871 func (wp *Pool) notify() {
873 defer wp.mtx.RUnlock()
874 for _, send := range wp.subscribers {
876 case send <- struct{}{}:
882 func (wp *Pool) getInstancesAndSync() error {
883 wp.setupOnce.Do(wp.setup)
884 if err := wp.instanceSet.throttleInstances.Error(); err != nil {
887 wp.logger.Debug("getting instance list")
888 threshold := time.Now()
889 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
891 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
894 wp.sync(threshold, instances)
895 wp.logger.Debug("sync done")
899 // Add/remove/update workers based on instances, which was obtained
900 // from the instanceSet. However, don't clobber any other updates that
901 // already happened after threshold.
902 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
904 defer wp.mtx.Unlock()
905 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
908 for _, inst := range instances {
909 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
910 it, ok := wp.instanceTypes[itTag]
912 wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
915 if wkr, isNew := wp.updateWorker(inst, it); isNew {
917 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
918 wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
923 for id, wkr := range wp.workers {
924 if wkr.updated.After(threshold) {
927 logger := wp.logger.WithFields(logrus.Fields{
928 "Instance": wkr.instance.ID(),
929 "WorkerState": wkr.state,
931 logger.Info("instance disappeared in cloud")
932 wkr.reportBootOutcome(BootOutcomeDisappeared)
933 if wp.mDisappearances != nil {
934 wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
936 delete(wp.workers, id)
944 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
952 func (wp *Pool) waitUntilLoaded() {
955 defer wp.mtx.RUnlock()
963 // Return a random string of n hexadecimal digits (n*4 random bits). n
965 func randomHex(n int) string {
966 buf := make([]byte, n/2)
967 _, err := rand.Read(buf)
971 return fmt.Sprintf("%x", buf)