14360: Add "idle" state.
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "io"
10         "sort"
11         "strings"
12         "sync"
13         "time"
14
15         "git.curoverse.com/arvados.git/lib/cloud"
16         "git.curoverse.com/arvados.git/sdk/go/arvados"
17         "github.com/Sirupsen/logrus"
18         "github.com/prometheus/client_golang/prometheus"
19 )
20
21 const (
22         tagKeyInstanceType = "InstanceType"
23         tagKeyHold         = "Hold"
24 )
25
26 // An InstanceView shows a worker's current state and recent activity.
27 type InstanceView struct {
28         Instance             string
29         Price                float64
30         ArvadosInstanceType  string
31         ProviderInstanceType string
32         LastContainerUUID    string
33         LastBusy             time.Time
34         WorkerState          string
35 }
36
37 // An Executor executes shell commands on a remote host.
38 type Executor interface {
39         // Run cmd on the current target.
40         Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
41
42         // Use the given target for subsequent operations. The new
43         // target is the same host as the previous target, but it
44         // might return a different address and verify a different
45         // host key.
46         //
47         // SetTarget is called frequently, and in most cases the new
48         // target will behave exactly the same as the old one. An
49         // implementation should optimize accordingly.
50         //
51         // SetTarget must not block on concurrent Execute calls.
52         SetTarget(cloud.ExecutorTarget)
53
54         Close()
55 }
56
57 const (
58         defaultSyncInterval       = time.Minute
59         defaultProbeInterval      = time.Second * 10
60         defaultMaxProbesPerSecond = 10
61         defaultTimeoutIdle        = time.Minute
62         defaultTimeoutBooting     = time.Minute * 10
63         defaultTimeoutProbe       = time.Minute * 10
64 )
65
66 func duration(conf arvados.Duration, def time.Duration) time.Duration {
67         if conf > 0 {
68                 return time.Duration(conf)
69         } else {
70                 return def
71         }
72 }
73
74 // NewPool creates a Pool of workers backed by instanceSet.
75 //
76 // New instances are configured and set up according to the given
77 // cluster configuration.
78 func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
79         wp := &Pool{
80                 logger:             logger,
81                 instanceSet:        instanceSet,
82                 newExecutor:        newExecutor,
83                 bootProbeCommand:   cluster.CloudVMs.BootProbeCommand,
84                 imageID:            cloud.ImageID(cluster.CloudVMs.ImageID),
85                 instanceTypes:      cluster.InstanceTypes,
86                 maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
87                 probeInterval:      duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
88                 syncInterval:       duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
89                 timeoutIdle:        duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
90                 timeoutBooting:     duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
91                 timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
92         }
93         wp.registerMetrics(reg)
94         go func() {
95                 wp.setupOnce.Do(wp.setup)
96                 go wp.runMetrics()
97                 go wp.runProbes()
98                 go wp.runSync()
99         }()
100         return wp
101 }
102
103 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
104 // zero Pool should not be used. Call NewPool to create a new Pool.
105 type Pool struct {
106         // configuration
107         logger             logrus.FieldLogger
108         instanceSet        cloud.InstanceSet
109         newExecutor        func(cloud.Instance) Executor
110         bootProbeCommand   string
111         imageID            cloud.ImageID
112         instanceTypes      map[string]arvados.InstanceType
113         syncInterval       time.Duration
114         probeInterval      time.Duration
115         maxProbesPerSecond int
116         timeoutIdle        time.Duration
117         timeoutBooting     time.Duration
118         timeoutProbe       time.Duration
119
120         // private state
121         subscribers  map[<-chan struct{}]chan<- struct{}
122         creating     map[arvados.InstanceType]int // goroutines waiting for (InstanceSet)Create to return
123         workers      map[cloud.InstanceID]*worker
124         loaded       bool                 // loaded list of instances from InstanceSet at least once
125         exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
126         atQuotaUntil time.Time
127         atQuotaErr   cloud.QuotaError
128         stop         chan bool
129         mtx          sync.RWMutex
130         setupOnce    sync.Once
131
132         mInstances         prometheus.Gauge
133         mContainersRunning prometheus.Gauge
134         mVCPUs             prometheus.Gauge
135         mVCPUsInuse        prometheus.Gauge
136         mMemory            prometheus.Gauge
137         mMemoryInuse       prometheus.Gauge
138 }
139
140 type worker struct {
141         state    State
142         instance cloud.Instance
143         executor Executor
144         instType arvados.InstanceType
145         vcpus    int64
146         memory   int64
147         probed   time.Time
148         updated  time.Time
149         busy     time.Time
150         lastUUID string
151         running  map[string]struct{} // remember to update state idle<->running when this changes
152         starting map[string]struct{} // remember to update state idle<->running when this changes
153         probing  chan struct{}
154 }
155
156 // Subscribe returns a channel that becomes ready whenever a worker's
157 // state changes.
158 //
159 // Example:
160 //
161 //      ch := wp.Subscribe()
162 //      defer wp.Unsubscribe(ch)
163 //      for range ch {
164 //              // ...try scheduling some work...
165 //              if done {
166 //                      break
167 //              }
168 //      }
169 func (wp *Pool) Subscribe() <-chan struct{} {
170         wp.setupOnce.Do(wp.setup)
171         wp.mtx.Lock()
172         defer wp.mtx.Unlock()
173         ch := make(chan struct{}, 1)
174         wp.subscribers[ch] = ch
175         return ch
176 }
177
178 // Unsubscribe stops sending updates to the given channel.
179 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
180         wp.setupOnce.Do(wp.setup)
181         wp.mtx.Lock()
182         defer wp.mtx.Unlock()
183         delete(wp.subscribers, ch)
184 }
185
186 // Unallocated returns the number of unallocated (creating + booting +
187 // idle + unknown) workers for each instance type.
188 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
189         wp.setupOnce.Do(wp.setup)
190         wp.mtx.RLock()
191         defer wp.mtx.RUnlock()
192         u := map[arvados.InstanceType]int{}
193         for it, c := range wp.creating {
194                 u[it] = c
195         }
196         for _, wkr := range wp.workers {
197                 if wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown {
198                         u[wkr.instType]++
199                 }
200         }
201         return u
202 }
203
204 // Create a new instance with the given type, and add it to the worker
205 // pool. The worker is added immediately; instance creation runs in
206 // the background.
207 func (wp *Pool) Create(it arvados.InstanceType) error {
208         logger := wp.logger.WithField("InstanceType", it.Name)
209         wp.setupOnce.Do(wp.setup)
210         wp.mtx.Lock()
211         defer wp.mtx.Unlock()
212         if time.Now().Before(wp.atQuotaUntil) {
213                 return wp.atQuotaErr
214         }
215         tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
216         wp.creating[it]++
217         go func() {
218                 defer wp.notify()
219                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
220                 wp.mtx.Lock()
221                 defer wp.mtx.Unlock()
222                 wp.creating[it]--
223                 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
224                         wp.atQuotaErr = err
225                         wp.atQuotaUntil = time.Now().Add(time.Minute)
226                 }
227                 if err != nil {
228                         logger.WithError(err).Error("create failed")
229                         return
230                 }
231                 wp.updateWorker(inst, it, StateBooting)
232         }()
233         return nil
234 }
235
236 // AtQuota returns true if Create is not expected to work at the
237 // moment.
238 func (wp *Pool) AtQuota() bool {
239         wp.mtx.Lock()
240         defer wp.mtx.Unlock()
241         return time.Now().Before(wp.atQuotaUntil)
242 }
243
244 // Add or update worker attached to the given instance. Use
245 // initialState if a new worker is created. Caller must have lock.
246 //
247 // Returns true when a new worker is created.
248 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) bool {
249         id := inst.ID()
250         if wp.workers[id] != nil {
251                 wp.workers[id].executor.SetTarget(inst)
252                 wp.workers[id].instance = inst
253                 wp.workers[id].updated = time.Now()
254                 if initialState == StateBooting && wp.workers[id].state == StateUnknown {
255                         wp.workers[id].state = StateBooting
256                 }
257                 return false
258         }
259         if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
260                 initialState = StateHold
261         }
262         wp.logger.WithFields(logrus.Fields{
263                 "InstanceType": it.Name,
264                 "Instance":     inst,
265                 "State":        initialState,
266         }).Infof("instance appeared in cloud")
267         now := time.Now()
268         wp.workers[id] = &worker{
269                 executor: wp.newExecutor(inst),
270                 state:    initialState,
271                 instance: inst,
272                 instType: it,
273                 probed:   now,
274                 busy:     now,
275                 updated:  now,
276                 running:  make(map[string]struct{}),
277                 starting: make(map[string]struct{}),
278                 probing:  make(chan struct{}, 1),
279         }
280         return true
281 }
282
283 // Shutdown shuts down a worker with the given type, or returns false
284 // if all workers with the given type are busy.
285 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
286         wp.setupOnce.Do(wp.setup)
287         wp.mtx.Lock()
288         defer wp.mtx.Unlock()
289         logger := wp.logger.WithField("InstanceType", it.Name)
290         logger.Info("shutdown requested")
291         for _, tryState := range []State{StateBooting, StateIdle} {
292                 // TODO: shutdown the worker with the longest idle
293                 // time (Idle) or the earliest create time (Booting)
294                 for _, wkr := range wp.workers {
295                         if wkr.state == tryState && wkr.instType == it {
296                                 logger = logger.WithField("Instance", wkr.instance)
297                                 logger.Info("shutting down")
298                                 wp.shutdown(wkr, logger)
299                                 return true
300                         }
301                 }
302         }
303         return false
304 }
305
306 // caller must have lock
307 func (wp *Pool) shutdown(wkr *worker, logger logrus.FieldLogger) {
308         wkr.updated = time.Now()
309         wkr.state = StateShutdown
310         go func() {
311                 err := wkr.instance.Destroy()
312                 if err != nil {
313                         logger.WithError(err).WithField("Instance", wkr.instance).Warn("shutdown failed")
314                         return
315                 }
316                 wp.mtx.Lock()
317                 wp.atQuotaUntil = time.Now()
318                 wp.mtx.Unlock()
319                 wp.notify()
320         }()
321 }
322
323 // Workers returns the current number of workers in each state.
324 func (wp *Pool) Workers() map[State]int {
325         wp.setupOnce.Do(wp.setup)
326         wp.mtx.Lock()
327         defer wp.mtx.Unlock()
328         r := map[State]int{}
329         for _, w := range wp.workers {
330                 r[w.state]++
331         }
332         return r
333 }
334
335 // Running returns the container UUIDs being prepared/run on workers.
336 func (wp *Pool) Running() map[string]time.Time {
337         wp.setupOnce.Do(wp.setup)
338         wp.mtx.Lock()
339         defer wp.mtx.Unlock()
340         r := map[string]time.Time{}
341         for _, wkr := range wp.workers {
342                 for uuid := range wkr.running {
343                         r[uuid] = time.Time{}
344                 }
345                 for uuid := range wkr.starting {
346                         r[uuid] = time.Time{}
347                 }
348         }
349         for uuid, exited := range wp.exited {
350                 r[uuid] = exited
351         }
352         return r
353 }
354
355 // StartContainer starts a container on an idle worker immediately if
356 // possible, otherwise returns false.
357 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
358         logger := wp.logger.WithFields(logrus.Fields{
359                 "InstanceType":  it.Name,
360                 "ContainerUUID": ctr.UUID,
361                 "Priority":      ctr.Priority,
362         })
363         wp.setupOnce.Do(wp.setup)
364         wp.mtx.Lock()
365         defer wp.mtx.Unlock()
366         var wkr *worker
367         for _, w := range wp.workers {
368                 if w.instType == it && w.state == StateIdle {
369                         if wkr == nil || w.busy.After(wkr.busy) {
370                                 wkr = w
371                         }
372                 }
373         }
374         if wkr == nil {
375                 return false
376         }
377         logger = logger.WithField("Instance", wkr.instance)
378         logger.Debug("starting container")
379         wkr.starting[ctr.UUID] = struct{}{}
380         wkr.state = StateRunning
381         go func() {
382                 stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
383                 wp.mtx.Lock()
384                 defer wp.mtx.Unlock()
385                 now := time.Now()
386                 wkr.updated = now
387                 wkr.busy = now
388                 delete(wkr.starting, ctr.UUID)
389                 wkr.running[ctr.UUID] = struct{}{}
390                 wkr.lastUUID = ctr.UUID
391                 if err != nil {
392                         logger.WithField("stdout", string(stdout)).
393                                 WithField("stderr", string(stderr)).
394                                 WithError(err).
395                                 Error("error starting crunch-run process")
396                         // Leave uuid in wkr.running, though: it's
397                         // possible the error was just a communication
398                         // failure and the process was in fact
399                         // started.  Wait for next probe to find out.
400                         return
401                 }
402                 logger.Info("crunch-run process started")
403                 wkr.lastUUID = ctr.UUID
404         }()
405         return true
406 }
407
408 // KillContainer kills the crunch-run process for the given container
409 // UUID, if it's running on any worker.
410 //
411 // KillContainer returns immediately; the act of killing the container
412 // takes some time, and runs in the background.
413 func (wp *Pool) KillContainer(uuid string) {
414         wp.mtx.Lock()
415         defer wp.mtx.Unlock()
416         if _, ok := wp.exited[uuid]; ok {
417                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
418                 delete(wp.exited, uuid)
419                 return
420         }
421         for _, wkr := range wp.workers {
422                 if _, ok := wkr.running[uuid]; ok {
423                         go wp.kill(wkr, uuid)
424                         return
425                 }
426         }
427         wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
428 }
429
430 func (wp *Pool) kill(wkr *worker, uuid string) {
431         logger := wp.logger.WithFields(logrus.Fields{
432                 "ContainerUUID": uuid,
433                 "Instance":      wkr.instance,
434         })
435         logger.Debug("killing process")
436         stdout, stderr, err := wkr.executor.Execute("crunch-run --kill "+uuid, nil)
437         if err != nil {
438                 logger.WithFields(logrus.Fields{
439                         "stderr": string(stderr),
440                         "stdout": string(stdout),
441                         "error":  err,
442                 }).Warn("kill failed")
443                 return
444         }
445         logger.Debug("killing process succeeded")
446         wp.mtx.Lock()
447         defer wp.mtx.Unlock()
448         if _, ok := wkr.running[uuid]; ok {
449                 delete(wkr.running, uuid)
450                 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
451                         wkr.state = StateIdle
452                 }
453                 wkr.updated = time.Now()
454                 go wp.notify()
455         }
456 }
457
458 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
459         if reg == nil {
460                 reg = prometheus.NewRegistry()
461         }
462         wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
463                 Namespace: "arvados",
464                 Subsystem: "dispatchcloud",
465                 Name:      "instances_total",
466                 Help:      "Number of cloud VMs including pending, booting, running, held, and shutting down.",
467         })
468         reg.MustRegister(wp.mInstances)
469         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
470                 Namespace: "arvados",
471                 Subsystem: "dispatchcloud",
472                 Name:      "containers_running",
473                 Help:      "Number of containers reported running by cloud VMs.",
474         })
475         reg.MustRegister(wp.mContainersRunning)
476
477         wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
478                 Namespace: "arvados",
479                 Subsystem: "dispatchcloud",
480                 Name:      "vcpus_total",
481                 Help:      "Total VCPUs on all cloud VMs.",
482         })
483         reg.MustRegister(wp.mVCPUs)
484         wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
485                 Namespace: "arvados",
486                 Subsystem: "dispatchcloud",
487                 Name:      "vcpus_inuse",
488                 Help:      "VCPUs on cloud VMs that are running containers.",
489         })
490         reg.MustRegister(wp.mVCPUsInuse)
491         wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
492                 Namespace: "arvados",
493                 Subsystem: "dispatchcloud",
494                 Name:      "memory_bytes_total",
495                 Help:      "Total memory on all cloud VMs.",
496         })
497         reg.MustRegister(wp.mMemory)
498         wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
499                 Namespace: "arvados",
500                 Subsystem: "dispatchcloud",
501                 Name:      "memory_bytes_inuse",
502                 Help:      "Memory on cloud VMs that are running containers.",
503         })
504         reg.MustRegister(wp.mMemoryInuse)
505 }
506
507 func (wp *Pool) runMetrics() {
508         ch := wp.Subscribe()
509         defer wp.Unsubscribe(ch)
510         for range ch {
511                 wp.updateMetrics()
512         }
513 }
514
515 func (wp *Pool) updateMetrics() {
516         wp.mtx.RLock()
517         defer wp.mtx.RUnlock()
518
519         var alloc, cpu, cpuInuse, mem, memInuse int64
520         for _, wkr := range wp.workers {
521                 cpu += int64(wkr.instType.VCPUs)
522                 mem += int64(wkr.instType.RAM)
523                 if len(wkr.running)+len(wkr.starting) == 0 {
524                         continue
525                 }
526                 alloc += int64(len(wkr.running) + len(wkr.starting))
527                 cpuInuse += int64(wkr.instType.VCPUs)
528                 memInuse += int64(wkr.instType.RAM)
529         }
530         wp.mInstances.Set(float64(len(wp.workers)))
531         wp.mContainersRunning.Set(float64(alloc))
532         wp.mVCPUs.Set(float64(cpu))
533         wp.mMemory.Set(float64(mem))
534         wp.mVCPUsInuse.Set(float64(cpuInuse))
535         wp.mMemoryInuse.Set(float64(memInuse))
536 }
537
538 func (wp *Pool) runProbes() {
539         maxPPS := wp.maxProbesPerSecond
540         if maxPPS < 1 {
541                 maxPPS = defaultMaxProbesPerSecond
542         }
543         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
544         defer limitticker.Stop()
545
546         probeticker := time.NewTicker(wp.probeInterval)
547         defer probeticker.Stop()
548
549         workers := []cloud.InstanceID{}
550         for range probeticker.C {
551                 workers = workers[:0]
552                 wp.mtx.Lock()
553                 for id, wkr := range wp.workers {
554                         if wkr.state == StateShutdown || wp.shutdownIfIdle(wkr) {
555                                 continue
556                         }
557                         workers = append(workers, id)
558                 }
559                 wp.mtx.Unlock()
560
561                 for _, id := range workers {
562                         wp.mtx.Lock()
563                         wkr, ok := wp.workers[id]
564                         wp.mtx.Unlock()
565                         if !ok || wkr.state == StateShutdown {
566                                 // Deleted/shutdown while we
567                                 // were probing others
568                                 continue
569                         }
570                         select {
571                         case wkr.probing <- struct{}{}:
572                                 go func() {
573                                         wp.probeAndUpdate(wkr)
574                                         <-wkr.probing
575                                 }()
576                         default:
577                                 wp.logger.WithField("Instance", wkr.instance).Debug("still waiting for last probe to finish")
578                         }
579                         select {
580                         case <-wp.stop:
581                                 return
582                         case <-limitticker.C:
583                         }
584                 }
585         }
586 }
587
588 func (wp *Pool) runSync() {
589         // sync once immediately, then wait syncInterval, sync again,
590         // etc.
591         timer := time.NewTimer(1)
592         for {
593                 select {
594                 case <-timer.C:
595                         err := wp.getInstancesAndSync()
596                         if err != nil {
597                                 wp.logger.WithError(err).Warn("sync failed")
598                         }
599                         timer.Reset(wp.syncInterval)
600                 case <-wp.stop:
601                         wp.logger.Debug("worker.Pool stopped")
602                         return
603                 }
604         }
605 }
606
607 // caller must have lock.
608 func (wp *Pool) shutdownIfBroken(wkr *worker, dur time.Duration) {
609         if wkr.state == StateHold {
610                 return
611         }
612         label, threshold := "", wp.timeoutProbe
613         if wkr.state == StateBooting {
614                 label, threshold = "new ", wp.timeoutBooting
615         }
616         if dur < threshold {
617                 return
618         }
619         wp.logger.WithFields(logrus.Fields{
620                 "Instance": wkr.instance,
621                 "Duration": dur,
622                 "Since":    wkr.probed,
623                 "State":    wkr.state,
624         }).Warnf("%sinstance unresponsive, shutting down", label)
625         wp.shutdown(wkr, wp.logger)
626 }
627
628 // caller must have lock.
629 func (wp *Pool) shutdownIfIdle(wkr *worker) bool {
630         if wkr.state != StateIdle {
631                 return false
632         }
633         age := time.Since(wkr.busy)
634         if age < wp.timeoutIdle {
635                 return false
636         }
637         logger := wp.logger.WithFields(logrus.Fields{
638                 "Age":      age,
639                 "Instance": wkr.instance,
640         })
641         logger.Info("shutdown idle worker")
642         wp.shutdown(wkr, logger)
643         return true
644 }
645
646 // Stop synchronizing with the InstanceSet.
647 func (wp *Pool) Stop() {
648         wp.setupOnce.Do(wp.setup)
649         close(wp.stop)
650 }
651
652 // Instances returns an InstanceView for each worker in the pool,
653 // summarizing its current state and recent activity.
654 func (wp *Pool) Instances() []InstanceView {
655         var r []InstanceView
656         wp.setupOnce.Do(wp.setup)
657         wp.mtx.Lock()
658         for _, w := range wp.workers {
659                 r = append(r, InstanceView{
660                         Instance:             w.instance.String(),
661                         Price:                w.instType.Price,
662                         ArvadosInstanceType:  w.instType.Name,
663                         ProviderInstanceType: w.instType.ProviderType,
664                         LastContainerUUID:    w.lastUUID,
665                         LastBusy:             w.busy,
666                         WorkerState:          w.state.String(),
667                 })
668         }
669         wp.mtx.Unlock()
670         sort.Slice(r, func(i, j int) bool {
671                 return strings.Compare(r[i].Instance, r[j].Instance) < 0
672         })
673         return r
674 }
675
676 func (wp *Pool) setup() {
677         wp.creating = map[arvados.InstanceType]int{}
678         wp.exited = map[string]time.Time{}
679         wp.workers = map[cloud.InstanceID]*worker{}
680         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
681 }
682
683 func (wp *Pool) notify() {
684         wp.mtx.RLock()
685         defer wp.mtx.RUnlock()
686         for _, send := range wp.subscribers {
687                 select {
688                 case send <- struct{}{}:
689                 default:
690                 }
691         }
692 }
693
694 func (wp *Pool) getInstancesAndSync() error {
695         wp.setupOnce.Do(wp.setup)
696         wp.logger.Debug("getting instance list")
697         threshold := time.Now()
698         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
699         if err != nil {
700                 return err
701         }
702         wp.sync(threshold, instances)
703         wp.logger.Debug("sync done")
704         return nil
705 }
706
707 // Add/remove/update workers based on instances, which was obtained
708 // from the instanceSet. However, don't clobber any other updates that
709 // already happened after threshold.
710 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
711         wp.mtx.Lock()
712         defer wp.mtx.Unlock()
713         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
714         notify := false
715
716         for _, inst := range instances {
717                 itTag := inst.Tags()[tagKeyInstanceType]
718                 it, ok := wp.instanceTypes[itTag]
719                 if !ok {
720                         wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
721                         continue
722                 }
723                 if wp.updateWorker(inst, it, StateUnknown) {
724                         notify = true
725                 }
726         }
727
728         for id, wkr := range wp.workers {
729                 if wkr.updated.After(threshold) {
730                         continue
731                 }
732                 logger := wp.logger.WithFields(logrus.Fields{
733                         "Instance":    wkr.instance,
734                         "WorkerState": wkr.state,
735                 })
736                 logger.Info("instance disappeared in cloud")
737                 delete(wp.workers, id)
738                 go wkr.executor.Close()
739                 notify = true
740         }
741
742         if !wp.loaded {
743                 wp.loaded = true
744                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
745         }
746
747         if notify {
748                 go wp.notify()
749         }
750 }
751
752 // should be called in a new goroutine
753 func (wp *Pool) probeAndUpdate(wkr *worker) {
754         logger := wp.logger.WithField("Instance", wkr.instance)
755         wp.mtx.Lock()
756         updated := wkr.updated
757         needProbeRunning := wkr.state == StateRunning || wkr.state == StateIdle
758         needProbeBooted := wkr.state == StateUnknown || wkr.state == StateBooting
759         wp.mtx.Unlock()
760         if !needProbeBooted && !needProbeRunning {
761                 return
762         }
763
764         var (
765                 ctrUUIDs []string
766                 ok       bool
767                 stderr   []byte
768         )
769         if needProbeBooted {
770                 ok, stderr = wp.probeBooted(wkr)
771                 wp.mtx.Lock()
772                 if ok || wkr.state == StateRunning || wkr.state == StateIdle {
773                         logger.Info("instance booted; will try probeRunning")
774                         needProbeRunning = true
775                 }
776                 wp.mtx.Unlock()
777         }
778         if needProbeRunning {
779                 ctrUUIDs, ok, stderr = wp.probeRunning(wkr)
780         }
781         logger = logger.WithField("stderr", string(stderr))
782         wp.mtx.Lock()
783         defer wp.mtx.Unlock()
784         if !ok {
785                 if wkr.state == StateShutdown && wkr.updated.After(updated) {
786                         // Skip the logging noise if shutdown was
787                         // initiated during probe.
788                         return
789                 }
790                 dur := time.Since(wkr.probed)
791                 logger := logger.WithFields(logrus.Fields{
792                         "Duration": dur,
793                         "State":    wkr.state,
794                 })
795                 if wkr.state == StateBooting {
796                         logger.Debug("new instance not responding")
797                 } else {
798                         logger.Info("instance not responding")
799                 }
800                 wp.shutdownIfBroken(wkr, dur)
801                 return
802         }
803
804         updateTime := time.Now()
805         wkr.probed = updateTime
806
807         if updated != wkr.updated {
808                 // Worker was updated after the probe began, so
809                 // wkr.running might have a container UUID that was
810                 // not yet running when ctrUUIDs was generated. Leave
811                 // wkr.running alone and wait for the next probe to
812                 // catch up on any changes.
813                 return
814         }
815
816         if len(ctrUUIDs) > 0 {
817                 wkr.busy = updateTime
818                 wkr.lastUUID = ctrUUIDs[0]
819         } else if len(wkr.running) > 0 {
820                 // Actual last-busy time was sometime between wkr.busy
821                 // and now. Now is the earliest opportunity to take
822                 // advantage of the non-busy state, though.
823                 wkr.busy = updateTime
824         }
825         running := map[string]struct{}{}
826         changed := false
827         for _, uuid := range ctrUUIDs {
828                 running[uuid] = struct{}{}
829                 if _, ok := wkr.running[uuid]; !ok {
830                         changed = true
831                 }
832         }
833         for uuid := range wkr.running {
834                 if _, ok := running[uuid]; !ok {
835                         logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
836                         wp.exited[uuid] = updateTime
837                         changed = true
838                 }
839         }
840         if wkr.state == StateUnknown || wkr.state == StateBooting {
841                 wkr.state = StateIdle
842                 changed = true
843         }
844         if changed {
845                 wkr.running = running
846                 if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
847                         wkr.state = StateRunning
848                 } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
849                         wkr.state = StateIdle
850                 }
851                 wkr.updated = updateTime
852                 go wp.notify()
853         }
854 }
855
856 func (wp *Pool) probeRunning(wkr *worker) (running []string, ok bool, stderr []byte) {
857         cmd := "crunch-run --list"
858         stdout, stderr, err := wkr.executor.Execute(cmd, nil)
859         if err != nil {
860                 wp.logger.WithFields(logrus.Fields{
861                         "Instance": wkr.instance,
862                         "Command":  cmd,
863                         "stdout":   string(stdout),
864                         "stderr":   string(stderr),
865                 }).WithError(err).Warn("probe failed")
866                 return nil, false, stderr
867         }
868         stdout = bytes.TrimRight(stdout, "\n")
869         if len(stdout) == 0 {
870                 return nil, true, stderr
871         }
872         return strings.Split(string(stdout), "\n"), true, stderr
873 }
874
875 func (wp *Pool) probeBooted(wkr *worker) (ok bool, stderr []byte) {
876         cmd := wp.bootProbeCommand
877         if cmd == "" {
878                 cmd = "true"
879         }
880         stdout, stderr, err := wkr.executor.Execute(cmd, nil)
881         logger := wp.logger.WithFields(logrus.Fields{
882                 "Instance": wkr.instance,
883                 "Command":  cmd,
884                 "stdout":   string(stdout),
885                 "stderr":   string(stderr),
886         })
887         if err != nil {
888                 logger.WithError(err).Debug("boot probe failed")
889                 return false, stderr
890         }
891         logger.Info("boot probe succeeded")
892         return true, stderr
893 }