ff5f762c1d225575f0ad4eeb1b69bb4de463d281
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "io"
9         "sort"
10         "strings"
11         "sync"
12         "time"
13
14         "git.curoverse.com/arvados.git/lib/cloud"
15         "git.curoverse.com/arvados.git/sdk/go/arvados"
16         "github.com/Sirupsen/logrus"
17         "github.com/prometheus/client_golang/prometheus"
18 )
19
20 const (
21         tagKeyInstanceType = "InstanceType"
22         tagKeyHold         = "Hold"
23 )
24
25 // An InstanceView shows a worker's current state and recent activity.
26 type InstanceView struct {
27         Instance             string
28         Price                float64
29         ArvadosInstanceType  string
30         ProviderInstanceType string
31         LastContainerUUID    string
32         LastBusy             time.Time
33         WorkerState          string
34 }
35
36 // An Executor executes shell commands on a remote host.
37 type Executor interface {
38         // Run cmd on the current target.
39         Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
40
41         // Use the given target for subsequent operations. The new
42         // target is the same host as the previous target, but it
43         // might return a different address and verify a different
44         // host key.
45         //
46         // SetTarget is called frequently, and in most cases the new
47         // target will behave exactly the same as the old one. An
48         // implementation should optimize accordingly.
49         //
50         // SetTarget must not block on concurrent Execute calls.
51         SetTarget(cloud.ExecutorTarget)
52
53         Close()
54 }
55
56 const (
57         defaultSyncInterval       = time.Minute
58         defaultProbeInterval      = time.Second * 10
59         defaultMaxProbesPerSecond = 10
60         defaultTimeoutIdle        = time.Minute
61         defaultTimeoutBooting     = time.Minute * 10
62         defaultTimeoutProbe       = time.Minute * 10
63         defaultTimeoutShutdown    = time.Second * 10
64 )
65
66 func duration(conf arvados.Duration, def time.Duration) time.Duration {
67         if conf > 0 {
68                 return time.Duration(conf)
69         } else {
70                 return def
71         }
72 }
73
74 // NewPool creates a Pool of workers backed by instanceSet.
75 //
76 // New instances are configured and set up according to the given
77 // cluster configuration.
78 func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
79         wp := &Pool{
80                 logger:             logger,
81                 instanceSet:        instanceSet,
82                 newExecutor:        newExecutor,
83                 bootProbeCommand:   cluster.CloudVMs.BootProbeCommand,
84                 imageID:            cloud.ImageID(cluster.CloudVMs.ImageID),
85                 instanceTypes:      cluster.InstanceTypes,
86                 maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
87                 probeInterval:      duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
88                 syncInterval:       duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
89                 timeoutIdle:        duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
90                 timeoutBooting:     duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
91                 timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
92                 timeoutShutdown:    duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
93                 stop:               make(chan bool),
94         }
95         wp.registerMetrics(reg)
96         go func() {
97                 wp.setupOnce.Do(wp.setup)
98                 go wp.runMetrics()
99                 go wp.runProbes()
100                 go wp.runSync()
101         }()
102         return wp
103 }
104
105 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
106 // zero Pool should not be used. Call NewPool to create a new Pool.
107 type Pool struct {
108         // configuration
109         logger             logrus.FieldLogger
110         instanceSet        cloud.InstanceSet
111         newExecutor        func(cloud.Instance) Executor
112         bootProbeCommand   string
113         imageID            cloud.ImageID
114         instanceTypes      map[string]arvados.InstanceType
115         syncInterval       time.Duration
116         probeInterval      time.Duration
117         maxProbesPerSecond int
118         timeoutIdle        time.Duration
119         timeoutBooting     time.Duration
120         timeoutProbe       time.Duration
121         timeoutShutdown    time.Duration
122
123         // private state
124         subscribers  map[<-chan struct{}]chan<- struct{}
125         creating     map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls
126         workers      map[cloud.InstanceID]*worker
127         loaded       bool                 // loaded list of instances from InstanceSet at least once
128         exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
129         atQuotaUntil time.Time
130         atQuotaErr   cloud.QuotaError
131         stop         chan bool
132         mtx          sync.RWMutex
133         setupOnce    sync.Once
134
135         mInstances         prometheus.Gauge
136         mContainersRunning prometheus.Gauge
137         mVCPUs             prometheus.Gauge
138         mVCPUsInuse        prometheus.Gauge
139         mMemory            prometheus.Gauge
140         mMemoryInuse       prometheus.Gauge
141 }
142
143 // Subscribe returns a channel that becomes ready whenever a worker's
144 // state changes.
145 //
146 // Example:
147 //
148 //      ch := wp.Subscribe()
149 //      defer wp.Unsubscribe(ch)
150 //      for range ch {
151 //              // ...try scheduling some work...
152 //              if done {
153 //                      break
154 //              }
155 //      }
156 func (wp *Pool) Subscribe() <-chan struct{} {
157         wp.setupOnce.Do(wp.setup)
158         wp.mtx.Lock()
159         defer wp.mtx.Unlock()
160         ch := make(chan struct{}, 1)
161         wp.subscribers[ch] = ch
162         return ch
163 }
164
165 // Unsubscribe stops sending updates to the given channel.
166 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
167         wp.setupOnce.Do(wp.setup)
168         wp.mtx.Lock()
169         defer wp.mtx.Unlock()
170         delete(wp.subscribers, ch)
171 }
172
173 // Unallocated returns the number of unallocated (creating + booting +
174 // idle + unknown) workers for each instance type.
175 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
176         wp.setupOnce.Do(wp.setup)
177         wp.mtx.RLock()
178         defer wp.mtx.RUnlock()
179         unalloc := map[arvados.InstanceType]int{}
180         creating := map[arvados.InstanceType]int{}
181         for it, times := range wp.creating {
182                 creating[it] = len(times)
183         }
184         for _, wkr := range wp.workers {
185                 if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) {
186                         continue
187                 }
188                 it := wkr.instType
189                 unalloc[it]++
190                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) {
191                         // If up to N new workers appear in
192                         // Instances() while we are waiting for N
193                         // Create() calls to complete, we assume we're
194                         // just seeing a race between Instances() and
195                         // Create() responses.
196                         //
197                         // The other common reason why nodes have
198                         // state==Unknown is that they appeared at
199                         // startup, before any Create calls. They
200                         // don't match the above timing condition, so
201                         // we never mistakenly attribute them to
202                         // pending Create calls.
203                         creating[it]--
204                 }
205         }
206         for it, c := range creating {
207                 unalloc[it] += c
208         }
209         return unalloc
210 }
211
212 // Create a new instance with the given type, and add it to the worker
213 // pool. The worker is added immediately; instance creation runs in
214 // the background.
215 func (wp *Pool) Create(it arvados.InstanceType) error {
216         logger := wp.logger.WithField("InstanceType", it.Name)
217         wp.setupOnce.Do(wp.setup)
218         wp.mtx.Lock()
219         defer wp.mtx.Unlock()
220         if time.Now().Before(wp.atQuotaUntil) {
221                 return wp.atQuotaErr
222         }
223         tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
224         now := time.Now()
225         wp.creating[it] = append(wp.creating[it], now)
226         go func() {
227                 defer wp.notify()
228                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
229                 wp.mtx.Lock()
230                 defer wp.mtx.Unlock()
231                 // Remove our timestamp marker from wp.creating
232                 for i, t := range wp.creating[it] {
233                         if t == now {
234                                 copy(wp.creating[it][i:], wp.creating[it][i+1:])
235                                 wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1]
236                                 break
237                         }
238                 }
239                 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
240                         wp.atQuotaErr = err
241                         wp.atQuotaUntil = time.Now().Add(time.Minute)
242                 }
243                 if err != nil {
244                         logger.WithError(err).Error("create failed")
245                         return
246                 }
247                 wp.updateWorker(inst, it, StateBooting)
248         }()
249         return nil
250 }
251
252 // AtQuota returns true if Create is not expected to work at the
253 // moment.
254 func (wp *Pool) AtQuota() bool {
255         wp.mtx.Lock()
256         defer wp.mtx.Unlock()
257         return time.Now().Before(wp.atQuotaUntil)
258 }
259
260 // Add or update worker attached to the given instance. Use
261 // initialState if a new worker is created.
262 //
263 // The second return value is true if a new worker is created.
264 //
265 // Caller must have lock.
266 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
267         id := inst.ID()
268         if wkr := wp.workers[id]; wkr != nil {
269                 wkr.executor.SetTarget(inst)
270                 wkr.instance = inst
271                 wkr.updated = time.Now()
272                 if initialState == StateBooting && wkr.state == StateUnknown {
273                         wkr.state = StateBooting
274                 }
275                 return wkr, false
276         }
277         if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
278                 initialState = StateHold
279         }
280         logger := wp.logger.WithFields(logrus.Fields{
281                 "InstanceType": it.Name,
282                 "Instance":     inst,
283         })
284         logger.WithField("State", initialState).Infof("instance appeared in cloud")
285         now := time.Now()
286         wkr := &worker{
287                 mtx:      &wp.mtx,
288                 wp:       wp,
289                 logger:   logger,
290                 executor: wp.newExecutor(inst),
291                 state:    initialState,
292                 instance: inst,
293                 instType: it,
294                 appeared: now,
295                 probed:   now,
296                 busy:     now,
297                 updated:  now,
298                 running:  make(map[string]struct{}),
299                 starting: make(map[string]struct{}),
300                 probing:  make(chan struct{}, 1),
301         }
302         wp.workers[id] = wkr
303         return wkr, true
304 }
305
306 // caller must have lock.
307 func (wp *Pool) notifyExited(uuid string, t time.Time) {
308         wp.exited[uuid] = t
309 }
310
311 // Shutdown shuts down a worker with the given type, or returns false
312 // if all workers with the given type are busy.
313 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
314         wp.setupOnce.Do(wp.setup)
315         wp.mtx.Lock()
316         defer wp.mtx.Unlock()
317         logger := wp.logger.WithField("InstanceType", it.Name)
318         logger.Info("shutdown requested")
319         for _, tryState := range []State{StateBooting, StateIdle} {
320                 // TODO: shutdown the worker with the longest idle
321                 // time (Idle) or the earliest create time (Booting)
322                 for _, wkr := range wp.workers {
323                         if wkr.state == tryState && wkr.instType == it {
324                                 logger.WithField("Instance", wkr.instance).Info("shutting down")
325                                 wkr.shutdown()
326                                 return true
327                         }
328                 }
329         }
330         return false
331 }
332
333 // CountWorkers returns the current number of workers in each state.
334 func (wp *Pool) CountWorkers() map[State]int {
335         wp.setupOnce.Do(wp.setup)
336         wp.mtx.Lock()
337         defer wp.mtx.Unlock()
338         r := map[State]int{}
339         for _, w := range wp.workers {
340                 r[w.state]++
341         }
342         return r
343 }
344
345 // Running returns the container UUIDs being prepared/run on workers.
346 func (wp *Pool) Running() map[string]time.Time {
347         wp.setupOnce.Do(wp.setup)
348         wp.mtx.Lock()
349         defer wp.mtx.Unlock()
350         r := map[string]time.Time{}
351         for _, wkr := range wp.workers {
352                 for uuid := range wkr.running {
353                         r[uuid] = time.Time{}
354                 }
355                 for uuid := range wkr.starting {
356                         r[uuid] = time.Time{}
357                 }
358         }
359         for uuid, exited := range wp.exited {
360                 r[uuid] = exited
361         }
362         return r
363 }
364
365 // StartContainer starts a container on an idle worker immediately if
366 // possible, otherwise returns false.
367 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
368         wp.setupOnce.Do(wp.setup)
369         wp.mtx.Lock()
370         defer wp.mtx.Unlock()
371         var wkr *worker
372         for _, w := range wp.workers {
373                 if w.instType == it && w.state == StateIdle {
374                         if wkr == nil || w.busy.After(wkr.busy) {
375                                 wkr = w
376                         }
377                 }
378         }
379         if wkr == nil {
380                 return false
381         }
382         wkr.startContainer(ctr)
383         return true
384 }
385
386 // KillContainer kills the crunch-run process for the given container
387 // UUID, if it's running on any worker.
388 //
389 // KillContainer returns immediately; the act of killing the container
390 // takes some time, and runs in the background.
391 func (wp *Pool) KillContainer(uuid string) {
392         wp.mtx.Lock()
393         defer wp.mtx.Unlock()
394         if _, ok := wp.exited[uuid]; ok {
395                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
396                 delete(wp.exited, uuid)
397                 return
398         }
399         for _, wkr := range wp.workers {
400                 if _, ok := wkr.running[uuid]; ok {
401                         go wp.kill(wkr, uuid)
402                         return
403                 }
404         }
405         wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
406 }
407
408 func (wp *Pool) kill(wkr *worker, uuid string) {
409         logger := wp.logger.WithFields(logrus.Fields{
410                 "ContainerUUID": uuid,
411                 "Instance":      wkr.instance,
412         })
413         logger.Debug("killing process")
414         stdout, stderr, err := wkr.executor.Execute("crunch-run --kill 15 "+uuid, nil)
415         if err != nil {
416                 logger.WithFields(logrus.Fields{
417                         "stderr": string(stderr),
418                         "stdout": string(stdout),
419                         "error":  err,
420                 }).Warn("kill failed")
421                 return
422         }
423         logger.Debug("killing process succeeded")
424         wp.mtx.Lock()
425         defer wp.mtx.Unlock()
426         if _, ok := wkr.running[uuid]; ok {
427                 delete(wkr.running, uuid)
428                 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
429                         wkr.state = StateIdle
430                 }
431                 wkr.updated = time.Now()
432                 go wp.notify()
433         }
434 }
435
436 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
437         if reg == nil {
438                 reg = prometheus.NewRegistry()
439         }
440         wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
441                 Namespace: "arvados",
442                 Subsystem: "dispatchcloud",
443                 Name:      "instances_total",
444                 Help:      "Number of cloud VMs including pending, booting, running, held, and shutting down.",
445         })
446         reg.MustRegister(wp.mInstances)
447         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
448                 Namespace: "arvados",
449                 Subsystem: "dispatchcloud",
450                 Name:      "containers_running",
451                 Help:      "Number of containers reported running by cloud VMs.",
452         })
453         reg.MustRegister(wp.mContainersRunning)
454
455         wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
456                 Namespace: "arvados",
457                 Subsystem: "dispatchcloud",
458                 Name:      "vcpus_total",
459                 Help:      "Total VCPUs on all cloud VMs.",
460         })
461         reg.MustRegister(wp.mVCPUs)
462         wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
463                 Namespace: "arvados",
464                 Subsystem: "dispatchcloud",
465                 Name:      "vcpus_inuse",
466                 Help:      "VCPUs on cloud VMs that are running containers.",
467         })
468         reg.MustRegister(wp.mVCPUsInuse)
469         wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
470                 Namespace: "arvados",
471                 Subsystem: "dispatchcloud",
472                 Name:      "memory_bytes_total",
473                 Help:      "Total memory on all cloud VMs.",
474         })
475         reg.MustRegister(wp.mMemory)
476         wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
477                 Namespace: "arvados",
478                 Subsystem: "dispatchcloud",
479                 Name:      "memory_bytes_inuse",
480                 Help:      "Memory on cloud VMs that are running containers.",
481         })
482         reg.MustRegister(wp.mMemoryInuse)
483 }
484
485 func (wp *Pool) runMetrics() {
486         ch := wp.Subscribe()
487         defer wp.Unsubscribe(ch)
488         for range ch {
489                 wp.updateMetrics()
490         }
491 }
492
493 func (wp *Pool) updateMetrics() {
494         wp.mtx.RLock()
495         defer wp.mtx.RUnlock()
496
497         var alloc, cpu, cpuInuse, mem, memInuse int64
498         for _, wkr := range wp.workers {
499                 cpu += int64(wkr.instType.VCPUs)
500                 mem += int64(wkr.instType.RAM)
501                 if len(wkr.running)+len(wkr.starting) == 0 {
502                         continue
503                 }
504                 alloc += int64(len(wkr.running) + len(wkr.starting))
505                 cpuInuse += int64(wkr.instType.VCPUs)
506                 memInuse += int64(wkr.instType.RAM)
507         }
508         wp.mInstances.Set(float64(len(wp.workers)))
509         wp.mContainersRunning.Set(float64(alloc))
510         wp.mVCPUs.Set(float64(cpu))
511         wp.mMemory.Set(float64(mem))
512         wp.mVCPUsInuse.Set(float64(cpuInuse))
513         wp.mMemoryInuse.Set(float64(memInuse))
514 }
515
516 func (wp *Pool) runProbes() {
517         maxPPS := wp.maxProbesPerSecond
518         if maxPPS < 1 {
519                 maxPPS = defaultMaxProbesPerSecond
520         }
521         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
522         defer limitticker.Stop()
523
524         probeticker := time.NewTicker(wp.probeInterval)
525         defer probeticker.Stop()
526
527         workers := []cloud.InstanceID{}
528         for range probeticker.C {
529                 workers = workers[:0]
530                 wp.mtx.Lock()
531                 for id, wkr := range wp.workers {
532                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
533                                 continue
534                         }
535                         workers = append(workers, id)
536                 }
537                 wp.mtx.Unlock()
538
539                 for _, id := range workers {
540                         wp.mtx.Lock()
541                         wkr, ok := wp.workers[id]
542                         wp.mtx.Unlock()
543                         if !ok {
544                                 // Deleted while we were probing
545                                 // others
546                                 continue
547                         }
548                         go wkr.ProbeAndUpdate()
549                         select {
550                         case <-wp.stop:
551                                 return
552                         case <-limitticker.C:
553                         }
554                 }
555         }
556 }
557
558 func (wp *Pool) runSync() {
559         // sync once immediately, then wait syncInterval, sync again,
560         // etc.
561         timer := time.NewTimer(1)
562         for {
563                 select {
564                 case <-timer.C:
565                         err := wp.getInstancesAndSync()
566                         if err != nil {
567                                 wp.logger.WithError(err).Warn("sync failed")
568                         }
569                         timer.Reset(wp.syncInterval)
570                 case <-wp.stop:
571                         wp.logger.Debug("worker.Pool stopped")
572                         return
573                 }
574         }
575 }
576
577 // Stop synchronizing with the InstanceSet.
578 func (wp *Pool) Stop() {
579         wp.setupOnce.Do(wp.setup)
580         close(wp.stop)
581 }
582
583 // Instances returns an InstanceView for each worker in the pool,
584 // summarizing its current state and recent activity.
585 func (wp *Pool) Instances() []InstanceView {
586         var r []InstanceView
587         wp.setupOnce.Do(wp.setup)
588         wp.mtx.Lock()
589         for _, w := range wp.workers {
590                 r = append(r, InstanceView{
591                         Instance:             w.instance.String(),
592                         Price:                w.instType.Price,
593                         ArvadosInstanceType:  w.instType.Name,
594                         ProviderInstanceType: w.instType.ProviderType,
595                         LastContainerUUID:    w.lastUUID,
596                         LastBusy:             w.busy,
597                         WorkerState:          w.state.String(),
598                 })
599         }
600         wp.mtx.Unlock()
601         sort.Slice(r, func(i, j int) bool {
602                 return strings.Compare(r[i].Instance, r[j].Instance) < 0
603         })
604         return r
605 }
606
607 func (wp *Pool) setup() {
608         wp.creating = map[arvados.InstanceType][]time.Time{}
609         wp.exited = map[string]time.Time{}
610         wp.workers = map[cloud.InstanceID]*worker{}
611         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
612 }
613
614 func (wp *Pool) notify() {
615         wp.mtx.RLock()
616         defer wp.mtx.RUnlock()
617         for _, send := range wp.subscribers {
618                 select {
619                 case send <- struct{}{}:
620                 default:
621                 }
622         }
623 }
624
625 func (wp *Pool) getInstancesAndSync() error {
626         wp.setupOnce.Do(wp.setup)
627         wp.logger.Debug("getting instance list")
628         threshold := time.Now()
629         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
630         if err != nil {
631                 return err
632         }
633         wp.sync(threshold, instances)
634         wp.logger.Debug("sync done")
635         return nil
636 }
637
638 // Add/remove/update workers based on instances, which was obtained
639 // from the instanceSet. However, don't clobber any other updates that
640 // already happened after threshold.
641 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
642         wp.mtx.Lock()
643         defer wp.mtx.Unlock()
644         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
645         notify := false
646
647         for _, inst := range instances {
648                 itTag := inst.Tags()[tagKeyInstanceType]
649                 it, ok := wp.instanceTypes[itTag]
650                 if !ok {
651                         wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
652                         continue
653                 }
654                 if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew {
655                         notify = true
656                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
657                         wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
658                         wkr.shutdown()
659                 }
660         }
661
662         for id, wkr := range wp.workers {
663                 if wkr.updated.After(threshold) {
664                         continue
665                 }
666                 logger := wp.logger.WithFields(logrus.Fields{
667                         "Instance":    wkr.instance,
668                         "WorkerState": wkr.state,
669                 })
670                 logger.Info("instance disappeared in cloud")
671                 delete(wp.workers, id)
672                 go wkr.executor.Close()
673                 notify = true
674         }
675
676         if !wp.loaded {
677                 wp.loaded = true
678                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
679         }
680
681         if notify {
682                 go wp.notify()
683         }
684 }