14325: Wake up scheduler when quota timeout expires.
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "errors"
9         "io"
10         "sort"
11         "strings"
12         "sync"
13         "time"
14
15         "git.curoverse.com/arvados.git/lib/cloud"
16         "git.curoverse.com/arvados.git/sdk/go/arvados"
17         "github.com/prometheus/client_golang/prometheus"
18         "github.com/sirupsen/logrus"
19 )
20
21 const (
22         tagKeyInstanceType = "InstanceType"
23         tagKeyIdleBehavior = "IdleBehavior"
24 )
25
26 // An InstanceView shows a worker's current state and recent activity.
27 type InstanceView struct {
28         Instance             cloud.InstanceID `json:"instance"`
29         Price                float64          `json:"price"`
30         ArvadosInstanceType  string           `json:"arvados_instance_type"`
31         ProviderInstanceType string           `json:"provider_instance_type"`
32         LastContainerUUID    string           `json:"last_container_uuid"`
33         LastBusy             time.Time        `json:"last_busy"`
34         WorkerState          string           `json:"worker_state"`
35         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
36 }
37
38 // An Executor executes shell commands on a remote host.
39 type Executor interface {
40         // Run cmd on the current target.
41         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
42
43         // Use the given target for subsequent operations. The new
44         // target is the same host as the previous target, but it
45         // might return a different address and verify a different
46         // host key.
47         //
48         // SetTarget is called frequently, and in most cases the new
49         // target will behave exactly the same as the old one. An
50         // implementation should optimize accordingly.
51         //
52         // SetTarget must not block on concurrent Execute calls.
53         SetTarget(cloud.ExecutorTarget)
54
55         Close()
56 }
57
58 const (
59         defaultSyncInterval       = time.Minute
60         defaultProbeInterval      = time.Second * 10
61         defaultMaxProbesPerSecond = 10
62         defaultTimeoutIdle        = time.Minute
63         defaultTimeoutBooting     = time.Minute * 10
64         defaultTimeoutProbe       = time.Minute * 10
65         defaultTimeoutShutdown    = time.Second * 10
66
67         // Time after a quota error to try again anyway, even if no
68         // instances have been shutdown.
69         quotaErrorTTL = time.Minute
70
71         // Time between "X failed because rate limiting" messages
72         logRateLimitErrorInterval = time.Second * 10
73 )
74
75 func duration(conf arvados.Duration, def time.Duration) time.Duration {
76         if conf > 0 {
77                 return time.Duration(conf)
78         } else {
79                 return def
80         }
81 }
82
83 // NewPool creates a Pool of workers backed by instanceSet.
84 //
85 // New instances are configured and set up according to the given
86 // cluster configuration.
87 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
88         wp := &Pool{
89                 logger:             logger,
90                 arvClient:          arvClient,
91                 instanceSet:        &throttledInstanceSet{InstanceSet: instanceSet},
92                 newExecutor:        newExecutor,
93                 bootProbeCommand:   cluster.CloudVMs.BootProbeCommand,
94                 imageID:            cloud.ImageID(cluster.CloudVMs.ImageID),
95                 instanceTypes:      cluster.InstanceTypes,
96                 maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
97                 probeInterval:      duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
98                 syncInterval:       duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
99                 timeoutIdle:        duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
100                 timeoutBooting:     duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
101                 timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
102                 timeoutShutdown:    duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
103                 stop:               make(chan bool),
104         }
105         wp.registerMetrics(reg)
106         go func() {
107                 wp.setupOnce.Do(wp.setup)
108                 go wp.runMetrics()
109                 go wp.runProbes()
110                 go wp.runSync()
111         }()
112         return wp
113 }
114
115 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
116 // zero Pool should not be used. Call NewPool to create a new Pool.
117 type Pool struct {
118         // configuration
119         logger             logrus.FieldLogger
120         arvClient          *arvados.Client
121         instanceSet        *throttledInstanceSet
122         newExecutor        func(cloud.Instance) Executor
123         bootProbeCommand   string
124         imageID            cloud.ImageID
125         instanceTypes      map[string]arvados.InstanceType
126         syncInterval       time.Duration
127         probeInterval      time.Duration
128         maxProbesPerSecond int
129         timeoutIdle        time.Duration
130         timeoutBooting     time.Duration
131         timeoutProbe       time.Duration
132         timeoutShutdown    time.Duration
133
134         // private state
135         subscribers  map[<-chan struct{}]chan<- struct{}
136         creating     map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls
137         workers      map[cloud.InstanceID]*worker
138         loaded       bool                 // loaded list of instances from InstanceSet at least once
139         exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
140         atQuotaUntil time.Time
141         atQuotaErr   cloud.QuotaError
142         stop         chan bool
143         mtx          sync.RWMutex
144         setupOnce    sync.Once
145
146         throttleCreate    throttle
147         throttleInstances throttle
148
149         mInstances         prometheus.Gauge
150         mInstancesPrice    prometheus.Gauge
151         mContainersRunning prometheus.Gauge
152         mVCPUs             prometheus.Gauge
153         mVCPUsInuse        prometheus.Gauge
154         mMemory            prometheus.Gauge
155         mMemoryInuse       prometheus.Gauge
156 }
157
158 // Subscribe returns a buffered channel that becomes ready after any
159 // change to the pool's state that could have scheduling implications:
160 // a worker's state changes, a new worker appears, the cloud
161 // provider's API rate limiting period ends, etc.
162 //
163 // Additional events that occur while the channel is already ready
164 // will be dropped, so it is OK if the caller services the channel
165 // slowly.
166 //
167 // Example:
168 //
169 //      ch := wp.Subscribe()
170 //      defer wp.Unsubscribe(ch)
171 //      for range ch {
172 //              tryScheduling(wp)
173 //              if done {
174 //                      break
175 //              }
176 //      }
177 func (wp *Pool) Subscribe() <-chan struct{} {
178         wp.setupOnce.Do(wp.setup)
179         wp.mtx.Lock()
180         defer wp.mtx.Unlock()
181         ch := make(chan struct{}, 1)
182         wp.subscribers[ch] = ch
183         return ch
184 }
185
186 // Unsubscribe stops sending updates to the given channel.
187 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
188         wp.setupOnce.Do(wp.setup)
189         wp.mtx.Lock()
190         defer wp.mtx.Unlock()
191         delete(wp.subscribers, ch)
192 }
193
194 // Unallocated returns the number of unallocated (creating + booting +
195 // idle + unknown) workers for each instance type.  Workers in
196 // hold/drain mode are not included.
197 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
198         wp.setupOnce.Do(wp.setup)
199         wp.mtx.RLock()
200         defer wp.mtx.RUnlock()
201         unalloc := map[arvados.InstanceType]int{}
202         creating := map[arvados.InstanceType]int{}
203         for it, times := range wp.creating {
204                 creating[it] = len(times)
205         }
206         for _, wkr := range wp.workers {
207                 if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) || wkr.idleBehavior != IdleBehaviorRun {
208                         continue
209                 }
210                 it := wkr.instType
211                 unalloc[it]++
212                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) {
213                         // If up to N new workers appear in
214                         // Instances() while we are waiting for N
215                         // Create() calls to complete, we assume we're
216                         // just seeing a race between Instances() and
217                         // Create() responses.
218                         //
219                         // The other common reason why nodes have
220                         // state==Unknown is that they appeared at
221                         // startup, before any Create calls. They
222                         // don't match the above timing condition, so
223                         // we never mistakenly attribute them to
224                         // pending Create calls.
225                         creating[it]--
226                 }
227         }
228         for it, c := range creating {
229                 unalloc[it] += c
230         }
231         return unalloc
232 }
233
234 // Create a new instance with the given type, and add it to the worker
235 // pool. The worker is added immediately; instance creation runs in
236 // the background.
237 //
238 // Create returns false if a pre-existing error state prevents it from
239 // even attempting to create a new instance. Those errors are logged
240 // by the Pool, so the caller does not need to log anything in such
241 // cases.
242 func (wp *Pool) Create(it arvados.InstanceType) bool {
243         logger := wp.logger.WithField("InstanceType", it.Name)
244         wp.setupOnce.Do(wp.setup)
245         wp.mtx.Lock()
246         defer wp.mtx.Unlock()
247         if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
248                 return false
249         }
250         tags := cloud.InstanceTags{
251                 tagKeyInstanceType: it.Name,
252                 tagKeyIdleBehavior: string(IdleBehaviorRun),
253         }
254         now := time.Now()
255         wp.creating[it] = append(wp.creating[it], now)
256         go func() {
257                 defer wp.notify()
258                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
259                 wp.mtx.Lock()
260                 defer wp.mtx.Unlock()
261                 // Remove our timestamp marker from wp.creating
262                 for i, t := range wp.creating[it] {
263                         if t == now {
264                                 copy(wp.creating[it][i:], wp.creating[it][i+1:])
265                                 wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1]
266                                 break
267                         }
268                 }
269                 if err != nil {
270                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
271                                 wp.atQuotaErr = err
272                                 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
273                                 time.AfterFunc(quotaErrorTTL, wp.notify)
274                         }
275                         logger.WithError(err).Error("create failed")
276                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
277                         return
278                 }
279                 wp.updateWorker(inst, it, StateBooting)
280         }()
281         return true
282 }
283
284 // AtQuota returns true if Create is not expected to work at the
285 // moment.
286 func (wp *Pool) AtQuota() bool {
287         wp.mtx.Lock()
288         defer wp.mtx.Unlock()
289         return time.Now().Before(wp.atQuotaUntil)
290 }
291
292 // SetIdleBehavior determines how the indicated instance will behave
293 // when it has no containers running.
294 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
295         wp.mtx.Lock()
296         defer wp.mtx.Unlock()
297         wkr, ok := wp.workers[id]
298         if !ok {
299                 return errors.New("requested instance does not exist")
300         }
301         wkr.idleBehavior = idleBehavior
302         wkr.saveTags()
303         wkr.shutdownIfIdle()
304         return nil
305 }
306
307 // Add or update worker attached to the given instance. Use
308 // initialState if a new worker is created.
309 //
310 // The second return value is true if a new worker is created.
311 //
312 // Caller must have lock.
313 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
314         id := inst.ID()
315         if wkr := wp.workers[id]; wkr != nil {
316                 wkr.executor.SetTarget(inst)
317                 wkr.instance = inst
318                 wkr.updated = time.Now()
319                 if initialState == StateBooting && wkr.state == StateUnknown {
320                         wkr.state = StateBooting
321                 }
322                 wkr.saveTags()
323                 return wkr, false
324         }
325
326         // If an instance has a valid IdleBehavior tag when it first
327         // appears, initialize the new worker accordingly (this is how
328         // we restore IdleBehavior that was set by a prior dispatch
329         // process); otherwise, default to "run". After this,
330         // wkr.idleBehavior is the source of truth, and will only be
331         // changed via SetIdleBehavior().
332         idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior])
333         if !validIdleBehavior[idleBehavior] {
334                 idleBehavior = IdleBehaviorRun
335         }
336
337         logger := wp.logger.WithFields(logrus.Fields{
338                 "InstanceType": it.Name,
339                 "Instance":     inst,
340         })
341         logger.WithFields(logrus.Fields{
342                 "State":        initialState,
343                 "IdleBehavior": idleBehavior,
344         }).Infof("instance appeared in cloud")
345         now := time.Now()
346         wkr := &worker{
347                 mtx:          &wp.mtx,
348                 wp:           wp,
349                 logger:       logger,
350                 executor:     wp.newExecutor(inst),
351                 state:        initialState,
352                 idleBehavior: idleBehavior,
353                 instance:     inst,
354                 instType:     it,
355                 appeared:     now,
356                 probed:       now,
357                 busy:         now,
358                 updated:      now,
359                 running:      make(map[string]struct{}),
360                 starting:     make(map[string]struct{}),
361                 probing:      make(chan struct{}, 1),
362         }
363         wp.workers[id] = wkr
364         return wkr, true
365 }
366
367 // caller must have lock.
368 func (wp *Pool) notifyExited(uuid string, t time.Time) {
369         wp.exited[uuid] = t
370 }
371
372 // Shutdown shuts down a worker with the given type, or returns false
373 // if all workers with the given type are busy.
374 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
375         wp.setupOnce.Do(wp.setup)
376         wp.mtx.Lock()
377         defer wp.mtx.Unlock()
378         logger := wp.logger.WithField("InstanceType", it.Name)
379         logger.Info("shutdown requested")
380         for _, tryState := range []State{StateBooting, StateIdle} {
381                 // TODO: shutdown the worker with the longest idle
382                 // time (Idle) or the earliest create time (Booting)
383                 for _, wkr := range wp.workers {
384                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
385                                 logger.WithField("Instance", wkr.instance).Info("shutting down")
386                                 wkr.shutdown()
387                                 return true
388                         }
389                 }
390         }
391         return false
392 }
393
394 // CountWorkers returns the current number of workers in each state.
395 func (wp *Pool) CountWorkers() map[State]int {
396         wp.setupOnce.Do(wp.setup)
397         wp.mtx.Lock()
398         defer wp.mtx.Unlock()
399         r := map[State]int{}
400         for _, w := range wp.workers {
401                 r[w.state]++
402         }
403         return r
404 }
405
406 // Running returns the container UUIDs being prepared/run on workers.
407 func (wp *Pool) Running() map[string]time.Time {
408         wp.setupOnce.Do(wp.setup)
409         wp.mtx.Lock()
410         defer wp.mtx.Unlock()
411         r := map[string]time.Time{}
412         for _, wkr := range wp.workers {
413                 for uuid := range wkr.running {
414                         r[uuid] = time.Time{}
415                 }
416                 for uuid := range wkr.starting {
417                         r[uuid] = time.Time{}
418                 }
419         }
420         for uuid, exited := range wp.exited {
421                 r[uuid] = exited
422         }
423         return r
424 }
425
426 // StartContainer starts a container on an idle worker immediately if
427 // possible, otherwise returns false.
428 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
429         wp.setupOnce.Do(wp.setup)
430         wp.mtx.Lock()
431         defer wp.mtx.Unlock()
432         var wkr *worker
433         for _, w := range wp.workers {
434                 if w.instType == it && w.state == StateIdle {
435                         if wkr == nil || w.busy.After(wkr.busy) {
436                                 wkr = w
437                         }
438                 }
439         }
440         if wkr == nil {
441                 return false
442         }
443         wkr.startContainer(ctr)
444         return true
445 }
446
447 // KillContainer kills the crunch-run process for the given container
448 // UUID, if it's running on any worker.
449 //
450 // KillContainer returns immediately; the act of killing the container
451 // takes some time, and runs in the background.
452 func (wp *Pool) KillContainer(uuid string) {
453         wp.mtx.Lock()
454         defer wp.mtx.Unlock()
455         if _, ok := wp.exited[uuid]; ok {
456                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
457                 delete(wp.exited, uuid)
458                 return
459         }
460         for _, wkr := range wp.workers {
461                 if _, ok := wkr.running[uuid]; ok {
462                         go wp.kill(wkr, uuid)
463                         return
464                 }
465         }
466         wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
467 }
468
469 func (wp *Pool) kill(wkr *worker, uuid string) {
470         logger := wp.logger.WithFields(logrus.Fields{
471                 "ContainerUUID": uuid,
472                 "Instance":      wkr.instance,
473         })
474         logger.Debug("killing process")
475         stdout, stderr, err := wkr.executor.Execute(nil, "crunch-run --kill 15 "+uuid, nil)
476         if err != nil {
477                 logger.WithFields(logrus.Fields{
478                         "stderr": string(stderr),
479                         "stdout": string(stdout),
480                         "error":  err,
481                 }).Warn("kill failed")
482                 return
483         }
484         logger.Debug("killing process succeeded")
485         wp.mtx.Lock()
486         defer wp.mtx.Unlock()
487         if _, ok := wkr.running[uuid]; ok {
488                 delete(wkr.running, uuid)
489                 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
490                         wkr.state = StateIdle
491                 }
492                 wkr.updated = time.Now()
493                 go wp.notify()
494         }
495 }
496
497 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
498         if reg == nil {
499                 reg = prometheus.NewRegistry()
500         }
501         wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
502                 Namespace: "arvados",
503                 Subsystem: "dispatchcloud",
504                 Name:      "instances_total",
505                 Help:      "Number of cloud VMs including pending, booting, running, held, and shutting down.",
506         })
507         reg.MustRegister(wp.mInstances)
508         wp.mInstancesPrice = prometheus.NewGauge(prometheus.GaugeOpts{
509                 Namespace: "arvados",
510                 Subsystem: "dispatchcloud",
511                 Name:      "instances_price_total",
512                 Help:      "Sum of prices of all cloud VMs including pending, booting, running, held, and shutting down.",
513         })
514         reg.MustRegister(wp.mInstancesPrice)
515         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
516                 Namespace: "arvados",
517                 Subsystem: "dispatchcloud",
518                 Name:      "containers_running",
519                 Help:      "Number of containers reported running by cloud VMs.",
520         })
521         reg.MustRegister(wp.mContainersRunning)
522
523         wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
524                 Namespace: "arvados",
525                 Subsystem: "dispatchcloud",
526                 Name:      "vcpus_total",
527                 Help:      "Total VCPUs on all cloud VMs.",
528         })
529         reg.MustRegister(wp.mVCPUs)
530         wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
531                 Namespace: "arvados",
532                 Subsystem: "dispatchcloud",
533                 Name:      "vcpus_inuse",
534                 Help:      "VCPUs on cloud VMs that are running containers.",
535         })
536         reg.MustRegister(wp.mVCPUsInuse)
537         wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
538                 Namespace: "arvados",
539                 Subsystem: "dispatchcloud",
540                 Name:      "memory_bytes_total",
541                 Help:      "Total memory on all cloud VMs.",
542         })
543         reg.MustRegister(wp.mMemory)
544         wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
545                 Namespace: "arvados",
546                 Subsystem: "dispatchcloud",
547                 Name:      "memory_bytes_inuse",
548                 Help:      "Memory on cloud VMs that are running containers.",
549         })
550         reg.MustRegister(wp.mMemoryInuse)
551 }
552
553 func (wp *Pool) runMetrics() {
554         ch := wp.Subscribe()
555         defer wp.Unsubscribe(ch)
556         for range ch {
557                 wp.updateMetrics()
558         }
559 }
560
561 func (wp *Pool) updateMetrics() {
562         wp.mtx.RLock()
563         defer wp.mtx.RUnlock()
564
565         var price float64
566         var alloc, cpu, cpuInuse, mem, memInuse int64
567         for _, wkr := range wp.workers {
568                 price += wkr.instType.Price
569                 cpu += int64(wkr.instType.VCPUs)
570                 mem += int64(wkr.instType.RAM)
571                 if len(wkr.running)+len(wkr.starting) == 0 {
572                         continue
573                 }
574                 alloc += int64(len(wkr.running) + len(wkr.starting))
575                 cpuInuse += int64(wkr.instType.VCPUs)
576                 memInuse += int64(wkr.instType.RAM)
577         }
578         wp.mInstances.Set(float64(len(wp.workers)))
579         wp.mInstancesPrice.Set(price)
580         wp.mContainersRunning.Set(float64(alloc))
581         wp.mVCPUs.Set(float64(cpu))
582         wp.mMemory.Set(float64(mem))
583         wp.mVCPUsInuse.Set(float64(cpuInuse))
584         wp.mMemoryInuse.Set(float64(memInuse))
585 }
586
587 func (wp *Pool) runProbes() {
588         maxPPS := wp.maxProbesPerSecond
589         if maxPPS < 1 {
590                 maxPPS = defaultMaxProbesPerSecond
591         }
592         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
593         defer limitticker.Stop()
594
595         probeticker := time.NewTicker(wp.probeInterval)
596         defer probeticker.Stop()
597
598         workers := []cloud.InstanceID{}
599         for range probeticker.C {
600                 workers = workers[:0]
601                 wp.mtx.Lock()
602                 for id, wkr := range wp.workers {
603                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
604                                 continue
605                         }
606                         workers = append(workers, id)
607                 }
608                 wp.mtx.Unlock()
609
610                 for _, id := range workers {
611                         wp.mtx.Lock()
612                         wkr, ok := wp.workers[id]
613                         wp.mtx.Unlock()
614                         if !ok {
615                                 // Deleted while we were probing
616                                 // others
617                                 continue
618                         }
619                         go wkr.ProbeAndUpdate()
620                         select {
621                         case <-wp.stop:
622                                 return
623                         case <-limitticker.C:
624                         }
625                 }
626         }
627 }
628
629 func (wp *Pool) runSync() {
630         // sync once immediately, then wait syncInterval, sync again,
631         // etc.
632         timer := time.NewTimer(1)
633         for {
634                 select {
635                 case <-timer.C:
636                         err := wp.getInstancesAndSync()
637                         if err != nil {
638                                 wp.logger.WithError(err).Warn("sync failed")
639                         }
640                         timer.Reset(wp.syncInterval)
641                 case <-wp.stop:
642                         wp.logger.Debug("worker.Pool stopped")
643                         return
644                 }
645         }
646 }
647
648 // Stop synchronizing with the InstanceSet.
649 func (wp *Pool) Stop() {
650         wp.setupOnce.Do(wp.setup)
651         close(wp.stop)
652 }
653
654 // Instances returns an InstanceView for each worker in the pool,
655 // summarizing its current state and recent activity.
656 func (wp *Pool) Instances() []InstanceView {
657         var r []InstanceView
658         wp.setupOnce.Do(wp.setup)
659         wp.mtx.Lock()
660         for _, w := range wp.workers {
661                 r = append(r, InstanceView{
662                         Instance:             w.instance.ID(),
663                         Price:                w.instType.Price,
664                         ArvadosInstanceType:  w.instType.Name,
665                         ProviderInstanceType: w.instType.ProviderType,
666                         LastContainerUUID:    w.lastUUID,
667                         LastBusy:             w.busy,
668                         WorkerState:          w.state.String(),
669                         IdleBehavior:         w.idleBehavior,
670                 })
671         }
672         wp.mtx.Unlock()
673         sort.Slice(r, func(i, j int) bool {
674                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
675         })
676         return r
677 }
678
679 func (wp *Pool) setup() {
680         wp.creating = map[arvados.InstanceType][]time.Time{}
681         wp.exited = map[string]time.Time{}
682         wp.workers = map[cloud.InstanceID]*worker{}
683         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
684 }
685
686 func (wp *Pool) notify() {
687         wp.mtx.RLock()
688         defer wp.mtx.RUnlock()
689         for _, send := range wp.subscribers {
690                 select {
691                 case send <- struct{}{}:
692                 default:
693                 }
694         }
695 }
696
697 func (wp *Pool) getInstancesAndSync() error {
698         wp.setupOnce.Do(wp.setup)
699         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
700                 return err
701         }
702         wp.logger.Debug("getting instance list")
703         threshold := time.Now()
704         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
705         if err != nil {
706                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
707                 return err
708         }
709         wp.sync(threshold, instances)
710         wp.logger.Debug("sync done")
711         return nil
712 }
713
714 // Add/remove/update workers based on instances, which was obtained
715 // from the instanceSet. However, don't clobber any other updates that
716 // already happened after threshold.
717 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
718         wp.mtx.Lock()
719         defer wp.mtx.Unlock()
720         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
721         notify := false
722
723         for _, inst := range instances {
724                 itTag := inst.Tags()[tagKeyInstanceType]
725                 it, ok := wp.instanceTypes[itTag]
726                 if !ok {
727                         wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
728                         continue
729                 }
730                 if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew {
731                         notify = true
732                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
733                         wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
734                         wkr.shutdown()
735                 }
736         }
737
738         for id, wkr := range wp.workers {
739                 if wkr.updated.After(threshold) {
740                         continue
741                 }
742                 logger := wp.logger.WithFields(logrus.Fields{
743                         "Instance":    wkr.instance,
744                         "WorkerState": wkr.state,
745                 })
746                 logger.Info("instance disappeared in cloud")
747                 delete(wp.workers, id)
748                 go wkr.executor.Close()
749                 notify = true
750         }
751
752         if !wp.loaded {
753                 wp.loaded = true
754                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
755         }
756
757         if notify {
758                 go wp.notify()
759         }
760 }