14360: Cut excess time.Now() calls.
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "io"
10         "sort"
11         "strings"
12         "sync"
13         "time"
14
15         "git.curoverse.com/arvados.git/lib/cloud"
16         "git.curoverse.com/arvados.git/sdk/go/arvados"
17         "github.com/Sirupsen/logrus"
18         "github.com/prometheus/client_golang/prometheus"
19 )
20
21 const (
22         tagKeyInstanceType = "InstanceType"
23         tagKeyHold         = "Hold"
24 )
25
26 // A View shows a worker's current state and recent activity.
27 type View struct {
28         Instance             string
29         Price                float64
30         ArvadosInstanceType  string
31         ProviderInstanceType string
32         LastContainerUUID    string
33         Unallocated          time.Time
34         WorkerState          string
35 }
36
37 // An Executor executes shell commands on a remote host.
38 type Executor interface {
39         // Run cmd on the current target.
40         Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
41
42         // Use the given target for subsequent operations. The new
43         // target is the same host as the previous target, but it
44         // might return a different address and verify a different
45         // host key.
46         //
47         // SetTarget is called frequently, and in most cases the new
48         // target will behave exactly the same as the old one. An
49         // implementation should optimize accordingly.
50         //
51         // SetTarget must not block on concurrent Execute calls.
52         SetTarget(cloud.ExecutorTarget)
53
54         Close()
55 }
56
57 const (
58         defaultSyncInterval       = time.Minute
59         defaultProbeInterval      = time.Second * 10
60         defaultMaxProbesPerSecond = 10
61         defaultTimeoutIdle        = time.Minute
62         defaultTimeoutBooting     = time.Minute * 10
63         defaultTimeoutProbe       = time.Minute * 10
64 )
65
66 func duration(conf arvados.Duration, def time.Duration) time.Duration {
67         if conf > 0 {
68                 return time.Duration(conf)
69         } else {
70                 return def
71         }
72 }
73
74 // NewPool creates a Pool of workers backed by instanceSet.
75 //
76 // New instances are configured and set up according to the given
77 // cluster configuration.
78 func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
79         wp := &Pool{
80                 logger:             logger,
81                 instanceSet:        instanceSet,
82                 newExecutor:        newExecutor,
83                 bootProbeCommand:   cluster.CloudVMs.BootProbeCommand,
84                 imageID:            cloud.ImageID(cluster.CloudVMs.ImageID),
85                 instanceTypes:      cluster.InstanceTypes,
86                 maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
87                 probeInterval:      duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
88                 syncInterval:       duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
89                 timeoutIdle:        duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
90                 timeoutBooting:     duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
91                 timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
92         }
93         wp.registerMetrics(reg)
94         go func() {
95                 wp.setupOnce.Do(wp.setup)
96                 go wp.runMetrics()
97                 go wp.runProbes()
98                 go wp.runSync()
99         }()
100         return wp
101 }
102
103 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
104 // zero Pool should not be used. Call NewPool to create a new Pool.
105 type Pool struct {
106         // configuration
107         logger             logrus.FieldLogger
108         instanceSet        cloud.InstanceSet
109         newExecutor        func(cloud.Instance) Executor
110         bootProbeCommand   string
111         imageID            cloud.ImageID
112         instanceTypes      map[string]arvados.InstanceType
113         syncInterval       time.Duration
114         probeInterval      time.Duration
115         maxProbesPerSecond int
116         timeoutIdle        time.Duration
117         timeoutBooting     time.Duration
118         timeoutProbe       time.Duration
119
120         // private state
121         subscribers  map[<-chan struct{}]chan<- struct{}
122         creating     map[arvados.InstanceType]int // goroutines waiting for (InstanceSet)Create to return
123         workers      map[cloud.InstanceID]*worker
124         loaded       bool                 // loaded list of instances from InstanceSet at least once
125         exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
126         atQuotaUntil time.Time
127         stop         chan bool
128         mtx          sync.RWMutex
129         setupOnce    sync.Once
130
131         mInstances         prometheus.Gauge
132         mContainersRunning prometheus.Gauge
133         mVCPUs             prometheus.Gauge
134         mVCPUsInuse        prometheus.Gauge
135         mMemory            prometheus.Gauge
136         mMemoryInuse       prometheus.Gauge
137 }
138
139 type worker struct {
140         state       State
141         instance    cloud.Instance
142         executor    Executor
143         instType    arvados.InstanceType
144         vcpus       int64
145         memory      int64
146         booted      bool
147         probed      time.Time
148         updated     time.Time
149         busy        time.Time
150         unallocated time.Time
151         lastUUID    string
152         running     map[string]struct{}
153         starting    map[string]struct{}
154         probing     chan struct{}
155 }
156
157 // Subscribe returns a channel that becomes ready whenever a worker's
158 // state changes.
159 //
160 // Example:
161 //
162 //      ch := wp.Subscribe()
163 //      defer wp.Unsubscribe(ch)
164 //      for range ch {
165 //              // ...try scheduling some work...
166 //              if done {
167 //                      break
168 //              }
169 //      }
170 func (wp *Pool) Subscribe() <-chan struct{} {
171         wp.setupOnce.Do(wp.setup)
172         wp.mtx.Lock()
173         defer wp.mtx.Unlock()
174         ch := make(chan struct{}, 1)
175         wp.subscribers[ch] = ch
176         return ch
177 }
178
179 // Unsubscribe stops sending updates to the given channel.
180 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
181         wp.setupOnce.Do(wp.setup)
182         wp.mtx.Lock()
183         defer wp.mtx.Unlock()
184         delete(wp.subscribers, ch)
185 }
186
187 // Unallocated returns the number of unallocated (creating + booting +
188 // idle + unknown) workers for each instance type.
189 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
190         wp.setupOnce.Do(wp.setup)
191         wp.mtx.RLock()
192         defer wp.mtx.RUnlock()
193         u := map[arvados.InstanceType]int{}
194         for it, c := range wp.creating {
195                 u[it] = c
196         }
197         for _, wkr := range wp.workers {
198                 if len(wkr.running)+len(wkr.starting) == 0 && (wkr.state == StateRunning || wkr.state == StateBooting || wkr.state == StateUnknown) {
199                         u[wkr.instType]++
200                 }
201         }
202         return u
203 }
204
205 // Create a new instance with the given type, and add it to the worker
206 // pool. The worker is added immediately; instance creation runs in
207 // the background.
208 func (wp *Pool) Create(it arvados.InstanceType) error {
209         logger := wp.logger.WithField("InstanceType", it.Name)
210         wp.setupOnce.Do(wp.setup)
211         wp.mtx.Lock()
212         defer wp.mtx.Unlock()
213         tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
214         wp.creating[it]++
215         go func() {
216                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
217                 wp.mtx.Lock()
218                 defer wp.mtx.Unlock()
219                 wp.creating[it]--
220                 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
221                         wp.atQuotaUntil = time.Now().Add(time.Minute)
222                 }
223                 if err != nil {
224                         logger.WithError(err).Error("create failed")
225                         go wp.notify()
226                         return
227                 }
228                 wp.updateWorker(inst, it, StateBooting)
229         }()
230         return nil
231 }
232
233 // AtQuota returns true if Create is not expected to work at the
234 // moment.
235 func (wp *Pool) AtQuota() bool {
236         return time.Now().Before(wp.atQuotaUntil)
237 }
238
239 // Add or update worker attached to the given instance. Use
240 // initialState if a new worker is created. Caller must have lock.
241 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) {
242         id := inst.ID()
243         if wp.workers[id] != nil {
244                 wp.workers[id].executor.SetTarget(inst)
245                 wp.workers[id].instance = inst
246                 wp.workers[id].updated = time.Now()
247                 if initialState == StateBooting && wp.workers[id].state == StateUnknown {
248                         wp.workers[id].state = StateBooting
249                 }
250                 return
251         }
252         if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
253                 initialState = StateHold
254         }
255         wp.logger.WithFields(logrus.Fields{
256                 "InstanceType": it.Name,
257                 "Instance":     inst,
258                 "State":        initialState,
259         }).Infof("instance appeared in cloud")
260         now := time.Now()
261         wp.workers[id] = &worker{
262                 executor:    wp.newExecutor(inst),
263                 state:       initialState,
264                 instance:    inst,
265                 instType:    it,
266                 probed:      now,
267                 busy:        now,
268                 updated:     now,
269                 unallocated: now,
270                 running:     make(map[string]struct{}),
271                 starting:    make(map[string]struct{}),
272                 probing:     make(chan struct{}, 1),
273         }
274         go wp.notify()
275 }
276
277 // Shutdown shuts down a worker with the given type, or returns false
278 // if all workers with the given type are busy.
279 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
280         wp.setupOnce.Do(wp.setup)
281         wp.mtx.Lock()
282         defer wp.mtx.Unlock()
283         logger := wp.logger.WithField("InstanceType", it.Name)
284         logger.Info("shutdown requested")
285         for _, tryState := range []State{StateBooting, StateRunning} {
286                 // TODO: shutdown the worker with the longest idle
287                 // time (Running) or the earliest create time
288                 // (Booting)
289                 for _, wkr := range wp.workers {
290                         if wkr.state != tryState || len(wkr.running)+len(wkr.starting) > 0 {
291                                 continue
292                         }
293                         if wkr.instType != it {
294                                 continue
295                         }
296                         logger = logger.WithField("Instance", wkr.instance)
297                         logger.Info("shutting down")
298                         wp.shutdown(wkr, logger)
299                         return true
300                 }
301         }
302         return false
303 }
304
305 // caller must have lock
306 func (wp *Pool) shutdown(wkr *worker, logger logrus.FieldLogger) {
307         wkr.updated = time.Now()
308         wkr.state = StateShutdown
309         go func() {
310                 err := wkr.instance.Destroy()
311                 if err != nil {
312                         logger.WithError(err).Warn("shutdown failed")
313                         return
314                 }
315                 wp.mtx.Lock()
316                 wp.atQuotaUntil = time.Now()
317                 wp.mtx.Unlock()
318                 wp.notify()
319         }()
320 }
321
322 // Workers returns the current number of workers in each state.
323 func (wp *Pool) Workers() map[State]int {
324         wp.setupOnce.Do(wp.setup)
325         wp.mtx.Lock()
326         defer wp.mtx.Unlock()
327         r := map[State]int{}
328         for _, w := range wp.workers {
329                 r[w.state]++
330         }
331         return r
332 }
333
334 // Running returns the container UUIDs being prepared/run on workers.
335 func (wp *Pool) Running() map[string]time.Time {
336         wp.setupOnce.Do(wp.setup)
337         wp.mtx.Lock()
338         defer wp.mtx.Unlock()
339         r := map[string]time.Time{}
340         for _, wkr := range wp.workers {
341                 for uuid := range wkr.running {
342                         r[uuid] = time.Time{}
343                 }
344                 for uuid := range wkr.starting {
345                         r[uuid] = time.Time{}
346                 }
347         }
348         for uuid, exited := range wp.exited {
349                 r[uuid] = exited
350         }
351         return r
352 }
353
354 // StartContainer starts a container on an idle worker immediately if
355 // possible, otherwise returns false.
356 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
357         logger := wp.logger.WithFields(logrus.Fields{
358                 "InstanceType":  it.Name,
359                 "ContainerUUID": ctr.UUID,
360                 "Priority":      ctr.Priority,
361         })
362         wp.setupOnce.Do(wp.setup)
363         wp.mtx.Lock()
364         defer wp.mtx.Unlock()
365         var wkr *worker
366         for _, w := range wp.workers {
367                 if w.instType == it && w.state == StateRunning && len(w.running)+len(w.starting) == 0 {
368                         if wkr == nil || w.busy.After(wkr.busy) {
369                                 wkr = w
370                         }
371                 }
372         }
373         if wkr == nil {
374                 return false
375         }
376         logger = logger.WithField("Instance", wkr.instance)
377         logger.Debug("starting container")
378         wkr.starting[ctr.UUID] = struct{}{}
379         go func() {
380                 stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
381                 wp.mtx.Lock()
382                 defer wp.mtx.Unlock()
383                 wkr.updated = time.Now()
384                 delete(wkr.starting, ctr.UUID)
385                 wkr.running[ctr.UUID] = struct{}{}
386                 if err != nil {
387                         logger.WithField("stdout", string(stdout)).
388                                 WithField("stderr", string(stderr)).
389                                 WithError(err).
390                                 Error("error starting crunch-run process")
391                         // Leave uuid in wkr.running, though: it's
392                         // possible the error was just a communication
393                         // failure and the process was in fact
394                         // started.  Wait for next probe to find out.
395                         return
396                 }
397                 logger.Info("crunch-run process started")
398                 wkr.lastUUID = ctr.UUID
399         }()
400         return true
401 }
402
403 // KillContainer kills the crunch-run process for the given container
404 // UUID, if it's running on any worker.
405 func (wp *Pool) KillContainer(uuid string) {
406         wp.mtx.Lock()
407         defer wp.mtx.Unlock()
408         if _, ok := wp.exited[uuid]; ok {
409                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
410                 delete(wp.exited, uuid)
411                 return
412         }
413         for _, wkr := range wp.workers {
414                 if _, ok := wkr.running[uuid]; ok {
415                         go wp.kill(wkr, uuid)
416                         return
417                 }
418         }
419         wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
420 }
421
422 func (wp *Pool) kill(wkr *worker, uuid string) {
423         logger := wp.logger.WithFields(logrus.Fields{
424                 "ContainerUUID": uuid,
425                 "Instance":      wkr.instance,
426         })
427         logger.Debug("killing process")
428         stdout, stderr, err := wkr.executor.Execute("crunch-run --kill "+uuid, nil)
429         if err != nil {
430                 logger.WithFields(logrus.Fields{
431                         "stderr": string(stderr),
432                         "stdout": string(stdout),
433                         "error":  err,
434                 }).Warn("kill failed")
435                 return
436         }
437         logger.Debug("killing process succeeded")
438         wp.mtx.Lock()
439         defer wp.mtx.Unlock()
440         if _, ok := wkr.running[uuid]; ok {
441                 delete(wkr.running, uuid)
442                 wkr.updated = time.Now()
443                 go wp.notify()
444         }
445 }
446
447 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
448         if reg == nil {
449                 reg = prometheus.NewRegistry()
450         }
451         wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
452                 Namespace: "arvados",
453                 Subsystem: "dispatchcloud",
454                 Name:      "instances_total",
455                 Help:      "Number of cloud VMs including pending, booting, running, held, and shutting down.",
456         })
457         reg.MustRegister(wp.mInstances)
458         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
459                 Namespace: "arvados",
460                 Subsystem: "dispatchcloud",
461                 Name:      "containers_running",
462                 Help:      "Number of containers reported running by cloud VMs.",
463         })
464         reg.MustRegister(wp.mContainersRunning)
465
466         wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
467                 Namespace: "arvados",
468                 Subsystem: "dispatchcloud",
469                 Name:      "vcpus_total",
470                 Help:      "Total VCPUs on all cloud VMs.",
471         })
472         reg.MustRegister(wp.mVCPUs)
473         wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
474                 Namespace: "arvados",
475                 Subsystem: "dispatchcloud",
476                 Name:      "vcpus_inuse",
477                 Help:      "VCPUs on cloud VMs that are running containers.",
478         })
479         reg.MustRegister(wp.mVCPUsInuse)
480         wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
481                 Namespace: "arvados",
482                 Subsystem: "dispatchcloud",
483                 Name:      "memory_bytes_total",
484                 Help:      "Total memory on all cloud VMs.",
485         })
486         reg.MustRegister(wp.mMemory)
487         wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
488                 Namespace: "arvados",
489                 Subsystem: "dispatchcloud",
490                 Name:      "memory_bytes_inuse",
491                 Help:      "Memory on cloud VMs that are running containers.",
492         })
493         reg.MustRegister(wp.mMemoryInuse)
494 }
495
496 func (wp *Pool) runMetrics() {
497         ch := wp.Subscribe()
498         defer wp.Unsubscribe(ch)
499         for range ch {
500                 wp.updateMetrics()
501         }
502 }
503
504 func (wp *Pool) updateMetrics() {
505         wp.mtx.RLock()
506         defer wp.mtx.RUnlock()
507
508         var alloc, cpu, cpuInuse, mem, memInuse int64
509         for _, wkr := range wp.workers {
510                 cpu += int64(wkr.instType.VCPUs)
511                 mem += int64(wkr.instType.RAM)
512                 if len(wkr.running)+len(wkr.starting) == 0 {
513                         continue
514                 }
515                 alloc += int64(len(wkr.running) + len(wkr.starting))
516                 cpuInuse += int64(wkr.instType.VCPUs)
517                 memInuse += int64(wkr.instType.RAM)
518         }
519         wp.mInstances.Set(float64(len(wp.workers)))
520         wp.mContainersRunning.Set(float64(alloc))
521         wp.mVCPUs.Set(float64(cpu))
522         wp.mMemory.Set(float64(mem))
523         wp.mVCPUsInuse.Set(float64(cpuInuse))
524         wp.mMemoryInuse.Set(float64(memInuse))
525 }
526
527 func (wp *Pool) runProbes() {
528         maxPPS := wp.maxProbesPerSecond
529         if maxPPS < 1 {
530                 maxPPS = defaultMaxProbesPerSecond
531         }
532         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
533         defer limitticker.Stop()
534
535         probeticker := time.NewTicker(wp.probeInterval)
536         defer probeticker.Stop()
537
538         workers := []cloud.InstanceID{}
539         for range probeticker.C {
540                 workers = workers[:0]
541                 wp.mtx.Lock()
542                 for id, wkr := range wp.workers {
543                         if wkr.state == StateShutdown || wp.autoShutdown(wkr) {
544                                 continue
545                         }
546                         workers = append(workers, id)
547                 }
548                 wp.mtx.Unlock()
549
550                 for _, id := range workers {
551                         wp.mtx.Lock()
552                         wkr, ok := wp.workers[id]
553                         wp.mtx.Unlock()
554                         if !ok || wkr.state == StateShutdown {
555                                 // Deleted/shutdown while we
556                                 // were probing others
557                                 continue
558                         }
559                         select {
560                         case wkr.probing <- struct{}{}:
561                                 go func() {
562                                         wp.probeAndUpdate(wkr)
563                                         <-wkr.probing
564                                 }()
565                         default:
566                                 wp.logger.WithField("Instance", wkr.instance).Debug("still waiting for last probe to finish")
567                         }
568                         select {
569                         case <-wp.stop:
570                                 return
571                         case <-limitticker.C:
572                         }
573                 }
574         }
575 }
576
577 func (wp *Pool) runSync() {
578         // sync once immediately, then wait syncInterval, sync again,
579         // etc.
580         timer := time.NewTimer(1)
581         for {
582                 select {
583                 case <-timer.C:
584                         err := wp.getInstancesAndSync()
585                         if err != nil {
586                                 wp.logger.WithError(err).Warn("sync failed")
587                         }
588                         timer.Reset(wp.syncInterval)
589                 case <-wp.stop:
590                         wp.logger.Debug("worker.Pool stopped")
591                         return
592                 }
593         }
594 }
595
596 // caller must have lock.
597 func (wp *Pool) autoShutdown(wkr *worker) bool {
598         if len(wkr.running)+len(wkr.starting) > 0 || wkr.state != StateRunning {
599                 return false
600         }
601         age := time.Since(wkr.unallocated)
602         if age < wp.timeoutIdle {
603                 return false
604         }
605         logger := wp.logger.WithFields(logrus.Fields{
606                 "Age":      age,
607                 "Instance": wkr.instance,
608         })
609         logger.Info("shutdown idle worker")
610         wp.shutdown(wkr, logger)
611         return true
612 }
613
614 // Stop synchronizing with the InstanceSet.
615 func (wp *Pool) Stop() {
616         wp.setupOnce.Do(wp.setup)
617         close(wp.stop)
618 }
619
620 // View reports status information for every worker in the pool.
621 func (wp *Pool) View() []View {
622         var r []View
623         wp.setupOnce.Do(wp.setup)
624         wp.mtx.Lock()
625         for _, w := range wp.workers {
626                 r = append(r, View{
627                         Instance:             w.instance.String(),
628                         Price:                w.instType.Price,
629                         ArvadosInstanceType:  w.instType.Name,
630                         ProviderInstanceType: w.instType.ProviderType,
631                         LastContainerUUID:    w.lastUUID,
632                         Unallocated:          w.unallocated,
633                         WorkerState:          w.state.String(),
634                 })
635         }
636         wp.mtx.Unlock()
637         sort.Slice(r, func(i, j int) bool {
638                 return strings.Compare(r[i].Instance, r[j].Instance) < 0
639         })
640         return r
641 }
642
643 func (wp *Pool) setup() {
644         wp.creating = map[arvados.InstanceType]int{}
645         wp.exited = map[string]time.Time{}
646         wp.workers = map[cloud.InstanceID]*worker{}
647         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
648 }
649
650 func (wp *Pool) notify() {
651         wp.mtx.RLock()
652         defer wp.mtx.RUnlock()
653         for _, send := range wp.subscribers {
654                 select {
655                 case send <- struct{}{}:
656                 default:
657                 }
658         }
659 }
660
661 func (wp *Pool) getInstancesAndSync() error {
662         wp.setupOnce.Do(wp.setup)
663         wp.logger.Debug("getting instance list")
664         threshold := time.Now()
665         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
666         if err != nil {
667                 return err
668         }
669         wp.sync(threshold, instances)
670         wp.logger.Debug("sync done")
671         return nil
672 }
673
674 // Add/remove/update workers based on instances, which was obtained
675 // from the instanceSet. However, don't clobber any other updates that
676 // already happened after threshold.
677 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
678         wp.mtx.Lock()
679         defer wp.mtx.Unlock()
680         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
681
682         for _, inst := range instances {
683                 itTag := inst.Tags()[tagKeyInstanceType]
684                 it, ok := wp.instanceTypes[itTag]
685                 if !ok {
686                         wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
687                         continue
688                 }
689                 wp.updateWorker(inst, it, StateUnknown)
690         }
691
692         for id, wkr := range wp.workers {
693                 if wkr.updated.After(threshold) {
694                         continue
695                 }
696                 logger := wp.logger.WithFields(logrus.Fields{
697                         "Instance":    wkr.instance,
698                         "WorkerState": wkr.state,
699                 })
700                 logger.Info("instance disappeared in cloud")
701                 delete(wp.workers, id)
702                 go wkr.executor.Close()
703                 go wp.notify()
704         }
705
706         if !wp.loaded {
707                 wp.loaded = true
708                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
709         }
710 }
711
712 // should be called in a new goroutine
713 func (wp *Pool) probeAndUpdate(wkr *worker) {
714         logger := wp.logger.WithField("Instance", wkr.instance)
715         wp.mtx.Lock()
716         updated := wkr.updated
717         booted := wkr.booted
718         wp.mtx.Unlock()
719
720         var (
721                 ctrUUIDs []string
722                 ok       bool
723                 stderr   []byte
724         )
725         if !booted {
726                 booted, stderr = wp.probeBooted(wkr)
727                 wp.mtx.Lock()
728                 if booted && !wkr.booted {
729                         wkr.booted = booted
730                         logger.Info("instance booted")
731                 } else {
732                         booted = wkr.booted
733                 }
734                 wp.mtx.Unlock()
735         }
736         if booted {
737                 ctrUUIDs, ok, stderr = wp.probeRunning(wkr)
738         }
739         logger = logger.WithField("stderr", string(stderr))
740         wp.mtx.Lock()
741         defer wp.mtx.Unlock()
742         if !ok {
743                 if wkr.state == StateShutdown {
744                         return
745                 }
746                 dur := time.Since(wkr.probed)
747                 logger := logger.WithFields(logrus.Fields{
748                         "Duration": dur,
749                         "State":    wkr.state,
750                 })
751                 if wkr.state == StateBooting {
752                         logger.Debug("new instance not responding")
753                 } else {
754                         logger.Info("instance not responding")
755                 }
756
757                 if wkr.state == StateHold {
758                         return
759                 }
760
761                 label, threshold := "", wp.timeoutProbe
762                 if wkr.state == StateBooting {
763                         label, threshold = "new ", wp.timeoutBooting
764                 }
765                 if dur > threshold {
766                         logger.WithField("Since", wkr.probed).Warnf("%sinstance unresponsive, shutting down", label)
767                         wp.shutdown(wkr, logger)
768                 }
769                 return
770         }
771
772         updateTime := time.Now()
773         wkr.probed = updateTime
774         if len(ctrUUIDs) > 0 {
775                 wkr.busy = updateTime
776                 wkr.lastUUID = ctrUUIDs[0]
777         }
778         if wkr.state == StateShutdown || wkr.state == StateHold {
779         } else if booted {
780                 if wkr.state != StateRunning {
781                         wkr.state = StateRunning
782                         go wp.notify()
783                 }
784         } else {
785                 wkr.state = StateBooting
786         }
787
788         if updated != wkr.updated {
789                 // Worker was updated (e.g., by starting a new
790                 // container) after the probe began. Avoid clobbering
791                 // those changes with the probe results.
792                 return
793         }
794
795         if len(ctrUUIDs) == 0 && len(wkr.running) > 0 {
796                 wkr.unallocated = updateTime
797         }
798         running := map[string]struct{}{}
799         changed := false
800         for _, uuid := range ctrUUIDs {
801                 running[uuid] = struct{}{}
802                 if _, ok := wkr.running[uuid]; !ok {
803                         changed = true
804                 }
805         }
806         for uuid := range wkr.running {
807                 if _, ok := running[uuid]; !ok {
808                         logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
809                         wp.exited[uuid] = updateTime
810                         changed = true
811                 }
812         }
813         if changed {
814                 wkr.running = running
815                 wkr.updated = updateTime
816                 go wp.notify()
817         }
818 }
819
820 func (wp *Pool) probeRunning(wkr *worker) (running []string, ok bool, stderr []byte) {
821         cmd := "crunch-run --list"
822         stdout, stderr, err := wkr.executor.Execute(cmd, nil)
823         if err != nil {
824                 wp.logger.WithFields(logrus.Fields{
825                         "Instance": wkr.instance,
826                         "Command":  cmd,
827                         "stdout":   string(stdout),
828                         "stderr":   string(stderr),
829                 }).WithError(err).Warn("probe failed")
830                 return nil, false, stderr
831         }
832         stdout = bytes.TrimRight(stdout, "\n")
833         if len(stdout) == 0 {
834                 return nil, true, stderr
835         }
836         return strings.Split(string(stdout), "\n"), true, stderr
837 }
838
839 func (wp *Pool) probeBooted(wkr *worker) (ok bool, stderr []byte) {
840         cmd := wp.bootProbeCommand
841         if cmd == "" {
842                 cmd = "true"
843         }
844         stdout, stderr, err := wkr.executor.Execute(cmd, nil)
845         logger := wp.logger.WithFields(logrus.Fields{
846                 "Instance": wkr.instance,
847                 "Command":  cmd,
848                 "stdout":   string(stdout),
849                 "stderr":   string(stderr),
850         })
851         if err != nil {
852                 logger.WithError(err).Debug("boot probe failed")
853                 return false, stderr
854         }
855         logger.Info("boot probe succeeded")
856         return true, stderr
857 }