14325: Propagate API env vars to crunch-run.
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "io"
9         "sort"
10         "strings"
11         "sync"
12         "time"
13
14         "git.curoverse.com/arvados.git/lib/cloud"
15         "git.curoverse.com/arvados.git/sdk/go/arvados"
16         "github.com/prometheus/client_golang/prometheus"
17         "github.com/sirupsen/logrus"
18 )
19
20 const (
21         tagKeyInstanceType = "InstanceType"
22         tagKeyHold         = "Hold"
23 )
24
25 // An InstanceView shows a worker's current state and recent activity.
26 type InstanceView struct {
27         Instance             string
28         Price                float64
29         ArvadosInstanceType  string
30         ProviderInstanceType string
31         LastContainerUUID    string
32         LastBusy             time.Time
33         WorkerState          string
34 }
35
36 // An Executor executes shell commands on a remote host.
37 type Executor interface {
38         // Run cmd on the current target.
39         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
40
41         // Use the given target for subsequent operations. The new
42         // target is the same host as the previous target, but it
43         // might return a different address and verify a different
44         // host key.
45         //
46         // SetTarget is called frequently, and in most cases the new
47         // target will behave exactly the same as the old one. An
48         // implementation should optimize accordingly.
49         //
50         // SetTarget must not block on concurrent Execute calls.
51         SetTarget(cloud.ExecutorTarget)
52
53         Close()
54 }
55
56 const (
57         defaultSyncInterval       = time.Minute
58         defaultProbeInterval      = time.Second * 10
59         defaultMaxProbesPerSecond = 10
60         defaultTimeoutIdle        = time.Minute
61         defaultTimeoutBooting     = time.Minute * 10
62         defaultTimeoutProbe       = time.Minute * 10
63         defaultTimeoutShutdown    = time.Second * 10
64 )
65
66 func duration(conf arvados.Duration, def time.Duration) time.Duration {
67         if conf > 0 {
68                 return time.Duration(conf)
69         } else {
70                 return def
71         }
72 }
73
74 // NewPool creates a Pool of workers backed by instanceSet.
75 //
76 // New instances are configured and set up according to the given
77 // cluster configuration.
78 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
79         wp := &Pool{
80                 logger:             logger,
81                 arvClient:          arvClient,
82                 instanceSet:        instanceSet,
83                 newExecutor:        newExecutor,
84                 bootProbeCommand:   cluster.CloudVMs.BootProbeCommand,
85                 imageID:            cloud.ImageID(cluster.CloudVMs.ImageID),
86                 instanceTypes:      cluster.InstanceTypes,
87                 maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
88                 probeInterval:      duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
89                 syncInterval:       duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
90                 timeoutIdle:        duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
91                 timeoutBooting:     duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
92                 timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
93                 timeoutShutdown:    duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
94                 stop:               make(chan bool),
95         }
96         wp.registerMetrics(reg)
97         go func() {
98                 wp.setupOnce.Do(wp.setup)
99                 go wp.runMetrics()
100                 go wp.runProbes()
101                 go wp.runSync()
102         }()
103         return wp
104 }
105
106 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
107 // zero Pool should not be used. Call NewPool to create a new Pool.
108 type Pool struct {
109         // configuration
110         logger             logrus.FieldLogger
111         arvClient          *arvados.Client
112         instanceSet        cloud.InstanceSet
113         newExecutor        func(cloud.Instance) Executor
114         bootProbeCommand   string
115         imageID            cloud.ImageID
116         instanceTypes      map[string]arvados.InstanceType
117         syncInterval       time.Duration
118         probeInterval      time.Duration
119         maxProbesPerSecond int
120         timeoutIdle        time.Duration
121         timeoutBooting     time.Duration
122         timeoutProbe       time.Duration
123         timeoutShutdown    time.Duration
124
125         // private state
126         subscribers  map[<-chan struct{}]chan<- struct{}
127         creating     map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls
128         workers      map[cloud.InstanceID]*worker
129         loaded       bool                 // loaded list of instances from InstanceSet at least once
130         exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
131         atQuotaUntil time.Time
132         atQuotaErr   cloud.QuotaError
133         stop         chan bool
134         mtx          sync.RWMutex
135         setupOnce    sync.Once
136
137         mInstances         prometheus.Gauge
138         mContainersRunning prometheus.Gauge
139         mVCPUs             prometheus.Gauge
140         mVCPUsInuse        prometheus.Gauge
141         mMemory            prometheus.Gauge
142         mMemoryInuse       prometheus.Gauge
143 }
144
145 // Subscribe returns a channel that becomes ready whenever a worker's
146 // state changes.
147 //
148 // Example:
149 //
150 //      ch := wp.Subscribe()
151 //      defer wp.Unsubscribe(ch)
152 //      for range ch {
153 //              // ...try scheduling some work...
154 //              if done {
155 //                      break
156 //              }
157 //      }
158 func (wp *Pool) Subscribe() <-chan struct{} {
159         wp.setupOnce.Do(wp.setup)
160         wp.mtx.Lock()
161         defer wp.mtx.Unlock()
162         ch := make(chan struct{}, 1)
163         wp.subscribers[ch] = ch
164         return ch
165 }
166
167 // Unsubscribe stops sending updates to the given channel.
168 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
169         wp.setupOnce.Do(wp.setup)
170         wp.mtx.Lock()
171         defer wp.mtx.Unlock()
172         delete(wp.subscribers, ch)
173 }
174
175 // Unallocated returns the number of unallocated (creating + booting +
176 // idle + unknown) workers for each instance type.
177 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
178         wp.setupOnce.Do(wp.setup)
179         wp.mtx.RLock()
180         defer wp.mtx.RUnlock()
181         unalloc := map[arvados.InstanceType]int{}
182         creating := map[arvados.InstanceType]int{}
183         for it, times := range wp.creating {
184                 creating[it] = len(times)
185         }
186         for _, wkr := range wp.workers {
187                 if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) {
188                         continue
189                 }
190                 it := wkr.instType
191                 unalloc[it]++
192                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) {
193                         // If up to N new workers appear in
194                         // Instances() while we are waiting for N
195                         // Create() calls to complete, we assume we're
196                         // just seeing a race between Instances() and
197                         // Create() responses.
198                         //
199                         // The other common reason why nodes have
200                         // state==Unknown is that they appeared at
201                         // startup, before any Create calls. They
202                         // don't match the above timing condition, so
203                         // we never mistakenly attribute them to
204                         // pending Create calls.
205                         creating[it]--
206                 }
207         }
208         for it, c := range creating {
209                 unalloc[it] += c
210         }
211         return unalloc
212 }
213
214 // Create a new instance with the given type, and add it to the worker
215 // pool. The worker is added immediately; instance creation runs in
216 // the background.
217 func (wp *Pool) Create(it arvados.InstanceType) error {
218         logger := wp.logger.WithField("InstanceType", it.Name)
219         wp.setupOnce.Do(wp.setup)
220         wp.mtx.Lock()
221         defer wp.mtx.Unlock()
222         if time.Now().Before(wp.atQuotaUntil) {
223                 return wp.atQuotaErr
224         }
225         tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
226         now := time.Now()
227         wp.creating[it] = append(wp.creating[it], now)
228         go func() {
229                 defer wp.notify()
230                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
231                 wp.mtx.Lock()
232                 defer wp.mtx.Unlock()
233                 // Remove our timestamp marker from wp.creating
234                 for i, t := range wp.creating[it] {
235                         if t == now {
236                                 copy(wp.creating[it][i:], wp.creating[it][i+1:])
237                                 wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1]
238                                 break
239                         }
240                 }
241                 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
242                         wp.atQuotaErr = err
243                         wp.atQuotaUntil = time.Now().Add(time.Minute)
244                 }
245                 if err != nil {
246                         logger.WithError(err).Error("create failed")
247                         return
248                 }
249                 wp.updateWorker(inst, it, StateBooting)
250         }()
251         return nil
252 }
253
254 // AtQuota returns true if Create is not expected to work at the
255 // moment.
256 func (wp *Pool) AtQuota() bool {
257         wp.mtx.Lock()
258         defer wp.mtx.Unlock()
259         return time.Now().Before(wp.atQuotaUntil)
260 }
261
262 // Add or update worker attached to the given instance. Use
263 // initialState if a new worker is created.
264 //
265 // The second return value is true if a new worker is created.
266 //
267 // Caller must have lock.
268 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
269         id := inst.ID()
270         if wkr := wp.workers[id]; wkr != nil {
271                 wkr.executor.SetTarget(inst)
272                 wkr.instance = inst
273                 wkr.updated = time.Now()
274                 if initialState == StateBooting && wkr.state == StateUnknown {
275                         wkr.state = StateBooting
276                 }
277                 return wkr, false
278         }
279         if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
280                 initialState = StateHold
281         }
282         logger := wp.logger.WithFields(logrus.Fields{
283                 "InstanceType": it.Name,
284                 "Instance":     inst,
285         })
286         logger.WithField("State", initialState).Infof("instance appeared in cloud")
287         now := time.Now()
288         wkr := &worker{
289                 mtx:      &wp.mtx,
290                 wp:       wp,
291                 logger:   logger,
292                 executor: wp.newExecutor(inst),
293                 state:    initialState,
294                 instance: inst,
295                 instType: it,
296                 appeared: now,
297                 probed:   now,
298                 busy:     now,
299                 updated:  now,
300                 running:  make(map[string]struct{}),
301                 starting: make(map[string]struct{}),
302                 probing:  make(chan struct{}, 1),
303         }
304         wp.workers[id] = wkr
305         return wkr, true
306 }
307
308 // caller must have lock.
309 func (wp *Pool) notifyExited(uuid string, t time.Time) {
310         wp.exited[uuid] = t
311 }
312
313 // Shutdown shuts down a worker with the given type, or returns false
314 // if all workers with the given type are busy.
315 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
316         wp.setupOnce.Do(wp.setup)
317         wp.mtx.Lock()
318         defer wp.mtx.Unlock()
319         logger := wp.logger.WithField("InstanceType", it.Name)
320         logger.Info("shutdown requested")
321         for _, tryState := range []State{StateBooting, StateIdle} {
322                 // TODO: shutdown the worker with the longest idle
323                 // time (Idle) or the earliest create time (Booting)
324                 for _, wkr := range wp.workers {
325                         if wkr.state == tryState && wkr.instType == it {
326                                 logger.WithField("Instance", wkr.instance).Info("shutting down")
327                                 wkr.shutdown()
328                                 return true
329                         }
330                 }
331         }
332         return false
333 }
334
335 // CountWorkers returns the current number of workers in each state.
336 func (wp *Pool) CountWorkers() map[State]int {
337         wp.setupOnce.Do(wp.setup)
338         wp.mtx.Lock()
339         defer wp.mtx.Unlock()
340         r := map[State]int{}
341         for _, w := range wp.workers {
342                 r[w.state]++
343         }
344         return r
345 }
346
347 // Running returns the container UUIDs being prepared/run on workers.
348 func (wp *Pool) Running() map[string]time.Time {
349         wp.setupOnce.Do(wp.setup)
350         wp.mtx.Lock()
351         defer wp.mtx.Unlock()
352         r := map[string]time.Time{}
353         for _, wkr := range wp.workers {
354                 for uuid := range wkr.running {
355                         r[uuid] = time.Time{}
356                 }
357                 for uuid := range wkr.starting {
358                         r[uuid] = time.Time{}
359                 }
360         }
361         for uuid, exited := range wp.exited {
362                 r[uuid] = exited
363         }
364         return r
365 }
366
367 // StartContainer starts a container on an idle worker immediately if
368 // possible, otherwise returns false.
369 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
370         wp.setupOnce.Do(wp.setup)
371         wp.mtx.Lock()
372         defer wp.mtx.Unlock()
373         var wkr *worker
374         for _, w := range wp.workers {
375                 if w.instType == it && w.state == StateIdle {
376                         if wkr == nil || w.busy.After(wkr.busy) {
377                                 wkr = w
378                         }
379                 }
380         }
381         if wkr == nil {
382                 return false
383         }
384         wkr.startContainer(ctr)
385         return true
386 }
387
388 // KillContainer kills the crunch-run process for the given container
389 // UUID, if it's running on any worker.
390 //
391 // KillContainer returns immediately; the act of killing the container
392 // takes some time, and runs in the background.
393 func (wp *Pool) KillContainer(uuid string) {
394         wp.mtx.Lock()
395         defer wp.mtx.Unlock()
396         if _, ok := wp.exited[uuid]; ok {
397                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
398                 delete(wp.exited, uuid)
399                 return
400         }
401         for _, wkr := range wp.workers {
402                 if _, ok := wkr.running[uuid]; ok {
403                         go wp.kill(wkr, uuid)
404                         return
405                 }
406         }
407         wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
408 }
409
410 func (wp *Pool) kill(wkr *worker, uuid string) {
411         logger := wp.logger.WithFields(logrus.Fields{
412                 "ContainerUUID": uuid,
413                 "Instance":      wkr.instance,
414         })
415         logger.Debug("killing process")
416         stdout, stderr, err := wkr.executor.Execute(nil, "crunch-run --kill 15 "+uuid, nil)
417         if err != nil {
418                 logger.WithFields(logrus.Fields{
419                         "stderr": string(stderr),
420                         "stdout": string(stdout),
421                         "error":  err,
422                 }).Warn("kill failed")
423                 return
424         }
425         logger.Debug("killing process succeeded")
426         wp.mtx.Lock()
427         defer wp.mtx.Unlock()
428         if _, ok := wkr.running[uuid]; ok {
429                 delete(wkr.running, uuid)
430                 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
431                         wkr.state = StateIdle
432                 }
433                 wkr.updated = time.Now()
434                 go wp.notify()
435         }
436 }
437
438 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
439         if reg == nil {
440                 reg = prometheus.NewRegistry()
441         }
442         wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
443                 Namespace: "arvados",
444                 Subsystem: "dispatchcloud",
445                 Name:      "instances_total",
446                 Help:      "Number of cloud VMs including pending, booting, running, held, and shutting down.",
447         })
448         reg.MustRegister(wp.mInstances)
449         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
450                 Namespace: "arvados",
451                 Subsystem: "dispatchcloud",
452                 Name:      "containers_running",
453                 Help:      "Number of containers reported running by cloud VMs.",
454         })
455         reg.MustRegister(wp.mContainersRunning)
456
457         wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
458                 Namespace: "arvados",
459                 Subsystem: "dispatchcloud",
460                 Name:      "vcpus_total",
461                 Help:      "Total VCPUs on all cloud VMs.",
462         })
463         reg.MustRegister(wp.mVCPUs)
464         wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
465                 Namespace: "arvados",
466                 Subsystem: "dispatchcloud",
467                 Name:      "vcpus_inuse",
468                 Help:      "VCPUs on cloud VMs that are running containers.",
469         })
470         reg.MustRegister(wp.mVCPUsInuse)
471         wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
472                 Namespace: "arvados",
473                 Subsystem: "dispatchcloud",
474                 Name:      "memory_bytes_total",
475                 Help:      "Total memory on all cloud VMs.",
476         })
477         reg.MustRegister(wp.mMemory)
478         wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
479                 Namespace: "arvados",
480                 Subsystem: "dispatchcloud",
481                 Name:      "memory_bytes_inuse",
482                 Help:      "Memory on cloud VMs that are running containers.",
483         })
484         reg.MustRegister(wp.mMemoryInuse)
485 }
486
487 func (wp *Pool) runMetrics() {
488         ch := wp.Subscribe()
489         defer wp.Unsubscribe(ch)
490         for range ch {
491                 wp.updateMetrics()
492         }
493 }
494
495 func (wp *Pool) updateMetrics() {
496         wp.mtx.RLock()
497         defer wp.mtx.RUnlock()
498
499         var alloc, cpu, cpuInuse, mem, memInuse int64
500         for _, wkr := range wp.workers {
501                 cpu += int64(wkr.instType.VCPUs)
502                 mem += int64(wkr.instType.RAM)
503                 if len(wkr.running)+len(wkr.starting) == 0 {
504                         continue
505                 }
506                 alloc += int64(len(wkr.running) + len(wkr.starting))
507                 cpuInuse += int64(wkr.instType.VCPUs)
508                 memInuse += int64(wkr.instType.RAM)
509         }
510         wp.mInstances.Set(float64(len(wp.workers)))
511         wp.mContainersRunning.Set(float64(alloc))
512         wp.mVCPUs.Set(float64(cpu))
513         wp.mMemory.Set(float64(mem))
514         wp.mVCPUsInuse.Set(float64(cpuInuse))
515         wp.mMemoryInuse.Set(float64(memInuse))
516 }
517
518 func (wp *Pool) runProbes() {
519         maxPPS := wp.maxProbesPerSecond
520         if maxPPS < 1 {
521                 maxPPS = defaultMaxProbesPerSecond
522         }
523         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
524         defer limitticker.Stop()
525
526         probeticker := time.NewTicker(wp.probeInterval)
527         defer probeticker.Stop()
528
529         workers := []cloud.InstanceID{}
530         for range probeticker.C {
531                 workers = workers[:0]
532                 wp.mtx.Lock()
533                 for id, wkr := range wp.workers {
534                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
535                                 continue
536                         }
537                         workers = append(workers, id)
538                 }
539                 wp.mtx.Unlock()
540
541                 for _, id := range workers {
542                         wp.mtx.Lock()
543                         wkr, ok := wp.workers[id]
544                         wp.mtx.Unlock()
545                         if !ok {
546                                 // Deleted while we were probing
547                                 // others
548                                 continue
549                         }
550                         go wkr.ProbeAndUpdate()
551                         select {
552                         case <-wp.stop:
553                                 return
554                         case <-limitticker.C:
555                         }
556                 }
557         }
558 }
559
560 func (wp *Pool) runSync() {
561         // sync once immediately, then wait syncInterval, sync again,
562         // etc.
563         timer := time.NewTimer(1)
564         for {
565                 select {
566                 case <-timer.C:
567                         err := wp.getInstancesAndSync()
568                         if err != nil {
569                                 wp.logger.WithError(err).Warn("sync failed")
570                         }
571                         timer.Reset(wp.syncInterval)
572                 case <-wp.stop:
573                         wp.logger.Debug("worker.Pool stopped")
574                         return
575                 }
576         }
577 }
578
579 // Stop synchronizing with the InstanceSet.
580 func (wp *Pool) Stop() {
581         wp.setupOnce.Do(wp.setup)
582         close(wp.stop)
583 }
584
585 // Instances returns an InstanceView for each worker in the pool,
586 // summarizing its current state and recent activity.
587 func (wp *Pool) Instances() []InstanceView {
588         var r []InstanceView
589         wp.setupOnce.Do(wp.setup)
590         wp.mtx.Lock()
591         for _, w := range wp.workers {
592                 r = append(r, InstanceView{
593                         Instance:             w.instance.String(),
594                         Price:                w.instType.Price,
595                         ArvadosInstanceType:  w.instType.Name,
596                         ProviderInstanceType: w.instType.ProviderType,
597                         LastContainerUUID:    w.lastUUID,
598                         LastBusy:             w.busy,
599                         WorkerState:          w.state.String(),
600                 })
601         }
602         wp.mtx.Unlock()
603         sort.Slice(r, func(i, j int) bool {
604                 return strings.Compare(r[i].Instance, r[j].Instance) < 0
605         })
606         return r
607 }
608
609 func (wp *Pool) setup() {
610         wp.creating = map[arvados.InstanceType][]time.Time{}
611         wp.exited = map[string]time.Time{}
612         wp.workers = map[cloud.InstanceID]*worker{}
613         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
614 }
615
616 func (wp *Pool) notify() {
617         wp.mtx.RLock()
618         defer wp.mtx.RUnlock()
619         for _, send := range wp.subscribers {
620                 select {
621                 case send <- struct{}{}:
622                 default:
623                 }
624         }
625 }
626
627 func (wp *Pool) getInstancesAndSync() error {
628         wp.setupOnce.Do(wp.setup)
629         wp.logger.Debug("getting instance list")
630         threshold := time.Now()
631         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
632         if err != nil {
633                 return err
634         }
635         wp.sync(threshold, instances)
636         wp.logger.Debug("sync done")
637         return nil
638 }
639
640 // Add/remove/update workers based on instances, which was obtained
641 // from the instanceSet. However, don't clobber any other updates that
642 // already happened after threshold.
643 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
644         wp.mtx.Lock()
645         defer wp.mtx.Unlock()
646         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
647         notify := false
648
649         for _, inst := range instances {
650                 itTag := inst.Tags()[tagKeyInstanceType]
651                 it, ok := wp.instanceTypes[itTag]
652                 if !ok {
653                         wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
654                         continue
655                 }
656                 if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew {
657                         notify = true
658                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
659                         wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
660                         wkr.shutdown()
661                 }
662         }
663
664         for id, wkr := range wp.workers {
665                 if wkr.updated.After(threshold) {
666                         continue
667                 }
668                 logger := wp.logger.WithFields(logrus.Fields{
669                         "Instance":    wkr.instance,
670                         "WorkerState": wkr.state,
671                 })
672                 logger.Info("instance disappeared in cloud")
673                 delete(wp.workers, id)
674                 go wkr.executor.Close()
675                 notify = true
676         }
677
678         if !wp.loaded {
679                 wp.loaded = true
680                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
681         }
682
683         if notify {
684                 go wp.notify()
685         }
686 }