1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
19 "git.arvados.org/arvados.git/lib/cloud"
20 "git.arvados.org/arvados.git/sdk/go/arvados"
21 "github.com/prometheus/client_golang/prometheus"
22 "github.com/sirupsen/logrus"
23 "golang.org/x/crypto/ssh"
27 tagKeyInstanceType = "InstanceType"
28 tagKeyIdleBehavior = "IdleBehavior"
29 tagKeyInstanceSecret = "InstanceSecret"
30 tagKeyInstanceSetID = "InstanceSetID"
33 // An InstanceView shows a worker's current state and recent activity.
34 type InstanceView struct {
35 Instance cloud.InstanceID `json:"instance"`
36 Address string `json:"address"`
37 Price float64 `json:"price"`
38 ArvadosInstanceType string `json:"arvados_instance_type"`
39 ProviderInstanceType string `json:"provider_instance_type"`
40 LastContainerUUID string `json:"last_container_uuid"`
41 LastBusy time.Time `json:"last_busy"`
42 WorkerState string `json:"worker_state"`
43 IdleBehavior IdleBehavior `json:"idle_behavior"`
46 // An Executor executes shell commands on a remote host.
47 type Executor interface {
48 // Run cmd on the current target.
49 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
51 // Use the given target for subsequent operations. The new
52 // target is the same host as the previous target, but it
53 // might return a different address and verify a different
56 // SetTarget is called frequently, and in most cases the new
57 // target will behave exactly the same as the old one. An
58 // implementation should optimize accordingly.
60 // SetTarget must not block on concurrent Execute calls.
61 SetTarget(cloud.ExecutorTarget)
67 defaultSyncInterval = time.Minute
68 defaultProbeInterval = time.Second * 10
69 defaultMaxProbesPerSecond = 10
70 defaultTimeoutIdle = time.Minute
71 defaultTimeoutBooting = time.Minute * 10
72 defaultTimeoutProbe = time.Minute * 10
73 defaultTimeoutShutdown = time.Second * 10
74 defaultTimeoutTERM = time.Minute * 2
75 defaultTimeoutSignal = time.Second * 5
77 // Time after a quota error to try again anyway, even if no
78 // instances have been shutdown.
79 quotaErrorTTL = time.Minute
81 // Time between "X failed because rate limiting" messages
82 logRateLimitErrorInterval = time.Second * 10
85 func duration(conf arvados.Duration, def time.Duration) time.Duration {
87 return time.Duration(conf)
93 // NewPool creates a Pool of workers backed by instanceSet.
95 // New instances are configured and set up according to the given
96 // cluster configuration.
97 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
100 arvClient: arvClient,
101 instanceSetID: instanceSetID,
102 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
103 newExecutor: newExecutor,
104 bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
105 runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
106 imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
107 instanceTypes: cluster.InstanceTypes,
108 maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
109 probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
110 syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
111 timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
112 timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
113 timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
114 timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
115 timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
116 timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
117 installPublicKey: installPublicKey,
118 tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
119 stop: make(chan bool),
121 wp.registerMetrics(reg)
123 wp.setupOnce.Do(wp.setup)
131 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
132 // zero Pool should not be used. Call NewPool to create a new Pool.
135 logger logrus.FieldLogger
136 arvClient *arvados.Client
137 instanceSetID cloud.InstanceSetID
138 instanceSet *throttledInstanceSet
139 newExecutor func(cloud.Instance) Executor
140 bootProbeCommand string
142 imageID cloud.ImageID
143 instanceTypes map[string]arvados.InstanceType
144 syncInterval time.Duration
145 probeInterval time.Duration
146 maxProbesPerSecond int
147 timeoutIdle time.Duration
148 timeoutBooting time.Duration
149 timeoutProbe time.Duration
150 timeoutShutdown time.Duration
151 timeoutTERM time.Duration
152 timeoutSignal time.Duration
153 installPublicKey ssh.PublicKey
157 subscribers map[<-chan struct{}]chan<- struct{}
158 creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
159 workers map[cloud.InstanceID]*worker
160 loaded bool // loaded list of instances from InstanceSet at least once
161 exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
162 atQuotaUntil time.Time
163 atQuotaErr cloud.QuotaError
168 runnerMD5 [md5.Size]byte
171 throttleCreate throttle
172 throttleInstances throttle
174 mContainersRunning prometheus.Gauge
175 mInstances *prometheus.GaugeVec
176 mInstancesPrice *prometheus.GaugeVec
177 mVCPUs *prometheus.GaugeVec
178 mMemory *prometheus.GaugeVec
179 mDisappearances *prometheus.CounterVec
182 type createCall struct {
184 instanceType arvados.InstanceType
187 func (wp *Pool) CheckHealth() error {
188 wp.setupOnce.Do(wp.setup)
189 if err := wp.loadRunnerData(); err != nil {
190 return fmt.Errorf("error loading runner binary: %s", err)
195 // Subscribe returns a buffered channel that becomes ready after any
196 // change to the pool's state that could have scheduling implications:
197 // a worker's state changes, a new worker appears, the cloud
198 // provider's API rate limiting period ends, etc.
200 // Additional events that occur while the channel is already ready
201 // will be dropped, so it is OK if the caller services the channel
206 // ch := wp.Subscribe()
207 // defer wp.Unsubscribe(ch)
214 func (wp *Pool) Subscribe() <-chan struct{} {
215 wp.setupOnce.Do(wp.setup)
217 defer wp.mtx.Unlock()
218 ch := make(chan struct{}, 1)
219 wp.subscribers[ch] = ch
223 // Unsubscribe stops sending updates to the given channel.
224 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
225 wp.setupOnce.Do(wp.setup)
227 defer wp.mtx.Unlock()
228 delete(wp.subscribers, ch)
231 // Unallocated returns the number of unallocated (creating + booting +
232 // idle + unknown) workers for each instance type. Workers in
233 // hold/drain mode are not included.
234 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
235 wp.setupOnce.Do(wp.setup)
237 defer wp.mtx.RUnlock()
238 unalloc := map[arvados.InstanceType]int{}
239 creating := map[arvados.InstanceType]int{}
240 oldestCreate := map[arvados.InstanceType]time.Time{}
241 for _, cc := range wp.creating {
242 it := cc.instanceType
244 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
245 oldestCreate[it] = cc.time
248 for _, wkr := range wp.workers {
249 // Skip workers that are not expected to become
250 // available soon. Note len(wkr.running)>0 is not
251 // redundant here: it can be true even in
253 if wkr.state == StateShutdown ||
254 wkr.state == StateRunning ||
255 wkr.idleBehavior != IdleBehaviorRun ||
256 len(wkr.running) > 0 {
261 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
262 // If up to N new workers appear in
263 // Instances() while we are waiting for N
264 // Create() calls to complete, we assume we're
265 // just seeing a race between Instances() and
266 // Create() responses.
268 // The other common reason why nodes have
269 // state==Unknown is that they appeared at
270 // startup, before any Create calls. They
271 // don't match the above timing condition, so
272 // we never mistakenly attribute them to
273 // pending Create calls.
277 for it, c := range creating {
283 // Create a new instance with the given type, and add it to the worker
284 // pool. The worker is added immediately; instance creation runs in
287 // Create returns false if a pre-existing error state prevents it from
288 // even attempting to create a new instance. Those errors are logged
289 // by the Pool, so the caller does not need to log anything in such
291 func (wp *Pool) Create(it arvados.InstanceType) bool {
292 logger := wp.logger.WithField("InstanceType", it.Name)
293 wp.setupOnce.Do(wp.setup)
294 if wp.loadRunnerData() != nil {
295 // Boot probe is certain to fail.
299 defer wp.mtx.Unlock()
300 if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
304 secret := randomHex(instanceSecretLength)
305 wp.creating[secret] = createCall{time: now, instanceType: it}
308 tags := cloud.InstanceTags{
309 wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID),
310 wp.tagKeyPrefix + tagKeyInstanceType: it.Name,
311 wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
312 wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
314 initCmd := TagVerifier{nil, secret}.InitCommand()
315 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
317 defer wp.mtx.Unlock()
318 // delete() is deferred so the updateWorker() call
319 // below knows to use StateBooting when adding a new
321 defer delete(wp.creating, secret)
323 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
325 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
326 time.AfterFunc(quotaErrorTTL, wp.notify)
328 logger.WithError(err).Error("create failed")
329 wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
332 wp.updateWorker(inst, it)
337 // AtQuota returns true if Create is not expected to work at the
339 func (wp *Pool) AtQuota() bool {
341 defer wp.mtx.Unlock()
342 return time.Now().Before(wp.atQuotaUntil)
345 // SetIdleBehavior determines how the indicated instance will behave
346 // when it has no containers running.
347 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
349 defer wp.mtx.Unlock()
350 wkr, ok := wp.workers[id]
352 return errors.New("requested instance does not exist")
354 wkr.setIdleBehavior(idleBehavior)
358 // Add or update worker attached to the given instance.
360 // The second return value is true if a new worker is created.
362 // A newly added instance has state=StateBooting if its tags match an
363 // entry in wp.creating, otherwise StateUnknown.
365 // Caller must have lock.
366 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
367 secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
368 inst = TagVerifier{inst, secret}
370 if wkr := wp.workers[id]; wkr != nil {
371 wkr.executor.SetTarget(inst)
373 wkr.updated = time.Now()
378 state := StateUnknown
379 if _, ok := wp.creating[secret]; ok {
383 // If an instance has a valid IdleBehavior tag when it first
384 // appears, initialize the new worker accordingly (this is how
385 // we restore IdleBehavior that was set by a prior dispatch
386 // process); otherwise, default to "run". After this,
387 // wkr.idleBehavior is the source of truth, and will only be
388 // changed via SetIdleBehavior().
389 idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
390 if !validIdleBehavior[idleBehavior] {
391 idleBehavior = IdleBehaviorRun
394 logger := wp.logger.WithFields(logrus.Fields{
395 "InstanceType": it.Name,
396 "Instance": inst.ID(),
397 "Address": inst.Address(),
399 logger.WithFields(logrus.Fields{
401 "IdleBehavior": idleBehavior,
402 }).Infof("instance appeared in cloud")
408 executor: wp.newExecutor(inst),
410 idleBehavior: idleBehavior,
417 running: make(map[string]*remoteRunner),
418 starting: make(map[string]*remoteRunner),
419 probing: make(chan struct{}, 1),
425 // Shutdown shuts down a worker with the given type, or returns false
426 // if all workers with the given type are busy.
427 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
428 wp.setupOnce.Do(wp.setup)
430 defer wp.mtx.Unlock()
431 logger := wp.logger.WithField("InstanceType", it.Name)
432 logger.Info("shutdown requested")
433 for _, tryState := range []State{StateBooting, StateIdle} {
434 // TODO: shutdown the worker with the longest idle
435 // time (Idle) or the earliest create time (Booting)
436 for _, wkr := range wp.workers {
437 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
438 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
447 // CountWorkers returns the current number of workers in each state.
449 // CountWorkers blocks, if necessary, until the initial instance list
450 // has been loaded from the cloud provider.
451 func (wp *Pool) CountWorkers() map[State]int {
452 wp.setupOnce.Do(wp.setup)
455 defer wp.mtx.Unlock()
457 for _, w := range wp.workers {
463 // Running returns the container UUIDs being prepared/run on workers.
465 // In the returned map, the time value indicates when the Pool
466 // observed that the container process had exited. A container that
467 // has not yet exited has a zero time value. The caller should use
468 // ForgetContainer() to garbage-collect the entries for exited
470 func (wp *Pool) Running() map[string]time.Time {
471 wp.setupOnce.Do(wp.setup)
473 defer wp.mtx.Unlock()
474 r := map[string]time.Time{}
475 for _, wkr := range wp.workers {
476 for uuid := range wkr.running {
477 r[uuid] = time.Time{}
479 for uuid := range wkr.starting {
480 r[uuid] = time.Time{}
483 for uuid, exited := range wp.exited {
489 // StartContainer starts a container on an idle worker immediately if
490 // possible, otherwise returns false.
491 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
492 wp.setupOnce.Do(wp.setup)
494 defer wp.mtx.Unlock()
496 for _, w := range wp.workers {
497 if w.instType == it && w.state == StateIdle {
498 if wkr == nil || w.busy.After(wkr.busy) {
506 wkr.startContainer(ctr)
510 // KillContainer kills the crunch-run process for the given container
511 // UUID, if it's running on any worker.
513 // KillContainer returns immediately; the act of killing the container
514 // takes some time, and runs in the background.
516 // KillContainer returns false if the container has already ended.
517 func (wp *Pool) KillContainer(uuid string, reason string) bool {
519 defer wp.mtx.Unlock()
520 logger := wp.logger.WithFields(logrus.Fields{
521 "ContainerUUID": uuid,
524 for _, wkr := range wp.workers {
525 rr := wkr.running[uuid]
527 rr = wkr.starting[uuid]
534 logger.Debug("cannot kill: already disappeared")
538 // ForgetContainer clears the placeholder for the given exited
539 // container, so it isn't returned by subsequent calls to Running().
541 // ForgetContainer has no effect if the container has not yet exited.
543 // The "container exited at time T" placeholder (which necessitates
544 // ForgetContainer) exists to make it easier for the caller
545 // (scheduler) to distinguish a container that exited without
546 // finalizing its state from a container that exited too recently for
547 // its final state to have appeared in the scheduler's queue cache.
548 func (wp *Pool) ForgetContainer(uuid string) {
550 defer wp.mtx.Unlock()
551 if _, ok := wp.exited[uuid]; ok {
552 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
553 delete(wp.exited, uuid)
557 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
559 reg = prometheus.NewRegistry()
561 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
562 Namespace: "arvados",
563 Subsystem: "dispatchcloud",
564 Name: "containers_running",
565 Help: "Number of containers reported running by cloud VMs.",
567 reg.MustRegister(wp.mContainersRunning)
568 wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
569 Namespace: "arvados",
570 Subsystem: "dispatchcloud",
571 Name: "instances_total",
572 Help: "Number of cloud VMs.",
573 }, []string{"category", "instance_type"})
574 reg.MustRegister(wp.mInstances)
575 wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
576 Namespace: "arvados",
577 Subsystem: "dispatchcloud",
578 Name: "instances_price",
579 Help: "Price of cloud VMs.",
580 }, []string{"category"})
581 reg.MustRegister(wp.mInstancesPrice)
582 wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
583 Namespace: "arvados",
584 Subsystem: "dispatchcloud",
586 Help: "Total VCPUs on all cloud VMs.",
587 }, []string{"category"})
588 reg.MustRegister(wp.mVCPUs)
589 wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
590 Namespace: "arvados",
591 Subsystem: "dispatchcloud",
592 Name: "memory_bytes_total",
593 Help: "Total memory on all cloud VMs.",
594 }, []string{"category"})
595 reg.MustRegister(wp.mMemory)
596 wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
597 Namespace: "arvados",
598 Subsystem: "dispatchcloud",
599 Name: "instances_disappeared",
600 Help: "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
601 }, []string{"state"})
602 for _, v := range stateString {
603 wp.mDisappearances.WithLabelValues(v).Add(0)
605 reg.MustRegister(wp.mDisappearances)
608 func (wp *Pool) runMetrics() {
610 defer wp.Unsubscribe(ch)
617 func (wp *Pool) updateMetrics() {
619 defer wp.mtx.RUnlock()
625 instances := map[entKey]int64{}
626 price := map[string]float64{}
627 cpu := map[string]int64{}
628 mem := map[string]int64{}
630 for _, wkr := range wp.workers {
633 case len(wkr.running)+len(wkr.starting) > 0:
635 case wkr.idleBehavior == IdleBehaviorHold:
637 case wkr.state == StateBooting:
639 case wkr.state == StateUnknown:
644 instances[entKey{cat, wkr.instType.Name}]++
645 price[cat] += wkr.instType.Price
646 cpu[cat] += int64(wkr.instType.VCPUs)
647 mem[cat] += int64(wkr.instType.RAM)
648 running += int64(len(wkr.running) + len(wkr.starting))
650 for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
651 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
652 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
653 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
654 // make sure to reset gauges for non-existing category/nodetype combinations
655 for _, it := range wp.instanceTypes {
656 if _, ok := instances[entKey{cat, it.Name}]; !ok {
657 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
661 for k, v := range instances {
662 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
664 wp.mContainersRunning.Set(float64(running))
667 func (wp *Pool) runProbes() {
668 maxPPS := wp.maxProbesPerSecond
670 maxPPS = defaultMaxProbesPerSecond
672 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
673 defer limitticker.Stop()
675 probeticker := time.NewTicker(wp.probeInterval)
676 defer probeticker.Stop()
678 workers := []cloud.InstanceID{}
679 for range probeticker.C {
680 workers = workers[:0]
682 for id, wkr := range wp.workers {
683 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
686 workers = append(workers, id)
690 for _, id := range workers {
692 wkr, ok := wp.workers[id]
695 // Deleted while we were probing
699 go wkr.ProbeAndUpdate()
703 case <-limitticker.C:
709 func (wp *Pool) runSync() {
710 // sync once immediately, then wait syncInterval, sync again,
712 timer := time.NewTimer(1)
716 err := wp.getInstancesAndSync()
718 wp.logger.WithError(err).Warn("sync failed")
720 timer.Reset(wp.syncInterval)
722 wp.logger.Debug("worker.Pool stopped")
728 // Stop synchronizing with the InstanceSet.
729 func (wp *Pool) Stop() {
730 wp.setupOnce.Do(wp.setup)
734 // Instances returns an InstanceView for each worker in the pool,
735 // summarizing its current state and recent activity.
736 func (wp *Pool) Instances() []InstanceView {
738 wp.setupOnce.Do(wp.setup)
740 for _, w := range wp.workers {
741 r = append(r, InstanceView{
742 Instance: w.instance.ID(),
743 Address: w.instance.Address(),
744 Price: w.instType.Price,
745 ArvadosInstanceType: w.instType.Name,
746 ProviderInstanceType: w.instType.ProviderType,
747 LastContainerUUID: w.lastUUID,
749 WorkerState: w.state.String(),
750 IdleBehavior: w.idleBehavior,
754 sort.Slice(r, func(i, j int) bool {
755 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
760 // KillInstance destroys a cloud VM instance. It returns an error if
761 // the given instance does not exist.
762 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
763 wkr, ok := wp.workers[id]
765 return errors.New("instance not found")
767 wkr.logger.WithField("Reason", reason).Info("shutting down")
772 func (wp *Pool) setup() {
773 wp.creating = map[string]createCall{}
774 wp.exited = map[string]time.Time{}
775 wp.workers = map[cloud.InstanceID]*worker{}
776 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
780 // Load the runner program to be deployed on worker nodes into
781 // wp.runnerData, if necessary. Errors are logged.
783 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
785 // Caller must not have lock.
786 func (wp *Pool) loadRunnerData() error {
788 defer wp.mtx.Unlock()
789 if wp.runnerData != nil {
791 } else if wp.runnerSource == "" {
792 wp.runnerCmd = "crunch-run"
793 wp.runnerData = []byte{}
796 logger := wp.logger.WithField("source", wp.runnerSource)
797 logger.Debug("loading runner")
798 buf, err := ioutil.ReadFile(wp.runnerSource)
800 logger.WithError(err).Error("failed to load runner program")
804 wp.runnerMD5 = md5.Sum(buf)
805 wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
809 func (wp *Pool) notify() {
811 defer wp.mtx.RUnlock()
812 for _, send := range wp.subscribers {
814 case send <- struct{}{}:
820 func (wp *Pool) getInstancesAndSync() error {
821 wp.setupOnce.Do(wp.setup)
822 if err := wp.instanceSet.throttleInstances.Error(); err != nil {
825 wp.logger.Debug("getting instance list")
826 threshold := time.Now()
827 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
829 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
832 wp.sync(threshold, instances)
833 wp.logger.Debug("sync done")
837 // Add/remove/update workers based on instances, which was obtained
838 // from the instanceSet. However, don't clobber any other updates that
839 // already happened after threshold.
840 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
842 defer wp.mtx.Unlock()
843 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
846 for _, inst := range instances {
847 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
848 it, ok := wp.instanceTypes[itTag]
850 wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
853 if wkr, isNew := wp.updateWorker(inst, it); isNew {
855 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
856 wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
861 for id, wkr := range wp.workers {
862 if wkr.updated.After(threshold) {
865 logger := wp.logger.WithFields(logrus.Fields{
866 "Instance": wkr.instance.ID(),
867 "WorkerState": wkr.state,
869 logger.Info("instance disappeared in cloud")
870 if wp.mDisappearances != nil {
871 wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
873 delete(wp.workers, id)
881 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
889 func (wp *Pool) waitUntilLoaded() {
892 defer wp.mtx.RUnlock()
900 // Return a random string of n hexadecimal digits (n*4 random bits). n
902 func randomHex(n int) string {
903 buf := make([]byte, n/2)
904 _, err := rand.Read(buf)
908 return fmt.Sprintf("%x", buf)