14360: Avoid overreporting instances during Create/List race.
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "io"
9         "sort"
10         "strings"
11         "sync"
12         "time"
13
14         "git.curoverse.com/arvados.git/lib/cloud"
15         "git.curoverse.com/arvados.git/sdk/go/arvados"
16         "github.com/Sirupsen/logrus"
17         "github.com/prometheus/client_golang/prometheus"
18 )
19
20 const (
21         tagKeyInstanceType = "InstanceType"
22         tagKeyHold         = "Hold"
23 )
24
25 // An InstanceView shows a worker's current state and recent activity.
26 type InstanceView struct {
27         Instance             string
28         Price                float64
29         ArvadosInstanceType  string
30         ProviderInstanceType string
31         LastContainerUUID    string
32         LastBusy             time.Time
33         WorkerState          string
34 }
35
36 // An Executor executes shell commands on a remote host.
37 type Executor interface {
38         // Run cmd on the current target.
39         Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
40
41         // Use the given target for subsequent operations. The new
42         // target is the same host as the previous target, but it
43         // might return a different address and verify a different
44         // host key.
45         //
46         // SetTarget is called frequently, and in most cases the new
47         // target will behave exactly the same as the old one. An
48         // implementation should optimize accordingly.
49         //
50         // SetTarget must not block on concurrent Execute calls.
51         SetTarget(cloud.ExecutorTarget)
52
53         Close()
54 }
55
56 const (
57         defaultSyncInterval       = time.Minute
58         defaultProbeInterval      = time.Second * 10
59         defaultMaxProbesPerSecond = 10
60         defaultTimeoutIdle        = time.Minute
61         defaultTimeoutBooting     = time.Minute * 10
62         defaultTimeoutProbe       = time.Minute * 10
63         defaultTimeoutShutdown    = time.Second * 10
64 )
65
66 func duration(conf arvados.Duration, def time.Duration) time.Duration {
67         if conf > 0 {
68                 return time.Duration(conf)
69         } else {
70                 return def
71         }
72 }
73
74 // NewPool creates a Pool of workers backed by instanceSet.
75 //
76 // New instances are configured and set up according to the given
77 // cluster configuration.
78 func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
79         wp := &Pool{
80                 logger:             logger,
81                 instanceSet:        instanceSet,
82                 newExecutor:        newExecutor,
83                 bootProbeCommand:   cluster.CloudVMs.BootProbeCommand,
84                 imageID:            cloud.ImageID(cluster.CloudVMs.ImageID),
85                 instanceTypes:      cluster.InstanceTypes,
86                 maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
87                 probeInterval:      duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
88                 syncInterval:       duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
89                 timeoutIdle:        duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
90                 timeoutBooting:     duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
91                 timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
92                 timeoutShutdown:    duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
93         }
94         wp.registerMetrics(reg)
95         go func() {
96                 wp.setupOnce.Do(wp.setup)
97                 go wp.runMetrics()
98                 go wp.runProbes()
99                 go wp.runSync()
100         }()
101         return wp
102 }
103
104 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
105 // zero Pool should not be used. Call NewPool to create a new Pool.
106 type Pool struct {
107         // configuration
108         logger             logrus.FieldLogger
109         instanceSet        cloud.InstanceSet
110         newExecutor        func(cloud.Instance) Executor
111         bootProbeCommand   string
112         imageID            cloud.ImageID
113         instanceTypes      map[string]arvados.InstanceType
114         syncInterval       time.Duration
115         probeInterval      time.Duration
116         maxProbesPerSecond int
117         timeoutIdle        time.Duration
118         timeoutBooting     time.Duration
119         timeoutProbe       time.Duration
120         timeoutShutdown    time.Duration
121
122         // private state
123         subscribers  map[<-chan struct{}]chan<- struct{}
124         creating     map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls
125         workers      map[cloud.InstanceID]*worker
126         loaded       bool                 // loaded list of instances from InstanceSet at least once
127         exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
128         atQuotaUntil time.Time
129         atQuotaErr   cloud.QuotaError
130         stop         chan bool
131         mtx          sync.RWMutex
132         setupOnce    sync.Once
133
134         mInstances         prometheus.Gauge
135         mContainersRunning prometheus.Gauge
136         mVCPUs             prometheus.Gauge
137         mVCPUsInuse        prometheus.Gauge
138         mMemory            prometheus.Gauge
139         mMemoryInuse       prometheus.Gauge
140 }
141
142 // Subscribe returns a channel that becomes ready whenever a worker's
143 // state changes.
144 //
145 // Example:
146 //
147 //      ch := wp.Subscribe()
148 //      defer wp.Unsubscribe(ch)
149 //      for range ch {
150 //              // ...try scheduling some work...
151 //              if done {
152 //                      break
153 //              }
154 //      }
155 func (wp *Pool) Subscribe() <-chan struct{} {
156         wp.setupOnce.Do(wp.setup)
157         wp.mtx.Lock()
158         defer wp.mtx.Unlock()
159         ch := make(chan struct{}, 1)
160         wp.subscribers[ch] = ch
161         return ch
162 }
163
164 // Unsubscribe stops sending updates to the given channel.
165 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
166         wp.setupOnce.Do(wp.setup)
167         wp.mtx.Lock()
168         defer wp.mtx.Unlock()
169         delete(wp.subscribers, ch)
170 }
171
172 // Unallocated returns the number of unallocated (creating + booting +
173 // idle + unknown) workers for each instance type.
174 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
175         wp.setupOnce.Do(wp.setup)
176         wp.mtx.RLock()
177         defer wp.mtx.RUnlock()
178         unalloc := map[arvados.InstanceType]int{}
179         creating := map[arvados.InstanceType]int{}
180         for it, times := range wp.creating {
181                 creating[it] = len(times)
182         }
183         for _, wkr := range wp.workers {
184                 if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) {
185                         continue
186                 }
187                 it := wkr.instType
188                 unalloc[it]++
189                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) {
190                         // If up to N new workers appear in
191                         // Instances() while we are waiting for N
192                         // Create() calls to complete, we assume we're
193                         // just seeing a race between Instances() and
194                         // Create() responses.
195                         //
196                         // The other common reason why nodes have
197                         // state==Unknown is that they appeared at
198                         // startup, before any Create calls. They
199                         // don't match the above timing condition, so
200                         // we never mistakenly attribute them to
201                         // pending Create calls.
202                         creating[it]--
203                 }
204         }
205         for it, c := range creating {
206                 unalloc[it] += c
207         }
208         return unalloc
209 }
210
211 // Create a new instance with the given type, and add it to the worker
212 // pool. The worker is added immediately; instance creation runs in
213 // the background.
214 func (wp *Pool) Create(it arvados.InstanceType) error {
215         logger := wp.logger.WithField("InstanceType", it.Name)
216         wp.setupOnce.Do(wp.setup)
217         wp.mtx.Lock()
218         defer wp.mtx.Unlock()
219         if time.Now().Before(wp.atQuotaUntil) {
220                 return wp.atQuotaErr
221         }
222         tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
223         now := time.Now()
224         wp.creating[it] = append(wp.creating[it], now)
225         go func() {
226                 defer wp.notify()
227                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
228                 wp.mtx.Lock()
229                 defer wp.mtx.Unlock()
230                 // Remove our timestamp marker from wp.creating
231                 for i, t := range wp.creating[it] {
232                         if t == now {
233                                 copy(wp.creating[it][i:], wp.creating[it][i+1:])
234                                 wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1]
235                                 break
236                         }
237                 }
238                 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
239                         wp.atQuotaErr = err
240                         wp.atQuotaUntil = time.Now().Add(time.Minute)
241                 }
242                 if err != nil {
243                         logger.WithError(err).Error("create failed")
244                         return
245                 }
246                 wp.updateWorker(inst, it, StateBooting)
247         }()
248         return nil
249 }
250
251 // AtQuota returns true if Create is not expected to work at the
252 // moment.
253 func (wp *Pool) AtQuota() bool {
254         wp.mtx.Lock()
255         defer wp.mtx.Unlock()
256         return time.Now().Before(wp.atQuotaUntil)
257 }
258
259 // Add or update worker attached to the given instance. Use
260 // initialState if a new worker is created.
261 //
262 // The second return value is true if a new worker is created.
263 //
264 // Caller must have lock.
265 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
266         id := inst.ID()
267         if wkr := wp.workers[id]; wkr != nil {
268                 wkr.executor.SetTarget(inst)
269                 wkr.instance = inst
270                 wkr.updated = time.Now()
271                 if initialState == StateBooting && wkr.state == StateUnknown {
272                         wkr.state = StateBooting
273                 }
274                 return wkr, false
275         }
276         if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
277                 initialState = StateHold
278         }
279         logger := wp.logger.WithFields(logrus.Fields{
280                 "InstanceType": it.Name,
281                 "Instance":     inst,
282         })
283         logger.WithField("State", initialState).Infof("instance appeared in cloud")
284         now := time.Now()
285         wkr := &worker{
286                 mtx:      &wp.mtx,
287                 wp:       wp,
288                 logger:   logger,
289                 executor: wp.newExecutor(inst),
290                 state:    initialState,
291                 instance: inst,
292                 instType: it,
293                 appeared: now,
294                 probed:   now,
295                 busy:     now,
296                 updated:  now,
297                 running:  make(map[string]struct{}),
298                 starting: make(map[string]struct{}),
299                 probing:  make(chan struct{}, 1),
300         }
301         wp.workers[id] = wkr
302         return wkr, true
303 }
304
305 // caller must have lock.
306 func (wp *Pool) notifyExited(uuid string, t time.Time) {
307         wp.exited[uuid] = t
308 }
309
310 // Shutdown shuts down a worker with the given type, or returns false
311 // if all workers with the given type are busy.
312 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
313         wp.setupOnce.Do(wp.setup)
314         wp.mtx.Lock()
315         defer wp.mtx.Unlock()
316         logger := wp.logger.WithField("InstanceType", it.Name)
317         logger.Info("shutdown requested")
318         for _, tryState := range []State{StateBooting, StateIdle} {
319                 // TODO: shutdown the worker with the longest idle
320                 // time (Idle) or the earliest create time (Booting)
321                 for _, wkr := range wp.workers {
322                         if wkr.state == tryState && wkr.instType == it {
323                                 logger.WithField("Instance", wkr.instance).Info("shutting down")
324                                 wkr.shutdown()
325                                 return true
326                         }
327                 }
328         }
329         return false
330 }
331
332 // CountWorkers returns the current number of workers in each state.
333 func (wp *Pool) CountWorkers() map[State]int {
334         wp.setupOnce.Do(wp.setup)
335         wp.mtx.Lock()
336         defer wp.mtx.Unlock()
337         r := map[State]int{}
338         for _, w := range wp.workers {
339                 r[w.state]++
340         }
341         return r
342 }
343
344 // Running returns the container UUIDs being prepared/run on workers.
345 func (wp *Pool) Running() map[string]time.Time {
346         wp.setupOnce.Do(wp.setup)
347         wp.mtx.Lock()
348         defer wp.mtx.Unlock()
349         r := map[string]time.Time{}
350         for _, wkr := range wp.workers {
351                 for uuid := range wkr.running {
352                         r[uuid] = time.Time{}
353                 }
354                 for uuid := range wkr.starting {
355                         r[uuid] = time.Time{}
356                 }
357         }
358         for uuid, exited := range wp.exited {
359                 r[uuid] = exited
360         }
361         return r
362 }
363
364 // StartContainer starts a container on an idle worker immediately if
365 // possible, otherwise returns false.
366 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
367         wp.setupOnce.Do(wp.setup)
368         wp.mtx.Lock()
369         defer wp.mtx.Unlock()
370         var wkr *worker
371         for _, w := range wp.workers {
372                 if w.instType == it && w.state == StateIdle {
373                         if wkr == nil || w.busy.After(wkr.busy) {
374                                 wkr = w
375                         }
376                 }
377         }
378         if wkr == nil {
379                 return false
380         }
381         wkr.startContainer(ctr)
382         return true
383 }
384
385 // KillContainer kills the crunch-run process for the given container
386 // UUID, if it's running on any worker.
387 //
388 // KillContainer returns immediately; the act of killing the container
389 // takes some time, and runs in the background.
390 func (wp *Pool) KillContainer(uuid string) {
391         wp.mtx.Lock()
392         defer wp.mtx.Unlock()
393         if _, ok := wp.exited[uuid]; ok {
394                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
395                 delete(wp.exited, uuid)
396                 return
397         }
398         for _, wkr := range wp.workers {
399                 if _, ok := wkr.running[uuid]; ok {
400                         go wp.kill(wkr, uuid)
401                         return
402                 }
403         }
404         wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
405 }
406
407 func (wp *Pool) kill(wkr *worker, uuid string) {
408         logger := wp.logger.WithFields(logrus.Fields{
409                 "ContainerUUID": uuid,
410                 "Instance":      wkr.instance,
411         })
412         logger.Debug("killing process")
413         stdout, stderr, err := wkr.executor.Execute("crunch-run --kill 15 "+uuid, nil)
414         if err != nil {
415                 logger.WithFields(logrus.Fields{
416                         "stderr": string(stderr),
417                         "stdout": string(stdout),
418                         "error":  err,
419                 }).Warn("kill failed")
420                 return
421         }
422         logger.Debug("killing process succeeded")
423         wp.mtx.Lock()
424         defer wp.mtx.Unlock()
425         if _, ok := wkr.running[uuid]; ok {
426                 delete(wkr.running, uuid)
427                 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
428                         wkr.state = StateIdle
429                 }
430                 wkr.updated = time.Now()
431                 go wp.notify()
432         }
433 }
434
435 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
436         if reg == nil {
437                 reg = prometheus.NewRegistry()
438         }
439         wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
440                 Namespace: "arvados",
441                 Subsystem: "dispatchcloud",
442                 Name:      "instances_total",
443                 Help:      "Number of cloud VMs including pending, booting, running, held, and shutting down.",
444         })
445         reg.MustRegister(wp.mInstances)
446         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
447                 Namespace: "arvados",
448                 Subsystem: "dispatchcloud",
449                 Name:      "containers_running",
450                 Help:      "Number of containers reported running by cloud VMs.",
451         })
452         reg.MustRegister(wp.mContainersRunning)
453
454         wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
455                 Namespace: "arvados",
456                 Subsystem: "dispatchcloud",
457                 Name:      "vcpus_total",
458                 Help:      "Total VCPUs on all cloud VMs.",
459         })
460         reg.MustRegister(wp.mVCPUs)
461         wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
462                 Namespace: "arvados",
463                 Subsystem: "dispatchcloud",
464                 Name:      "vcpus_inuse",
465                 Help:      "VCPUs on cloud VMs that are running containers.",
466         })
467         reg.MustRegister(wp.mVCPUsInuse)
468         wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
469                 Namespace: "arvados",
470                 Subsystem: "dispatchcloud",
471                 Name:      "memory_bytes_total",
472                 Help:      "Total memory on all cloud VMs.",
473         })
474         reg.MustRegister(wp.mMemory)
475         wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
476                 Namespace: "arvados",
477                 Subsystem: "dispatchcloud",
478                 Name:      "memory_bytes_inuse",
479                 Help:      "Memory on cloud VMs that are running containers.",
480         })
481         reg.MustRegister(wp.mMemoryInuse)
482 }
483
484 func (wp *Pool) runMetrics() {
485         ch := wp.Subscribe()
486         defer wp.Unsubscribe(ch)
487         for range ch {
488                 wp.updateMetrics()
489         }
490 }
491
492 func (wp *Pool) updateMetrics() {
493         wp.mtx.RLock()
494         defer wp.mtx.RUnlock()
495
496         var alloc, cpu, cpuInuse, mem, memInuse int64
497         for _, wkr := range wp.workers {
498                 cpu += int64(wkr.instType.VCPUs)
499                 mem += int64(wkr.instType.RAM)
500                 if len(wkr.running)+len(wkr.starting) == 0 {
501                         continue
502                 }
503                 alloc += int64(len(wkr.running) + len(wkr.starting))
504                 cpuInuse += int64(wkr.instType.VCPUs)
505                 memInuse += int64(wkr.instType.RAM)
506         }
507         wp.mInstances.Set(float64(len(wp.workers)))
508         wp.mContainersRunning.Set(float64(alloc))
509         wp.mVCPUs.Set(float64(cpu))
510         wp.mMemory.Set(float64(mem))
511         wp.mVCPUsInuse.Set(float64(cpuInuse))
512         wp.mMemoryInuse.Set(float64(memInuse))
513 }
514
515 func (wp *Pool) runProbes() {
516         maxPPS := wp.maxProbesPerSecond
517         if maxPPS < 1 {
518                 maxPPS = defaultMaxProbesPerSecond
519         }
520         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
521         defer limitticker.Stop()
522
523         probeticker := time.NewTicker(wp.probeInterval)
524         defer probeticker.Stop()
525
526         workers := []cloud.InstanceID{}
527         for range probeticker.C {
528                 workers = workers[:0]
529                 wp.mtx.Lock()
530                 for id, wkr := range wp.workers {
531                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
532                                 continue
533                         }
534                         workers = append(workers, id)
535                 }
536                 wp.mtx.Unlock()
537
538                 for _, id := range workers {
539                         wp.mtx.Lock()
540                         wkr, ok := wp.workers[id]
541                         wp.mtx.Unlock()
542                         if !ok {
543                                 // Deleted while we were probing
544                                 // others
545                                 continue
546                         }
547                         go wkr.ProbeAndUpdate()
548                         select {
549                         case <-wp.stop:
550                                 return
551                         case <-limitticker.C:
552                         }
553                 }
554         }
555 }
556
557 func (wp *Pool) runSync() {
558         // sync once immediately, then wait syncInterval, sync again,
559         // etc.
560         timer := time.NewTimer(1)
561         for {
562                 select {
563                 case <-timer.C:
564                         err := wp.getInstancesAndSync()
565                         if err != nil {
566                                 wp.logger.WithError(err).Warn("sync failed")
567                         }
568                         timer.Reset(wp.syncInterval)
569                 case <-wp.stop:
570                         wp.logger.Debug("worker.Pool stopped")
571                         return
572                 }
573         }
574 }
575
576 // Stop synchronizing with the InstanceSet.
577 func (wp *Pool) Stop() {
578         wp.setupOnce.Do(wp.setup)
579         close(wp.stop)
580 }
581
582 // Instances returns an InstanceView for each worker in the pool,
583 // summarizing its current state and recent activity.
584 func (wp *Pool) Instances() []InstanceView {
585         var r []InstanceView
586         wp.setupOnce.Do(wp.setup)
587         wp.mtx.Lock()
588         for _, w := range wp.workers {
589                 r = append(r, InstanceView{
590                         Instance:             w.instance.String(),
591                         Price:                w.instType.Price,
592                         ArvadosInstanceType:  w.instType.Name,
593                         ProviderInstanceType: w.instType.ProviderType,
594                         LastContainerUUID:    w.lastUUID,
595                         LastBusy:             w.busy,
596                         WorkerState:          w.state.String(),
597                 })
598         }
599         wp.mtx.Unlock()
600         sort.Slice(r, func(i, j int) bool {
601                 return strings.Compare(r[i].Instance, r[j].Instance) < 0
602         })
603         return r
604 }
605
606 func (wp *Pool) setup() {
607         wp.creating = map[arvados.InstanceType][]time.Time{}
608         wp.exited = map[string]time.Time{}
609         wp.workers = map[cloud.InstanceID]*worker{}
610         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
611 }
612
613 func (wp *Pool) notify() {
614         wp.mtx.RLock()
615         defer wp.mtx.RUnlock()
616         for _, send := range wp.subscribers {
617                 select {
618                 case send <- struct{}{}:
619                 default:
620                 }
621         }
622 }
623
624 func (wp *Pool) getInstancesAndSync() error {
625         wp.setupOnce.Do(wp.setup)
626         wp.logger.Debug("getting instance list")
627         threshold := time.Now()
628         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
629         if err != nil {
630                 return err
631         }
632         wp.sync(threshold, instances)
633         wp.logger.Debug("sync done")
634         return nil
635 }
636
637 // Add/remove/update workers based on instances, which was obtained
638 // from the instanceSet. However, don't clobber any other updates that
639 // already happened after threshold.
640 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
641         wp.mtx.Lock()
642         defer wp.mtx.Unlock()
643         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
644         notify := false
645
646         for _, inst := range instances {
647                 itTag := inst.Tags()[tagKeyInstanceType]
648                 it, ok := wp.instanceTypes[itTag]
649                 if !ok {
650                         wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
651                         continue
652                 }
653                 if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew {
654                         notify = true
655                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
656                         wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
657                         wkr.shutdown()
658                 }
659         }
660
661         for id, wkr := range wp.workers {
662                 if wkr.updated.After(threshold) {
663                         continue
664                 }
665                 logger := wp.logger.WithFields(logrus.Fields{
666                         "Instance":    wkr.instance,
667                         "WorkerState": wkr.state,
668                 })
669                 logger.Info("instance disappeared in cloud")
670                 delete(wp.workers, id)
671                 go wkr.executor.Close()
672                 notify = true
673         }
674
675         if !wp.loaded {
676                 wp.loaded = true
677                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
678         }
679
680         if notify {
681                 go wp.notify()
682         }
683 }