1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
17 "git.arvados.org/arvados.git/lib/cloud"
18 "git.arvados.org/arvados.git/sdk/go/arvados"
19 "github.com/prometheus/client_golang/prometheus"
20 "github.com/sirupsen/logrus"
21 "golang.org/x/crypto/ssh"
25 tagKeyInstanceType = "InstanceType"
26 tagKeyIdleBehavior = "IdleBehavior"
27 tagKeyInstanceSecret = "InstanceSecret"
28 tagKeyInstanceSetID = "InstanceSetID"
31 // An InstanceView shows a worker's current state and recent activity.
32 type InstanceView struct {
33 Instance cloud.InstanceID `json:"instance"`
34 Address string `json:"address"`
35 Price float64 `json:"price"`
36 ArvadosInstanceType string `json:"arvados_instance_type"`
37 ProviderInstanceType string `json:"provider_instance_type"`
38 LastContainerUUID string `json:"last_container_uuid"`
39 LastBusy time.Time `json:"last_busy"`
40 WorkerState string `json:"worker_state"`
41 IdleBehavior IdleBehavior `json:"idle_behavior"`
44 // An Executor executes shell commands on a remote host.
45 type Executor interface {
46 // Run cmd on the current target.
47 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
49 // Use the given target for subsequent operations. The new
50 // target is the same host as the previous target, but it
51 // might return a different address and verify a different
54 // SetTarget is called frequently, and in most cases the new
55 // target will behave exactly the same as the old one. An
56 // implementation should optimize accordingly.
58 // SetTarget must not block on concurrent Execute calls.
59 SetTarget(cloud.ExecutorTarget)
65 defaultSyncInterval = time.Minute
66 defaultProbeInterval = time.Second * 10
67 defaultMaxProbesPerSecond = 10
68 defaultTimeoutIdle = time.Minute
69 defaultTimeoutBooting = time.Minute * 10
70 defaultTimeoutProbe = time.Minute * 10
71 defaultTimeoutShutdown = time.Second * 10
72 defaultTimeoutTERM = time.Minute * 2
73 defaultTimeoutSignal = time.Second * 5
75 // Time after a quota error to try again anyway, even if no
76 // instances have been shutdown.
77 quotaErrorTTL = time.Minute
79 // Time between "X failed because rate limiting" messages
80 logRateLimitErrorInterval = time.Second * 10
83 func duration(conf arvados.Duration, def time.Duration) time.Duration {
85 return time.Duration(conf)
91 // NewPool creates a Pool of workers backed by instanceSet.
93 // New instances are configured and set up according to the given
94 // cluster configuration.
95 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
99 instanceSetID: instanceSetID,
100 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
101 newExecutor: newExecutor,
102 bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
103 imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
104 instanceTypes: cluster.InstanceTypes,
105 maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
106 probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
107 syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
108 timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
109 timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
110 timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
111 timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
112 timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
113 timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
114 installPublicKey: installPublicKey,
115 tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
116 stop: make(chan bool),
118 wp.registerMetrics(reg)
120 wp.setupOnce.Do(wp.setup)
128 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
129 // zero Pool should not be used. Call NewPool to create a new Pool.
132 logger logrus.FieldLogger
133 arvClient *arvados.Client
134 instanceSetID cloud.InstanceSetID
135 instanceSet *throttledInstanceSet
136 newExecutor func(cloud.Instance) Executor
137 bootProbeCommand string
138 imageID cloud.ImageID
139 instanceTypes map[string]arvados.InstanceType
140 syncInterval time.Duration
141 probeInterval time.Duration
142 maxProbesPerSecond int
143 timeoutIdle time.Duration
144 timeoutBooting time.Duration
145 timeoutProbe time.Duration
146 timeoutShutdown time.Duration
147 timeoutTERM time.Duration
148 timeoutSignal time.Duration
149 installPublicKey ssh.PublicKey
153 subscribers map[<-chan struct{}]chan<- struct{}
154 creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
155 workers map[cloud.InstanceID]*worker
156 loaded bool // loaded list of instances from InstanceSet at least once
157 exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
158 atQuotaUntil time.Time
159 atQuotaErr cloud.QuotaError
164 throttleCreate throttle
165 throttleInstances throttle
167 mContainersRunning prometheus.Gauge
168 mInstances *prometheus.GaugeVec
169 mInstancesPrice *prometheus.GaugeVec
170 mVCPUs *prometheus.GaugeVec
171 mMemory *prometheus.GaugeVec
172 mDisappearances *prometheus.CounterVec
175 type createCall struct {
177 instanceType arvados.InstanceType
180 // Subscribe returns a buffered channel that becomes ready after any
181 // change to the pool's state that could have scheduling implications:
182 // a worker's state changes, a new worker appears, the cloud
183 // provider's API rate limiting period ends, etc.
185 // Additional events that occur while the channel is already ready
186 // will be dropped, so it is OK if the caller services the channel
191 // ch := wp.Subscribe()
192 // defer wp.Unsubscribe(ch)
199 func (wp *Pool) Subscribe() <-chan struct{} {
200 wp.setupOnce.Do(wp.setup)
202 defer wp.mtx.Unlock()
203 ch := make(chan struct{}, 1)
204 wp.subscribers[ch] = ch
208 // Unsubscribe stops sending updates to the given channel.
209 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
210 wp.setupOnce.Do(wp.setup)
212 defer wp.mtx.Unlock()
213 delete(wp.subscribers, ch)
216 // Unallocated returns the number of unallocated (creating + booting +
217 // idle + unknown) workers for each instance type. Workers in
218 // hold/drain mode are not included.
219 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
220 wp.setupOnce.Do(wp.setup)
222 defer wp.mtx.RUnlock()
223 unalloc := map[arvados.InstanceType]int{}
224 creating := map[arvados.InstanceType]int{}
225 oldestCreate := map[arvados.InstanceType]time.Time{}
226 for _, cc := range wp.creating {
227 it := cc.instanceType
229 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
230 oldestCreate[it] = cc.time
233 for _, wkr := range wp.workers {
234 // Skip workers that are not expected to become
235 // available soon. Note len(wkr.running)>0 is not
236 // redundant here: it can be true even in
238 if wkr.state == StateShutdown ||
239 wkr.state == StateRunning ||
240 wkr.idleBehavior != IdleBehaviorRun ||
241 len(wkr.running) > 0 {
246 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
247 // If up to N new workers appear in
248 // Instances() while we are waiting for N
249 // Create() calls to complete, we assume we're
250 // just seeing a race between Instances() and
251 // Create() responses.
253 // The other common reason why nodes have
254 // state==Unknown is that they appeared at
255 // startup, before any Create calls. They
256 // don't match the above timing condition, so
257 // we never mistakenly attribute them to
258 // pending Create calls.
262 for it, c := range creating {
268 // Create a new instance with the given type, and add it to the worker
269 // pool. The worker is added immediately; instance creation runs in
272 // Create returns false if a pre-existing error state prevents it from
273 // even attempting to create a new instance. Those errors are logged
274 // by the Pool, so the caller does not need to log anything in such
276 func (wp *Pool) Create(it arvados.InstanceType) bool {
277 logger := wp.logger.WithField("InstanceType", it.Name)
278 wp.setupOnce.Do(wp.setup)
280 defer wp.mtx.Unlock()
281 if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
285 secret := randomHex(instanceSecretLength)
286 wp.creating[secret] = createCall{time: now, instanceType: it}
289 tags := cloud.InstanceTags{
290 wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID),
291 wp.tagKeyPrefix + tagKeyInstanceType: it.Name,
292 wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
293 wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
295 initCmd := TagVerifier{nil, secret}.InitCommand()
296 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
298 defer wp.mtx.Unlock()
299 // delete() is deferred so the updateWorker() call
300 // below knows to use StateBooting when adding a new
302 defer delete(wp.creating, secret)
304 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
306 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
307 time.AfterFunc(quotaErrorTTL, wp.notify)
309 logger.WithError(err).Error("create failed")
310 wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
313 wp.updateWorker(inst, it)
318 // AtQuota returns true if Create is not expected to work at the
320 func (wp *Pool) AtQuota() bool {
322 defer wp.mtx.Unlock()
323 return time.Now().Before(wp.atQuotaUntil)
326 // SetIdleBehavior determines how the indicated instance will behave
327 // when it has no containers running.
328 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
330 defer wp.mtx.Unlock()
331 wkr, ok := wp.workers[id]
333 return errors.New("requested instance does not exist")
335 wkr.setIdleBehavior(idleBehavior)
339 // Add or update worker attached to the given instance.
341 // The second return value is true if a new worker is created.
343 // A newly added instance has state=StateBooting if its tags match an
344 // entry in wp.creating, otherwise StateUnknown.
346 // Caller must have lock.
347 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
348 secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
349 inst = TagVerifier{inst, secret}
351 if wkr := wp.workers[id]; wkr != nil {
352 wkr.executor.SetTarget(inst)
354 wkr.updated = time.Now()
359 state := StateUnknown
360 if _, ok := wp.creating[secret]; ok {
364 // If an instance has a valid IdleBehavior tag when it first
365 // appears, initialize the new worker accordingly (this is how
366 // we restore IdleBehavior that was set by a prior dispatch
367 // process); otherwise, default to "run". After this,
368 // wkr.idleBehavior is the source of truth, and will only be
369 // changed via SetIdleBehavior().
370 idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
371 if !validIdleBehavior[idleBehavior] {
372 idleBehavior = IdleBehaviorRun
375 logger := wp.logger.WithFields(logrus.Fields{
376 "InstanceType": it.Name,
377 "Instance": inst.ID(),
378 "Address": inst.Address(),
380 logger.WithFields(logrus.Fields{
382 "IdleBehavior": idleBehavior,
383 }).Infof("instance appeared in cloud")
389 executor: wp.newExecutor(inst),
391 idleBehavior: idleBehavior,
398 running: make(map[string]*remoteRunner),
399 starting: make(map[string]*remoteRunner),
400 probing: make(chan struct{}, 1),
406 // Shutdown shuts down a worker with the given type, or returns false
407 // if all workers with the given type are busy.
408 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
409 wp.setupOnce.Do(wp.setup)
411 defer wp.mtx.Unlock()
412 logger := wp.logger.WithField("InstanceType", it.Name)
413 logger.Info("shutdown requested")
414 for _, tryState := range []State{StateBooting, StateIdle} {
415 // TODO: shutdown the worker with the longest idle
416 // time (Idle) or the earliest create time (Booting)
417 for _, wkr := range wp.workers {
418 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
419 logger.WithField("Instance", wkr.instance).Info("shutting down")
428 // CountWorkers returns the current number of workers in each state.
430 // CountWorkers blocks, if necessary, until the initial instance list
431 // has been loaded from the cloud provider.
432 func (wp *Pool) CountWorkers() map[State]int {
433 wp.setupOnce.Do(wp.setup)
436 defer wp.mtx.Unlock()
438 for _, w := range wp.workers {
444 // Running returns the container UUIDs being prepared/run on workers.
446 // In the returned map, the time value indicates when the Pool
447 // observed that the container process had exited. A container that
448 // has not yet exited has a zero time value. The caller should use
449 // ForgetContainer() to garbage-collect the entries for exited
451 func (wp *Pool) Running() map[string]time.Time {
452 wp.setupOnce.Do(wp.setup)
454 defer wp.mtx.Unlock()
455 r := map[string]time.Time{}
456 for _, wkr := range wp.workers {
457 for uuid := range wkr.running {
458 r[uuid] = time.Time{}
460 for uuid := range wkr.starting {
461 r[uuid] = time.Time{}
464 for uuid, exited := range wp.exited {
470 // StartContainer starts a container on an idle worker immediately if
471 // possible, otherwise returns false.
472 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
473 wp.setupOnce.Do(wp.setup)
475 defer wp.mtx.Unlock()
477 for _, w := range wp.workers {
478 if w.instType == it && w.state == StateIdle {
479 if wkr == nil || w.busy.After(wkr.busy) {
487 wkr.startContainer(ctr)
491 // KillContainer kills the crunch-run process for the given container
492 // UUID, if it's running on any worker.
494 // KillContainer returns immediately; the act of killing the container
495 // takes some time, and runs in the background.
497 // KillContainer returns false if the container has already ended.
498 func (wp *Pool) KillContainer(uuid string, reason string) bool {
500 defer wp.mtx.Unlock()
501 logger := wp.logger.WithFields(logrus.Fields{
502 "ContainerUUID": uuid,
505 for _, wkr := range wp.workers {
506 rr := wkr.running[uuid]
508 rr = wkr.starting[uuid]
515 logger.Debug("cannot kill: already disappeared")
519 // ForgetContainer clears the placeholder for the given exited
520 // container, so it isn't returned by subsequent calls to Running().
522 // ForgetContainer has no effect if the container has not yet exited.
524 // The "container exited at time T" placeholder (which necessitates
525 // ForgetContainer) exists to make it easier for the caller
526 // (scheduler) to distinguish a container that exited without
527 // finalizing its state from a container that exited too recently for
528 // its final state to have appeared in the scheduler's queue cache.
529 func (wp *Pool) ForgetContainer(uuid string) {
531 defer wp.mtx.Unlock()
532 if _, ok := wp.exited[uuid]; ok {
533 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
534 delete(wp.exited, uuid)
538 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
540 reg = prometheus.NewRegistry()
542 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
543 Namespace: "arvados",
544 Subsystem: "dispatchcloud",
545 Name: "containers_running",
546 Help: "Number of containers reported running by cloud VMs.",
548 reg.MustRegister(wp.mContainersRunning)
549 wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
550 Namespace: "arvados",
551 Subsystem: "dispatchcloud",
552 Name: "instances_total",
553 Help: "Number of cloud VMs.",
554 }, []string{"category"})
555 reg.MustRegister(wp.mInstances)
556 wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
557 Namespace: "arvados",
558 Subsystem: "dispatchcloud",
559 Name: "instances_price",
560 Help: "Price of cloud VMs.",
561 }, []string{"category"})
562 reg.MustRegister(wp.mInstancesPrice)
563 wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
564 Namespace: "arvados",
565 Subsystem: "dispatchcloud",
567 Help: "Total VCPUs on all cloud VMs.",
568 }, []string{"category"})
569 reg.MustRegister(wp.mVCPUs)
570 wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
571 Namespace: "arvados",
572 Subsystem: "dispatchcloud",
573 Name: "memory_bytes_total",
574 Help: "Total memory on all cloud VMs.",
575 }, []string{"category"})
576 reg.MustRegister(wp.mMemory)
577 wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
578 Namespace: "arvados",
579 Subsystem: "dispatchcloud",
580 Name: "instances_disappeared",
581 Help: "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
582 }, []string{"state"})
583 for _, v := range stateString {
584 wp.mDisappearances.WithLabelValues(v).Add(0)
586 reg.MustRegister(wp.mDisappearances)
589 func (wp *Pool) runMetrics() {
591 defer wp.Unsubscribe(ch)
598 func (wp *Pool) updateMetrics() {
600 defer wp.mtx.RUnlock()
602 instances := map[string]int64{}
603 price := map[string]float64{}
604 cpu := map[string]int64{}
605 mem := map[string]int64{}
607 for _, wkr := range wp.workers {
610 case len(wkr.running)+len(wkr.starting) > 0:
612 case wkr.idleBehavior == IdleBehaviorHold:
614 case wkr.state == StateBooting:
616 case wkr.state == StateUnknown:
622 price[cat] += wkr.instType.Price
623 cpu[cat] += int64(wkr.instType.VCPUs)
624 mem[cat] += int64(wkr.instType.RAM)
625 running += int64(len(wkr.running) + len(wkr.starting))
627 for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
628 wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat]))
629 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
630 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
631 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
633 wp.mContainersRunning.Set(float64(running))
636 func (wp *Pool) runProbes() {
637 maxPPS := wp.maxProbesPerSecond
639 maxPPS = defaultMaxProbesPerSecond
641 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
642 defer limitticker.Stop()
644 probeticker := time.NewTicker(wp.probeInterval)
645 defer probeticker.Stop()
647 workers := []cloud.InstanceID{}
648 for range probeticker.C {
649 workers = workers[:0]
651 for id, wkr := range wp.workers {
652 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
655 workers = append(workers, id)
659 for _, id := range workers {
661 wkr, ok := wp.workers[id]
664 // Deleted while we were probing
668 go wkr.ProbeAndUpdate()
672 case <-limitticker.C:
678 func (wp *Pool) runSync() {
679 // sync once immediately, then wait syncInterval, sync again,
681 timer := time.NewTimer(1)
685 err := wp.getInstancesAndSync()
687 wp.logger.WithError(err).Warn("sync failed")
689 timer.Reset(wp.syncInterval)
691 wp.logger.Debug("worker.Pool stopped")
697 // Stop synchronizing with the InstanceSet.
698 func (wp *Pool) Stop() {
699 wp.setupOnce.Do(wp.setup)
703 // Instances returns an InstanceView for each worker in the pool,
704 // summarizing its current state and recent activity.
705 func (wp *Pool) Instances() []InstanceView {
707 wp.setupOnce.Do(wp.setup)
709 for _, w := range wp.workers {
710 r = append(r, InstanceView{
711 Instance: w.instance.ID(),
712 Address: w.instance.Address(),
713 Price: w.instType.Price,
714 ArvadosInstanceType: w.instType.Name,
715 ProviderInstanceType: w.instType.ProviderType,
716 LastContainerUUID: w.lastUUID,
718 WorkerState: w.state.String(),
719 IdleBehavior: w.idleBehavior,
723 sort.Slice(r, func(i, j int) bool {
724 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
729 // KillInstance destroys a cloud VM instance. It returns an error if
730 // the given instance does not exist.
731 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
732 wkr, ok := wp.workers[id]
734 return errors.New("instance not found")
736 wkr.logger.WithField("Reason", reason).Info("shutting down")
741 func (wp *Pool) setup() {
742 wp.creating = map[string]createCall{}
743 wp.exited = map[string]time.Time{}
744 wp.workers = map[cloud.InstanceID]*worker{}
745 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
748 func (wp *Pool) notify() {
750 defer wp.mtx.RUnlock()
751 for _, send := range wp.subscribers {
753 case send <- struct{}{}:
759 func (wp *Pool) getInstancesAndSync() error {
760 wp.setupOnce.Do(wp.setup)
761 if err := wp.instanceSet.throttleInstances.Error(); err != nil {
764 wp.logger.Debug("getting instance list")
765 threshold := time.Now()
766 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
768 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
771 wp.sync(threshold, instances)
772 wp.logger.Debug("sync done")
776 // Add/remove/update workers based on instances, which was obtained
777 // from the instanceSet. However, don't clobber any other updates that
778 // already happened after threshold.
779 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
781 defer wp.mtx.Unlock()
782 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
785 for _, inst := range instances {
786 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
787 it, ok := wp.instanceTypes[itTag]
789 wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
792 if wkr, isNew := wp.updateWorker(inst, it); isNew {
794 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
795 wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
800 for id, wkr := range wp.workers {
801 if wkr.updated.After(threshold) {
804 logger := wp.logger.WithFields(logrus.Fields{
805 "Instance": wkr.instance.ID(),
806 "WorkerState": wkr.state,
808 logger.Info("instance disappeared in cloud")
809 if wp.mDisappearances != nil {
810 wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
812 delete(wp.workers, id)
820 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
828 func (wp *Pool) waitUntilLoaded() {
831 defer wp.mtx.RUnlock()
839 // Return a random string of n hexadecimal digits (n*4 random bits). n
841 func randomHex(n int) string {
842 buf := make([]byte, n/2)
843 _, err := rand.Read(buf)
847 return fmt.Sprintf("%x", buf)