14807: Expose instance IP addresses in logs and management API.
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "errors"
9         "io"
10         "sort"
11         "strings"
12         "sync"
13         "time"
14
15         "git.curoverse.com/arvados.git/lib/cloud"
16         "git.curoverse.com/arvados.git/sdk/go/arvados"
17         "github.com/prometheus/client_golang/prometheus"
18         "github.com/sirupsen/logrus"
19         "golang.org/x/crypto/ssh"
20 )
21
22 const (
23         tagKeyInstanceType = "InstanceType"
24         tagKeyIdleBehavior = "IdleBehavior"
25 )
26
27 // An InstanceView shows a worker's current state and recent activity.
28 type InstanceView struct {
29         Instance             cloud.InstanceID `json:"instance"`
30         Address              string           `json:"address"`
31         Price                float64          `json:"price"`
32         ArvadosInstanceType  string           `json:"arvados_instance_type"`
33         ProviderInstanceType string           `json:"provider_instance_type"`
34         LastContainerUUID    string           `json:"last_container_uuid"`
35         LastBusy             time.Time        `json:"last_busy"`
36         WorkerState          string           `json:"worker_state"`
37         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
38 }
39
40 // An Executor executes shell commands on a remote host.
41 type Executor interface {
42         // Run cmd on the current target.
43         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
44
45         // Use the given target for subsequent operations. The new
46         // target is the same host as the previous target, but it
47         // might return a different address and verify a different
48         // host key.
49         //
50         // SetTarget is called frequently, and in most cases the new
51         // target will behave exactly the same as the old one. An
52         // implementation should optimize accordingly.
53         //
54         // SetTarget must not block on concurrent Execute calls.
55         SetTarget(cloud.ExecutorTarget)
56
57         Close()
58 }
59
60 const (
61         defaultSyncInterval       = time.Minute
62         defaultProbeInterval      = time.Second * 10
63         defaultMaxProbesPerSecond = 10
64         defaultTimeoutIdle        = time.Minute
65         defaultTimeoutBooting     = time.Minute * 10
66         defaultTimeoutProbe       = time.Minute * 10
67         defaultTimeoutShutdown    = time.Second * 10
68
69         // Time after a quota error to try again anyway, even if no
70         // instances have been shutdown.
71         quotaErrorTTL = time.Minute
72
73         // Time between "X failed because rate limiting" messages
74         logRateLimitErrorInterval = time.Second * 10
75 )
76
77 func duration(conf arvados.Duration, def time.Duration) time.Duration {
78         if conf > 0 {
79                 return time.Duration(conf)
80         } else {
81                 return def
82         }
83 }
84
85 // NewPool creates a Pool of workers backed by instanceSet.
86 //
87 // New instances are configured and set up according to the given
88 // cluster configuration.
89 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
90         wp := &Pool{
91                 logger:             logger,
92                 arvClient:          arvClient,
93                 instanceSet:        &throttledInstanceSet{InstanceSet: instanceSet},
94                 newExecutor:        newExecutor,
95                 bootProbeCommand:   cluster.CloudVMs.BootProbeCommand,
96                 imageID:            cloud.ImageID(cluster.CloudVMs.ImageID),
97                 instanceTypes:      cluster.InstanceTypes,
98                 maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
99                 probeInterval:      duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
100                 syncInterval:       duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
101                 timeoutIdle:        duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
102                 timeoutBooting:     duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
103                 timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
104                 timeoutShutdown:    duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
105                 installPublicKey:   installPublicKey,
106                 stop:               make(chan bool),
107         }
108         wp.registerMetrics(reg)
109         go func() {
110                 wp.setupOnce.Do(wp.setup)
111                 go wp.runMetrics()
112                 go wp.runProbes()
113                 go wp.runSync()
114         }()
115         return wp
116 }
117
118 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
119 // zero Pool should not be used. Call NewPool to create a new Pool.
120 type Pool struct {
121         // configuration
122         logger             logrus.FieldLogger
123         arvClient          *arvados.Client
124         instanceSet        *throttledInstanceSet
125         newExecutor        func(cloud.Instance) Executor
126         bootProbeCommand   string
127         imageID            cloud.ImageID
128         instanceTypes      map[string]arvados.InstanceType
129         syncInterval       time.Duration
130         probeInterval      time.Duration
131         maxProbesPerSecond int
132         timeoutIdle        time.Duration
133         timeoutBooting     time.Duration
134         timeoutProbe       time.Duration
135         timeoutShutdown    time.Duration
136         installPublicKey   ssh.PublicKey
137
138         // private state
139         subscribers  map[<-chan struct{}]chan<- struct{}
140         creating     map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls
141         workers      map[cloud.InstanceID]*worker
142         loaded       bool                 // loaded list of instances from InstanceSet at least once
143         exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
144         atQuotaUntil time.Time
145         atQuotaErr   cloud.QuotaError
146         stop         chan bool
147         mtx          sync.RWMutex
148         setupOnce    sync.Once
149
150         throttleCreate    throttle
151         throttleInstances throttle
152
153         mInstances         prometheus.Gauge
154         mInstancesPrice    prometheus.Gauge
155         mContainersRunning prometheus.Gauge
156         mVCPUs             prometheus.Gauge
157         mVCPUsInuse        prometheus.Gauge
158         mMemory            prometheus.Gauge
159         mMemoryInuse       prometheus.Gauge
160 }
161
162 // Subscribe returns a buffered channel that becomes ready after any
163 // change to the pool's state that could have scheduling implications:
164 // a worker's state changes, a new worker appears, the cloud
165 // provider's API rate limiting period ends, etc.
166 //
167 // Additional events that occur while the channel is already ready
168 // will be dropped, so it is OK if the caller services the channel
169 // slowly.
170 //
171 // Example:
172 //
173 //      ch := wp.Subscribe()
174 //      defer wp.Unsubscribe(ch)
175 //      for range ch {
176 //              tryScheduling(wp)
177 //              if done {
178 //                      break
179 //              }
180 //      }
181 func (wp *Pool) Subscribe() <-chan struct{} {
182         wp.setupOnce.Do(wp.setup)
183         wp.mtx.Lock()
184         defer wp.mtx.Unlock()
185         ch := make(chan struct{}, 1)
186         wp.subscribers[ch] = ch
187         return ch
188 }
189
190 // Unsubscribe stops sending updates to the given channel.
191 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
192         wp.setupOnce.Do(wp.setup)
193         wp.mtx.Lock()
194         defer wp.mtx.Unlock()
195         delete(wp.subscribers, ch)
196 }
197
198 // Unallocated returns the number of unallocated (creating + booting +
199 // idle + unknown) workers for each instance type.  Workers in
200 // hold/drain mode are not included.
201 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
202         wp.setupOnce.Do(wp.setup)
203         wp.mtx.RLock()
204         defer wp.mtx.RUnlock()
205         unalloc := map[arvados.InstanceType]int{}
206         creating := map[arvados.InstanceType]int{}
207         for it, times := range wp.creating {
208                 creating[it] = len(times)
209         }
210         for _, wkr := range wp.workers {
211                 // Skip workers that are not expected to become
212                 // available soon. Note len(wkr.running)>0 is not
213                 // redundant here: it can be true even in
214                 // StateUnknown.
215                 if wkr.state == StateShutdown ||
216                         wkr.state == StateRunning ||
217                         wkr.idleBehavior != IdleBehaviorRun ||
218                         len(wkr.running) > 0 {
219                         continue
220                 }
221                 it := wkr.instType
222                 unalloc[it]++
223                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) {
224                         // If up to N new workers appear in
225                         // Instances() while we are waiting for N
226                         // Create() calls to complete, we assume we're
227                         // just seeing a race between Instances() and
228                         // Create() responses.
229                         //
230                         // The other common reason why nodes have
231                         // state==Unknown is that they appeared at
232                         // startup, before any Create calls. They
233                         // don't match the above timing condition, so
234                         // we never mistakenly attribute them to
235                         // pending Create calls.
236                         creating[it]--
237                 }
238         }
239         for it, c := range creating {
240                 unalloc[it] += c
241         }
242         return unalloc
243 }
244
245 // Create a new instance with the given type, and add it to the worker
246 // pool. The worker is added immediately; instance creation runs in
247 // the background.
248 //
249 // Create returns false if a pre-existing error state prevents it from
250 // even attempting to create a new instance. Those errors are logged
251 // by the Pool, so the caller does not need to log anything in such
252 // cases.
253 func (wp *Pool) Create(it arvados.InstanceType) bool {
254         logger := wp.logger.WithField("InstanceType", it.Name)
255         wp.setupOnce.Do(wp.setup)
256         wp.mtx.Lock()
257         defer wp.mtx.Unlock()
258         if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
259                 return false
260         }
261         tags := cloud.InstanceTags{
262                 tagKeyInstanceType: it.Name,
263                 tagKeyIdleBehavior: string(IdleBehaviorRun),
264         }
265         now := time.Now()
266         wp.creating[it] = append(wp.creating[it], now)
267         go func() {
268                 defer wp.notify()
269                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, wp.installPublicKey)
270                 wp.mtx.Lock()
271                 defer wp.mtx.Unlock()
272                 // Remove our timestamp marker from wp.creating
273                 for i, t := range wp.creating[it] {
274                         if t == now {
275                                 copy(wp.creating[it][i:], wp.creating[it][i+1:])
276                                 wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1]
277                                 break
278                         }
279                 }
280                 if err != nil {
281                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
282                                 wp.atQuotaErr = err
283                                 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
284                                 time.AfterFunc(quotaErrorTTL, wp.notify)
285                         }
286                         logger.WithError(err).Error("create failed")
287                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
288                         return
289                 }
290                 wp.updateWorker(inst, it, StateBooting)
291         }()
292         return true
293 }
294
295 // AtQuota returns true if Create is not expected to work at the
296 // moment.
297 func (wp *Pool) AtQuota() bool {
298         wp.mtx.Lock()
299         defer wp.mtx.Unlock()
300         return time.Now().Before(wp.atQuotaUntil)
301 }
302
303 // SetIdleBehavior determines how the indicated instance will behave
304 // when it has no containers running.
305 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
306         wp.mtx.Lock()
307         defer wp.mtx.Unlock()
308         wkr, ok := wp.workers[id]
309         if !ok {
310                 return errors.New("requested instance does not exist")
311         }
312         wkr.idleBehavior = idleBehavior
313         wkr.saveTags()
314         wkr.shutdownIfIdle()
315         return nil
316 }
317
318 // Add or update worker attached to the given instance. Use
319 // initialState if a new worker is created.
320 //
321 // The second return value is true if a new worker is created.
322 //
323 // Caller must have lock.
324 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
325         id := inst.ID()
326         if wkr := wp.workers[id]; wkr != nil {
327                 wkr.executor.SetTarget(inst)
328                 wkr.instance = inst
329                 wkr.updated = time.Now()
330                 if initialState == StateBooting && wkr.state == StateUnknown {
331                         wkr.state = StateBooting
332                 }
333                 wkr.saveTags()
334                 return wkr, false
335         }
336
337         // If an instance has a valid IdleBehavior tag when it first
338         // appears, initialize the new worker accordingly (this is how
339         // we restore IdleBehavior that was set by a prior dispatch
340         // process); otherwise, default to "run". After this,
341         // wkr.idleBehavior is the source of truth, and will only be
342         // changed via SetIdleBehavior().
343         idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior])
344         if !validIdleBehavior[idleBehavior] {
345                 idleBehavior = IdleBehaviorRun
346         }
347
348         logger := wp.logger.WithFields(logrus.Fields{
349                 "InstanceType": it.Name,
350                 "Instance":     inst,
351                 "Address":      inst.Address(),
352         })
353         logger.WithFields(logrus.Fields{
354                 "State":        initialState,
355                 "IdleBehavior": idleBehavior,
356         }).Infof("instance appeared in cloud")
357         now := time.Now()
358         wkr := &worker{
359                 mtx:          &wp.mtx,
360                 wp:           wp,
361                 logger:       logger,
362                 executor:     wp.newExecutor(inst),
363                 state:        initialState,
364                 idleBehavior: idleBehavior,
365                 instance:     inst,
366                 instType:     it,
367                 appeared:     now,
368                 probed:       now,
369                 busy:         now,
370                 updated:      now,
371                 running:      make(map[string]struct{}),
372                 starting:     make(map[string]struct{}),
373                 probing:      make(chan struct{}, 1),
374         }
375         wp.workers[id] = wkr
376         return wkr, true
377 }
378
379 // caller must have lock.
380 func (wp *Pool) notifyExited(uuid string, t time.Time) {
381         wp.exited[uuid] = t
382 }
383
384 // Shutdown shuts down a worker with the given type, or returns false
385 // if all workers with the given type are busy.
386 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
387         wp.setupOnce.Do(wp.setup)
388         wp.mtx.Lock()
389         defer wp.mtx.Unlock()
390         logger := wp.logger.WithField("InstanceType", it.Name)
391         logger.Info("shutdown requested")
392         for _, tryState := range []State{StateBooting, StateIdle} {
393                 // TODO: shutdown the worker with the longest idle
394                 // time (Idle) or the earliest create time (Booting)
395                 for _, wkr := range wp.workers {
396                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
397                                 logger.WithField("Instance", wkr.instance).Info("shutting down")
398                                 wkr.shutdown()
399                                 return true
400                         }
401                 }
402         }
403         return false
404 }
405
406 // CountWorkers returns the current number of workers in each state.
407 func (wp *Pool) CountWorkers() map[State]int {
408         wp.setupOnce.Do(wp.setup)
409         wp.mtx.Lock()
410         defer wp.mtx.Unlock()
411         r := map[State]int{}
412         for _, w := range wp.workers {
413                 r[w.state]++
414         }
415         return r
416 }
417
418 // Running returns the container UUIDs being prepared/run on workers.
419 //
420 // In the returned map, the time value indicates when the Pool
421 // observed that the container process had exited. A container that
422 // has not yet exited has a zero time value. The caller should use
423 // KillContainer() to garbage-collect the entries for exited
424 // containers.
425 func (wp *Pool) Running() map[string]time.Time {
426         wp.setupOnce.Do(wp.setup)
427         wp.mtx.Lock()
428         defer wp.mtx.Unlock()
429         r := map[string]time.Time{}
430         for _, wkr := range wp.workers {
431                 for uuid := range wkr.running {
432                         r[uuid] = time.Time{}
433                 }
434                 for uuid := range wkr.starting {
435                         r[uuid] = time.Time{}
436                 }
437         }
438         for uuid, exited := range wp.exited {
439                 r[uuid] = exited
440         }
441         return r
442 }
443
444 // StartContainer starts a container on an idle worker immediately if
445 // possible, otherwise returns false.
446 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
447         wp.setupOnce.Do(wp.setup)
448         wp.mtx.Lock()
449         defer wp.mtx.Unlock()
450         var wkr *worker
451         for _, w := range wp.workers {
452                 if w.instType == it && w.state == StateIdle {
453                         if wkr == nil || w.busy.After(wkr.busy) {
454                                 wkr = w
455                         }
456                 }
457         }
458         if wkr == nil {
459                 return false
460         }
461         wkr.startContainer(ctr)
462         return true
463 }
464
465 // KillContainer kills the crunch-run process for the given container
466 // UUID, if it's running on any worker.
467 //
468 // KillContainer returns immediately; the act of killing the container
469 // takes some time, and runs in the background.
470 func (wp *Pool) KillContainer(uuid string) {
471         wp.mtx.Lock()
472         defer wp.mtx.Unlock()
473         if _, ok := wp.exited[uuid]; ok {
474                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
475                 delete(wp.exited, uuid)
476                 return
477         }
478         for _, wkr := range wp.workers {
479                 if _, ok := wkr.running[uuid]; ok {
480                         go wp.kill(wkr, uuid)
481                         return
482                 }
483         }
484         wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
485 }
486
487 func (wp *Pool) kill(wkr *worker, uuid string) {
488         logger := wp.logger.WithFields(logrus.Fields{
489                 "ContainerUUID": uuid,
490                 "Instance":      wkr.instance,
491         })
492         logger.Debug("killing process")
493         cmd := "crunch-run --kill 15 " + uuid
494         if u := wkr.instance.RemoteUser(); u != "root" {
495                 cmd = "sudo " + cmd
496         }
497         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
498         if err != nil {
499                 logger.WithFields(logrus.Fields{
500                         "stderr": string(stderr),
501                         "stdout": string(stdout),
502                         "error":  err,
503                 }).Warn("kill failed")
504                 return
505         }
506         logger.Debug("killing process succeeded")
507         wp.mtx.Lock()
508         defer wp.mtx.Unlock()
509         if _, ok := wkr.running[uuid]; ok {
510                 delete(wkr.running, uuid)
511                 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
512                         wkr.state = StateIdle
513                 }
514                 wkr.updated = time.Now()
515                 go wp.notify()
516         }
517 }
518
519 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
520         if reg == nil {
521                 reg = prometheus.NewRegistry()
522         }
523         wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
524                 Namespace: "arvados",
525                 Subsystem: "dispatchcloud",
526                 Name:      "instances_total",
527                 Help:      "Number of cloud VMs including pending, booting, running, held, and shutting down.",
528         })
529         reg.MustRegister(wp.mInstances)
530         wp.mInstancesPrice = prometheus.NewGauge(prometheus.GaugeOpts{
531                 Namespace: "arvados",
532                 Subsystem: "dispatchcloud",
533                 Name:      "instances_price_total",
534                 Help:      "Sum of prices of all cloud VMs including pending, booting, running, held, and shutting down.",
535         })
536         reg.MustRegister(wp.mInstancesPrice)
537         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
538                 Namespace: "arvados",
539                 Subsystem: "dispatchcloud",
540                 Name:      "containers_running",
541                 Help:      "Number of containers reported running by cloud VMs.",
542         })
543         reg.MustRegister(wp.mContainersRunning)
544
545         wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
546                 Namespace: "arvados",
547                 Subsystem: "dispatchcloud",
548                 Name:      "vcpus_total",
549                 Help:      "Total VCPUs on all cloud VMs.",
550         })
551         reg.MustRegister(wp.mVCPUs)
552         wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
553                 Namespace: "arvados",
554                 Subsystem: "dispatchcloud",
555                 Name:      "vcpus_inuse",
556                 Help:      "VCPUs on cloud VMs that are running containers.",
557         })
558         reg.MustRegister(wp.mVCPUsInuse)
559         wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
560                 Namespace: "arvados",
561                 Subsystem: "dispatchcloud",
562                 Name:      "memory_bytes_total",
563                 Help:      "Total memory on all cloud VMs.",
564         })
565         reg.MustRegister(wp.mMemory)
566         wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
567                 Namespace: "arvados",
568                 Subsystem: "dispatchcloud",
569                 Name:      "memory_bytes_inuse",
570                 Help:      "Memory on cloud VMs that are running containers.",
571         })
572         reg.MustRegister(wp.mMemoryInuse)
573 }
574
575 func (wp *Pool) runMetrics() {
576         ch := wp.Subscribe()
577         defer wp.Unsubscribe(ch)
578         for range ch {
579                 wp.updateMetrics()
580         }
581 }
582
583 func (wp *Pool) updateMetrics() {
584         wp.mtx.RLock()
585         defer wp.mtx.RUnlock()
586
587         var price float64
588         var alloc, cpu, cpuInuse, mem, memInuse int64
589         for _, wkr := range wp.workers {
590                 price += wkr.instType.Price
591                 cpu += int64(wkr.instType.VCPUs)
592                 mem += int64(wkr.instType.RAM)
593                 if len(wkr.running)+len(wkr.starting) == 0 {
594                         continue
595                 }
596                 alloc += int64(len(wkr.running) + len(wkr.starting))
597                 cpuInuse += int64(wkr.instType.VCPUs)
598                 memInuse += int64(wkr.instType.RAM)
599         }
600         wp.mInstances.Set(float64(len(wp.workers)))
601         wp.mInstancesPrice.Set(price)
602         wp.mContainersRunning.Set(float64(alloc))
603         wp.mVCPUs.Set(float64(cpu))
604         wp.mMemory.Set(float64(mem))
605         wp.mVCPUsInuse.Set(float64(cpuInuse))
606         wp.mMemoryInuse.Set(float64(memInuse))
607 }
608
609 func (wp *Pool) runProbes() {
610         maxPPS := wp.maxProbesPerSecond
611         if maxPPS < 1 {
612                 maxPPS = defaultMaxProbesPerSecond
613         }
614         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
615         defer limitticker.Stop()
616
617         probeticker := time.NewTicker(wp.probeInterval)
618         defer probeticker.Stop()
619
620         workers := []cloud.InstanceID{}
621         for range probeticker.C {
622                 workers = workers[:0]
623                 wp.mtx.Lock()
624                 for id, wkr := range wp.workers {
625                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
626                                 continue
627                         }
628                         workers = append(workers, id)
629                 }
630                 wp.mtx.Unlock()
631
632                 for _, id := range workers {
633                         wp.mtx.Lock()
634                         wkr, ok := wp.workers[id]
635                         wp.mtx.Unlock()
636                         if !ok {
637                                 // Deleted while we were probing
638                                 // others
639                                 continue
640                         }
641                         go wkr.ProbeAndUpdate()
642                         select {
643                         case <-wp.stop:
644                                 return
645                         case <-limitticker.C:
646                         }
647                 }
648         }
649 }
650
651 func (wp *Pool) runSync() {
652         // sync once immediately, then wait syncInterval, sync again,
653         // etc.
654         timer := time.NewTimer(1)
655         for {
656                 select {
657                 case <-timer.C:
658                         err := wp.getInstancesAndSync()
659                         if err != nil {
660                                 wp.logger.WithError(err).Warn("sync failed")
661                         }
662                         timer.Reset(wp.syncInterval)
663                 case <-wp.stop:
664                         wp.logger.Debug("worker.Pool stopped")
665                         return
666                 }
667         }
668 }
669
670 // Stop synchronizing with the InstanceSet.
671 func (wp *Pool) Stop() {
672         wp.setupOnce.Do(wp.setup)
673         close(wp.stop)
674 }
675
676 // Instances returns an InstanceView for each worker in the pool,
677 // summarizing its current state and recent activity.
678 func (wp *Pool) Instances() []InstanceView {
679         var r []InstanceView
680         wp.setupOnce.Do(wp.setup)
681         wp.mtx.Lock()
682         for _, w := range wp.workers {
683                 r = append(r, InstanceView{
684                         Instance:             w.instance.ID(),
685                         Address:              w.instance.Address(),
686                         Price:                w.instType.Price,
687                         ArvadosInstanceType:  w.instType.Name,
688                         ProviderInstanceType: w.instType.ProviderType,
689                         LastContainerUUID:    w.lastUUID,
690                         LastBusy:             w.busy,
691                         WorkerState:          w.state.String(),
692                         IdleBehavior:         w.idleBehavior,
693                 })
694         }
695         wp.mtx.Unlock()
696         sort.Slice(r, func(i, j int) bool {
697                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
698         })
699         return r
700 }
701
702 func (wp *Pool) setup() {
703         wp.creating = map[arvados.InstanceType][]time.Time{}
704         wp.exited = map[string]time.Time{}
705         wp.workers = map[cloud.InstanceID]*worker{}
706         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
707 }
708
709 func (wp *Pool) notify() {
710         wp.mtx.RLock()
711         defer wp.mtx.RUnlock()
712         for _, send := range wp.subscribers {
713                 select {
714                 case send <- struct{}{}:
715                 default:
716                 }
717         }
718 }
719
720 func (wp *Pool) getInstancesAndSync() error {
721         wp.setupOnce.Do(wp.setup)
722         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
723                 return err
724         }
725         wp.logger.Debug("getting instance list")
726         threshold := time.Now()
727         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
728         if err != nil {
729                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
730                 return err
731         }
732         wp.sync(threshold, instances)
733         wp.logger.Debug("sync done")
734         return nil
735 }
736
737 // Add/remove/update workers based on instances, which was obtained
738 // from the instanceSet. However, don't clobber any other updates that
739 // already happened after threshold.
740 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
741         wp.mtx.Lock()
742         defer wp.mtx.Unlock()
743         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
744         notify := false
745
746         for _, inst := range instances {
747                 itTag := inst.Tags()[tagKeyInstanceType]
748                 it, ok := wp.instanceTypes[itTag]
749                 if !ok {
750                         wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
751                         continue
752                 }
753                 if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew {
754                         notify = true
755                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
756                         wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
757                         wkr.shutdown()
758                 }
759         }
760
761         for id, wkr := range wp.workers {
762                 if wkr.updated.After(threshold) {
763                         continue
764                 }
765                 logger := wp.logger.WithFields(logrus.Fields{
766                         "Instance":    wkr.instance,
767                         "WorkerState": wkr.state,
768                 })
769                 logger.Info("instance disappeared in cloud")
770                 delete(wp.workers, id)
771                 go wkr.executor.Close()
772                 notify = true
773         }
774
775         if !wp.loaded {
776                 wp.loaded = true
777                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
778         }
779
780         if notify {
781                 go wp.notify()
782         }
783 }