1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
19 "git.arvados.org/arvados.git/lib/cloud"
20 "git.arvados.org/arvados.git/sdk/go/arvados"
21 "github.com/prometheus/client_golang/prometheus"
22 "github.com/sirupsen/logrus"
23 "golang.org/x/crypto/ssh"
27 tagKeyInstanceType = "InstanceType"
28 tagKeyIdleBehavior = "IdleBehavior"
29 tagKeyInstanceSecret = "InstanceSecret"
30 tagKeyInstanceSetID = "InstanceSetID"
33 // An InstanceView shows a worker's current state and recent activity.
34 type InstanceView struct {
35 Instance cloud.InstanceID `json:"instance"`
36 Address string `json:"address"`
37 Price float64 `json:"price"`
38 ArvadosInstanceType string `json:"arvados_instance_type"`
39 ProviderInstanceType string `json:"provider_instance_type"`
40 LastContainerUUID string `json:"last_container_uuid"`
41 LastBusy time.Time `json:"last_busy"`
42 WorkerState string `json:"worker_state"`
43 IdleBehavior IdleBehavior `json:"idle_behavior"`
46 // An Executor executes shell commands on a remote host.
47 type Executor interface {
48 // Run cmd on the current target.
49 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
51 // Use the given target for subsequent operations. The new
52 // target is the same host as the previous target, but it
53 // might return a different address and verify a different
56 // SetTarget is called frequently, and in most cases the new
57 // target will behave exactly the same as the old one. An
58 // implementation should optimize accordingly.
60 // SetTarget must not block on concurrent Execute calls.
61 SetTarget(cloud.ExecutorTarget)
67 defaultSyncInterval = time.Minute
68 defaultProbeInterval = time.Second * 10
69 defaultMaxProbesPerSecond = 10
70 defaultTimeoutIdle = time.Minute
71 defaultTimeoutBooting = time.Minute * 10
72 defaultTimeoutProbe = time.Minute * 10
73 defaultTimeoutShutdown = time.Second * 10
74 defaultTimeoutTERM = time.Minute * 2
75 defaultTimeoutSignal = time.Second * 5
77 // Time after a quota error to try again anyway, even if no
78 // instances have been shutdown.
79 quotaErrorTTL = time.Minute
81 // Time between "X failed because rate limiting" messages
82 logRateLimitErrorInterval = time.Second * 10
85 func duration(conf arvados.Duration, def time.Duration) time.Duration {
87 return time.Duration(conf)
93 // NewPool creates a Pool of workers backed by instanceSet.
95 // New instances are configured and set up according to the given
96 // cluster configuration.
97 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
100 arvClient: arvClient,
101 instanceSetID: instanceSetID,
102 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
103 newExecutor: newExecutor,
104 bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
105 runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
106 imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
107 instanceTypes: cluster.InstanceTypes,
108 maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
109 probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
110 syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
111 timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
112 timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
113 timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
114 timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
115 timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
116 timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
117 installPublicKey: installPublicKey,
118 tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
119 stop: make(chan bool),
121 wp.registerMetrics(reg)
123 wp.setupOnce.Do(wp.setup)
131 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
132 // zero Pool should not be used. Call NewPool to create a new Pool.
135 logger logrus.FieldLogger
136 arvClient *arvados.Client
137 instanceSetID cloud.InstanceSetID
138 instanceSet *throttledInstanceSet
139 newExecutor func(cloud.Instance) Executor
140 bootProbeCommand string
142 imageID cloud.ImageID
143 instanceTypes map[string]arvados.InstanceType
144 syncInterval time.Duration
145 probeInterval time.Duration
146 maxProbesPerSecond int
147 timeoutIdle time.Duration
148 timeoutBooting time.Duration
149 timeoutProbe time.Duration
150 timeoutShutdown time.Duration
151 timeoutTERM time.Duration
152 timeoutSignal time.Duration
153 installPublicKey ssh.PublicKey
157 subscribers map[<-chan struct{}]chan<- struct{}
158 creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
159 workers map[cloud.InstanceID]*worker
160 loaded bool // loaded list of instances from InstanceSet at least once
161 exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
162 atQuotaUntil time.Time
163 atQuotaErr cloud.QuotaError
168 runnerMD5 [md5.Size]byte
171 throttleCreate throttle
172 throttleInstances throttle
174 mContainersRunning prometheus.Gauge
175 mInstances *prometheus.GaugeVec
176 mInstancesPrice *prometheus.GaugeVec
177 mVCPUs *prometheus.GaugeVec
178 mMemory *prometheus.GaugeVec
179 mDisappearances *prometheus.CounterVec
182 type createCall struct {
184 instanceType arvados.InstanceType
187 func (wp *Pool) CheckHealth() error {
188 wp.setupOnce.Do(wp.setup)
189 if err := wp.loadRunnerData(); err != nil {
190 return fmt.Errorf("error loading runner binary: %s", err)
195 // Subscribe returns a buffered channel that becomes ready after any
196 // change to the pool's state that could have scheduling implications:
197 // a worker's state changes, a new worker appears, the cloud
198 // provider's API rate limiting period ends, etc.
200 // Additional events that occur while the channel is already ready
201 // will be dropped, so it is OK if the caller services the channel
206 // ch := wp.Subscribe()
207 // defer wp.Unsubscribe(ch)
214 func (wp *Pool) Subscribe() <-chan struct{} {
215 wp.setupOnce.Do(wp.setup)
217 defer wp.mtx.Unlock()
218 ch := make(chan struct{}, 1)
219 wp.subscribers[ch] = ch
223 // Unsubscribe stops sending updates to the given channel.
224 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
225 wp.setupOnce.Do(wp.setup)
227 defer wp.mtx.Unlock()
228 delete(wp.subscribers, ch)
231 // Unallocated returns the number of unallocated (creating + booting +
232 // idle + unknown) workers for each instance type. Workers in
233 // hold/drain mode are not included.
234 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
235 wp.setupOnce.Do(wp.setup)
237 defer wp.mtx.RUnlock()
238 unalloc := map[arvados.InstanceType]int{}
239 creating := map[arvados.InstanceType]int{}
240 oldestCreate := map[arvados.InstanceType]time.Time{}
241 for _, cc := range wp.creating {
242 it := cc.instanceType
244 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
245 oldestCreate[it] = cc.time
248 for _, wkr := range wp.workers {
249 // Skip workers that are not expected to become
250 // available soon. Note len(wkr.running)>0 is not
251 // redundant here: it can be true even in
253 if wkr.state == StateShutdown ||
254 wkr.state == StateRunning ||
255 wkr.idleBehavior != IdleBehaviorRun ||
256 len(wkr.running) > 0 {
261 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
262 // If up to N new workers appear in
263 // Instances() while we are waiting for N
264 // Create() calls to complete, we assume we're
265 // just seeing a race between Instances() and
266 // Create() responses.
268 // The other common reason why nodes have
269 // state==Unknown is that they appeared at
270 // startup, before any Create calls. They
271 // don't match the above timing condition, so
272 // we never mistakenly attribute them to
273 // pending Create calls.
277 for it, c := range creating {
283 // Create a new instance with the given type, and add it to the worker
284 // pool. The worker is added immediately; instance creation runs in
287 // Create returns false if a pre-existing error state prevents it from
288 // even attempting to create a new instance. Those errors are logged
289 // by the Pool, so the caller does not need to log anything in such
291 func (wp *Pool) Create(it arvados.InstanceType) bool {
292 logger := wp.logger.WithField("InstanceType", it.Name)
293 wp.setupOnce.Do(wp.setup)
294 if wp.loadRunnerData() != nil {
295 // Boot probe is certain to fail.
299 defer wp.mtx.Unlock()
300 if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
304 secret := randomHex(instanceSecretLength)
305 wp.creating[secret] = createCall{time: now, instanceType: it}
308 tags := cloud.InstanceTags{
309 wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID),
310 wp.tagKeyPrefix + tagKeyInstanceType: it.Name,
311 wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
312 wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
314 initCmd := TagVerifier{nil, secret}.InitCommand()
315 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
317 defer wp.mtx.Unlock()
318 // delete() is deferred so the updateWorker() call
319 // below knows to use StateBooting when adding a new
321 defer delete(wp.creating, secret)
323 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
325 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
326 time.AfterFunc(quotaErrorTTL, wp.notify)
328 logger.WithError(err).Error("create failed")
329 wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
332 wp.updateWorker(inst, it)
337 // AtQuota returns true if Create is not expected to work at the
339 func (wp *Pool) AtQuota() bool {
341 defer wp.mtx.Unlock()
342 return time.Now().Before(wp.atQuotaUntil)
345 // SetIdleBehavior determines how the indicated instance will behave
346 // when it has no containers running.
347 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
349 defer wp.mtx.Unlock()
350 wkr, ok := wp.workers[id]
352 return errors.New("requested instance does not exist")
354 wkr.setIdleBehavior(idleBehavior)
358 // Add or update worker attached to the given instance.
360 // The second return value is true if a new worker is created.
362 // A newly added instance has state=StateBooting if its tags match an
363 // entry in wp.creating, otherwise StateUnknown.
365 // Caller must have lock.
366 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
367 secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
368 inst = TagVerifier{inst, secret}
370 if wkr := wp.workers[id]; wkr != nil {
371 wkr.executor.SetTarget(inst)
373 wkr.updated = time.Now()
378 state := StateUnknown
379 if _, ok := wp.creating[secret]; ok {
383 // If an instance has a valid IdleBehavior tag when it first
384 // appears, initialize the new worker accordingly (this is how
385 // we restore IdleBehavior that was set by a prior dispatch
386 // process); otherwise, default to "run". After this,
387 // wkr.idleBehavior is the source of truth, and will only be
388 // changed via SetIdleBehavior().
389 idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
390 if !validIdleBehavior[idleBehavior] {
391 idleBehavior = IdleBehaviorRun
394 logger := wp.logger.WithFields(logrus.Fields{
395 "InstanceType": it.Name,
396 "Instance": inst.ID(),
397 "Address": inst.Address(),
399 logger.WithFields(logrus.Fields{
401 "IdleBehavior": idleBehavior,
402 }).Infof("instance appeared in cloud")
408 executor: wp.newExecutor(inst),
410 idleBehavior: idleBehavior,
417 running: make(map[string]*remoteRunner),
418 starting: make(map[string]*remoteRunner),
419 probing: make(chan struct{}, 1),
425 // Shutdown shuts down a worker with the given type, or returns false
426 // if all workers with the given type are busy.
427 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
428 wp.setupOnce.Do(wp.setup)
430 defer wp.mtx.Unlock()
431 logger := wp.logger.WithField("InstanceType", it.Name)
432 logger.Info("shutdown requested")
433 for _, tryState := range []State{StateBooting, StateIdle} {
434 // TODO: shutdown the worker with the longest idle
435 // time (Idle) or the earliest create time (Booting)
436 for _, wkr := range wp.workers {
437 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
438 logger.WithField("Instance", wkr.instance).Info("shutting down")
447 // CountWorkers returns the current number of workers in each state.
449 // CountWorkers blocks, if necessary, until the initial instance list
450 // has been loaded from the cloud provider.
451 func (wp *Pool) CountWorkers() map[State]int {
452 wp.setupOnce.Do(wp.setup)
455 defer wp.mtx.Unlock()
457 for _, w := range wp.workers {
463 // Running returns the container UUIDs being prepared/run on workers.
465 // In the returned map, the time value indicates when the Pool
466 // observed that the container process had exited. A container that
467 // has not yet exited has a zero time value. The caller should use
468 // ForgetContainer() to garbage-collect the entries for exited
470 func (wp *Pool) Running() map[string]time.Time {
471 wp.setupOnce.Do(wp.setup)
473 defer wp.mtx.Unlock()
474 r := map[string]time.Time{}
475 for _, wkr := range wp.workers {
476 for uuid := range wkr.running {
477 r[uuid] = time.Time{}
479 for uuid := range wkr.starting {
480 r[uuid] = time.Time{}
483 for uuid, exited := range wp.exited {
489 // StartContainer starts a container on an idle worker immediately if
490 // possible, otherwise returns false.
491 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
492 wp.setupOnce.Do(wp.setup)
494 defer wp.mtx.Unlock()
496 for _, w := range wp.workers {
497 if w.instType == it && w.state == StateIdle {
498 if wkr == nil || w.busy.After(wkr.busy) {
506 wkr.startContainer(ctr)
510 // KillContainer kills the crunch-run process for the given container
511 // UUID, if it's running on any worker.
513 // KillContainer returns immediately; the act of killing the container
514 // takes some time, and runs in the background.
516 // KillContainer returns false if the container has already ended.
517 func (wp *Pool) KillContainer(uuid string, reason string) bool {
519 defer wp.mtx.Unlock()
520 logger := wp.logger.WithFields(logrus.Fields{
521 "ContainerUUID": uuid,
524 for _, wkr := range wp.workers {
525 rr := wkr.running[uuid]
527 rr = wkr.starting[uuid]
534 logger.Debug("cannot kill: already disappeared")
538 // ForgetContainer clears the placeholder for the given exited
539 // container, so it isn't returned by subsequent calls to Running().
541 // ForgetContainer has no effect if the container has not yet exited.
543 // The "container exited at time T" placeholder (which necessitates
544 // ForgetContainer) exists to make it easier for the caller
545 // (scheduler) to distinguish a container that exited without
546 // finalizing its state from a container that exited too recently for
547 // its final state to have appeared in the scheduler's queue cache.
548 func (wp *Pool) ForgetContainer(uuid string) {
550 defer wp.mtx.Unlock()
551 if _, ok := wp.exited[uuid]; ok {
552 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
553 delete(wp.exited, uuid)
557 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
559 reg = prometheus.NewRegistry()
561 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
562 Namespace: "arvados",
563 Subsystem: "dispatchcloud",
564 Name: "containers_running",
565 Help: "Number of containers reported running by cloud VMs.",
567 reg.MustRegister(wp.mContainersRunning)
568 wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
569 Namespace: "arvados",
570 Subsystem: "dispatchcloud",
571 Name: "instances_total",
572 Help: "Number of cloud VMs.",
573 }, []string{"category"})
574 reg.MustRegister(wp.mInstances)
575 wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
576 Namespace: "arvados",
577 Subsystem: "dispatchcloud",
578 Name: "instances_price",
579 Help: "Price of cloud VMs.",
580 }, []string{"category"})
581 reg.MustRegister(wp.mInstancesPrice)
582 wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
583 Namespace: "arvados",
584 Subsystem: "dispatchcloud",
586 Help: "Total VCPUs on all cloud VMs.",
587 }, []string{"category"})
588 reg.MustRegister(wp.mVCPUs)
589 wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
590 Namespace: "arvados",
591 Subsystem: "dispatchcloud",
592 Name: "memory_bytes_total",
593 Help: "Total memory on all cloud VMs.",
594 }, []string{"category"})
595 reg.MustRegister(wp.mMemory)
596 wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
597 Namespace: "arvados",
598 Subsystem: "dispatchcloud",
599 Name: "instances_disappeared",
600 Help: "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
601 }, []string{"state"})
602 for _, v := range stateString {
603 wp.mDisappearances.WithLabelValues(v).Add(0)
605 reg.MustRegister(wp.mDisappearances)
608 func (wp *Pool) runMetrics() {
610 defer wp.Unsubscribe(ch)
617 func (wp *Pool) updateMetrics() {
619 defer wp.mtx.RUnlock()
621 instances := map[string]int64{}
622 price := map[string]float64{}
623 cpu := map[string]int64{}
624 mem := map[string]int64{}
626 for _, wkr := range wp.workers {
629 case len(wkr.running)+len(wkr.starting) > 0:
631 case wkr.idleBehavior == IdleBehaviorHold:
633 case wkr.state == StateBooting:
635 case wkr.state == StateUnknown:
641 price[cat] += wkr.instType.Price
642 cpu[cat] += int64(wkr.instType.VCPUs)
643 mem[cat] += int64(wkr.instType.RAM)
644 running += int64(len(wkr.running) + len(wkr.starting))
646 for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
647 wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat]))
648 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
649 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
650 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
652 wp.mContainersRunning.Set(float64(running))
655 func (wp *Pool) runProbes() {
656 maxPPS := wp.maxProbesPerSecond
658 maxPPS = defaultMaxProbesPerSecond
660 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
661 defer limitticker.Stop()
663 probeticker := time.NewTicker(wp.probeInterval)
664 defer probeticker.Stop()
666 workers := []cloud.InstanceID{}
667 for range probeticker.C {
668 workers = workers[:0]
670 for id, wkr := range wp.workers {
671 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
674 workers = append(workers, id)
678 for _, id := range workers {
680 wkr, ok := wp.workers[id]
683 // Deleted while we were probing
687 go wkr.ProbeAndUpdate()
691 case <-limitticker.C:
697 func (wp *Pool) runSync() {
698 // sync once immediately, then wait syncInterval, sync again,
700 timer := time.NewTimer(1)
704 err := wp.getInstancesAndSync()
706 wp.logger.WithError(err).Warn("sync failed")
708 timer.Reset(wp.syncInterval)
710 wp.logger.Debug("worker.Pool stopped")
716 // Stop synchronizing with the InstanceSet.
717 func (wp *Pool) Stop() {
718 wp.setupOnce.Do(wp.setup)
722 // Instances returns an InstanceView for each worker in the pool,
723 // summarizing its current state and recent activity.
724 func (wp *Pool) Instances() []InstanceView {
726 wp.setupOnce.Do(wp.setup)
728 for _, w := range wp.workers {
729 r = append(r, InstanceView{
730 Instance: w.instance.ID(),
731 Address: w.instance.Address(),
732 Price: w.instType.Price,
733 ArvadosInstanceType: w.instType.Name,
734 ProviderInstanceType: w.instType.ProviderType,
735 LastContainerUUID: w.lastUUID,
737 WorkerState: w.state.String(),
738 IdleBehavior: w.idleBehavior,
742 sort.Slice(r, func(i, j int) bool {
743 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
748 // KillInstance destroys a cloud VM instance. It returns an error if
749 // the given instance does not exist.
750 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
751 wkr, ok := wp.workers[id]
753 return errors.New("instance not found")
755 wkr.logger.WithField("Reason", reason).Info("shutting down")
760 func (wp *Pool) setup() {
761 wp.creating = map[string]createCall{}
762 wp.exited = map[string]time.Time{}
763 wp.workers = map[cloud.InstanceID]*worker{}
764 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
768 // Load the runner program to be deployed on worker nodes into
769 // wp.runnerData, if necessary. Errors are logged.
771 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
773 // Caller must not have lock.
774 func (wp *Pool) loadRunnerData() error {
776 defer wp.mtx.Unlock()
777 if wp.runnerData != nil {
779 } else if wp.runnerSource == "" {
780 wp.runnerCmd = "crunch-run"
781 wp.runnerData = []byte{}
784 logger := wp.logger.WithField("source", wp.runnerSource)
785 logger.Debug("loading runner")
786 buf, err := ioutil.ReadFile(wp.runnerSource)
788 logger.WithError(err).Error("failed to load runner program")
792 wp.runnerMD5 = md5.Sum(buf)
793 wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
797 func (wp *Pool) notify() {
799 defer wp.mtx.RUnlock()
800 for _, send := range wp.subscribers {
802 case send <- struct{}{}:
808 func (wp *Pool) getInstancesAndSync() error {
809 wp.setupOnce.Do(wp.setup)
810 if err := wp.instanceSet.throttleInstances.Error(); err != nil {
813 wp.logger.Debug("getting instance list")
814 threshold := time.Now()
815 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
817 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
820 wp.sync(threshold, instances)
821 wp.logger.Debug("sync done")
825 // Add/remove/update workers based on instances, which was obtained
826 // from the instanceSet. However, don't clobber any other updates that
827 // already happened after threshold.
828 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
830 defer wp.mtx.Unlock()
831 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
834 for _, inst := range instances {
835 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
836 it, ok := wp.instanceTypes[itTag]
838 wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
841 if wkr, isNew := wp.updateWorker(inst, it); isNew {
843 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
844 wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
849 for id, wkr := range wp.workers {
850 if wkr.updated.After(threshold) {
853 logger := wp.logger.WithFields(logrus.Fields{
854 "Instance": wkr.instance.ID(),
855 "WorkerState": wkr.state,
857 logger.Info("instance disappeared in cloud")
858 if wp.mDisappearances != nil {
859 wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
861 delete(wp.workers, id)
869 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
877 func (wp *Pool) waitUntilLoaded() {
880 defer wp.mtx.RUnlock()
888 // Return a random string of n hexadecimal digits (n*4 random bits). n
890 func randomHex(n int) string {
891 buf := make([]byte, n/2)
892 _, err := rand.Read(buf)
896 return fmt.Sprintf("%x", buf)