e90935e2aa9e5747d08e136475cd186c0b4bc766
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/rand"
9         "errors"
10         "fmt"
11         "io"
12         "sort"
13         "strings"
14         "sync"
15         "time"
16
17         "git.curoverse.com/arvados.git/lib/cloud"
18         "git.curoverse.com/arvados.git/sdk/go/arvados"
19         "github.com/prometheus/client_golang/prometheus"
20         "github.com/sirupsen/logrus"
21         "golang.org/x/crypto/ssh"
22 )
23
24 const (
25         tagKeyInstanceType   = "InstanceType"
26         tagKeyIdleBehavior   = "IdleBehavior"
27         tagKeyInstanceSecret = "InstanceSecret"
28 )
29
30 // An InstanceView shows a worker's current state and recent activity.
31 type InstanceView struct {
32         Instance             cloud.InstanceID `json:"instance"`
33         Address              string           `json:"address"`
34         Price                float64          `json:"price"`
35         ArvadosInstanceType  string           `json:"arvados_instance_type"`
36         ProviderInstanceType string           `json:"provider_instance_type"`
37         LastContainerUUID    string           `json:"last_container_uuid"`
38         LastBusy             time.Time        `json:"last_busy"`
39         WorkerState          string           `json:"worker_state"`
40         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
41 }
42
43 // An Executor executes shell commands on a remote host.
44 type Executor interface {
45         // Run cmd on the current target.
46         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
47
48         // Use the given target for subsequent operations. The new
49         // target is the same host as the previous target, but it
50         // might return a different address and verify a different
51         // host key.
52         //
53         // SetTarget is called frequently, and in most cases the new
54         // target will behave exactly the same as the old one. An
55         // implementation should optimize accordingly.
56         //
57         // SetTarget must not block on concurrent Execute calls.
58         SetTarget(cloud.ExecutorTarget)
59
60         Close()
61 }
62
63 const (
64         defaultSyncInterval       = time.Minute
65         defaultProbeInterval      = time.Second * 10
66         defaultMaxProbesPerSecond = 10
67         defaultTimeoutIdle        = time.Minute
68         defaultTimeoutBooting     = time.Minute * 10
69         defaultTimeoutProbe       = time.Minute * 10
70         defaultTimeoutShutdown    = time.Second * 10
71
72         // Time after a quota error to try again anyway, even if no
73         // instances have been shutdown.
74         quotaErrorTTL = time.Minute
75
76         // Time between "X failed because rate limiting" messages
77         logRateLimitErrorInterval = time.Second * 10
78 )
79
80 func duration(conf arvados.Duration, def time.Duration) time.Duration {
81         if conf > 0 {
82                 return time.Duration(conf)
83         } else {
84                 return def
85         }
86 }
87
88 // NewPool creates a Pool of workers backed by instanceSet.
89 //
90 // New instances are configured and set up according to the given
91 // cluster configuration.
92 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
93         wp := &Pool{
94                 logger:             logger,
95                 arvClient:          arvClient,
96                 instanceSet:        &throttledInstanceSet{InstanceSet: instanceSet},
97                 newExecutor:        newExecutor,
98                 bootProbeCommand:   cluster.CloudVMs.BootProbeCommand,
99                 imageID:            cloud.ImageID(cluster.CloudVMs.ImageID),
100                 instanceTypes:      cluster.InstanceTypes,
101                 maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
102                 probeInterval:      duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
103                 syncInterval:       duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
104                 timeoutIdle:        duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
105                 timeoutBooting:     duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
106                 timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
107                 timeoutShutdown:    duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
108                 installPublicKey:   installPublicKey,
109                 stop:               make(chan bool),
110         }
111         wp.registerMetrics(reg)
112         go func() {
113                 wp.setupOnce.Do(wp.setup)
114                 go wp.runMetrics()
115                 go wp.runProbes()
116                 go wp.runSync()
117         }()
118         return wp
119 }
120
121 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
122 // zero Pool should not be used. Call NewPool to create a new Pool.
123 type Pool struct {
124         // configuration
125         logger             logrus.FieldLogger
126         arvClient          *arvados.Client
127         instanceSet        *throttledInstanceSet
128         newExecutor        func(cloud.Instance) Executor
129         bootProbeCommand   string
130         imageID            cloud.ImageID
131         instanceTypes      map[string]arvados.InstanceType
132         syncInterval       time.Duration
133         probeInterval      time.Duration
134         maxProbesPerSecond int
135         timeoutIdle        time.Duration
136         timeoutBooting     time.Duration
137         timeoutProbe       time.Duration
138         timeoutShutdown    time.Duration
139         installPublicKey   ssh.PublicKey
140
141         // private state
142         subscribers  map[<-chan struct{}]chan<- struct{}
143         creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
144         workers      map[cloud.InstanceID]*worker
145         loaded       bool                 // loaded list of instances from InstanceSet at least once
146         exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
147         atQuotaUntil time.Time
148         atQuotaErr   cloud.QuotaError
149         stop         chan bool
150         mtx          sync.RWMutex
151         setupOnce    sync.Once
152
153         throttleCreate    throttle
154         throttleInstances throttle
155
156         mContainersRunning prometheus.Gauge
157         mInstances         *prometheus.GaugeVec
158         mInstancesPrice    *prometheus.GaugeVec
159         mVCPUs             *prometheus.GaugeVec
160         mMemory            *prometheus.GaugeVec
161 }
162
163 type createCall struct {
164         time         time.Time
165         instanceType arvados.InstanceType
166 }
167
168 // Subscribe returns a buffered channel that becomes ready after any
169 // change to the pool's state that could have scheduling implications:
170 // a worker's state changes, a new worker appears, the cloud
171 // provider's API rate limiting period ends, etc.
172 //
173 // Additional events that occur while the channel is already ready
174 // will be dropped, so it is OK if the caller services the channel
175 // slowly.
176 //
177 // Example:
178 //
179 //      ch := wp.Subscribe()
180 //      defer wp.Unsubscribe(ch)
181 //      for range ch {
182 //              tryScheduling(wp)
183 //              if done {
184 //                      break
185 //              }
186 //      }
187 func (wp *Pool) Subscribe() <-chan struct{} {
188         wp.setupOnce.Do(wp.setup)
189         wp.mtx.Lock()
190         defer wp.mtx.Unlock()
191         ch := make(chan struct{}, 1)
192         wp.subscribers[ch] = ch
193         return ch
194 }
195
196 // Unsubscribe stops sending updates to the given channel.
197 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
198         wp.setupOnce.Do(wp.setup)
199         wp.mtx.Lock()
200         defer wp.mtx.Unlock()
201         delete(wp.subscribers, ch)
202 }
203
204 // Unallocated returns the number of unallocated (creating + booting +
205 // idle + unknown) workers for each instance type.  Workers in
206 // hold/drain mode are not included.
207 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
208         wp.setupOnce.Do(wp.setup)
209         wp.mtx.RLock()
210         defer wp.mtx.RUnlock()
211         unalloc := map[arvados.InstanceType]int{}
212         creating := map[arvados.InstanceType]int{}
213         oldestCreate := map[arvados.InstanceType]time.Time{}
214         for _, cc := range wp.creating {
215                 it := cc.instanceType
216                 creating[it]++
217                 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
218                         oldestCreate[it] = cc.time
219                 }
220         }
221         for _, wkr := range wp.workers {
222                 // Skip workers that are not expected to become
223                 // available soon. Note len(wkr.running)>0 is not
224                 // redundant here: it can be true even in
225                 // StateUnknown.
226                 if wkr.state == StateShutdown ||
227                         wkr.state == StateRunning ||
228                         wkr.idleBehavior != IdleBehaviorRun ||
229                         len(wkr.running) > 0 {
230                         continue
231                 }
232                 it := wkr.instType
233                 unalloc[it]++
234                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
235                         // If up to N new workers appear in
236                         // Instances() while we are waiting for N
237                         // Create() calls to complete, we assume we're
238                         // just seeing a race between Instances() and
239                         // Create() responses.
240                         //
241                         // The other common reason why nodes have
242                         // state==Unknown is that they appeared at
243                         // startup, before any Create calls. They
244                         // don't match the above timing condition, so
245                         // we never mistakenly attribute them to
246                         // pending Create calls.
247                         creating[it]--
248                 }
249         }
250         for it, c := range creating {
251                 unalloc[it] += c
252         }
253         return unalloc
254 }
255
256 // Create a new instance with the given type, and add it to the worker
257 // pool. The worker is added immediately; instance creation runs in
258 // the background.
259 //
260 // Create returns false if a pre-existing error state prevents it from
261 // even attempting to create a new instance. Those errors are logged
262 // by the Pool, so the caller does not need to log anything in such
263 // cases.
264 func (wp *Pool) Create(it arvados.InstanceType) bool {
265         logger := wp.logger.WithField("InstanceType", it.Name)
266         wp.setupOnce.Do(wp.setup)
267         wp.mtx.Lock()
268         defer wp.mtx.Unlock()
269         if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
270                 return false
271         }
272         now := time.Now()
273         secret := randomHex(instanceSecretLength)
274         wp.creating[secret] = createCall{time: now, instanceType: it}
275         go func() {
276                 defer wp.notify()
277                 tags := cloud.InstanceTags{
278                         tagKeyInstanceType:   it.Name,
279                         tagKeyIdleBehavior:   string(IdleBehaviorRun),
280                         tagKeyInstanceSecret: secret,
281                 }
282                 initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename))
283                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
284                 wp.mtx.Lock()
285                 defer wp.mtx.Unlock()
286                 // delete() is deferred so the updateWorker() call
287                 // below knows to use StateBooting when adding a new
288                 // worker.
289                 defer delete(wp.creating, secret)
290                 if err != nil {
291                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
292                                 wp.atQuotaErr = err
293                                 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
294                                 time.AfterFunc(quotaErrorTTL, wp.notify)
295                         }
296                         logger.WithError(err).Error("create failed")
297                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
298                         return
299                 }
300                 wp.updateWorker(inst, it)
301         }()
302         return true
303 }
304
305 // AtQuota returns true if Create is not expected to work at the
306 // moment.
307 func (wp *Pool) AtQuota() bool {
308         wp.mtx.Lock()
309         defer wp.mtx.Unlock()
310         return time.Now().Before(wp.atQuotaUntil)
311 }
312
313 // SetIdleBehavior determines how the indicated instance will behave
314 // when it has no containers running.
315 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
316         wp.mtx.Lock()
317         defer wp.mtx.Unlock()
318         wkr, ok := wp.workers[id]
319         if !ok {
320                 return errors.New("requested instance does not exist")
321         }
322         wkr.idleBehavior = idleBehavior
323         wkr.saveTags()
324         wkr.shutdownIfIdle()
325         return nil
326 }
327
328 // Add or update worker attached to the given instance.
329 //
330 // The second return value is true if a new worker is created.
331 //
332 // A newly added instance has state=StateBooting if its tags match an
333 // entry in wp.creating, otherwise StateUnknown.
334 //
335 // Caller must have lock.
336 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
337         inst = tagVerifier{inst}
338         id := inst.ID()
339         if wkr := wp.workers[id]; wkr != nil {
340                 wkr.executor.SetTarget(inst)
341                 wkr.instance = inst
342                 wkr.updated = time.Now()
343                 wkr.saveTags()
344                 return wkr, false
345         }
346
347         state := StateUnknown
348         if _, ok := wp.creating[inst.Tags()[tagKeyInstanceSecret]]; ok {
349                 state = StateBooting
350         }
351
352         // If an instance has a valid IdleBehavior tag when it first
353         // appears, initialize the new worker accordingly (this is how
354         // we restore IdleBehavior that was set by a prior dispatch
355         // process); otherwise, default to "run". After this,
356         // wkr.idleBehavior is the source of truth, and will only be
357         // changed via SetIdleBehavior().
358         idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior])
359         if !validIdleBehavior[idleBehavior] {
360                 idleBehavior = IdleBehaviorRun
361         }
362
363         logger := wp.logger.WithFields(logrus.Fields{
364                 "InstanceType": it.Name,
365                 "Instance":     inst.ID(),
366                 "Address":      inst.Address(),
367         })
368         logger.WithFields(logrus.Fields{
369                 "State":        state,
370                 "IdleBehavior": idleBehavior,
371         }).Infof("instance appeared in cloud")
372         now := time.Now()
373         wkr := &worker{
374                 mtx:          &wp.mtx,
375                 wp:           wp,
376                 logger:       logger,
377                 executor:     wp.newExecutor(inst),
378                 state:        state,
379                 idleBehavior: idleBehavior,
380                 instance:     inst,
381                 instType:     it,
382                 appeared:     now,
383                 probed:       now,
384                 busy:         now,
385                 updated:      now,
386                 running:      make(map[string]struct{}),
387                 starting:     make(map[string]struct{}),
388                 probing:      make(chan struct{}, 1),
389         }
390         wp.workers[id] = wkr
391         return wkr, true
392 }
393
394 // caller must have lock.
395 func (wp *Pool) notifyExited(uuid string, t time.Time) {
396         wp.exited[uuid] = t
397 }
398
399 // Shutdown shuts down a worker with the given type, or returns false
400 // if all workers with the given type are busy.
401 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
402         wp.setupOnce.Do(wp.setup)
403         wp.mtx.Lock()
404         defer wp.mtx.Unlock()
405         logger := wp.logger.WithField("InstanceType", it.Name)
406         logger.Info("shutdown requested")
407         for _, tryState := range []State{StateBooting, StateIdle} {
408                 // TODO: shutdown the worker with the longest idle
409                 // time (Idle) or the earliest create time (Booting)
410                 for _, wkr := range wp.workers {
411                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
412                                 logger.WithField("Instance", wkr.instance).Info("shutting down")
413                                 wkr.shutdown()
414                                 return true
415                         }
416                 }
417         }
418         return false
419 }
420
421 // CountWorkers returns the current number of workers in each state.
422 //
423 // CountWorkers blocks, if necessary, until the initial instance list
424 // has been loaded from the cloud provider.
425 func (wp *Pool) CountWorkers() map[State]int {
426         wp.setupOnce.Do(wp.setup)
427         wp.waitUntilLoaded()
428         wp.mtx.Lock()
429         defer wp.mtx.Unlock()
430         r := map[State]int{}
431         for _, w := range wp.workers {
432                 r[w.state]++
433         }
434         return r
435 }
436
437 // Running returns the container UUIDs being prepared/run on workers.
438 //
439 // In the returned map, the time value indicates when the Pool
440 // observed that the container process had exited. A container that
441 // has not yet exited has a zero time value. The caller should use
442 // KillContainer() to garbage-collect the entries for exited
443 // containers.
444 func (wp *Pool) Running() map[string]time.Time {
445         wp.setupOnce.Do(wp.setup)
446         wp.mtx.Lock()
447         defer wp.mtx.Unlock()
448         r := map[string]time.Time{}
449         for _, wkr := range wp.workers {
450                 for uuid := range wkr.running {
451                         r[uuid] = time.Time{}
452                 }
453                 for uuid := range wkr.starting {
454                         r[uuid] = time.Time{}
455                 }
456         }
457         for uuid, exited := range wp.exited {
458                 r[uuid] = exited
459         }
460         return r
461 }
462
463 // StartContainer starts a container on an idle worker immediately if
464 // possible, otherwise returns false.
465 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
466         wp.setupOnce.Do(wp.setup)
467         wp.mtx.Lock()
468         defer wp.mtx.Unlock()
469         var wkr *worker
470         for _, w := range wp.workers {
471                 if w.instType == it && w.state == StateIdle {
472                         if wkr == nil || w.busy.After(wkr.busy) {
473                                 wkr = w
474                         }
475                 }
476         }
477         if wkr == nil {
478                 return false
479         }
480         wkr.startContainer(ctr)
481         return true
482 }
483
484 // KillContainer kills the crunch-run process for the given container
485 // UUID, if it's running on any worker.
486 //
487 // KillContainer returns immediately; the act of killing the container
488 // takes some time, and runs in the background.
489 func (wp *Pool) KillContainer(uuid string) {
490         wp.mtx.Lock()
491         defer wp.mtx.Unlock()
492         if _, ok := wp.exited[uuid]; ok {
493                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
494                 delete(wp.exited, uuid)
495                 return
496         }
497         for _, wkr := range wp.workers {
498                 if _, ok := wkr.running[uuid]; ok {
499                         go wp.kill(wkr, uuid)
500                         return
501                 }
502         }
503         wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
504 }
505
506 func (wp *Pool) kill(wkr *worker, uuid string) {
507         logger := wp.logger.WithFields(logrus.Fields{
508                 "ContainerUUID": uuid,
509                 "Instance":      wkr.instance.ID(),
510         })
511         logger.Debug("killing process")
512         cmd := "crunch-run --kill 15 " + uuid
513         if u := wkr.instance.RemoteUser(); u != "root" {
514                 cmd = "sudo " + cmd
515         }
516         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
517         if err != nil {
518                 logger.WithFields(logrus.Fields{
519                         "stderr": string(stderr),
520                         "stdout": string(stdout),
521                         "error":  err,
522                 }).Warn("kill failed")
523                 return
524         }
525         logger.Debug("killing process succeeded")
526         wp.mtx.Lock()
527         defer wp.mtx.Unlock()
528         if _, ok := wkr.running[uuid]; ok {
529                 delete(wkr.running, uuid)
530                 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
531                         wkr.state = StateIdle
532                 }
533                 wkr.updated = time.Now()
534                 go wp.notify()
535         }
536 }
537
538 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
539         if reg == nil {
540                 reg = prometheus.NewRegistry()
541         }
542         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
543                 Namespace: "arvados",
544                 Subsystem: "dispatchcloud",
545                 Name:      "containers_running",
546                 Help:      "Number of containers reported running by cloud VMs.",
547         })
548         reg.MustRegister(wp.mContainersRunning)
549         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
550                 Namespace: "arvados",
551                 Subsystem: "dispatchcloud",
552                 Name:      "instances_total",
553                 Help:      "Number of cloud VMs.",
554         }, []string{"category"})
555         reg.MustRegister(wp.mInstances)
556         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
557                 Namespace: "arvados",
558                 Subsystem: "dispatchcloud",
559                 Name:      "instances_price",
560                 Help:      "Price of cloud VMs.",
561         }, []string{"category"})
562         reg.MustRegister(wp.mInstancesPrice)
563         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
564                 Namespace: "arvados",
565                 Subsystem: "dispatchcloud",
566                 Name:      "vcpus_total",
567                 Help:      "Total VCPUs on all cloud VMs.",
568         }, []string{"category"})
569         reg.MustRegister(wp.mVCPUs)
570         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
571                 Namespace: "arvados",
572                 Subsystem: "dispatchcloud",
573                 Name:      "memory_bytes_total",
574                 Help:      "Total memory on all cloud VMs.",
575         }, []string{"category"})
576         reg.MustRegister(wp.mMemory)
577 }
578
579 func (wp *Pool) runMetrics() {
580         ch := wp.Subscribe()
581         defer wp.Unsubscribe(ch)
582         wp.updateMetrics()
583         for range ch {
584                 wp.updateMetrics()
585         }
586 }
587
588 func (wp *Pool) updateMetrics() {
589         wp.mtx.RLock()
590         defer wp.mtx.RUnlock()
591
592         instances := map[string]int64{}
593         price := map[string]float64{}
594         cpu := map[string]int64{}
595         mem := map[string]int64{}
596         var running int64
597         for _, wkr := range wp.workers {
598                 var cat string
599                 switch {
600                 case len(wkr.running)+len(wkr.starting) > 0:
601                         cat = "inuse"
602                 case wkr.idleBehavior == IdleBehaviorHold:
603                         cat = "hold"
604                 case wkr.state == StateBooting:
605                         cat = "booting"
606                 case wkr.state == StateUnknown:
607                         cat = "unknown"
608                 default:
609                         cat = "idle"
610                 }
611                 instances[cat]++
612                 price[cat] += wkr.instType.Price
613                 cpu[cat] += int64(wkr.instType.VCPUs)
614                 mem[cat] += int64(wkr.instType.RAM)
615                 running += int64(len(wkr.running) + len(wkr.starting))
616         }
617         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
618                 wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat]))
619                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
620                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
621                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
622         }
623         wp.mContainersRunning.Set(float64(running))
624 }
625
626 func (wp *Pool) runProbes() {
627         maxPPS := wp.maxProbesPerSecond
628         if maxPPS < 1 {
629                 maxPPS = defaultMaxProbesPerSecond
630         }
631         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
632         defer limitticker.Stop()
633
634         probeticker := time.NewTicker(wp.probeInterval)
635         defer probeticker.Stop()
636
637         workers := []cloud.InstanceID{}
638         for range probeticker.C {
639                 workers = workers[:0]
640                 wp.mtx.Lock()
641                 for id, wkr := range wp.workers {
642                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
643                                 continue
644                         }
645                         workers = append(workers, id)
646                 }
647                 wp.mtx.Unlock()
648
649                 for _, id := range workers {
650                         wp.mtx.Lock()
651                         wkr, ok := wp.workers[id]
652                         wp.mtx.Unlock()
653                         if !ok {
654                                 // Deleted while we were probing
655                                 // others
656                                 continue
657                         }
658                         go wkr.ProbeAndUpdate()
659                         select {
660                         case <-wp.stop:
661                                 return
662                         case <-limitticker.C:
663                         }
664                 }
665         }
666 }
667
668 func (wp *Pool) runSync() {
669         // sync once immediately, then wait syncInterval, sync again,
670         // etc.
671         timer := time.NewTimer(1)
672         for {
673                 select {
674                 case <-timer.C:
675                         err := wp.getInstancesAndSync()
676                         if err != nil {
677                                 wp.logger.WithError(err).Warn("sync failed")
678                         }
679                         timer.Reset(wp.syncInterval)
680                 case <-wp.stop:
681                         wp.logger.Debug("worker.Pool stopped")
682                         return
683                 }
684         }
685 }
686
687 // Stop synchronizing with the InstanceSet.
688 func (wp *Pool) Stop() {
689         wp.setupOnce.Do(wp.setup)
690         close(wp.stop)
691 }
692
693 // Instances returns an InstanceView for each worker in the pool,
694 // summarizing its current state and recent activity.
695 func (wp *Pool) Instances() []InstanceView {
696         var r []InstanceView
697         wp.setupOnce.Do(wp.setup)
698         wp.mtx.Lock()
699         for _, w := range wp.workers {
700                 r = append(r, InstanceView{
701                         Instance:             w.instance.ID(),
702                         Address:              w.instance.Address(),
703                         Price:                w.instType.Price,
704                         ArvadosInstanceType:  w.instType.Name,
705                         ProviderInstanceType: w.instType.ProviderType,
706                         LastContainerUUID:    w.lastUUID,
707                         LastBusy:             w.busy,
708                         WorkerState:          w.state.String(),
709                         IdleBehavior:         w.idleBehavior,
710                 })
711         }
712         wp.mtx.Unlock()
713         sort.Slice(r, func(i, j int) bool {
714                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
715         })
716         return r
717 }
718
719 func (wp *Pool) setup() {
720         wp.creating = map[string]createCall{}
721         wp.exited = map[string]time.Time{}
722         wp.workers = map[cloud.InstanceID]*worker{}
723         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
724 }
725
726 func (wp *Pool) notify() {
727         wp.mtx.RLock()
728         defer wp.mtx.RUnlock()
729         for _, send := range wp.subscribers {
730                 select {
731                 case send <- struct{}{}:
732                 default:
733                 }
734         }
735 }
736
737 func (wp *Pool) getInstancesAndSync() error {
738         wp.setupOnce.Do(wp.setup)
739         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
740                 return err
741         }
742         wp.logger.Debug("getting instance list")
743         threshold := time.Now()
744         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
745         if err != nil {
746                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
747                 return err
748         }
749         wp.sync(threshold, instances)
750         wp.logger.Debug("sync done")
751         return nil
752 }
753
754 // Add/remove/update workers based on instances, which was obtained
755 // from the instanceSet. However, don't clobber any other updates that
756 // already happened after threshold.
757 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
758         wp.mtx.Lock()
759         defer wp.mtx.Unlock()
760         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
761         notify := false
762
763         for _, inst := range instances {
764                 itTag := inst.Tags()[tagKeyInstanceType]
765                 it, ok := wp.instanceTypes[itTag]
766                 if !ok {
767                         wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
768                         continue
769                 }
770                 if wkr, isNew := wp.updateWorker(inst, it); isNew {
771                         notify = true
772                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
773                         wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
774                         wkr.shutdown()
775                 }
776         }
777
778         for id, wkr := range wp.workers {
779                 if wkr.updated.After(threshold) {
780                         continue
781                 }
782                 logger := wp.logger.WithFields(logrus.Fields{
783                         "Instance":    wkr.instance.ID(),
784                         "WorkerState": wkr.state,
785                 })
786                 logger.Info("instance disappeared in cloud")
787                 delete(wp.workers, id)
788                 go wkr.executor.Close()
789                 notify = true
790         }
791
792         if !wp.loaded {
793                 notify = true
794                 wp.loaded = true
795                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
796         }
797
798         if notify {
799                 go wp.notify()
800         }
801 }
802
803 func (wp *Pool) waitUntilLoaded() {
804         ch := wp.Subscribe()
805         wp.mtx.RLock()
806         defer wp.mtx.RUnlock()
807         for !wp.loaded {
808                 wp.mtx.RUnlock()
809                 <-ch
810                 wp.mtx.RLock()
811         }
812 }
813
814 // Return a random string of n hexadecimal digits (n*4 random bits). n
815 // must be even.
816 func randomHex(n int) string {
817         buf := make([]byte, n/2)
818         _, err := rand.Read(buf)
819         if err != nil {
820                 panic(err)
821         }
822         return fmt.Sprintf("%x", buf)
823 }