e6b50629892e62c250ebcf05ba5bb226daec1ce1
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "errors"
9         "io"
10         "sort"
11         "strings"
12         "sync"
13         "time"
14
15         "git.curoverse.com/arvados.git/lib/cloud"
16         "git.curoverse.com/arvados.git/sdk/go/arvados"
17         "github.com/prometheus/client_golang/prometheus"
18         "github.com/sirupsen/logrus"
19 )
20
21 const (
22         tagKeyInstanceType = "InstanceType"
23         tagKeyIdleBehavior = "IdleBehavior"
24 )
25
26 // An InstanceView shows a worker's current state and recent activity.
27 type InstanceView struct {
28         Instance             cloud.InstanceID `json:"instance"`
29         Price                float64          `json:"price"`
30         ArvadosInstanceType  string           `json:"arvados_instance_type"`
31         ProviderInstanceType string           `json:"provider_instance_type"`
32         LastContainerUUID    string           `json:"last_container_uuid"`
33         LastBusy             time.Time        `json:"last_busy"`
34         WorkerState          string           `json:"worker_state"`
35         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
36 }
37
38 // An Executor executes shell commands on a remote host.
39 type Executor interface {
40         // Run cmd on the current target.
41         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
42
43         // Use the given target for subsequent operations. The new
44         // target is the same host as the previous target, but it
45         // might return a different address and verify a different
46         // host key.
47         //
48         // SetTarget is called frequently, and in most cases the new
49         // target will behave exactly the same as the old one. An
50         // implementation should optimize accordingly.
51         //
52         // SetTarget must not block on concurrent Execute calls.
53         SetTarget(cloud.ExecutorTarget)
54
55         Close()
56 }
57
58 const (
59         defaultSyncInterval       = time.Minute
60         defaultProbeInterval      = time.Second * 10
61         defaultMaxProbesPerSecond = 10
62         defaultTimeoutIdle        = time.Minute
63         defaultTimeoutBooting     = time.Minute * 10
64         defaultTimeoutProbe       = time.Minute * 10
65         defaultTimeoutShutdown    = time.Second * 10
66
67         // Time after a quota error to try again anyway, even if no
68         // instances have been shutdown.
69         quotaErrorTTL = time.Minute
70
71         // Time between "X failed because rate limiting" messages
72         logRateLimitErrorInterval = time.Second * 10
73 )
74
75 func duration(conf arvados.Duration, def time.Duration) time.Duration {
76         if conf > 0 {
77                 return time.Duration(conf)
78         } else {
79                 return def
80         }
81 }
82
83 // NewPool creates a Pool of workers backed by instanceSet.
84 //
85 // New instances are configured and set up according to the given
86 // cluster configuration.
87 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
88         wp := &Pool{
89                 logger:             logger,
90                 arvClient:          arvClient,
91                 instanceSet:        &throttledInstanceSet{InstanceSet: instanceSet},
92                 newExecutor:        newExecutor,
93                 bootProbeCommand:   cluster.CloudVMs.BootProbeCommand,
94                 imageID:            cloud.ImageID(cluster.CloudVMs.ImageID),
95                 instanceTypes:      cluster.InstanceTypes,
96                 maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
97                 probeInterval:      duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
98                 syncInterval:       duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
99                 timeoutIdle:        duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
100                 timeoutBooting:     duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
101                 timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
102                 timeoutShutdown:    duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
103                 stop:               make(chan bool),
104         }
105         wp.registerMetrics(reg)
106         go func() {
107                 wp.setupOnce.Do(wp.setup)
108                 go wp.runMetrics()
109                 go wp.runProbes()
110                 go wp.runSync()
111         }()
112         return wp
113 }
114
115 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
116 // zero Pool should not be used. Call NewPool to create a new Pool.
117 type Pool struct {
118         // configuration
119         logger             logrus.FieldLogger
120         arvClient          *arvados.Client
121         instanceSet        *throttledInstanceSet
122         newExecutor        func(cloud.Instance) Executor
123         bootProbeCommand   string
124         imageID            cloud.ImageID
125         instanceTypes      map[string]arvados.InstanceType
126         syncInterval       time.Duration
127         probeInterval      time.Duration
128         maxProbesPerSecond int
129         timeoutIdle        time.Duration
130         timeoutBooting     time.Duration
131         timeoutProbe       time.Duration
132         timeoutShutdown    time.Duration
133
134         // private state
135         subscribers  map[<-chan struct{}]chan<- struct{}
136         creating     map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls
137         workers      map[cloud.InstanceID]*worker
138         loaded       bool                 // loaded list of instances from InstanceSet at least once
139         exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
140         atQuotaUntil time.Time
141         atQuotaErr   cloud.QuotaError
142         stop         chan bool
143         mtx          sync.RWMutex
144         setupOnce    sync.Once
145
146         throttleCreate    throttle
147         throttleInstances throttle
148
149         mInstances         prometheus.Gauge
150         mInstancesPrice    prometheus.Gauge
151         mContainersRunning prometheus.Gauge
152         mVCPUs             prometheus.Gauge
153         mVCPUsInuse        prometheus.Gauge
154         mMemory            prometheus.Gauge
155         mMemoryInuse       prometheus.Gauge
156 }
157
158 // Subscribe returns a buffered channel that becomes ready after any
159 // change to the pool's state that could have scheduling implications:
160 // a worker's state changes, a new worker appears, the cloud
161 // provider's API rate limiting period ends, etc.
162 //
163 // Additional events that occur while the channel is already ready
164 // will be dropped, so it is OK if the caller services the channel
165 // slowly.
166 //
167 // Example:
168 //
169 //      ch := wp.Subscribe()
170 //      defer wp.Unsubscribe(ch)
171 //      for range ch {
172 //              tryScheduling(wp)
173 //              if done {
174 //                      break
175 //              }
176 //      }
177 func (wp *Pool) Subscribe() <-chan struct{} {
178         wp.setupOnce.Do(wp.setup)
179         wp.mtx.Lock()
180         defer wp.mtx.Unlock()
181         ch := make(chan struct{}, 1)
182         wp.subscribers[ch] = ch
183         return ch
184 }
185
186 // Unsubscribe stops sending updates to the given channel.
187 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
188         wp.setupOnce.Do(wp.setup)
189         wp.mtx.Lock()
190         defer wp.mtx.Unlock()
191         delete(wp.subscribers, ch)
192 }
193
194 // Unallocated returns the number of unallocated (creating + booting +
195 // idle + unknown) workers for each instance type.  Workers in
196 // hold/drain mode are not included.
197 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
198         wp.setupOnce.Do(wp.setup)
199         wp.mtx.RLock()
200         defer wp.mtx.RUnlock()
201         unalloc := map[arvados.InstanceType]int{}
202         creating := map[arvados.InstanceType]int{}
203         for it, times := range wp.creating {
204                 creating[it] = len(times)
205         }
206         for _, wkr := range wp.workers {
207                 // Skip workers that are not expected to become
208                 // available soon. Note len(wkr.running)>0 is not
209                 // redundant here: it can be true even in
210                 // StateUnknown.
211                 if wkr.state == StateShutdown ||
212                         wkr.state == StateRunning ||
213                         wkr.idleBehavior != IdleBehaviorRun ||
214                         len(wkr.running) > 0 {
215                         continue
216                 }
217                 it := wkr.instType
218                 unalloc[it]++
219                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) {
220                         // If up to N new workers appear in
221                         // Instances() while we are waiting for N
222                         // Create() calls to complete, we assume we're
223                         // just seeing a race between Instances() and
224                         // Create() responses.
225                         //
226                         // The other common reason why nodes have
227                         // state==Unknown is that they appeared at
228                         // startup, before any Create calls. They
229                         // don't match the above timing condition, so
230                         // we never mistakenly attribute them to
231                         // pending Create calls.
232                         creating[it]--
233                 }
234         }
235         for it, c := range creating {
236                 unalloc[it] += c
237         }
238         return unalloc
239 }
240
241 // Create a new instance with the given type, and add it to the worker
242 // pool. The worker is added immediately; instance creation runs in
243 // the background.
244 //
245 // Create returns false if a pre-existing error state prevents it from
246 // even attempting to create a new instance. Those errors are logged
247 // by the Pool, so the caller does not need to log anything in such
248 // cases.
249 func (wp *Pool) Create(it arvados.InstanceType) bool {
250         logger := wp.logger.WithField("InstanceType", it.Name)
251         wp.setupOnce.Do(wp.setup)
252         wp.mtx.Lock()
253         defer wp.mtx.Unlock()
254         if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
255                 return false
256         }
257         tags := cloud.InstanceTags{
258                 tagKeyInstanceType: it.Name,
259                 tagKeyIdleBehavior: string(IdleBehaviorRun),
260         }
261         now := time.Now()
262         wp.creating[it] = append(wp.creating[it], now)
263         go func() {
264                 defer wp.notify()
265                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
266                 wp.mtx.Lock()
267                 defer wp.mtx.Unlock()
268                 // Remove our timestamp marker from wp.creating
269                 for i, t := range wp.creating[it] {
270                         if t == now {
271                                 copy(wp.creating[it][i:], wp.creating[it][i+1:])
272                                 wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1]
273                                 break
274                         }
275                 }
276                 if err != nil {
277                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
278                                 wp.atQuotaErr = err
279                                 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
280                                 time.AfterFunc(quotaErrorTTL, wp.notify)
281                         }
282                         logger.WithError(err).Error("create failed")
283                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
284                         return
285                 }
286                 wp.updateWorker(inst, it, StateBooting)
287         }()
288         return true
289 }
290
291 // AtQuota returns true if Create is not expected to work at the
292 // moment.
293 func (wp *Pool) AtQuota() bool {
294         wp.mtx.Lock()
295         defer wp.mtx.Unlock()
296         return time.Now().Before(wp.atQuotaUntil)
297 }
298
299 // SetIdleBehavior determines how the indicated instance will behave
300 // when it has no containers running.
301 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
302         wp.mtx.Lock()
303         defer wp.mtx.Unlock()
304         wkr, ok := wp.workers[id]
305         if !ok {
306                 return errors.New("requested instance does not exist")
307         }
308         wkr.idleBehavior = idleBehavior
309         wkr.saveTags()
310         wkr.shutdownIfIdle()
311         return nil
312 }
313
314 // Add or update worker attached to the given instance. Use
315 // initialState if a new worker is created.
316 //
317 // The second return value is true if a new worker is created.
318 //
319 // Caller must have lock.
320 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
321         id := inst.ID()
322         if wkr := wp.workers[id]; wkr != nil {
323                 wkr.executor.SetTarget(inst)
324                 wkr.instance = inst
325                 wkr.updated = time.Now()
326                 if initialState == StateBooting && wkr.state == StateUnknown {
327                         wkr.state = StateBooting
328                 }
329                 wkr.saveTags()
330                 return wkr, false
331         }
332
333         // If an instance has a valid IdleBehavior tag when it first
334         // appears, initialize the new worker accordingly (this is how
335         // we restore IdleBehavior that was set by a prior dispatch
336         // process); otherwise, default to "run". After this,
337         // wkr.idleBehavior is the source of truth, and will only be
338         // changed via SetIdleBehavior().
339         idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior])
340         if !validIdleBehavior[idleBehavior] {
341                 idleBehavior = IdleBehaviorRun
342         }
343
344         logger := wp.logger.WithFields(logrus.Fields{
345                 "InstanceType": it.Name,
346                 "Instance":     inst,
347         })
348         logger.WithFields(logrus.Fields{
349                 "State":        initialState,
350                 "IdleBehavior": idleBehavior,
351         }).Infof("instance appeared in cloud")
352         now := time.Now()
353         wkr := &worker{
354                 mtx:          &wp.mtx,
355                 wp:           wp,
356                 logger:       logger,
357                 executor:     wp.newExecutor(inst),
358                 state:        initialState,
359                 idleBehavior: idleBehavior,
360                 instance:     inst,
361                 instType:     it,
362                 appeared:     now,
363                 probed:       now,
364                 busy:         now,
365                 updated:      now,
366                 running:      make(map[string]struct{}),
367                 starting:     make(map[string]struct{}),
368                 probing:      make(chan struct{}, 1),
369         }
370         wp.workers[id] = wkr
371         return wkr, true
372 }
373
374 // caller must have lock.
375 func (wp *Pool) notifyExited(uuid string, t time.Time) {
376         wp.exited[uuid] = t
377 }
378
379 // Shutdown shuts down a worker with the given type, or returns false
380 // if all workers with the given type are busy.
381 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
382         wp.setupOnce.Do(wp.setup)
383         wp.mtx.Lock()
384         defer wp.mtx.Unlock()
385         logger := wp.logger.WithField("InstanceType", it.Name)
386         logger.Info("shutdown requested")
387         for _, tryState := range []State{StateBooting, StateIdle} {
388                 // TODO: shutdown the worker with the longest idle
389                 // time (Idle) or the earliest create time (Booting)
390                 for _, wkr := range wp.workers {
391                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
392                                 logger.WithField("Instance", wkr.instance).Info("shutting down")
393                                 wkr.shutdown()
394                                 return true
395                         }
396                 }
397         }
398         return false
399 }
400
401 // CountWorkers returns the current number of workers in each state.
402 func (wp *Pool) CountWorkers() map[State]int {
403         wp.setupOnce.Do(wp.setup)
404         wp.mtx.Lock()
405         defer wp.mtx.Unlock()
406         r := map[State]int{}
407         for _, w := range wp.workers {
408                 r[w.state]++
409         }
410         return r
411 }
412
413 // Running returns the container UUIDs being prepared/run on workers.
414 //
415 // In the returned map, the time value indicates when the Pool
416 // observed that the container process had exited. A container that
417 // has not yet exited has a zero time value. The caller should use
418 // KillContainer() to garbage-collect the entries for exited
419 // containers.
420 func (wp *Pool) Running() map[string]time.Time {
421         wp.setupOnce.Do(wp.setup)
422         wp.mtx.Lock()
423         defer wp.mtx.Unlock()
424         r := map[string]time.Time{}
425         for _, wkr := range wp.workers {
426                 for uuid := range wkr.running {
427                         r[uuid] = time.Time{}
428                 }
429                 for uuid := range wkr.starting {
430                         r[uuid] = time.Time{}
431                 }
432         }
433         for uuid, exited := range wp.exited {
434                 r[uuid] = exited
435         }
436         return r
437 }
438
439 // StartContainer starts a container on an idle worker immediately if
440 // possible, otherwise returns false.
441 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
442         wp.setupOnce.Do(wp.setup)
443         wp.mtx.Lock()
444         defer wp.mtx.Unlock()
445         var wkr *worker
446         for _, w := range wp.workers {
447                 if w.instType == it && w.state == StateIdle {
448                         if wkr == nil || w.busy.After(wkr.busy) {
449                                 wkr = w
450                         }
451                 }
452         }
453         if wkr == nil {
454                 return false
455         }
456         wkr.startContainer(ctr)
457         return true
458 }
459
460 // KillContainer kills the crunch-run process for the given container
461 // UUID, if it's running on any worker.
462 //
463 // KillContainer returns immediately; the act of killing the container
464 // takes some time, and runs in the background.
465 func (wp *Pool) KillContainer(uuid string) {
466         wp.mtx.Lock()
467         defer wp.mtx.Unlock()
468         if _, ok := wp.exited[uuid]; ok {
469                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
470                 delete(wp.exited, uuid)
471                 return
472         }
473         for _, wkr := range wp.workers {
474                 if _, ok := wkr.running[uuid]; ok {
475                         go wp.kill(wkr, uuid)
476                         return
477                 }
478         }
479         wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
480 }
481
482 func (wp *Pool) kill(wkr *worker, uuid string) {
483         logger := wp.logger.WithFields(logrus.Fields{
484                 "ContainerUUID": uuid,
485                 "Instance":      wkr.instance,
486         })
487         logger.Debug("killing process")
488         stdout, stderr, err := wkr.executor.Execute(nil, "crunch-run --kill 15 "+uuid, nil)
489         if err != nil {
490                 logger.WithFields(logrus.Fields{
491                         "stderr": string(stderr),
492                         "stdout": string(stdout),
493                         "error":  err,
494                 }).Warn("kill failed")
495                 return
496         }
497         logger.Debug("killing process succeeded")
498         wp.mtx.Lock()
499         defer wp.mtx.Unlock()
500         if _, ok := wkr.running[uuid]; ok {
501                 delete(wkr.running, uuid)
502                 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
503                         wkr.state = StateIdle
504                 }
505                 wkr.updated = time.Now()
506                 go wp.notify()
507         }
508 }
509
510 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
511         if reg == nil {
512                 reg = prometheus.NewRegistry()
513         }
514         wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
515                 Namespace: "arvados",
516                 Subsystem: "dispatchcloud",
517                 Name:      "instances_total",
518                 Help:      "Number of cloud VMs including pending, booting, running, held, and shutting down.",
519         })
520         reg.MustRegister(wp.mInstances)
521         wp.mInstancesPrice = prometheus.NewGauge(prometheus.GaugeOpts{
522                 Namespace: "arvados",
523                 Subsystem: "dispatchcloud",
524                 Name:      "instances_price_total",
525                 Help:      "Sum of prices of all cloud VMs including pending, booting, running, held, and shutting down.",
526         })
527         reg.MustRegister(wp.mInstancesPrice)
528         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
529                 Namespace: "arvados",
530                 Subsystem: "dispatchcloud",
531                 Name:      "containers_running",
532                 Help:      "Number of containers reported running by cloud VMs.",
533         })
534         reg.MustRegister(wp.mContainersRunning)
535
536         wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
537                 Namespace: "arvados",
538                 Subsystem: "dispatchcloud",
539                 Name:      "vcpus_total",
540                 Help:      "Total VCPUs on all cloud VMs.",
541         })
542         reg.MustRegister(wp.mVCPUs)
543         wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
544                 Namespace: "arvados",
545                 Subsystem: "dispatchcloud",
546                 Name:      "vcpus_inuse",
547                 Help:      "VCPUs on cloud VMs that are running containers.",
548         })
549         reg.MustRegister(wp.mVCPUsInuse)
550         wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
551                 Namespace: "arvados",
552                 Subsystem: "dispatchcloud",
553                 Name:      "memory_bytes_total",
554                 Help:      "Total memory on all cloud VMs.",
555         })
556         reg.MustRegister(wp.mMemory)
557         wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
558                 Namespace: "arvados",
559                 Subsystem: "dispatchcloud",
560                 Name:      "memory_bytes_inuse",
561                 Help:      "Memory on cloud VMs that are running containers.",
562         })
563         reg.MustRegister(wp.mMemoryInuse)
564 }
565
566 func (wp *Pool) runMetrics() {
567         ch := wp.Subscribe()
568         defer wp.Unsubscribe(ch)
569         for range ch {
570                 wp.updateMetrics()
571         }
572 }
573
574 func (wp *Pool) updateMetrics() {
575         wp.mtx.RLock()
576         defer wp.mtx.RUnlock()
577
578         var price float64
579         var alloc, cpu, cpuInuse, mem, memInuse int64
580         for _, wkr := range wp.workers {
581                 price += wkr.instType.Price
582                 cpu += int64(wkr.instType.VCPUs)
583                 mem += int64(wkr.instType.RAM)
584                 if len(wkr.running)+len(wkr.starting) == 0 {
585                         continue
586                 }
587                 alloc += int64(len(wkr.running) + len(wkr.starting))
588                 cpuInuse += int64(wkr.instType.VCPUs)
589                 memInuse += int64(wkr.instType.RAM)
590         }
591         wp.mInstances.Set(float64(len(wp.workers)))
592         wp.mInstancesPrice.Set(price)
593         wp.mContainersRunning.Set(float64(alloc))
594         wp.mVCPUs.Set(float64(cpu))
595         wp.mMemory.Set(float64(mem))
596         wp.mVCPUsInuse.Set(float64(cpuInuse))
597         wp.mMemoryInuse.Set(float64(memInuse))
598 }
599
600 func (wp *Pool) runProbes() {
601         maxPPS := wp.maxProbesPerSecond
602         if maxPPS < 1 {
603                 maxPPS = defaultMaxProbesPerSecond
604         }
605         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
606         defer limitticker.Stop()
607
608         probeticker := time.NewTicker(wp.probeInterval)
609         defer probeticker.Stop()
610
611         workers := []cloud.InstanceID{}
612         for range probeticker.C {
613                 workers = workers[:0]
614                 wp.mtx.Lock()
615                 for id, wkr := range wp.workers {
616                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
617                                 continue
618                         }
619                         workers = append(workers, id)
620                 }
621                 wp.mtx.Unlock()
622
623                 for _, id := range workers {
624                         wp.mtx.Lock()
625                         wkr, ok := wp.workers[id]
626                         wp.mtx.Unlock()
627                         if !ok {
628                                 // Deleted while we were probing
629                                 // others
630                                 continue
631                         }
632                         go wkr.ProbeAndUpdate()
633                         select {
634                         case <-wp.stop:
635                                 return
636                         case <-limitticker.C:
637                         }
638                 }
639         }
640 }
641
642 func (wp *Pool) runSync() {
643         // sync once immediately, then wait syncInterval, sync again,
644         // etc.
645         timer := time.NewTimer(1)
646         for {
647                 select {
648                 case <-timer.C:
649                         err := wp.getInstancesAndSync()
650                         if err != nil {
651                                 wp.logger.WithError(err).Warn("sync failed")
652                         }
653                         timer.Reset(wp.syncInterval)
654                 case <-wp.stop:
655                         wp.logger.Debug("worker.Pool stopped")
656                         return
657                 }
658         }
659 }
660
661 // Stop synchronizing with the InstanceSet.
662 func (wp *Pool) Stop() {
663         wp.setupOnce.Do(wp.setup)
664         close(wp.stop)
665 }
666
667 // Instances returns an InstanceView for each worker in the pool,
668 // summarizing its current state and recent activity.
669 func (wp *Pool) Instances() []InstanceView {
670         var r []InstanceView
671         wp.setupOnce.Do(wp.setup)
672         wp.mtx.Lock()
673         for _, w := range wp.workers {
674                 r = append(r, InstanceView{
675                         Instance:             w.instance.ID(),
676                         Price:                w.instType.Price,
677                         ArvadosInstanceType:  w.instType.Name,
678                         ProviderInstanceType: w.instType.ProviderType,
679                         LastContainerUUID:    w.lastUUID,
680                         LastBusy:             w.busy,
681                         WorkerState:          w.state.String(),
682                         IdleBehavior:         w.idleBehavior,
683                 })
684         }
685         wp.mtx.Unlock()
686         sort.Slice(r, func(i, j int) bool {
687                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
688         })
689         return r
690 }
691
692 func (wp *Pool) setup() {
693         wp.creating = map[arvados.InstanceType][]time.Time{}
694         wp.exited = map[string]time.Time{}
695         wp.workers = map[cloud.InstanceID]*worker{}
696         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
697 }
698
699 func (wp *Pool) notify() {
700         wp.mtx.RLock()
701         defer wp.mtx.RUnlock()
702         for _, send := range wp.subscribers {
703                 select {
704                 case send <- struct{}{}:
705                 default:
706                 }
707         }
708 }
709
710 func (wp *Pool) getInstancesAndSync() error {
711         wp.setupOnce.Do(wp.setup)
712         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
713                 return err
714         }
715         wp.logger.Debug("getting instance list")
716         threshold := time.Now()
717         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
718         if err != nil {
719                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
720                 return err
721         }
722         wp.sync(threshold, instances)
723         wp.logger.Debug("sync done")
724         return nil
725 }
726
727 // Add/remove/update workers based on instances, which was obtained
728 // from the instanceSet. However, don't clobber any other updates that
729 // already happened after threshold.
730 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
731         wp.mtx.Lock()
732         defer wp.mtx.Unlock()
733         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
734         notify := false
735
736         for _, inst := range instances {
737                 itTag := inst.Tags()[tagKeyInstanceType]
738                 it, ok := wp.instanceTypes[itTag]
739                 if !ok {
740                         wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
741                         continue
742                 }
743                 if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew {
744                         notify = true
745                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
746                         wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
747                         wkr.shutdown()
748                 }
749         }
750
751         for id, wkr := range wp.workers {
752                 if wkr.updated.After(threshold) {
753                         continue
754                 }
755                 logger := wp.logger.WithFields(logrus.Fields{
756                         "Instance":    wkr.instance,
757                         "WorkerState": wkr.state,
758                 })
759                 logger.Info("instance disappeared in cloud")
760                 delete(wp.workers, id)
761                 go wkr.executor.Close()
762                 notify = true
763         }
764
765         if !wp.loaded {
766                 wp.loaded = true
767                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
768         }
769
770         if notify {
771                 go wp.notify()
772         }
773 }