1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
19 "git.arvados.org/arvados.git/lib/cloud"
20 "git.arvados.org/arvados.git/sdk/go/arvados"
21 "github.com/prometheus/client_golang/prometheus"
22 "github.com/sirupsen/logrus"
23 "golang.org/x/crypto/ssh"
27 tagKeyInstanceType = "InstanceType"
28 tagKeyIdleBehavior = "IdleBehavior"
29 tagKeyInstanceSecret = "InstanceSecret"
30 tagKeyInstanceSetID = "InstanceSetID"
33 // An InstanceView shows a worker's current state and recent activity.
34 type InstanceView struct {
35 Instance cloud.InstanceID `json:"instance"`
36 Address string `json:"address"`
37 Price float64 `json:"price"`
38 ArvadosInstanceType string `json:"arvados_instance_type"`
39 ProviderInstanceType string `json:"provider_instance_type"`
40 LastContainerUUID string `json:"last_container_uuid"`
41 LastBusy time.Time `json:"last_busy"`
42 WorkerState string `json:"worker_state"`
43 IdleBehavior IdleBehavior `json:"idle_behavior"`
46 // An Executor executes shell commands on a remote host.
47 type Executor interface {
48 // Run cmd on the current target.
49 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
51 // Use the given target for subsequent operations. The new
52 // target is the same host as the previous target, but it
53 // might return a different address and verify a different
56 // SetTarget is called frequently, and in most cases the new
57 // target will behave exactly the same as the old one. An
58 // implementation should optimize accordingly.
60 // SetTarget must not block on concurrent Execute calls.
61 SetTarget(cloud.ExecutorTarget)
67 defaultSyncInterval = time.Minute
68 defaultProbeInterval = time.Second * 10
69 defaultMaxProbesPerSecond = 10
70 defaultTimeoutIdle = time.Minute
71 defaultTimeoutBooting = time.Minute * 10
72 defaultTimeoutProbe = time.Minute * 10
73 defaultTimeoutShutdown = time.Second * 10
74 defaultTimeoutTERM = time.Minute * 2
75 defaultTimeoutSignal = time.Second * 5
77 // Time after a quota error to try again anyway, even if no
78 // instances have been shutdown.
79 quotaErrorTTL = time.Minute
81 // Time between "X failed because rate limiting" messages
82 logRateLimitErrorInterval = time.Second * 10
85 func duration(conf arvados.Duration, def time.Duration) time.Duration {
87 return time.Duration(conf)
93 // NewPool creates a Pool of workers backed by instanceSet.
95 // New instances are configured and set up according to the given
96 // cluster configuration.
97 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
100 arvClient: arvClient,
101 instanceSetID: instanceSetID,
102 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
103 newExecutor: newExecutor,
104 bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
105 runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
106 imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
107 instanceTypes: cluster.InstanceTypes,
108 maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
109 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
110 probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
111 syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
112 timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
113 timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
114 timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
115 timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
116 timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
117 timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
118 installPublicKey: installPublicKey,
119 tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
120 stop: make(chan bool),
122 wp.registerMetrics(reg)
124 wp.setupOnce.Do(wp.setup)
132 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
133 // zero Pool should not be used. Call NewPool to create a new Pool.
136 logger logrus.FieldLogger
137 arvClient *arvados.Client
138 instanceSetID cloud.InstanceSetID
139 instanceSet *throttledInstanceSet
140 newExecutor func(cloud.Instance) Executor
141 bootProbeCommand string
143 imageID cloud.ImageID
144 instanceTypes map[string]arvados.InstanceType
145 syncInterval time.Duration
146 probeInterval time.Duration
147 maxProbesPerSecond int
148 maxConcurrentInstanceCreateOps int
149 timeoutIdle time.Duration
150 timeoutBooting time.Duration
151 timeoutProbe time.Duration
152 timeoutShutdown time.Duration
153 timeoutTERM time.Duration
154 timeoutSignal time.Duration
155 installPublicKey ssh.PublicKey
159 subscribers map[<-chan struct{}]chan<- struct{}
160 creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
161 workers map[cloud.InstanceID]*worker
162 loaded bool // loaded list of instances from InstanceSet at least once
163 exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
164 atQuotaUntil time.Time
165 atQuotaErr cloud.QuotaError
170 runnerMD5 [md5.Size]byte
173 mContainersRunning prometheus.Gauge
174 mInstances *prometheus.GaugeVec
175 mInstancesPrice *prometheus.GaugeVec
176 mVCPUs *prometheus.GaugeVec
177 mMemory *prometheus.GaugeVec
178 mBootOutcomes *prometheus.CounterVec
179 mDisappearances *prometheus.CounterVec
180 mTimeToSSH prometheus.Summary
181 mTimeToReadyForContainer prometheus.Summary
184 type createCall struct {
186 instanceType arvados.InstanceType
189 func (wp *Pool) CheckHealth() error {
190 wp.setupOnce.Do(wp.setup)
191 if err := wp.loadRunnerData(); err != nil {
192 return fmt.Errorf("error loading runner binary: %s", err)
197 // Subscribe returns a buffered channel that becomes ready after any
198 // change to the pool's state that could have scheduling implications:
199 // a worker's state changes, a new worker appears, the cloud
200 // provider's API rate limiting period ends, etc.
202 // Additional events that occur while the channel is already ready
203 // will be dropped, so it is OK if the caller services the channel
208 // ch := wp.Subscribe()
209 // defer wp.Unsubscribe(ch)
216 func (wp *Pool) Subscribe() <-chan struct{} {
217 wp.setupOnce.Do(wp.setup)
219 defer wp.mtx.Unlock()
220 ch := make(chan struct{}, 1)
221 wp.subscribers[ch] = ch
225 // Unsubscribe stops sending updates to the given channel.
226 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
227 wp.setupOnce.Do(wp.setup)
229 defer wp.mtx.Unlock()
230 delete(wp.subscribers, ch)
233 // Unallocated returns the number of unallocated (creating + booting +
234 // idle + unknown) workers for each instance type. Workers in
235 // hold/drain mode are not included.
236 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
237 wp.setupOnce.Do(wp.setup)
239 defer wp.mtx.RUnlock()
240 unalloc := map[arvados.InstanceType]int{}
241 creating := map[arvados.InstanceType]int{}
242 oldestCreate := map[arvados.InstanceType]time.Time{}
243 for _, cc := range wp.creating {
244 it := cc.instanceType
246 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
247 oldestCreate[it] = cc.time
250 for _, wkr := range wp.workers {
251 // Skip workers that are not expected to become
252 // available soon. Note len(wkr.running)>0 is not
253 // redundant here: it can be true even in
255 if wkr.state == StateShutdown ||
256 wkr.state == StateRunning ||
257 wkr.idleBehavior != IdleBehaviorRun ||
258 len(wkr.running) > 0 {
263 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
264 // If up to N new workers appear in
265 // Instances() while we are waiting for N
266 // Create() calls to complete, we assume we're
267 // just seeing a race between Instances() and
268 // Create() responses.
270 // The other common reason why nodes have
271 // state==Unknown is that they appeared at
272 // startup, before any Create calls. They
273 // don't match the above timing condition, so
274 // we never mistakenly attribute them to
275 // pending Create calls.
279 for it, c := range creating {
285 // Create a new instance with the given type, and add it to the worker
286 // pool. The worker is added immediately; instance creation runs in
289 // Create returns false if a pre-existing error state prevents it from
290 // even attempting to create a new instance. Those errors are logged
291 // by the Pool, so the caller does not need to log anything in such
293 func (wp *Pool) Create(it arvados.InstanceType) bool {
294 logger := wp.logger.WithField("InstanceType", it.Name)
295 wp.setupOnce.Do(wp.setup)
296 if wp.loadRunnerData() != nil {
297 // Boot probe is certain to fail.
301 defer wp.mtx.Unlock()
302 if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil {
305 // The maxConcurrentInstanceCreateOps knob throttles the number of node create
306 // requests in flight. It was added to work around a limitation in Azure's
307 // managed disks, which support no more than 20 concurrent node creation
308 // requests from a single disk image (cf.
309 // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
310 // The code assumes that node creation, from Azure's perspective, means the
311 // period until the instance appears in the "get all instances" list.
312 if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
313 logger.Info("reached MaxConcurrentInstanceCreateOps")
314 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
318 secret := randomHex(instanceSecretLength)
319 wp.creating[secret] = createCall{time: now, instanceType: it}
322 tags := cloud.InstanceTags{
323 wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID),
324 wp.tagKeyPrefix + tagKeyInstanceType: it.Name,
325 wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
326 wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
328 initCmd := TagVerifier{nil, secret, nil}.InitCommand()
329 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
331 defer wp.mtx.Unlock()
332 // delete() is deferred so the updateWorker() call
333 // below knows to use StateBooting when adding a new
335 defer delete(wp.creating, secret)
337 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
339 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
340 time.AfterFunc(quotaErrorTTL, wp.notify)
342 logger.WithError(err).Error("create failed")
343 wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
346 wp.updateWorker(inst, it)
351 // AtQuota returns true if Create is not expected to work at the
353 func (wp *Pool) AtQuota() bool {
355 defer wp.mtx.Unlock()
356 return time.Now().Before(wp.atQuotaUntil)
359 // SetIdleBehavior determines how the indicated instance will behave
360 // when it has no containers running.
361 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
363 defer wp.mtx.Unlock()
364 wkr, ok := wp.workers[id]
366 return errors.New("requested instance does not exist")
368 wkr.setIdleBehavior(idleBehavior)
372 // Successful connection to the SSH daemon, update the mTimeToSSH metric
373 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
375 defer wp.mtx.Unlock()
376 wkr := wp.workers[inst.ID()]
377 if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
378 // the node is not in booting state (can happen if a-d-c is restarted) OR
379 // this is not the first SSH connection
383 wkr.firstSSHConnection = time.Now()
384 if wp.mTimeToSSH != nil {
385 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
389 // Add or update worker attached to the given instance.
391 // The second return value is true if a new worker is created.
393 // A newly added instance has state=StateBooting if its tags match an
394 // entry in wp.creating, otherwise StateUnknown.
396 // Caller must have lock.
397 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
398 secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
399 inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
401 if wkr := wp.workers[id]; wkr != nil {
402 wkr.executor.SetTarget(inst)
404 wkr.updated = time.Now()
409 state := StateUnknown
410 if _, ok := wp.creating[secret]; ok {
414 // If an instance has a valid IdleBehavior tag when it first
415 // appears, initialize the new worker accordingly (this is how
416 // we restore IdleBehavior that was set by a prior dispatch
417 // process); otherwise, default to "run". After this,
418 // wkr.idleBehavior is the source of truth, and will only be
419 // changed via SetIdleBehavior().
420 idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
421 if !validIdleBehavior[idleBehavior] {
422 idleBehavior = IdleBehaviorRun
425 logger := wp.logger.WithFields(logrus.Fields{
426 "InstanceType": it.Name,
427 "Instance": inst.ID(),
428 "Address": inst.Address(),
430 logger.WithFields(logrus.Fields{
432 "IdleBehavior": idleBehavior,
433 }).Infof("instance appeared in cloud")
439 executor: wp.newExecutor(inst),
441 idleBehavior: idleBehavior,
448 running: make(map[string]*remoteRunner),
449 starting: make(map[string]*remoteRunner),
450 probing: make(chan struct{}, 1),
456 // Shutdown shuts down a worker with the given type, or returns false
457 // if all workers with the given type are busy.
458 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
459 wp.setupOnce.Do(wp.setup)
461 defer wp.mtx.Unlock()
462 logger := wp.logger.WithField("InstanceType", it.Name)
463 logger.Info("shutdown requested")
464 for _, tryState := range []State{StateBooting, StateIdle} {
465 // TODO: shutdown the worker with the longest idle
466 // time (Idle) or the earliest create time (Booting)
467 for _, wkr := range wp.workers {
468 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
469 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
470 wkr.reportBootOutcome(BootOutcomeAborted)
479 // CountWorkers returns the current number of workers in each state.
481 // CountWorkers blocks, if necessary, until the initial instance list
482 // has been loaded from the cloud provider.
483 func (wp *Pool) CountWorkers() map[State]int {
484 wp.setupOnce.Do(wp.setup)
487 defer wp.mtx.Unlock()
489 for _, w := range wp.workers {
495 // Running returns the container UUIDs being prepared/run on workers.
497 // In the returned map, the time value indicates when the Pool
498 // observed that the container process had exited. A container that
499 // has not yet exited has a zero time value. The caller should use
500 // ForgetContainer() to garbage-collect the entries for exited
502 func (wp *Pool) Running() map[string]time.Time {
503 wp.setupOnce.Do(wp.setup)
505 defer wp.mtx.Unlock()
506 r := map[string]time.Time{}
507 for _, wkr := range wp.workers {
508 for uuid := range wkr.running {
509 r[uuid] = time.Time{}
511 for uuid := range wkr.starting {
512 r[uuid] = time.Time{}
515 for uuid, exited := range wp.exited {
521 // StartContainer starts a container on an idle worker immediately if
522 // possible, otherwise returns false.
523 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
524 wp.setupOnce.Do(wp.setup)
526 defer wp.mtx.Unlock()
528 for _, w := range wp.workers {
529 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
530 if wkr == nil || w.busy.After(wkr.busy) {
538 wkr.startContainer(ctr)
542 // KillContainer kills the crunch-run process for the given container
543 // UUID, if it's running on any worker.
545 // KillContainer returns immediately; the act of killing the container
546 // takes some time, and runs in the background.
548 // KillContainer returns false if the container has already ended.
549 func (wp *Pool) KillContainer(uuid string, reason string) bool {
551 defer wp.mtx.Unlock()
552 logger := wp.logger.WithFields(logrus.Fields{
553 "ContainerUUID": uuid,
556 for _, wkr := range wp.workers {
557 rr := wkr.running[uuid]
559 rr = wkr.starting[uuid]
566 logger.Debug("cannot kill: already disappeared")
570 // ForgetContainer clears the placeholder for the given exited
571 // container, so it isn't returned by subsequent calls to Running().
573 // ForgetContainer has no effect if the container has not yet exited.
575 // The "container exited at time T" placeholder (which necessitates
576 // ForgetContainer) exists to make it easier for the caller
577 // (scheduler) to distinguish a container that exited without
578 // finalizing its state from a container that exited too recently for
579 // its final state to have appeared in the scheduler's queue cache.
580 func (wp *Pool) ForgetContainer(uuid string) {
582 defer wp.mtx.Unlock()
583 if _, ok := wp.exited[uuid]; ok {
584 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
585 delete(wp.exited, uuid)
589 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
591 reg = prometheus.NewRegistry()
593 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
594 Namespace: "arvados",
595 Subsystem: "dispatchcloud",
596 Name: "containers_running",
597 Help: "Number of containers reported running by cloud VMs.",
599 reg.MustRegister(wp.mContainersRunning)
600 wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
601 Namespace: "arvados",
602 Subsystem: "dispatchcloud",
603 Name: "instances_total",
604 Help: "Number of cloud VMs.",
605 }, []string{"category", "instance_type"})
606 reg.MustRegister(wp.mInstances)
607 wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
608 Namespace: "arvados",
609 Subsystem: "dispatchcloud",
610 Name: "instances_price",
611 Help: "Price of cloud VMs.",
612 }, []string{"category"})
613 reg.MustRegister(wp.mInstancesPrice)
614 wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
615 Namespace: "arvados",
616 Subsystem: "dispatchcloud",
618 Help: "Total VCPUs on all cloud VMs.",
619 }, []string{"category"})
620 reg.MustRegister(wp.mVCPUs)
621 wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
622 Namespace: "arvados",
623 Subsystem: "dispatchcloud",
624 Name: "memory_bytes_total",
625 Help: "Total memory on all cloud VMs.",
626 }, []string{"category"})
627 reg.MustRegister(wp.mMemory)
628 wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
629 Namespace: "arvados",
630 Subsystem: "dispatchcloud",
631 Name: "boot_outcomes",
632 Help: "Boot outcomes by type.",
633 }, []string{"outcome"})
634 for k := range validBootOutcomes {
635 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
637 reg.MustRegister(wp.mBootOutcomes)
638 wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
639 Namespace: "arvados",
640 Subsystem: "dispatchcloud",
641 Name: "instances_disappeared",
642 Help: "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
643 }, []string{"state"})
644 for _, v := range stateString {
645 wp.mDisappearances.WithLabelValues(v).Add(0)
647 reg.MustRegister(wp.mDisappearances)
648 wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
649 Namespace: "arvados",
650 Subsystem: "dispatchcloud",
651 Name: "instances_time_to_ssh_seconds",
652 Help: "Number of seconds between instance creation and the first successful SSH connection.",
653 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
655 reg.MustRegister(wp.mTimeToSSH)
656 wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
657 Namespace: "arvados",
658 Subsystem: "dispatchcloud",
659 Name: "instances_time_to_ready_for_container_seconds",
660 Help: "Number of seconds between the first successful SSH connection and ready to run a container.",
661 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
663 reg.MustRegister(wp.mTimeToReadyForContainer)
666 func (wp *Pool) runMetrics() {
668 defer wp.Unsubscribe(ch)
675 func (wp *Pool) updateMetrics() {
677 defer wp.mtx.RUnlock()
683 instances := map[entKey]int64{}
684 price := map[string]float64{}
685 cpu := map[string]int64{}
686 mem := map[string]int64{}
688 for _, wkr := range wp.workers {
691 case len(wkr.running)+len(wkr.starting) > 0:
693 case wkr.idleBehavior == IdleBehaviorHold:
695 case wkr.state == StateBooting:
697 case wkr.state == StateUnknown:
702 instances[entKey{cat, wkr.instType.Name}]++
703 price[cat] += wkr.instType.Price
704 cpu[cat] += int64(wkr.instType.VCPUs)
705 mem[cat] += int64(wkr.instType.RAM)
706 running += int64(len(wkr.running) + len(wkr.starting))
708 for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
709 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
710 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
711 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
712 // make sure to reset gauges for non-existing category/nodetype combinations
713 for _, it := range wp.instanceTypes {
714 if _, ok := instances[entKey{cat, it.Name}]; !ok {
715 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
719 for k, v := range instances {
720 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
722 wp.mContainersRunning.Set(float64(running))
725 func (wp *Pool) runProbes() {
726 maxPPS := wp.maxProbesPerSecond
728 maxPPS = defaultMaxProbesPerSecond
730 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
731 defer limitticker.Stop()
733 probeticker := time.NewTicker(wp.probeInterval)
734 defer probeticker.Stop()
736 workers := []cloud.InstanceID{}
737 for range probeticker.C {
738 workers = workers[:0]
740 for id, wkr := range wp.workers {
741 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
744 workers = append(workers, id)
748 for _, id := range workers {
750 wkr, ok := wp.workers[id]
753 // Deleted while we were probing
757 go wkr.ProbeAndUpdate()
761 case <-limitticker.C:
767 func (wp *Pool) runSync() {
768 // sync once immediately, then wait syncInterval, sync again,
770 timer := time.NewTimer(1)
774 err := wp.getInstancesAndSync()
776 wp.logger.WithError(err).Warn("sync failed")
778 timer.Reset(wp.syncInterval)
780 wp.logger.Debug("worker.Pool stopped")
786 // Stop synchronizing with the InstanceSet.
787 func (wp *Pool) Stop() {
788 wp.setupOnce.Do(wp.setup)
792 // Instances returns an InstanceView for each worker in the pool,
793 // summarizing its current state and recent activity.
794 func (wp *Pool) Instances() []InstanceView {
796 wp.setupOnce.Do(wp.setup)
798 for _, w := range wp.workers {
799 r = append(r, InstanceView{
800 Instance: w.instance.ID(),
801 Address: w.instance.Address(),
802 Price: w.instType.Price,
803 ArvadosInstanceType: w.instType.Name,
804 ProviderInstanceType: w.instType.ProviderType,
805 LastContainerUUID: w.lastUUID,
807 WorkerState: w.state.String(),
808 IdleBehavior: w.idleBehavior,
812 sort.Slice(r, func(i, j int) bool {
813 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
818 // KillInstance destroys a cloud VM instance. It returns an error if
819 // the given instance does not exist.
820 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
821 wkr, ok := wp.workers[id]
823 return errors.New("instance not found")
825 wkr.logger.WithField("Reason", reason).Info("shutting down")
826 wkr.reportBootOutcome(BootOutcomeAborted)
831 func (wp *Pool) setup() {
832 wp.creating = map[string]createCall{}
833 wp.exited = map[string]time.Time{}
834 wp.workers = map[cloud.InstanceID]*worker{}
835 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
839 // Load the runner program to be deployed on worker nodes into
840 // wp.runnerData, if necessary. Errors are logged.
842 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
844 // Caller must not have lock.
845 func (wp *Pool) loadRunnerData() error {
847 defer wp.mtx.Unlock()
848 if wp.runnerData != nil {
850 } else if wp.runnerSource == "" {
851 wp.runnerCmd = "crunch-run"
852 wp.runnerData = []byte{}
855 logger := wp.logger.WithField("source", wp.runnerSource)
856 logger.Debug("loading runner")
857 buf, err := ioutil.ReadFile(wp.runnerSource)
859 logger.WithError(err).Error("failed to load runner program")
863 wp.runnerMD5 = md5.Sum(buf)
864 wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
868 func (wp *Pool) notify() {
870 defer wp.mtx.RUnlock()
871 for _, send := range wp.subscribers {
873 case send <- struct{}{}:
879 func (wp *Pool) getInstancesAndSync() error {
880 wp.setupOnce.Do(wp.setup)
881 if err := wp.instanceSet.throttleInstances.Error(); err != nil {
884 wp.logger.Debug("getting instance list")
885 threshold := time.Now()
886 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
888 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
891 wp.sync(threshold, instances)
892 wp.logger.Debug("sync done")
896 // Add/remove/update workers based on instances, which was obtained
897 // from the instanceSet. However, don't clobber any other updates that
898 // already happened after threshold.
899 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
901 defer wp.mtx.Unlock()
902 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
905 for _, inst := range instances {
906 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
907 it, ok := wp.instanceTypes[itTag]
909 wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
912 if wkr, isNew := wp.updateWorker(inst, it); isNew {
914 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
915 wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
920 for id, wkr := range wp.workers {
921 if wkr.updated.After(threshold) {
924 logger := wp.logger.WithFields(logrus.Fields{
925 "Instance": wkr.instance.ID(),
926 "WorkerState": wkr.state,
928 logger.Info("instance disappeared in cloud")
929 wkr.reportBootOutcome(BootOutcomeDisappeared)
930 if wp.mDisappearances != nil {
931 wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
933 delete(wp.workers, id)
941 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
949 func (wp *Pool) waitUntilLoaded() {
952 defer wp.mtx.RUnlock()
960 // Return a random string of n hexadecimal digits (n*4 random bits). n
962 func randomHex(n int) string {
963 buf := make([]byte, n/2)
964 _, err := rand.Read(buf)
968 return fmt.Sprintf("%x", buf)