14360: Note race in docs, fix flaky test.
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "io"
10         "sort"
11         "strings"
12         "sync"
13         "time"
14
15         "git.curoverse.com/arvados.git/lib/cloud"
16         "git.curoverse.com/arvados.git/sdk/go/arvados"
17         "github.com/Sirupsen/logrus"
18         "github.com/prometheus/client_golang/prometheus"
19 )
20
21 const (
22         tagKeyInstanceType = "InstanceType"
23         tagKeyHold         = "Hold"
24 )
25
26 // An InstanceView shows a worker's current state and recent activity.
27 type InstanceView struct {
28         Instance             string
29         Price                float64
30         ArvadosInstanceType  string
31         ProviderInstanceType string
32         LastContainerUUID    string
33         LastBusy             time.Time
34         WorkerState          string
35 }
36
37 // An Executor executes shell commands on a remote host.
38 type Executor interface {
39         // Run cmd on the current target.
40         Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
41
42         // Use the given target for subsequent operations. The new
43         // target is the same host as the previous target, but it
44         // might return a different address and verify a different
45         // host key.
46         //
47         // SetTarget is called frequently, and in most cases the new
48         // target will behave exactly the same as the old one. An
49         // implementation should optimize accordingly.
50         //
51         // SetTarget must not block on concurrent Execute calls.
52         SetTarget(cloud.ExecutorTarget)
53
54         Close()
55 }
56
57 const (
58         defaultSyncInterval       = time.Minute
59         defaultProbeInterval      = time.Second * 10
60         defaultMaxProbesPerSecond = 10
61         defaultTimeoutIdle        = time.Minute
62         defaultTimeoutBooting     = time.Minute * 10
63         defaultTimeoutProbe       = time.Minute * 10
64 )
65
66 func duration(conf arvados.Duration, def time.Duration) time.Duration {
67         if conf > 0 {
68                 return time.Duration(conf)
69         } else {
70                 return def
71         }
72 }
73
74 // NewPool creates a Pool of workers backed by instanceSet.
75 //
76 // New instances are configured and set up according to the given
77 // cluster configuration.
78 func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
79         wp := &Pool{
80                 logger:             logger,
81                 instanceSet:        instanceSet,
82                 newExecutor:        newExecutor,
83                 bootProbeCommand:   cluster.CloudVMs.BootProbeCommand,
84                 imageID:            cloud.ImageID(cluster.CloudVMs.ImageID),
85                 instanceTypes:      cluster.InstanceTypes,
86                 maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
87                 probeInterval:      duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
88                 syncInterval:       duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
89                 timeoutIdle:        duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
90                 timeoutBooting:     duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
91                 timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
92         }
93         wp.registerMetrics(reg)
94         go func() {
95                 wp.setupOnce.Do(wp.setup)
96                 go wp.runMetrics()
97                 go wp.runProbes()
98                 go wp.runSync()
99         }()
100         return wp
101 }
102
103 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
104 // zero Pool should not be used. Call NewPool to create a new Pool.
105 type Pool struct {
106         // configuration
107         logger             logrus.FieldLogger
108         instanceSet        cloud.InstanceSet
109         newExecutor        func(cloud.Instance) Executor
110         bootProbeCommand   string
111         imageID            cloud.ImageID
112         instanceTypes      map[string]arvados.InstanceType
113         syncInterval       time.Duration
114         probeInterval      time.Duration
115         maxProbesPerSecond int
116         timeoutIdle        time.Duration
117         timeoutBooting     time.Duration
118         timeoutProbe       time.Duration
119
120         // private state
121         subscribers  map[<-chan struct{}]chan<- struct{}
122         creating     map[arvados.InstanceType]int // goroutines waiting for (InstanceSet)Create to return
123         workers      map[cloud.InstanceID]*worker
124         loaded       bool                 // loaded list of instances from InstanceSet at least once
125         exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
126         atQuotaUntil time.Time
127         atQuotaErr   cloud.QuotaError
128         stop         chan bool
129         mtx          sync.RWMutex
130         setupOnce    sync.Once
131
132         mInstances         prometheus.Gauge
133         mContainersRunning prometheus.Gauge
134         mVCPUs             prometheus.Gauge
135         mVCPUsInuse        prometheus.Gauge
136         mMemory            prometheus.Gauge
137         mMemoryInuse       prometheus.Gauge
138 }
139
140 type worker struct {
141         state    State
142         instance cloud.Instance
143         executor Executor
144         instType arvados.InstanceType
145         vcpus    int64
146         memory   int64
147         probed   time.Time
148         updated  time.Time
149         busy     time.Time
150         lastUUID string
151         running  map[string]struct{} // remember to update state idle<->running when this changes
152         starting map[string]struct{} // remember to update state idle<->running when this changes
153         probing  chan struct{}
154 }
155
156 // Subscribe returns a channel that becomes ready whenever a worker's
157 // state changes.
158 //
159 // Example:
160 //
161 //      ch := wp.Subscribe()
162 //      defer wp.Unsubscribe(ch)
163 //      for range ch {
164 //              // ...try scheduling some work...
165 //              if done {
166 //                      break
167 //              }
168 //      }
169 func (wp *Pool) Subscribe() <-chan struct{} {
170         wp.setupOnce.Do(wp.setup)
171         wp.mtx.Lock()
172         defer wp.mtx.Unlock()
173         ch := make(chan struct{}, 1)
174         wp.subscribers[ch] = ch
175         return ch
176 }
177
178 // Unsubscribe stops sending updates to the given channel.
179 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
180         wp.setupOnce.Do(wp.setup)
181         wp.mtx.Lock()
182         defer wp.mtx.Unlock()
183         delete(wp.subscribers, ch)
184 }
185
186 // Unallocated returns the number of unallocated (creating + booting +
187 // idle + unknown) workers for each instance type.
188 //
189 // The returned counts should be interpreted as upper bounds, rather
190 // than exact counts: they are sometimes artificially high when a
191 // newly created instance appears in the driver's Instances() list
192 // before the Create() call returns.
193 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
194         wp.setupOnce.Do(wp.setup)
195         wp.mtx.RLock()
196         defer wp.mtx.RUnlock()
197         u := map[arvados.InstanceType]int{}
198         for it, c := range wp.creating {
199                 u[it] = c
200         }
201         for _, wkr := range wp.workers {
202                 if wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown {
203                         u[wkr.instType]++
204                 }
205         }
206         return u
207 }
208
209 // Create a new instance with the given type, and add it to the worker
210 // pool. The worker is added immediately; instance creation runs in
211 // the background.
212 func (wp *Pool) Create(it arvados.InstanceType) error {
213         logger := wp.logger.WithField("InstanceType", it.Name)
214         wp.setupOnce.Do(wp.setup)
215         wp.mtx.Lock()
216         defer wp.mtx.Unlock()
217         if time.Now().Before(wp.atQuotaUntil) {
218                 return wp.atQuotaErr
219         }
220         tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
221         wp.creating[it]++
222         go func() {
223                 defer wp.notify()
224                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
225                 wp.mtx.Lock()
226                 defer wp.mtx.Unlock()
227                 wp.creating[it]--
228                 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
229                         wp.atQuotaErr = err
230                         wp.atQuotaUntil = time.Now().Add(time.Minute)
231                 }
232                 if err != nil {
233                         logger.WithError(err).Error("create failed")
234                         return
235                 }
236                 wp.updateWorker(inst, it, StateBooting)
237         }()
238         return nil
239 }
240
241 // AtQuota returns true if Create is not expected to work at the
242 // moment.
243 func (wp *Pool) AtQuota() bool {
244         wp.mtx.Lock()
245         defer wp.mtx.Unlock()
246         return time.Now().Before(wp.atQuotaUntil)
247 }
248
249 // Add or update worker attached to the given instance. Use
250 // initialState if a new worker is created. Caller must have lock.
251 //
252 // Returns true when a new worker is created.
253 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) bool {
254         id := inst.ID()
255         if wp.workers[id] != nil {
256                 wp.workers[id].executor.SetTarget(inst)
257                 wp.workers[id].instance = inst
258                 wp.workers[id].updated = time.Now()
259                 if initialState == StateBooting && wp.workers[id].state == StateUnknown {
260                         wp.workers[id].state = StateBooting
261                 }
262                 return false
263         }
264         if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
265                 initialState = StateHold
266         }
267         wp.logger.WithFields(logrus.Fields{
268                 "InstanceType": it.Name,
269                 "Instance":     inst,
270                 "State":        initialState,
271         }).Infof("instance appeared in cloud")
272         now := time.Now()
273         wp.workers[id] = &worker{
274                 executor: wp.newExecutor(inst),
275                 state:    initialState,
276                 instance: inst,
277                 instType: it,
278                 probed:   now,
279                 busy:     now,
280                 updated:  now,
281                 running:  make(map[string]struct{}),
282                 starting: make(map[string]struct{}),
283                 probing:  make(chan struct{}, 1),
284         }
285         return true
286 }
287
288 // Shutdown shuts down a worker with the given type, or returns false
289 // if all workers with the given type are busy.
290 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
291         wp.setupOnce.Do(wp.setup)
292         wp.mtx.Lock()
293         defer wp.mtx.Unlock()
294         logger := wp.logger.WithField("InstanceType", it.Name)
295         logger.Info("shutdown requested")
296         for _, tryState := range []State{StateBooting, StateIdle} {
297                 // TODO: shutdown the worker with the longest idle
298                 // time (Idle) or the earliest create time (Booting)
299                 for _, wkr := range wp.workers {
300                         if wkr.state == tryState && wkr.instType == it {
301                                 logger = logger.WithField("Instance", wkr.instance)
302                                 logger.Info("shutting down")
303                                 wp.shutdown(wkr, logger)
304                                 return true
305                         }
306                 }
307         }
308         return false
309 }
310
311 // caller must have lock
312 func (wp *Pool) shutdown(wkr *worker, logger logrus.FieldLogger) {
313         wkr.updated = time.Now()
314         wkr.state = StateShutdown
315         go func() {
316                 err := wkr.instance.Destroy()
317                 if err != nil {
318                         logger.WithError(err).WithField("Instance", wkr.instance).Warn("shutdown failed")
319                         return
320                 }
321                 wp.mtx.Lock()
322                 wp.atQuotaUntil = time.Now()
323                 wp.mtx.Unlock()
324                 wp.notify()
325         }()
326 }
327
328 // Workers returns the current number of workers in each state.
329 func (wp *Pool) Workers() map[State]int {
330         wp.setupOnce.Do(wp.setup)
331         wp.mtx.Lock()
332         defer wp.mtx.Unlock()
333         r := map[State]int{}
334         for _, w := range wp.workers {
335                 r[w.state]++
336         }
337         return r
338 }
339
340 // Running returns the container UUIDs being prepared/run on workers.
341 func (wp *Pool) Running() map[string]time.Time {
342         wp.setupOnce.Do(wp.setup)
343         wp.mtx.Lock()
344         defer wp.mtx.Unlock()
345         r := map[string]time.Time{}
346         for _, wkr := range wp.workers {
347                 for uuid := range wkr.running {
348                         r[uuid] = time.Time{}
349                 }
350                 for uuid := range wkr.starting {
351                         r[uuid] = time.Time{}
352                 }
353         }
354         for uuid, exited := range wp.exited {
355                 r[uuid] = exited
356         }
357         return r
358 }
359
360 // StartContainer starts a container on an idle worker immediately if
361 // possible, otherwise returns false.
362 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
363         logger := wp.logger.WithFields(logrus.Fields{
364                 "InstanceType":  it.Name,
365                 "ContainerUUID": ctr.UUID,
366                 "Priority":      ctr.Priority,
367         })
368         wp.setupOnce.Do(wp.setup)
369         wp.mtx.Lock()
370         defer wp.mtx.Unlock()
371         var wkr *worker
372         for _, w := range wp.workers {
373                 if w.instType == it && w.state == StateIdle {
374                         if wkr == nil || w.busy.After(wkr.busy) {
375                                 wkr = w
376                         }
377                 }
378         }
379         if wkr == nil {
380                 return false
381         }
382         logger = logger.WithField("Instance", wkr.instance)
383         logger.Debug("starting container")
384         wkr.starting[ctr.UUID] = struct{}{}
385         wkr.state = StateRunning
386         go func() {
387                 stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
388                 wp.mtx.Lock()
389                 defer wp.mtx.Unlock()
390                 now := time.Now()
391                 wkr.updated = now
392                 wkr.busy = now
393                 delete(wkr.starting, ctr.UUID)
394                 wkr.running[ctr.UUID] = struct{}{}
395                 wkr.lastUUID = ctr.UUID
396                 if err != nil {
397                         logger.WithField("stdout", string(stdout)).
398                                 WithField("stderr", string(stderr)).
399                                 WithError(err).
400                                 Error("error starting crunch-run process")
401                         // Leave uuid in wkr.running, though: it's
402                         // possible the error was just a communication
403                         // failure and the process was in fact
404                         // started.  Wait for next probe to find out.
405                         return
406                 }
407                 logger.Info("crunch-run process started")
408                 wkr.lastUUID = ctr.UUID
409         }()
410         return true
411 }
412
413 // KillContainer kills the crunch-run process for the given container
414 // UUID, if it's running on any worker.
415 //
416 // KillContainer returns immediately; the act of killing the container
417 // takes some time, and runs in the background.
418 func (wp *Pool) KillContainer(uuid string) {
419         wp.mtx.Lock()
420         defer wp.mtx.Unlock()
421         if _, ok := wp.exited[uuid]; ok {
422                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
423                 delete(wp.exited, uuid)
424                 return
425         }
426         for _, wkr := range wp.workers {
427                 if _, ok := wkr.running[uuid]; ok {
428                         go wp.kill(wkr, uuid)
429                         return
430                 }
431         }
432         wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
433 }
434
435 func (wp *Pool) kill(wkr *worker, uuid string) {
436         logger := wp.logger.WithFields(logrus.Fields{
437                 "ContainerUUID": uuid,
438                 "Instance":      wkr.instance,
439         })
440         logger.Debug("killing process")
441         stdout, stderr, err := wkr.executor.Execute("crunch-run --kill "+uuid, nil)
442         if err != nil {
443                 logger.WithFields(logrus.Fields{
444                         "stderr": string(stderr),
445                         "stdout": string(stdout),
446                         "error":  err,
447                 }).Warn("kill failed")
448                 return
449         }
450         logger.Debug("killing process succeeded")
451         wp.mtx.Lock()
452         defer wp.mtx.Unlock()
453         if _, ok := wkr.running[uuid]; ok {
454                 delete(wkr.running, uuid)
455                 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
456                         wkr.state = StateIdle
457                 }
458                 wkr.updated = time.Now()
459                 go wp.notify()
460         }
461 }
462
463 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
464         if reg == nil {
465                 reg = prometheus.NewRegistry()
466         }
467         wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
468                 Namespace: "arvados",
469                 Subsystem: "dispatchcloud",
470                 Name:      "instances_total",
471                 Help:      "Number of cloud VMs including pending, booting, running, held, and shutting down.",
472         })
473         reg.MustRegister(wp.mInstances)
474         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
475                 Namespace: "arvados",
476                 Subsystem: "dispatchcloud",
477                 Name:      "containers_running",
478                 Help:      "Number of containers reported running by cloud VMs.",
479         })
480         reg.MustRegister(wp.mContainersRunning)
481
482         wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
483                 Namespace: "arvados",
484                 Subsystem: "dispatchcloud",
485                 Name:      "vcpus_total",
486                 Help:      "Total VCPUs on all cloud VMs.",
487         })
488         reg.MustRegister(wp.mVCPUs)
489         wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
490                 Namespace: "arvados",
491                 Subsystem: "dispatchcloud",
492                 Name:      "vcpus_inuse",
493                 Help:      "VCPUs on cloud VMs that are running containers.",
494         })
495         reg.MustRegister(wp.mVCPUsInuse)
496         wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
497                 Namespace: "arvados",
498                 Subsystem: "dispatchcloud",
499                 Name:      "memory_bytes_total",
500                 Help:      "Total memory on all cloud VMs.",
501         })
502         reg.MustRegister(wp.mMemory)
503         wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
504                 Namespace: "arvados",
505                 Subsystem: "dispatchcloud",
506                 Name:      "memory_bytes_inuse",
507                 Help:      "Memory on cloud VMs that are running containers.",
508         })
509         reg.MustRegister(wp.mMemoryInuse)
510 }
511
512 func (wp *Pool) runMetrics() {
513         ch := wp.Subscribe()
514         defer wp.Unsubscribe(ch)
515         for range ch {
516                 wp.updateMetrics()
517         }
518 }
519
520 func (wp *Pool) updateMetrics() {
521         wp.mtx.RLock()
522         defer wp.mtx.RUnlock()
523
524         var alloc, cpu, cpuInuse, mem, memInuse int64
525         for _, wkr := range wp.workers {
526                 cpu += int64(wkr.instType.VCPUs)
527                 mem += int64(wkr.instType.RAM)
528                 if len(wkr.running)+len(wkr.starting) == 0 {
529                         continue
530                 }
531                 alloc += int64(len(wkr.running) + len(wkr.starting))
532                 cpuInuse += int64(wkr.instType.VCPUs)
533                 memInuse += int64(wkr.instType.RAM)
534         }
535         wp.mInstances.Set(float64(len(wp.workers)))
536         wp.mContainersRunning.Set(float64(alloc))
537         wp.mVCPUs.Set(float64(cpu))
538         wp.mMemory.Set(float64(mem))
539         wp.mVCPUsInuse.Set(float64(cpuInuse))
540         wp.mMemoryInuse.Set(float64(memInuse))
541 }
542
543 func (wp *Pool) runProbes() {
544         maxPPS := wp.maxProbesPerSecond
545         if maxPPS < 1 {
546                 maxPPS = defaultMaxProbesPerSecond
547         }
548         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
549         defer limitticker.Stop()
550
551         probeticker := time.NewTicker(wp.probeInterval)
552         defer probeticker.Stop()
553
554         workers := []cloud.InstanceID{}
555         for range probeticker.C {
556                 workers = workers[:0]
557                 wp.mtx.Lock()
558                 for id, wkr := range wp.workers {
559                         if wkr.state == StateShutdown || wp.shutdownIfIdle(wkr) {
560                                 continue
561                         }
562                         workers = append(workers, id)
563                 }
564                 wp.mtx.Unlock()
565
566                 for _, id := range workers {
567                         wp.mtx.Lock()
568                         wkr, ok := wp.workers[id]
569                         wp.mtx.Unlock()
570                         if !ok || wkr.state == StateShutdown {
571                                 // Deleted/shutdown while we
572                                 // were probing others
573                                 continue
574                         }
575                         select {
576                         case wkr.probing <- struct{}{}:
577                                 go func() {
578                                         wp.probeAndUpdate(wkr)
579                                         <-wkr.probing
580                                 }()
581                         default:
582                                 wp.logger.WithField("Instance", wkr.instance).Debug("still waiting for last probe to finish")
583                         }
584                         select {
585                         case <-wp.stop:
586                                 return
587                         case <-limitticker.C:
588                         }
589                 }
590         }
591 }
592
593 func (wp *Pool) runSync() {
594         // sync once immediately, then wait syncInterval, sync again,
595         // etc.
596         timer := time.NewTimer(1)
597         for {
598                 select {
599                 case <-timer.C:
600                         err := wp.getInstancesAndSync()
601                         if err != nil {
602                                 wp.logger.WithError(err).Warn("sync failed")
603                         }
604                         timer.Reset(wp.syncInterval)
605                 case <-wp.stop:
606                         wp.logger.Debug("worker.Pool stopped")
607                         return
608                 }
609         }
610 }
611
612 // caller must have lock.
613 func (wp *Pool) shutdownIfBroken(wkr *worker, dur time.Duration) {
614         if wkr.state == StateHold {
615                 return
616         }
617         label, threshold := "", wp.timeoutProbe
618         if wkr.state == StateBooting {
619                 label, threshold = "new ", wp.timeoutBooting
620         }
621         if dur < threshold {
622                 return
623         }
624         wp.logger.WithFields(logrus.Fields{
625                 "Instance": wkr.instance,
626                 "Duration": dur,
627                 "Since":    wkr.probed,
628                 "State":    wkr.state,
629         }).Warnf("%sinstance unresponsive, shutting down", label)
630         wp.shutdown(wkr, wp.logger)
631 }
632
633 // caller must have lock.
634 func (wp *Pool) shutdownIfIdle(wkr *worker) bool {
635         if wkr.state != StateIdle {
636                 return false
637         }
638         age := time.Since(wkr.busy)
639         if age < wp.timeoutIdle {
640                 return false
641         }
642         logger := wp.logger.WithFields(logrus.Fields{
643                 "Age":      age,
644                 "Instance": wkr.instance,
645         })
646         logger.Info("shutdown idle worker")
647         wp.shutdown(wkr, logger)
648         return true
649 }
650
651 // Stop synchronizing with the InstanceSet.
652 func (wp *Pool) Stop() {
653         wp.setupOnce.Do(wp.setup)
654         close(wp.stop)
655 }
656
657 // Instances returns an InstanceView for each worker in the pool,
658 // summarizing its current state and recent activity.
659 func (wp *Pool) Instances() []InstanceView {
660         var r []InstanceView
661         wp.setupOnce.Do(wp.setup)
662         wp.mtx.Lock()
663         for _, w := range wp.workers {
664                 r = append(r, InstanceView{
665                         Instance:             w.instance.String(),
666                         Price:                w.instType.Price,
667                         ArvadosInstanceType:  w.instType.Name,
668                         ProviderInstanceType: w.instType.ProviderType,
669                         LastContainerUUID:    w.lastUUID,
670                         LastBusy:             w.busy,
671                         WorkerState:          w.state.String(),
672                 })
673         }
674         wp.mtx.Unlock()
675         sort.Slice(r, func(i, j int) bool {
676                 return strings.Compare(r[i].Instance, r[j].Instance) < 0
677         })
678         return r
679 }
680
681 func (wp *Pool) setup() {
682         wp.creating = map[arvados.InstanceType]int{}
683         wp.exited = map[string]time.Time{}
684         wp.workers = map[cloud.InstanceID]*worker{}
685         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
686 }
687
688 func (wp *Pool) notify() {
689         wp.mtx.RLock()
690         defer wp.mtx.RUnlock()
691         for _, send := range wp.subscribers {
692                 select {
693                 case send <- struct{}{}:
694                 default:
695                 }
696         }
697 }
698
699 func (wp *Pool) getInstancesAndSync() error {
700         wp.setupOnce.Do(wp.setup)
701         wp.logger.Debug("getting instance list")
702         threshold := time.Now()
703         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
704         if err != nil {
705                 return err
706         }
707         wp.sync(threshold, instances)
708         wp.logger.Debug("sync done")
709         return nil
710 }
711
712 // Add/remove/update workers based on instances, which was obtained
713 // from the instanceSet. However, don't clobber any other updates that
714 // already happened after threshold.
715 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
716         wp.mtx.Lock()
717         defer wp.mtx.Unlock()
718         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
719         notify := false
720
721         for _, inst := range instances {
722                 itTag := inst.Tags()[tagKeyInstanceType]
723                 it, ok := wp.instanceTypes[itTag]
724                 if !ok {
725                         wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
726                         continue
727                 }
728                 if wp.updateWorker(inst, it, StateUnknown) {
729                         notify = true
730                 }
731         }
732
733         for id, wkr := range wp.workers {
734                 if wkr.updated.After(threshold) {
735                         continue
736                 }
737                 logger := wp.logger.WithFields(logrus.Fields{
738                         "Instance":    wkr.instance,
739                         "WorkerState": wkr.state,
740                 })
741                 logger.Info("instance disappeared in cloud")
742                 delete(wp.workers, id)
743                 go wkr.executor.Close()
744                 notify = true
745         }
746
747         if !wp.loaded {
748                 wp.loaded = true
749                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
750         }
751
752         if notify {
753                 go wp.notify()
754         }
755 }
756
757 // should be called in a new goroutine
758 func (wp *Pool) probeAndUpdate(wkr *worker) {
759         logger := wp.logger.WithField("Instance", wkr.instance)
760         wp.mtx.Lock()
761         updated := wkr.updated
762         needProbeRunning := wkr.state == StateRunning || wkr.state == StateIdle
763         needProbeBooted := wkr.state == StateUnknown || wkr.state == StateBooting
764         wp.mtx.Unlock()
765         if !needProbeBooted && !needProbeRunning {
766                 return
767         }
768
769         var (
770                 ctrUUIDs []string
771                 ok       bool
772                 stderr   []byte
773         )
774         if needProbeBooted {
775                 ok, stderr = wp.probeBooted(wkr)
776                 wp.mtx.Lock()
777                 if ok || wkr.state == StateRunning || wkr.state == StateIdle {
778                         logger.Info("instance booted; will try probeRunning")
779                         needProbeRunning = true
780                 }
781                 wp.mtx.Unlock()
782         }
783         if needProbeRunning {
784                 ctrUUIDs, ok, stderr = wp.probeRunning(wkr)
785         }
786         logger = logger.WithField("stderr", string(stderr))
787         wp.mtx.Lock()
788         defer wp.mtx.Unlock()
789         if !ok {
790                 if wkr.state == StateShutdown && wkr.updated.After(updated) {
791                         // Skip the logging noise if shutdown was
792                         // initiated during probe.
793                         return
794                 }
795                 dur := time.Since(wkr.probed)
796                 logger := logger.WithFields(logrus.Fields{
797                         "Duration": dur,
798                         "State":    wkr.state,
799                 })
800                 if wkr.state == StateBooting {
801                         logger.Debug("new instance not responding")
802                 } else {
803                         logger.Info("instance not responding")
804                 }
805                 wp.shutdownIfBroken(wkr, dur)
806                 return
807         }
808
809         updateTime := time.Now()
810         wkr.probed = updateTime
811
812         if updated != wkr.updated {
813                 // Worker was updated after the probe began, so
814                 // wkr.running might have a container UUID that was
815                 // not yet running when ctrUUIDs was generated. Leave
816                 // wkr.running alone and wait for the next probe to
817                 // catch up on any changes.
818                 return
819         }
820
821         if len(ctrUUIDs) > 0 {
822                 wkr.busy = updateTime
823                 wkr.lastUUID = ctrUUIDs[0]
824         } else if len(wkr.running) > 0 {
825                 // Actual last-busy time was sometime between wkr.busy
826                 // and now. Now is the earliest opportunity to take
827                 // advantage of the non-busy state, though.
828                 wkr.busy = updateTime
829         }
830         running := map[string]struct{}{}
831         changed := false
832         for _, uuid := range ctrUUIDs {
833                 running[uuid] = struct{}{}
834                 if _, ok := wkr.running[uuid]; !ok {
835                         changed = true
836                 }
837         }
838         for uuid := range wkr.running {
839                 if _, ok := running[uuid]; !ok {
840                         logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
841                         wp.exited[uuid] = updateTime
842                         changed = true
843                 }
844         }
845         if wkr.state == StateUnknown || wkr.state == StateBooting {
846                 wkr.state = StateIdle
847                 changed = true
848         }
849         if changed {
850                 wkr.running = running
851                 if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
852                         wkr.state = StateRunning
853                 } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
854                         wkr.state = StateIdle
855                 }
856                 wkr.updated = updateTime
857                 go wp.notify()
858         }
859 }
860
861 func (wp *Pool) probeRunning(wkr *worker) (running []string, ok bool, stderr []byte) {
862         cmd := "crunch-run --list"
863         stdout, stderr, err := wkr.executor.Execute(cmd, nil)
864         if err != nil {
865                 wp.logger.WithFields(logrus.Fields{
866                         "Instance": wkr.instance,
867                         "Command":  cmd,
868                         "stdout":   string(stdout),
869                         "stderr":   string(stderr),
870                 }).WithError(err).Warn("probe failed")
871                 return nil, false, stderr
872         }
873         stdout = bytes.TrimRight(stdout, "\n")
874         if len(stdout) == 0 {
875                 return nil, true, stderr
876         }
877         return strings.Split(string(stdout), "\n"), true, stderr
878 }
879
880 func (wp *Pool) probeBooted(wkr *worker) (ok bool, stderr []byte) {
881         cmd := wp.bootProbeCommand
882         if cmd == "" {
883                 cmd = "true"
884         }
885         stdout, stderr, err := wkr.executor.Execute(cmd, nil)
886         logger := wp.logger.WithFields(logrus.Fields{
887                 "Instance": wkr.instance,
888                 "Command":  cmd,
889                 "stdout":   string(stdout),
890                 "stderr":   string(stderr),
891         })
892         if err != nil {
893                 logger.WithError(err).Debug("boot probe failed")
894                 return false, stderr
895         }
896         logger.Info("boot probe succeeded")
897         return true, stderr
898 }