14360: Improve comments.
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "io"
10         "sort"
11         "strings"
12         "sync"
13         "time"
14
15         "git.curoverse.com/arvados.git/lib/cloud"
16         "git.curoverse.com/arvados.git/sdk/go/arvados"
17         "github.com/Sirupsen/logrus"
18         "github.com/prometheus/client_golang/prometheus"
19 )
20
21 const (
22         tagKeyInstanceType = "InstanceType"
23         tagKeyHold         = "Hold"
24 )
25
26 // An InstanceView shows a worker's current state and recent activity.
27 type InstanceView struct {
28         Instance             string
29         Price                float64
30         ArvadosInstanceType  string
31         ProviderInstanceType string
32         LastContainerUUID    string
33         Unallocated          time.Time
34         WorkerState          string
35 }
36
37 // An Executor executes shell commands on a remote host.
38 type Executor interface {
39         // Run cmd on the current target.
40         Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
41
42         // Use the given target for subsequent operations. The new
43         // target is the same host as the previous target, but it
44         // might return a different address and verify a different
45         // host key.
46         //
47         // SetTarget is called frequently, and in most cases the new
48         // target will behave exactly the same as the old one. An
49         // implementation should optimize accordingly.
50         //
51         // SetTarget must not block on concurrent Execute calls.
52         SetTarget(cloud.ExecutorTarget)
53
54         Close()
55 }
56
57 const (
58         defaultSyncInterval       = time.Minute
59         defaultProbeInterval      = time.Second * 10
60         defaultMaxProbesPerSecond = 10
61         defaultTimeoutIdle        = time.Minute
62         defaultTimeoutBooting     = time.Minute * 10
63         defaultTimeoutProbe       = time.Minute * 10
64 )
65
66 func duration(conf arvados.Duration, def time.Duration) time.Duration {
67         if conf > 0 {
68                 return time.Duration(conf)
69         } else {
70                 return def
71         }
72 }
73
74 // NewPool creates a Pool of workers backed by instanceSet.
75 //
76 // New instances are configured and set up according to the given
77 // cluster configuration.
78 func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
79         wp := &Pool{
80                 logger:             logger,
81                 instanceSet:        instanceSet,
82                 newExecutor:        newExecutor,
83                 bootProbeCommand:   cluster.CloudVMs.BootProbeCommand,
84                 imageID:            cloud.ImageID(cluster.CloudVMs.ImageID),
85                 instanceTypes:      cluster.InstanceTypes,
86                 maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
87                 probeInterval:      duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
88                 syncInterval:       duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
89                 timeoutIdle:        duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
90                 timeoutBooting:     duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
91                 timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
92         }
93         wp.registerMetrics(reg)
94         go func() {
95                 wp.setupOnce.Do(wp.setup)
96                 go wp.runMetrics()
97                 go wp.runProbes()
98                 go wp.runSync()
99         }()
100         return wp
101 }
102
103 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
104 // zero Pool should not be used. Call NewPool to create a new Pool.
105 type Pool struct {
106         // configuration
107         logger             logrus.FieldLogger
108         instanceSet        cloud.InstanceSet
109         newExecutor        func(cloud.Instance) Executor
110         bootProbeCommand   string
111         imageID            cloud.ImageID
112         instanceTypes      map[string]arvados.InstanceType
113         syncInterval       time.Duration
114         probeInterval      time.Duration
115         maxProbesPerSecond int
116         timeoutIdle        time.Duration
117         timeoutBooting     time.Duration
118         timeoutProbe       time.Duration
119
120         // private state
121         subscribers  map[<-chan struct{}]chan<- struct{}
122         creating     map[arvados.InstanceType]int // goroutines waiting for (InstanceSet)Create to return
123         workers      map[cloud.InstanceID]*worker
124         loaded       bool                 // loaded list of instances from InstanceSet at least once
125         exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
126         atQuotaUntil time.Time
127         stop         chan bool
128         mtx          sync.RWMutex
129         setupOnce    sync.Once
130
131         mInstances         prometheus.Gauge
132         mContainersRunning prometheus.Gauge
133         mVCPUs             prometheus.Gauge
134         mVCPUsInuse        prometheus.Gauge
135         mMemory            prometheus.Gauge
136         mMemoryInuse       prometheus.Gauge
137 }
138
139 type worker struct {
140         state       State
141         instance    cloud.Instance
142         executor    Executor
143         instType    arvados.InstanceType
144         vcpus       int64
145         memory      int64
146         booted      bool
147         probed      time.Time
148         updated     time.Time
149         busy        time.Time
150         unallocated time.Time
151         lastUUID    string
152         running     map[string]struct{}
153         starting    map[string]struct{}
154         probing     chan struct{}
155 }
156
157 // Subscribe returns a channel that becomes ready whenever a worker's
158 // state changes.
159 //
160 // Example:
161 //
162 //      ch := wp.Subscribe()
163 //      defer wp.Unsubscribe(ch)
164 //      for range ch {
165 //              // ...try scheduling some work...
166 //              if done {
167 //                      break
168 //              }
169 //      }
170 func (wp *Pool) Subscribe() <-chan struct{} {
171         wp.setupOnce.Do(wp.setup)
172         wp.mtx.Lock()
173         defer wp.mtx.Unlock()
174         ch := make(chan struct{}, 1)
175         wp.subscribers[ch] = ch
176         return ch
177 }
178
179 // Unsubscribe stops sending updates to the given channel.
180 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
181         wp.setupOnce.Do(wp.setup)
182         wp.mtx.Lock()
183         defer wp.mtx.Unlock()
184         delete(wp.subscribers, ch)
185 }
186
187 // Unallocated returns the number of unallocated (creating + booting +
188 // idle + unknown) workers for each instance type.
189 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
190         wp.setupOnce.Do(wp.setup)
191         wp.mtx.RLock()
192         defer wp.mtx.RUnlock()
193         u := map[arvados.InstanceType]int{}
194         for it, c := range wp.creating {
195                 u[it] = c
196         }
197         for _, wkr := range wp.workers {
198                 if len(wkr.running)+len(wkr.starting) == 0 && (wkr.state == StateRunning || wkr.state == StateBooting || wkr.state == StateUnknown) {
199                         u[wkr.instType]++
200                 }
201         }
202         return u
203 }
204
205 // Create a new instance with the given type, and add it to the worker
206 // pool. The worker is added immediately; instance creation runs in
207 // the background.
208 func (wp *Pool) Create(it arvados.InstanceType) error {
209         logger := wp.logger.WithField("InstanceType", it.Name)
210         wp.setupOnce.Do(wp.setup)
211         wp.mtx.Lock()
212         defer wp.mtx.Unlock()
213         tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
214         wp.creating[it]++
215         go func() {
216                 defer wp.notify()
217                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
218                 wp.mtx.Lock()
219                 defer wp.mtx.Unlock()
220                 wp.creating[it]--
221                 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
222                         wp.atQuotaUntil = time.Now().Add(time.Minute)
223                 }
224                 if err != nil {
225                         logger.WithError(err).Error("create failed")
226                         return
227                 }
228                 wp.updateWorker(inst, it, StateBooting)
229         }()
230         return nil
231 }
232
233 // AtQuota returns true if Create is not expected to work at the
234 // moment.
235 func (wp *Pool) AtQuota() bool {
236         wp.mtx.Lock()
237         defer wp.mtx.Unlock()
238         return time.Now().Before(wp.atQuotaUntil)
239 }
240
241 // Add or update worker attached to the given instance. Use
242 // initialState if a new worker is created. Caller must have lock.
243 //
244 // Returns true when a new worker is created.
245 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) bool {
246         id := inst.ID()
247         if wp.workers[id] != nil {
248                 wp.workers[id].executor.SetTarget(inst)
249                 wp.workers[id].instance = inst
250                 wp.workers[id].updated = time.Now()
251                 if initialState == StateBooting && wp.workers[id].state == StateUnknown {
252                         wp.workers[id].state = StateBooting
253                 }
254                 return false
255         }
256         if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
257                 initialState = StateHold
258         }
259         wp.logger.WithFields(logrus.Fields{
260                 "InstanceType": it.Name,
261                 "Instance":     inst,
262                 "State":        initialState,
263         }).Infof("instance appeared in cloud")
264         now := time.Now()
265         wp.workers[id] = &worker{
266                 executor:    wp.newExecutor(inst),
267                 state:       initialState,
268                 instance:    inst,
269                 instType:    it,
270                 probed:      now,
271                 busy:        now,
272                 updated:     now,
273                 unallocated: now,
274                 running:     make(map[string]struct{}),
275                 starting:    make(map[string]struct{}),
276                 probing:     make(chan struct{}, 1),
277         }
278         return true
279 }
280
281 // Shutdown shuts down a worker with the given type, or returns false
282 // if all workers with the given type are busy.
283 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
284         wp.setupOnce.Do(wp.setup)
285         wp.mtx.Lock()
286         defer wp.mtx.Unlock()
287         logger := wp.logger.WithField("InstanceType", it.Name)
288         logger.Info("shutdown requested")
289         for _, tryState := range []State{StateBooting, StateRunning} {
290                 // TODO: shutdown the worker with the longest idle
291                 // time (Running) or the earliest create time
292                 // (Booting)
293                 for _, wkr := range wp.workers {
294                         if wkr.state != tryState || len(wkr.running)+len(wkr.starting) > 0 {
295                                 continue
296                         }
297                         if wkr.instType != it {
298                                 continue
299                         }
300                         logger = logger.WithField("Instance", wkr.instance)
301                         logger.Info("shutting down")
302                         wp.shutdown(wkr, logger)
303                         return true
304                 }
305         }
306         return false
307 }
308
309 // caller must have lock
310 func (wp *Pool) shutdown(wkr *worker, logger logrus.FieldLogger) {
311         wkr.updated = time.Now()
312         wkr.state = StateShutdown
313         go func() {
314                 err := wkr.instance.Destroy()
315                 if err != nil {
316                         logger.WithError(err).WithField("Instance", wkr.instance).Warn("shutdown failed")
317                         return
318                 }
319                 wp.mtx.Lock()
320                 wp.atQuotaUntil = time.Now()
321                 wp.mtx.Unlock()
322                 wp.notify()
323         }()
324 }
325
326 // Workers returns the current number of workers in each state.
327 func (wp *Pool) Workers() map[State]int {
328         wp.setupOnce.Do(wp.setup)
329         wp.mtx.Lock()
330         defer wp.mtx.Unlock()
331         r := map[State]int{}
332         for _, w := range wp.workers {
333                 r[w.state]++
334         }
335         return r
336 }
337
338 // Running returns the container UUIDs being prepared/run on workers.
339 func (wp *Pool) Running() map[string]time.Time {
340         wp.setupOnce.Do(wp.setup)
341         wp.mtx.Lock()
342         defer wp.mtx.Unlock()
343         r := map[string]time.Time{}
344         for _, wkr := range wp.workers {
345                 for uuid := range wkr.running {
346                         r[uuid] = time.Time{}
347                 }
348                 for uuid := range wkr.starting {
349                         r[uuid] = time.Time{}
350                 }
351         }
352         for uuid, exited := range wp.exited {
353                 r[uuid] = exited
354         }
355         return r
356 }
357
358 // StartContainer starts a container on an idle worker immediately if
359 // possible, otherwise returns false.
360 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
361         logger := wp.logger.WithFields(logrus.Fields{
362                 "InstanceType":  it.Name,
363                 "ContainerUUID": ctr.UUID,
364                 "Priority":      ctr.Priority,
365         })
366         wp.setupOnce.Do(wp.setup)
367         wp.mtx.Lock()
368         defer wp.mtx.Unlock()
369         var wkr *worker
370         for _, w := range wp.workers {
371                 if w.instType == it && w.state == StateRunning && len(w.running)+len(w.starting) == 0 {
372                         if wkr == nil || w.busy.After(wkr.busy) {
373                                 wkr = w
374                         }
375                 }
376         }
377         if wkr == nil {
378                 return false
379         }
380         logger = logger.WithField("Instance", wkr.instance)
381         logger.Debug("starting container")
382         wkr.starting[ctr.UUID] = struct{}{}
383         go func() {
384                 stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
385                 wp.mtx.Lock()
386                 defer wp.mtx.Unlock()
387                 now := time.Now()
388                 wkr.updated = now
389                 wkr.busy = now
390                 delete(wkr.starting, ctr.UUID)
391                 wkr.running[ctr.UUID] = struct{}{}
392                 wkr.lastUUID = ctr.UUID
393                 if err != nil {
394                         logger.WithField("stdout", string(stdout)).
395                                 WithField("stderr", string(stderr)).
396                                 WithError(err).
397                                 Error("error starting crunch-run process")
398                         // Leave uuid in wkr.running, though: it's
399                         // possible the error was just a communication
400                         // failure and the process was in fact
401                         // started.  Wait for next probe to find out.
402                         return
403                 }
404                 logger.Info("crunch-run process started")
405                 wkr.lastUUID = ctr.UUID
406         }()
407         return true
408 }
409
410 // KillContainer kills the crunch-run process for the given container
411 // UUID, if it's running on any worker.
412 //
413 // KillContainer returns immediately; the act of killing the container
414 // takes some time, and runs in the background.
415 func (wp *Pool) KillContainer(uuid string) {
416         wp.mtx.Lock()
417         defer wp.mtx.Unlock()
418         if _, ok := wp.exited[uuid]; ok {
419                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
420                 delete(wp.exited, uuid)
421                 return
422         }
423         for _, wkr := range wp.workers {
424                 if _, ok := wkr.running[uuid]; ok {
425                         go wp.kill(wkr, uuid)
426                         return
427                 }
428         }
429         wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
430 }
431
432 func (wp *Pool) kill(wkr *worker, uuid string) {
433         logger := wp.logger.WithFields(logrus.Fields{
434                 "ContainerUUID": uuid,
435                 "Instance":      wkr.instance,
436         })
437         logger.Debug("killing process")
438         stdout, stderr, err := wkr.executor.Execute("crunch-run --kill "+uuid, nil)
439         if err != nil {
440                 logger.WithFields(logrus.Fields{
441                         "stderr": string(stderr),
442                         "stdout": string(stdout),
443                         "error":  err,
444                 }).Warn("kill failed")
445                 return
446         }
447         logger.Debug("killing process succeeded")
448         wp.mtx.Lock()
449         defer wp.mtx.Unlock()
450         if _, ok := wkr.running[uuid]; ok {
451                 delete(wkr.running, uuid)
452                 wkr.updated = time.Now()
453                 go wp.notify()
454         }
455 }
456
457 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
458         if reg == nil {
459                 reg = prometheus.NewRegistry()
460         }
461         wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
462                 Namespace: "arvados",
463                 Subsystem: "dispatchcloud",
464                 Name:      "instances_total",
465                 Help:      "Number of cloud VMs including pending, booting, running, held, and shutting down.",
466         })
467         reg.MustRegister(wp.mInstances)
468         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
469                 Namespace: "arvados",
470                 Subsystem: "dispatchcloud",
471                 Name:      "containers_running",
472                 Help:      "Number of containers reported running by cloud VMs.",
473         })
474         reg.MustRegister(wp.mContainersRunning)
475
476         wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
477                 Namespace: "arvados",
478                 Subsystem: "dispatchcloud",
479                 Name:      "vcpus_total",
480                 Help:      "Total VCPUs on all cloud VMs.",
481         })
482         reg.MustRegister(wp.mVCPUs)
483         wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
484                 Namespace: "arvados",
485                 Subsystem: "dispatchcloud",
486                 Name:      "vcpus_inuse",
487                 Help:      "VCPUs on cloud VMs that are running containers.",
488         })
489         reg.MustRegister(wp.mVCPUsInuse)
490         wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
491                 Namespace: "arvados",
492                 Subsystem: "dispatchcloud",
493                 Name:      "memory_bytes_total",
494                 Help:      "Total memory on all cloud VMs.",
495         })
496         reg.MustRegister(wp.mMemory)
497         wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
498                 Namespace: "arvados",
499                 Subsystem: "dispatchcloud",
500                 Name:      "memory_bytes_inuse",
501                 Help:      "Memory on cloud VMs that are running containers.",
502         })
503         reg.MustRegister(wp.mMemoryInuse)
504 }
505
506 func (wp *Pool) runMetrics() {
507         ch := wp.Subscribe()
508         defer wp.Unsubscribe(ch)
509         for range ch {
510                 wp.updateMetrics()
511         }
512 }
513
514 func (wp *Pool) updateMetrics() {
515         wp.mtx.RLock()
516         defer wp.mtx.RUnlock()
517
518         var alloc, cpu, cpuInuse, mem, memInuse int64
519         for _, wkr := range wp.workers {
520                 cpu += int64(wkr.instType.VCPUs)
521                 mem += int64(wkr.instType.RAM)
522                 if len(wkr.running)+len(wkr.starting) == 0 {
523                         continue
524                 }
525                 alloc += int64(len(wkr.running) + len(wkr.starting))
526                 cpuInuse += int64(wkr.instType.VCPUs)
527                 memInuse += int64(wkr.instType.RAM)
528         }
529         wp.mInstances.Set(float64(len(wp.workers)))
530         wp.mContainersRunning.Set(float64(alloc))
531         wp.mVCPUs.Set(float64(cpu))
532         wp.mMemory.Set(float64(mem))
533         wp.mVCPUsInuse.Set(float64(cpuInuse))
534         wp.mMemoryInuse.Set(float64(memInuse))
535 }
536
537 func (wp *Pool) runProbes() {
538         maxPPS := wp.maxProbesPerSecond
539         if maxPPS < 1 {
540                 maxPPS = defaultMaxProbesPerSecond
541         }
542         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
543         defer limitticker.Stop()
544
545         probeticker := time.NewTicker(wp.probeInterval)
546         defer probeticker.Stop()
547
548         workers := []cloud.InstanceID{}
549         for range probeticker.C {
550                 workers = workers[:0]
551                 wp.mtx.Lock()
552                 for id, wkr := range wp.workers {
553                         if wkr.state == StateShutdown || wp.shutdownIfIdle(wkr) {
554                                 continue
555                         }
556                         workers = append(workers, id)
557                 }
558                 wp.mtx.Unlock()
559
560                 for _, id := range workers {
561                         wp.mtx.Lock()
562                         wkr, ok := wp.workers[id]
563                         wp.mtx.Unlock()
564                         if !ok || wkr.state == StateShutdown {
565                                 // Deleted/shutdown while we
566                                 // were probing others
567                                 continue
568                         }
569                         select {
570                         case wkr.probing <- struct{}{}:
571                                 go func() {
572                                         wp.probeAndUpdate(wkr)
573                                         <-wkr.probing
574                                 }()
575                         default:
576                                 wp.logger.WithField("Instance", wkr.instance).Debug("still waiting for last probe to finish")
577                         }
578                         select {
579                         case <-wp.stop:
580                                 return
581                         case <-limitticker.C:
582                         }
583                 }
584         }
585 }
586
587 func (wp *Pool) runSync() {
588         // sync once immediately, then wait syncInterval, sync again,
589         // etc.
590         timer := time.NewTimer(1)
591         for {
592                 select {
593                 case <-timer.C:
594                         err := wp.getInstancesAndSync()
595                         if err != nil {
596                                 wp.logger.WithError(err).Warn("sync failed")
597                         }
598                         timer.Reset(wp.syncInterval)
599                 case <-wp.stop:
600                         wp.logger.Debug("worker.Pool stopped")
601                         return
602                 }
603         }
604 }
605
606 // caller must have lock.
607 func (wp *Pool) shutdownIfBroken(wkr *worker, dur time.Duration) {
608         if wkr.state == StateHold {
609                 return
610         }
611         label, threshold := "", wp.timeoutProbe
612         if wkr.state == StateBooting {
613                 label, threshold = "new ", wp.timeoutBooting
614         }
615         if dur < threshold {
616                 return
617         }
618         wp.logger.WithFields(logrus.Fields{
619                 "Instance": wkr.instance,
620                 "Duration": dur,
621                 "Since":    wkr.probed,
622                 "State":    wkr.state,
623         }).Warnf("%sinstance unresponsive, shutting down", label)
624         wp.shutdown(wkr, wp.logger)
625 }
626
627 // caller must have lock.
628 func (wp *Pool) shutdownIfIdle(wkr *worker) bool {
629         if len(wkr.running)+len(wkr.starting) > 0 || wkr.state != StateRunning {
630                 return false
631         }
632         age := time.Since(wkr.unallocated)
633         if age < wp.timeoutIdle {
634                 return false
635         }
636         logger := wp.logger.WithFields(logrus.Fields{
637                 "Age":      age,
638                 "Instance": wkr.instance,
639         })
640         logger.Info("shutdown idle worker")
641         wp.shutdown(wkr, logger)
642         return true
643 }
644
645 // Stop synchronizing with the InstanceSet.
646 func (wp *Pool) Stop() {
647         wp.setupOnce.Do(wp.setup)
648         close(wp.stop)
649 }
650
651 // Instances returns an InstanceView for each worker in the pool,
652 // summarizing its current state and recent activity.
653 func (wp *Pool) Instances() []InstanceView {
654         var r []InstanceView
655         wp.setupOnce.Do(wp.setup)
656         wp.mtx.Lock()
657         for _, w := range wp.workers {
658                 r = append(r, InstanceView{
659                         Instance:             w.instance.String(),
660                         Price:                w.instType.Price,
661                         ArvadosInstanceType:  w.instType.Name,
662                         ProviderInstanceType: w.instType.ProviderType,
663                         LastContainerUUID:    w.lastUUID,
664                         Unallocated:          w.unallocated,
665                         WorkerState:          w.state.String(),
666                 })
667         }
668         wp.mtx.Unlock()
669         sort.Slice(r, func(i, j int) bool {
670                 return strings.Compare(r[i].Instance, r[j].Instance) < 0
671         })
672         return r
673 }
674
675 func (wp *Pool) setup() {
676         wp.creating = map[arvados.InstanceType]int{}
677         wp.exited = map[string]time.Time{}
678         wp.workers = map[cloud.InstanceID]*worker{}
679         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
680 }
681
682 func (wp *Pool) notify() {
683         wp.mtx.RLock()
684         defer wp.mtx.RUnlock()
685         for _, send := range wp.subscribers {
686                 select {
687                 case send <- struct{}{}:
688                 default:
689                 }
690         }
691 }
692
693 func (wp *Pool) getInstancesAndSync() error {
694         wp.setupOnce.Do(wp.setup)
695         wp.logger.Debug("getting instance list")
696         threshold := time.Now()
697         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
698         if err != nil {
699                 return err
700         }
701         wp.sync(threshold, instances)
702         wp.logger.Debug("sync done")
703         return nil
704 }
705
706 // Add/remove/update workers based on instances, which was obtained
707 // from the instanceSet. However, don't clobber any other updates that
708 // already happened after threshold.
709 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
710         wp.mtx.Lock()
711         defer wp.mtx.Unlock()
712         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
713         notify := false
714
715         for _, inst := range instances {
716                 itTag := inst.Tags()[tagKeyInstanceType]
717                 it, ok := wp.instanceTypes[itTag]
718                 if !ok {
719                         wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
720                         continue
721                 }
722                 if wp.updateWorker(inst, it, StateUnknown) {
723                         notify = true
724                 }
725         }
726
727         for id, wkr := range wp.workers {
728                 if wkr.updated.After(threshold) {
729                         continue
730                 }
731                 logger := wp.logger.WithFields(logrus.Fields{
732                         "Instance":    wkr.instance,
733                         "WorkerState": wkr.state,
734                 })
735                 logger.Info("instance disappeared in cloud")
736                 delete(wp.workers, id)
737                 go wkr.executor.Close()
738                 notify = true
739         }
740
741         if !wp.loaded {
742                 wp.loaded = true
743                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
744         }
745
746         if notify {
747                 go wp.notify()
748         }
749 }
750
751 // should be called in a new goroutine
752 func (wp *Pool) probeAndUpdate(wkr *worker) {
753         logger := wp.logger.WithField("Instance", wkr.instance)
754         wp.mtx.Lock()
755         updated := wkr.updated
756         booted := wkr.booted
757         wp.mtx.Unlock()
758
759         var (
760                 ctrUUIDs []string
761                 ok       bool
762                 stderr   []byte
763         )
764         if !booted {
765                 booted, stderr = wp.probeBooted(wkr)
766                 wp.mtx.Lock()
767                 if booted && !wkr.booted {
768                         wkr.booted = booted
769                         logger.Info("instance booted")
770                 } else {
771                         booted = wkr.booted
772                 }
773                 wp.mtx.Unlock()
774         }
775         if booted {
776                 ctrUUIDs, ok, stderr = wp.probeRunning(wkr)
777         }
778         logger = logger.WithField("stderr", string(stderr))
779         wp.mtx.Lock()
780         defer wp.mtx.Unlock()
781         if !ok {
782                 if wkr.state == StateShutdown {
783                         return
784                 }
785                 dur := time.Since(wkr.probed)
786                 logger := logger.WithFields(logrus.Fields{
787                         "Duration": dur,
788                         "State":    wkr.state,
789                 })
790                 if wkr.state == StateBooting {
791                         logger.Debug("new instance not responding")
792                 } else {
793                         logger.Info("instance not responding")
794                 }
795                 wp.shutdownIfBroken(wkr, dur)
796                 return
797         }
798
799         updateTime := time.Now()
800         wkr.probed = updateTime
801         if wkr.state == StateShutdown || wkr.state == StateHold {
802         } else if booted {
803                 if wkr.state != StateRunning {
804                         wkr.state = StateRunning
805                         go wp.notify()
806                 }
807         } else {
808                 wkr.state = StateBooting
809         }
810
811         if updated != wkr.updated {
812                 // Worker was updated after the probe began, so
813                 // wkr.running might have a container UUID that was
814                 // not yet running when ctrUUIDs was generated. Leave
815                 // wkr.running alone and wait for the next probe to
816                 // catch up on any changes.
817                 return
818         }
819
820         if len(ctrUUIDs) > 0 {
821                 wkr.busy = updateTime
822                 wkr.lastUUID = ctrUUIDs[0]
823         } else if len(wkr.running) > 0 {
824                 wkr.unallocated = updateTime
825         }
826         running := map[string]struct{}{}
827         changed := false
828         for _, uuid := range ctrUUIDs {
829                 running[uuid] = struct{}{}
830                 if _, ok := wkr.running[uuid]; !ok {
831                         changed = true
832                 }
833         }
834         for uuid := range wkr.running {
835                 if _, ok := running[uuid]; !ok {
836                         logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
837                         wp.exited[uuid] = updateTime
838                         changed = true
839                 }
840         }
841         if changed {
842                 wkr.running = running
843                 wkr.updated = updateTime
844                 go wp.notify()
845         }
846 }
847
848 func (wp *Pool) probeRunning(wkr *worker) (running []string, ok bool, stderr []byte) {
849         cmd := "crunch-run --list"
850         stdout, stderr, err := wkr.executor.Execute(cmd, nil)
851         if err != nil {
852                 wp.logger.WithFields(logrus.Fields{
853                         "Instance": wkr.instance,
854                         "Command":  cmd,
855                         "stdout":   string(stdout),
856                         "stderr":   string(stderr),
857                 }).WithError(err).Warn("probe failed")
858                 return nil, false, stderr
859         }
860         stdout = bytes.TrimRight(stdout, "\n")
861         if len(stdout) == 0 {
862                 return nil, true, stderr
863         }
864         return strings.Split(string(stdout), "\n"), true, stderr
865 }
866
867 func (wp *Pool) probeBooted(wkr *worker) (ok bool, stderr []byte) {
868         cmd := wp.bootProbeCommand
869         if cmd == "" {
870                 cmd = "true"
871         }
872         stdout, stderr, err := wkr.executor.Execute(cmd, nil)
873         logger := wp.logger.WithFields(logrus.Fields{
874                 "Instance": wkr.instance,
875                 "Command":  cmd,
876                 "stdout":   string(stdout),
877                 "stderr":   string(stderr),
878         })
879         if err != nil {
880                 logger.WithError(err).Debug("boot probe failed")
881                 return false, stderr
882         }
883         logger.Info("boot probe succeeded")
884         return true, stderr
885 }