14325: Document Running() return value.
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "errors"
9         "io"
10         "sort"
11         "strings"
12         "sync"
13         "time"
14
15         "git.curoverse.com/arvados.git/lib/cloud"
16         "git.curoverse.com/arvados.git/sdk/go/arvados"
17         "github.com/prometheus/client_golang/prometheus"
18         "github.com/sirupsen/logrus"
19 )
20
21 const (
22         tagKeyInstanceType = "InstanceType"
23         tagKeyIdleBehavior = "IdleBehavior"
24 )
25
26 // An InstanceView shows a worker's current state and recent activity.
27 type InstanceView struct {
28         Instance             cloud.InstanceID `json:"instance"`
29         Price                float64          `json:"price"`
30         ArvadosInstanceType  string           `json:"arvados_instance_type"`
31         ProviderInstanceType string           `json:"provider_instance_type"`
32         LastContainerUUID    string           `json:"last_container_uuid"`
33         LastBusy             time.Time        `json:"last_busy"`
34         WorkerState          string           `json:"worker_state"`
35         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
36 }
37
38 // An Executor executes shell commands on a remote host.
39 type Executor interface {
40         // Run cmd on the current target.
41         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
42
43         // Use the given target for subsequent operations. The new
44         // target is the same host as the previous target, but it
45         // might return a different address and verify a different
46         // host key.
47         //
48         // SetTarget is called frequently, and in most cases the new
49         // target will behave exactly the same as the old one. An
50         // implementation should optimize accordingly.
51         //
52         // SetTarget must not block on concurrent Execute calls.
53         SetTarget(cloud.ExecutorTarget)
54
55         Close()
56 }
57
58 const (
59         defaultSyncInterval       = time.Minute
60         defaultProbeInterval      = time.Second * 10
61         defaultMaxProbesPerSecond = 10
62         defaultTimeoutIdle        = time.Minute
63         defaultTimeoutBooting     = time.Minute * 10
64         defaultTimeoutProbe       = time.Minute * 10
65         defaultTimeoutShutdown    = time.Second * 10
66
67         // Time after a quota error to try again anyway, even if no
68         // instances have been shutdown.
69         quotaErrorTTL = time.Minute
70
71         // Time between "X failed because rate limiting" messages
72         logRateLimitErrorInterval = time.Second * 10
73 )
74
75 func duration(conf arvados.Duration, def time.Duration) time.Duration {
76         if conf > 0 {
77                 return time.Duration(conf)
78         } else {
79                 return def
80         }
81 }
82
83 // NewPool creates a Pool of workers backed by instanceSet.
84 //
85 // New instances are configured and set up according to the given
86 // cluster configuration.
87 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
88         wp := &Pool{
89                 logger:             logger,
90                 arvClient:          arvClient,
91                 instanceSet:        &throttledInstanceSet{InstanceSet: instanceSet},
92                 newExecutor:        newExecutor,
93                 bootProbeCommand:   cluster.CloudVMs.BootProbeCommand,
94                 imageID:            cloud.ImageID(cluster.CloudVMs.ImageID),
95                 instanceTypes:      cluster.InstanceTypes,
96                 maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
97                 probeInterval:      duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
98                 syncInterval:       duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
99                 timeoutIdle:        duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
100                 timeoutBooting:     duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
101                 timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
102                 timeoutShutdown:    duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
103                 stop:               make(chan bool),
104         }
105         wp.registerMetrics(reg)
106         go func() {
107                 wp.setupOnce.Do(wp.setup)
108                 go wp.runMetrics()
109                 go wp.runProbes()
110                 go wp.runSync()
111         }()
112         return wp
113 }
114
115 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
116 // zero Pool should not be used. Call NewPool to create a new Pool.
117 type Pool struct {
118         // configuration
119         logger             logrus.FieldLogger
120         arvClient          *arvados.Client
121         instanceSet        *throttledInstanceSet
122         newExecutor        func(cloud.Instance) Executor
123         bootProbeCommand   string
124         imageID            cloud.ImageID
125         instanceTypes      map[string]arvados.InstanceType
126         syncInterval       time.Duration
127         probeInterval      time.Duration
128         maxProbesPerSecond int
129         timeoutIdle        time.Duration
130         timeoutBooting     time.Duration
131         timeoutProbe       time.Duration
132         timeoutShutdown    time.Duration
133
134         // private state
135         subscribers  map[<-chan struct{}]chan<- struct{}
136         creating     map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls
137         workers      map[cloud.InstanceID]*worker
138         loaded       bool                 // loaded list of instances from InstanceSet at least once
139         exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
140         atQuotaUntil time.Time
141         atQuotaErr   cloud.QuotaError
142         stop         chan bool
143         mtx          sync.RWMutex
144         setupOnce    sync.Once
145
146         throttleCreate    throttle
147         throttleInstances throttle
148
149         mInstances         prometheus.Gauge
150         mInstancesPrice    prometheus.Gauge
151         mContainersRunning prometheus.Gauge
152         mVCPUs             prometheus.Gauge
153         mVCPUsInuse        prometheus.Gauge
154         mMemory            prometheus.Gauge
155         mMemoryInuse       prometheus.Gauge
156 }
157
158 // Subscribe returns a buffered channel that becomes ready after any
159 // change to the pool's state that could have scheduling implications:
160 // a worker's state changes, a new worker appears, the cloud
161 // provider's API rate limiting period ends, etc.
162 //
163 // Additional events that occur while the channel is already ready
164 // will be dropped, so it is OK if the caller services the channel
165 // slowly.
166 //
167 // Example:
168 //
169 //      ch := wp.Subscribe()
170 //      defer wp.Unsubscribe(ch)
171 //      for range ch {
172 //              tryScheduling(wp)
173 //              if done {
174 //                      break
175 //              }
176 //      }
177 func (wp *Pool) Subscribe() <-chan struct{} {
178         wp.setupOnce.Do(wp.setup)
179         wp.mtx.Lock()
180         defer wp.mtx.Unlock()
181         ch := make(chan struct{}, 1)
182         wp.subscribers[ch] = ch
183         return ch
184 }
185
186 // Unsubscribe stops sending updates to the given channel.
187 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
188         wp.setupOnce.Do(wp.setup)
189         wp.mtx.Lock()
190         defer wp.mtx.Unlock()
191         delete(wp.subscribers, ch)
192 }
193
194 // Unallocated returns the number of unallocated (creating + booting +
195 // idle + unknown) workers for each instance type.  Workers in
196 // hold/drain mode are not included.
197 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
198         wp.setupOnce.Do(wp.setup)
199         wp.mtx.RLock()
200         defer wp.mtx.RUnlock()
201         unalloc := map[arvados.InstanceType]int{}
202         creating := map[arvados.InstanceType]int{}
203         for it, times := range wp.creating {
204                 creating[it] = len(times)
205         }
206         for _, wkr := range wp.workers {
207                 if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) || wkr.idleBehavior != IdleBehaviorRun {
208                         continue
209                 }
210                 it := wkr.instType
211                 unalloc[it]++
212                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) {
213                         // If up to N new workers appear in
214                         // Instances() while we are waiting for N
215                         // Create() calls to complete, we assume we're
216                         // just seeing a race between Instances() and
217                         // Create() responses.
218                         //
219                         // The other common reason why nodes have
220                         // state==Unknown is that they appeared at
221                         // startup, before any Create calls. They
222                         // don't match the above timing condition, so
223                         // we never mistakenly attribute them to
224                         // pending Create calls.
225                         creating[it]--
226                 }
227         }
228         for it, c := range creating {
229                 unalloc[it] += c
230         }
231         return unalloc
232 }
233
234 // Create a new instance with the given type, and add it to the worker
235 // pool. The worker is added immediately; instance creation runs in
236 // the background.
237 //
238 // Create returns false if a pre-existing error state prevents it from
239 // even attempting to create a new instance. Those errors are logged
240 // by the Pool, so the caller does not need to log anything in such
241 // cases.
242 func (wp *Pool) Create(it arvados.InstanceType) bool {
243         logger := wp.logger.WithField("InstanceType", it.Name)
244         wp.setupOnce.Do(wp.setup)
245         wp.mtx.Lock()
246         defer wp.mtx.Unlock()
247         if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
248                 return false
249         }
250         tags := cloud.InstanceTags{
251                 tagKeyInstanceType: it.Name,
252                 tagKeyIdleBehavior: string(IdleBehaviorRun),
253         }
254         now := time.Now()
255         wp.creating[it] = append(wp.creating[it], now)
256         go func() {
257                 defer wp.notify()
258                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
259                 wp.mtx.Lock()
260                 defer wp.mtx.Unlock()
261                 // Remove our timestamp marker from wp.creating
262                 for i, t := range wp.creating[it] {
263                         if t == now {
264                                 copy(wp.creating[it][i:], wp.creating[it][i+1:])
265                                 wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1]
266                                 break
267                         }
268                 }
269                 if err != nil {
270                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
271                                 wp.atQuotaErr = err
272                                 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
273                                 time.AfterFunc(quotaErrorTTL, wp.notify)
274                         }
275                         logger.WithError(err).Error("create failed")
276                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
277                         return
278                 }
279                 wp.updateWorker(inst, it, StateBooting)
280         }()
281         return true
282 }
283
284 // AtQuota returns true if Create is not expected to work at the
285 // moment.
286 func (wp *Pool) AtQuota() bool {
287         wp.mtx.Lock()
288         defer wp.mtx.Unlock()
289         return time.Now().Before(wp.atQuotaUntil)
290 }
291
292 // SetIdleBehavior determines how the indicated instance will behave
293 // when it has no containers running.
294 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
295         wp.mtx.Lock()
296         defer wp.mtx.Unlock()
297         wkr, ok := wp.workers[id]
298         if !ok {
299                 return errors.New("requested instance does not exist")
300         }
301         wkr.idleBehavior = idleBehavior
302         wkr.saveTags()
303         wkr.shutdownIfIdle()
304         return nil
305 }
306
307 // Add or update worker attached to the given instance. Use
308 // initialState if a new worker is created.
309 //
310 // The second return value is true if a new worker is created.
311 //
312 // Caller must have lock.
313 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
314         id := inst.ID()
315         if wkr := wp.workers[id]; wkr != nil {
316                 wkr.executor.SetTarget(inst)
317                 wkr.instance = inst
318                 wkr.updated = time.Now()
319                 if initialState == StateBooting && wkr.state == StateUnknown {
320                         wkr.state = StateBooting
321                 }
322                 wkr.saveTags()
323                 return wkr, false
324         }
325
326         // If an instance has a valid IdleBehavior tag when it first
327         // appears, initialize the new worker accordingly (this is how
328         // we restore IdleBehavior that was set by a prior dispatch
329         // process); otherwise, default to "run". After this,
330         // wkr.idleBehavior is the source of truth, and will only be
331         // changed via SetIdleBehavior().
332         idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior])
333         if !validIdleBehavior[idleBehavior] {
334                 idleBehavior = IdleBehaviorRun
335         }
336
337         logger := wp.logger.WithFields(logrus.Fields{
338                 "InstanceType": it.Name,
339                 "Instance":     inst,
340         })
341         logger.WithFields(logrus.Fields{
342                 "State":        initialState,
343                 "IdleBehavior": idleBehavior,
344         }).Infof("instance appeared in cloud")
345         now := time.Now()
346         wkr := &worker{
347                 mtx:          &wp.mtx,
348                 wp:           wp,
349                 logger:       logger,
350                 executor:     wp.newExecutor(inst),
351                 state:        initialState,
352                 idleBehavior: idleBehavior,
353                 instance:     inst,
354                 instType:     it,
355                 appeared:     now,
356                 probed:       now,
357                 busy:         now,
358                 updated:      now,
359                 running:      make(map[string]struct{}),
360                 starting:     make(map[string]struct{}),
361                 probing:      make(chan struct{}, 1),
362         }
363         wp.workers[id] = wkr
364         return wkr, true
365 }
366
367 // caller must have lock.
368 func (wp *Pool) notifyExited(uuid string, t time.Time) {
369         wp.exited[uuid] = t
370 }
371
372 // Shutdown shuts down a worker with the given type, or returns false
373 // if all workers with the given type are busy.
374 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
375         wp.setupOnce.Do(wp.setup)
376         wp.mtx.Lock()
377         defer wp.mtx.Unlock()
378         logger := wp.logger.WithField("InstanceType", it.Name)
379         logger.Info("shutdown requested")
380         for _, tryState := range []State{StateBooting, StateIdle} {
381                 // TODO: shutdown the worker with the longest idle
382                 // time (Idle) or the earliest create time (Booting)
383                 for _, wkr := range wp.workers {
384                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
385                                 logger.WithField("Instance", wkr.instance).Info("shutting down")
386                                 wkr.shutdown()
387                                 return true
388                         }
389                 }
390         }
391         return false
392 }
393
394 // CountWorkers returns the current number of workers in each state.
395 func (wp *Pool) CountWorkers() map[State]int {
396         wp.setupOnce.Do(wp.setup)
397         wp.mtx.Lock()
398         defer wp.mtx.Unlock()
399         r := map[State]int{}
400         for _, w := range wp.workers {
401                 r[w.state]++
402         }
403         return r
404 }
405
406 // Running returns the container UUIDs being prepared/run on workers.
407 //
408 // In the returned map, the time value indicates when the Pool
409 // observed that the container process had exited. A container that
410 // has not yet exited has a zero time value. The caller should use
411 // KillContainer() to garbage-collect the entries for exited
412 // containers.
413 func (wp *Pool) Running() map[string]time.Time {
414         wp.setupOnce.Do(wp.setup)
415         wp.mtx.Lock()
416         defer wp.mtx.Unlock()
417         r := map[string]time.Time{}
418         for _, wkr := range wp.workers {
419                 for uuid := range wkr.running {
420                         r[uuid] = time.Time{}
421                 }
422                 for uuid := range wkr.starting {
423                         r[uuid] = time.Time{}
424                 }
425         }
426         for uuid, exited := range wp.exited {
427                 r[uuid] = exited
428         }
429         return r
430 }
431
432 // StartContainer starts a container on an idle worker immediately if
433 // possible, otherwise returns false.
434 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
435         wp.setupOnce.Do(wp.setup)
436         wp.mtx.Lock()
437         defer wp.mtx.Unlock()
438         var wkr *worker
439         for _, w := range wp.workers {
440                 if w.instType == it && w.state == StateIdle {
441                         if wkr == nil || w.busy.After(wkr.busy) {
442                                 wkr = w
443                         }
444                 }
445         }
446         if wkr == nil {
447                 return false
448         }
449         wkr.startContainer(ctr)
450         return true
451 }
452
453 // KillContainer kills the crunch-run process for the given container
454 // UUID, if it's running on any worker.
455 //
456 // KillContainer returns immediately; the act of killing the container
457 // takes some time, and runs in the background.
458 func (wp *Pool) KillContainer(uuid string) {
459         wp.mtx.Lock()
460         defer wp.mtx.Unlock()
461         if _, ok := wp.exited[uuid]; ok {
462                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
463                 delete(wp.exited, uuid)
464                 return
465         }
466         for _, wkr := range wp.workers {
467                 if _, ok := wkr.running[uuid]; ok {
468                         go wp.kill(wkr, uuid)
469                         return
470                 }
471         }
472         wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
473 }
474
475 func (wp *Pool) kill(wkr *worker, uuid string) {
476         logger := wp.logger.WithFields(logrus.Fields{
477                 "ContainerUUID": uuid,
478                 "Instance":      wkr.instance,
479         })
480         logger.Debug("killing process")
481         stdout, stderr, err := wkr.executor.Execute(nil, "crunch-run --kill 15 "+uuid, nil)
482         if err != nil {
483                 logger.WithFields(logrus.Fields{
484                         "stderr": string(stderr),
485                         "stdout": string(stdout),
486                         "error":  err,
487                 }).Warn("kill failed")
488                 return
489         }
490         logger.Debug("killing process succeeded")
491         wp.mtx.Lock()
492         defer wp.mtx.Unlock()
493         if _, ok := wkr.running[uuid]; ok {
494                 delete(wkr.running, uuid)
495                 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
496                         wkr.state = StateIdle
497                 }
498                 wkr.updated = time.Now()
499                 go wp.notify()
500         }
501 }
502
503 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
504         if reg == nil {
505                 reg = prometheus.NewRegistry()
506         }
507         wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
508                 Namespace: "arvados",
509                 Subsystem: "dispatchcloud",
510                 Name:      "instances_total",
511                 Help:      "Number of cloud VMs including pending, booting, running, held, and shutting down.",
512         })
513         reg.MustRegister(wp.mInstances)
514         wp.mInstancesPrice = prometheus.NewGauge(prometheus.GaugeOpts{
515                 Namespace: "arvados",
516                 Subsystem: "dispatchcloud",
517                 Name:      "instances_price_total",
518                 Help:      "Sum of prices of all cloud VMs including pending, booting, running, held, and shutting down.",
519         })
520         reg.MustRegister(wp.mInstancesPrice)
521         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
522                 Namespace: "arvados",
523                 Subsystem: "dispatchcloud",
524                 Name:      "containers_running",
525                 Help:      "Number of containers reported running by cloud VMs.",
526         })
527         reg.MustRegister(wp.mContainersRunning)
528
529         wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
530                 Namespace: "arvados",
531                 Subsystem: "dispatchcloud",
532                 Name:      "vcpus_total",
533                 Help:      "Total VCPUs on all cloud VMs.",
534         })
535         reg.MustRegister(wp.mVCPUs)
536         wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
537                 Namespace: "arvados",
538                 Subsystem: "dispatchcloud",
539                 Name:      "vcpus_inuse",
540                 Help:      "VCPUs on cloud VMs that are running containers.",
541         })
542         reg.MustRegister(wp.mVCPUsInuse)
543         wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
544                 Namespace: "arvados",
545                 Subsystem: "dispatchcloud",
546                 Name:      "memory_bytes_total",
547                 Help:      "Total memory on all cloud VMs.",
548         })
549         reg.MustRegister(wp.mMemory)
550         wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
551                 Namespace: "arvados",
552                 Subsystem: "dispatchcloud",
553                 Name:      "memory_bytes_inuse",
554                 Help:      "Memory on cloud VMs that are running containers.",
555         })
556         reg.MustRegister(wp.mMemoryInuse)
557 }
558
559 func (wp *Pool) runMetrics() {
560         ch := wp.Subscribe()
561         defer wp.Unsubscribe(ch)
562         for range ch {
563                 wp.updateMetrics()
564         }
565 }
566
567 func (wp *Pool) updateMetrics() {
568         wp.mtx.RLock()
569         defer wp.mtx.RUnlock()
570
571         var price float64
572         var alloc, cpu, cpuInuse, mem, memInuse int64
573         for _, wkr := range wp.workers {
574                 price += wkr.instType.Price
575                 cpu += int64(wkr.instType.VCPUs)
576                 mem += int64(wkr.instType.RAM)
577                 if len(wkr.running)+len(wkr.starting) == 0 {
578                         continue
579                 }
580                 alloc += int64(len(wkr.running) + len(wkr.starting))
581                 cpuInuse += int64(wkr.instType.VCPUs)
582                 memInuse += int64(wkr.instType.RAM)
583         }
584         wp.mInstances.Set(float64(len(wp.workers)))
585         wp.mInstancesPrice.Set(price)
586         wp.mContainersRunning.Set(float64(alloc))
587         wp.mVCPUs.Set(float64(cpu))
588         wp.mMemory.Set(float64(mem))
589         wp.mVCPUsInuse.Set(float64(cpuInuse))
590         wp.mMemoryInuse.Set(float64(memInuse))
591 }
592
593 func (wp *Pool) runProbes() {
594         maxPPS := wp.maxProbesPerSecond
595         if maxPPS < 1 {
596                 maxPPS = defaultMaxProbesPerSecond
597         }
598         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
599         defer limitticker.Stop()
600
601         probeticker := time.NewTicker(wp.probeInterval)
602         defer probeticker.Stop()
603
604         workers := []cloud.InstanceID{}
605         for range probeticker.C {
606                 workers = workers[:0]
607                 wp.mtx.Lock()
608                 for id, wkr := range wp.workers {
609                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
610                                 continue
611                         }
612                         workers = append(workers, id)
613                 }
614                 wp.mtx.Unlock()
615
616                 for _, id := range workers {
617                         wp.mtx.Lock()
618                         wkr, ok := wp.workers[id]
619                         wp.mtx.Unlock()
620                         if !ok {
621                                 // Deleted while we were probing
622                                 // others
623                                 continue
624                         }
625                         go wkr.ProbeAndUpdate()
626                         select {
627                         case <-wp.stop:
628                                 return
629                         case <-limitticker.C:
630                         }
631                 }
632         }
633 }
634
635 func (wp *Pool) runSync() {
636         // sync once immediately, then wait syncInterval, sync again,
637         // etc.
638         timer := time.NewTimer(1)
639         for {
640                 select {
641                 case <-timer.C:
642                         err := wp.getInstancesAndSync()
643                         if err != nil {
644                                 wp.logger.WithError(err).Warn("sync failed")
645                         }
646                         timer.Reset(wp.syncInterval)
647                 case <-wp.stop:
648                         wp.logger.Debug("worker.Pool stopped")
649                         return
650                 }
651         }
652 }
653
654 // Stop synchronizing with the InstanceSet.
655 func (wp *Pool) Stop() {
656         wp.setupOnce.Do(wp.setup)
657         close(wp.stop)
658 }
659
660 // Instances returns an InstanceView for each worker in the pool,
661 // summarizing its current state and recent activity.
662 func (wp *Pool) Instances() []InstanceView {
663         var r []InstanceView
664         wp.setupOnce.Do(wp.setup)
665         wp.mtx.Lock()
666         for _, w := range wp.workers {
667                 r = append(r, InstanceView{
668                         Instance:             w.instance.ID(),
669                         Price:                w.instType.Price,
670                         ArvadosInstanceType:  w.instType.Name,
671                         ProviderInstanceType: w.instType.ProviderType,
672                         LastContainerUUID:    w.lastUUID,
673                         LastBusy:             w.busy,
674                         WorkerState:          w.state.String(),
675                         IdleBehavior:         w.idleBehavior,
676                 })
677         }
678         wp.mtx.Unlock()
679         sort.Slice(r, func(i, j int) bool {
680                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
681         })
682         return r
683 }
684
685 func (wp *Pool) setup() {
686         wp.creating = map[arvados.InstanceType][]time.Time{}
687         wp.exited = map[string]time.Time{}
688         wp.workers = map[cloud.InstanceID]*worker{}
689         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
690 }
691
692 func (wp *Pool) notify() {
693         wp.mtx.RLock()
694         defer wp.mtx.RUnlock()
695         for _, send := range wp.subscribers {
696                 select {
697                 case send <- struct{}{}:
698                 default:
699                 }
700         }
701 }
702
703 func (wp *Pool) getInstancesAndSync() error {
704         wp.setupOnce.Do(wp.setup)
705         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
706                 return err
707         }
708         wp.logger.Debug("getting instance list")
709         threshold := time.Now()
710         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
711         if err != nil {
712                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
713                 return err
714         }
715         wp.sync(threshold, instances)
716         wp.logger.Debug("sync done")
717         return nil
718 }
719
720 // Add/remove/update workers based on instances, which was obtained
721 // from the instanceSet. However, don't clobber any other updates that
722 // already happened after threshold.
723 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
724         wp.mtx.Lock()
725         defer wp.mtx.Unlock()
726         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
727         notify := false
728
729         for _, inst := range instances {
730                 itTag := inst.Tags()[tagKeyInstanceType]
731                 it, ok := wp.instanceTypes[itTag]
732                 if !ok {
733                         wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
734                         continue
735                 }
736                 if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew {
737                         notify = true
738                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
739                         wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
740                         wkr.shutdown()
741                 }
742         }
743
744         for id, wkr := range wp.workers {
745                 if wkr.updated.After(threshold) {
746                         continue
747                 }
748                 logger := wp.logger.WithFields(logrus.Fields{
749                         "Instance":    wkr.instance,
750                         "WorkerState": wkr.state,
751                 })
752                 logger.Info("instance disappeared in cloud")
753                 delete(wp.workers, id)
754                 go wkr.executor.Close()
755                 notify = true
756         }
757
758         if !wp.loaded {
759                 wp.loaded = true
760                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
761         }
762
763         if notify {
764                 go wp.notify()
765         }
766 }