Merge branch 'master' into 14930-arvput-trash-at
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/rand"
9         "errors"
10         "fmt"
11         "io"
12         "sort"
13         "strings"
14         "sync"
15         "time"
16
17         "git.curoverse.com/arvados.git/lib/cloud"
18         "git.curoverse.com/arvados.git/sdk/go/arvados"
19         "github.com/prometheus/client_golang/prometheus"
20         "github.com/sirupsen/logrus"
21         "golang.org/x/crypto/ssh"
22 )
23
24 const (
25         tagKeyInstanceType   = "InstanceType"
26         tagKeyIdleBehavior   = "IdleBehavior"
27         tagKeyInstanceSecret = "InstanceSecret"
28         tagKeyInstanceSetID  = "InstanceSetID"
29 )
30
31 // An InstanceView shows a worker's current state and recent activity.
32 type InstanceView struct {
33         Instance             cloud.InstanceID `json:"instance"`
34         Address              string           `json:"address"`
35         Price                float64          `json:"price"`
36         ArvadosInstanceType  string           `json:"arvados_instance_type"`
37         ProviderInstanceType string           `json:"provider_instance_type"`
38         LastContainerUUID    string           `json:"last_container_uuid"`
39         LastBusy             time.Time        `json:"last_busy"`
40         WorkerState          string           `json:"worker_state"`
41         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
42 }
43
44 // An Executor executes shell commands on a remote host.
45 type Executor interface {
46         // Run cmd on the current target.
47         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
48
49         // Use the given target for subsequent operations. The new
50         // target is the same host as the previous target, but it
51         // might return a different address and verify a different
52         // host key.
53         //
54         // SetTarget is called frequently, and in most cases the new
55         // target will behave exactly the same as the old one. An
56         // implementation should optimize accordingly.
57         //
58         // SetTarget must not block on concurrent Execute calls.
59         SetTarget(cloud.ExecutorTarget)
60
61         Close()
62 }
63
64 const (
65         defaultSyncInterval       = time.Minute
66         defaultProbeInterval      = time.Second * 10
67         defaultMaxProbesPerSecond = 10
68         defaultTimeoutIdle        = time.Minute
69         defaultTimeoutBooting     = time.Minute * 10
70         defaultTimeoutProbe       = time.Minute * 10
71         defaultTimeoutShutdown    = time.Second * 10
72         defaultTimeoutTERM        = time.Minute * 2
73         defaultTimeoutSignal      = time.Second * 5
74
75         // Time after a quota error to try again anyway, even if no
76         // instances have been shutdown.
77         quotaErrorTTL = time.Minute
78
79         // Time between "X failed because rate limiting" messages
80         logRateLimitErrorInterval = time.Second * 10
81 )
82
83 func duration(conf arvados.Duration, def time.Duration) time.Duration {
84         if conf > 0 {
85                 return time.Duration(conf)
86         } else {
87                 return def
88         }
89 }
90
91 // NewPool creates a Pool of workers backed by instanceSet.
92 //
93 // New instances are configured and set up according to the given
94 // cluster configuration.
95 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
96         wp := &Pool{
97                 logger:             logger,
98                 arvClient:          arvClient,
99                 instanceSetID:      instanceSetID,
100                 instanceSet:        &throttledInstanceSet{InstanceSet: instanceSet},
101                 newExecutor:        newExecutor,
102                 bootProbeCommand:   cluster.Containers.CloudVMs.BootProbeCommand,
103                 imageID:            cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
104                 instanceTypes:      cluster.InstanceTypes,
105                 maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
106                 probeInterval:      duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
107                 syncInterval:       duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
108                 timeoutIdle:        duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
109                 timeoutBooting:     duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
110                 timeoutProbe:       duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
111                 timeoutShutdown:    duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
112                 timeoutTERM:        duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
113                 timeoutSignal:      duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
114                 installPublicKey:   installPublicKey,
115                 tagKeyPrefix:       cluster.Containers.CloudVMs.TagKeyPrefix,
116                 stop:               make(chan bool),
117         }
118         wp.registerMetrics(reg)
119         go func() {
120                 wp.setupOnce.Do(wp.setup)
121                 go wp.runMetrics()
122                 go wp.runProbes()
123                 go wp.runSync()
124         }()
125         return wp
126 }
127
128 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
129 // zero Pool should not be used. Call NewPool to create a new Pool.
130 type Pool struct {
131         // configuration
132         logger             logrus.FieldLogger
133         arvClient          *arvados.Client
134         instanceSetID      cloud.InstanceSetID
135         instanceSet        *throttledInstanceSet
136         newExecutor        func(cloud.Instance) Executor
137         bootProbeCommand   string
138         imageID            cloud.ImageID
139         instanceTypes      map[string]arvados.InstanceType
140         syncInterval       time.Duration
141         probeInterval      time.Duration
142         maxProbesPerSecond int
143         timeoutIdle        time.Duration
144         timeoutBooting     time.Duration
145         timeoutProbe       time.Duration
146         timeoutShutdown    time.Duration
147         timeoutTERM        time.Duration
148         timeoutSignal      time.Duration
149         installPublicKey   ssh.PublicKey
150         tagKeyPrefix       string
151
152         // private state
153         subscribers  map[<-chan struct{}]chan<- struct{}
154         creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
155         workers      map[cloud.InstanceID]*worker
156         loaded       bool                 // loaded list of instances from InstanceSet at least once
157         exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
158         atQuotaUntil time.Time
159         atQuotaErr   cloud.QuotaError
160         stop         chan bool
161         mtx          sync.RWMutex
162         setupOnce    sync.Once
163
164         throttleCreate    throttle
165         throttleInstances throttle
166
167         mContainersRunning prometheus.Gauge
168         mInstances         *prometheus.GaugeVec
169         mInstancesPrice    *prometheus.GaugeVec
170         mVCPUs             *prometheus.GaugeVec
171         mMemory            *prometheus.GaugeVec
172 }
173
174 type createCall struct {
175         time         time.Time
176         instanceType arvados.InstanceType
177 }
178
179 // Subscribe returns a buffered channel that becomes ready after any
180 // change to the pool's state that could have scheduling implications:
181 // a worker's state changes, a new worker appears, the cloud
182 // provider's API rate limiting period ends, etc.
183 //
184 // Additional events that occur while the channel is already ready
185 // will be dropped, so it is OK if the caller services the channel
186 // slowly.
187 //
188 // Example:
189 //
190 //      ch := wp.Subscribe()
191 //      defer wp.Unsubscribe(ch)
192 //      for range ch {
193 //              tryScheduling(wp)
194 //              if done {
195 //                      break
196 //              }
197 //      }
198 func (wp *Pool) Subscribe() <-chan struct{} {
199         wp.setupOnce.Do(wp.setup)
200         wp.mtx.Lock()
201         defer wp.mtx.Unlock()
202         ch := make(chan struct{}, 1)
203         wp.subscribers[ch] = ch
204         return ch
205 }
206
207 // Unsubscribe stops sending updates to the given channel.
208 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
209         wp.setupOnce.Do(wp.setup)
210         wp.mtx.Lock()
211         defer wp.mtx.Unlock()
212         delete(wp.subscribers, ch)
213 }
214
215 // Unallocated returns the number of unallocated (creating + booting +
216 // idle + unknown) workers for each instance type.  Workers in
217 // hold/drain mode are not included.
218 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
219         wp.setupOnce.Do(wp.setup)
220         wp.mtx.RLock()
221         defer wp.mtx.RUnlock()
222         unalloc := map[arvados.InstanceType]int{}
223         creating := map[arvados.InstanceType]int{}
224         oldestCreate := map[arvados.InstanceType]time.Time{}
225         for _, cc := range wp.creating {
226                 it := cc.instanceType
227                 creating[it]++
228                 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
229                         oldestCreate[it] = cc.time
230                 }
231         }
232         for _, wkr := range wp.workers {
233                 // Skip workers that are not expected to become
234                 // available soon. Note len(wkr.running)>0 is not
235                 // redundant here: it can be true even in
236                 // StateUnknown.
237                 if wkr.state == StateShutdown ||
238                         wkr.state == StateRunning ||
239                         wkr.idleBehavior != IdleBehaviorRun ||
240                         len(wkr.running) > 0 {
241                         continue
242                 }
243                 it := wkr.instType
244                 unalloc[it]++
245                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
246                         // If up to N new workers appear in
247                         // Instances() while we are waiting for N
248                         // Create() calls to complete, we assume we're
249                         // just seeing a race between Instances() and
250                         // Create() responses.
251                         //
252                         // The other common reason why nodes have
253                         // state==Unknown is that they appeared at
254                         // startup, before any Create calls. They
255                         // don't match the above timing condition, so
256                         // we never mistakenly attribute them to
257                         // pending Create calls.
258                         creating[it]--
259                 }
260         }
261         for it, c := range creating {
262                 unalloc[it] += c
263         }
264         return unalloc
265 }
266
267 // Create a new instance with the given type, and add it to the worker
268 // pool. The worker is added immediately; instance creation runs in
269 // the background.
270 //
271 // Create returns false if a pre-existing error state prevents it from
272 // even attempting to create a new instance. Those errors are logged
273 // by the Pool, so the caller does not need to log anything in such
274 // cases.
275 func (wp *Pool) Create(it arvados.InstanceType) bool {
276         logger := wp.logger.WithField("InstanceType", it.Name)
277         wp.setupOnce.Do(wp.setup)
278         wp.mtx.Lock()
279         defer wp.mtx.Unlock()
280         if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
281                 return false
282         }
283         now := time.Now()
284         secret := randomHex(instanceSecretLength)
285         wp.creating[secret] = createCall{time: now, instanceType: it}
286         go func() {
287                 defer wp.notify()
288                 tags := cloud.InstanceTags{
289                         wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
290                         wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
291                         wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
292                         wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
293                 }
294                 initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename))
295                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
296                 wp.mtx.Lock()
297                 defer wp.mtx.Unlock()
298                 // delete() is deferred so the updateWorker() call
299                 // below knows to use StateBooting when adding a new
300                 // worker.
301                 defer delete(wp.creating, secret)
302                 if err != nil {
303                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
304                                 wp.atQuotaErr = err
305                                 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
306                                 time.AfterFunc(quotaErrorTTL, wp.notify)
307                         }
308                         logger.WithError(err).Error("create failed")
309                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
310                         return
311                 }
312                 wp.updateWorker(inst, it)
313         }()
314         return true
315 }
316
317 // AtQuota returns true if Create is not expected to work at the
318 // moment.
319 func (wp *Pool) AtQuota() bool {
320         wp.mtx.Lock()
321         defer wp.mtx.Unlock()
322         return time.Now().Before(wp.atQuotaUntil)
323 }
324
325 // SetIdleBehavior determines how the indicated instance will behave
326 // when it has no containers running.
327 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
328         wp.mtx.Lock()
329         defer wp.mtx.Unlock()
330         wkr, ok := wp.workers[id]
331         if !ok {
332                 return errors.New("requested instance does not exist")
333         }
334         wkr.setIdleBehavior(idleBehavior)
335         return nil
336 }
337
338 // Add or update worker attached to the given instance.
339 //
340 // The second return value is true if a new worker is created.
341 //
342 // A newly added instance has state=StateBooting if its tags match an
343 // entry in wp.creating, otherwise StateUnknown.
344 //
345 // Caller must have lock.
346 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
347         secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
348         inst = tagVerifier{inst, secret}
349         id := inst.ID()
350         if wkr := wp.workers[id]; wkr != nil {
351                 wkr.executor.SetTarget(inst)
352                 wkr.instance = inst
353                 wkr.updated = time.Now()
354                 wkr.saveTags()
355                 return wkr, false
356         }
357
358         state := StateUnknown
359         if _, ok := wp.creating[secret]; ok {
360                 state = StateBooting
361         }
362
363         // If an instance has a valid IdleBehavior tag when it first
364         // appears, initialize the new worker accordingly (this is how
365         // we restore IdleBehavior that was set by a prior dispatch
366         // process); otherwise, default to "run". After this,
367         // wkr.idleBehavior is the source of truth, and will only be
368         // changed via SetIdleBehavior().
369         idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
370         if !validIdleBehavior[idleBehavior] {
371                 idleBehavior = IdleBehaviorRun
372         }
373
374         logger := wp.logger.WithFields(logrus.Fields{
375                 "InstanceType": it.Name,
376                 "Instance":     inst.ID(),
377                 "Address":      inst.Address(),
378         })
379         logger.WithFields(logrus.Fields{
380                 "State":        state,
381                 "IdleBehavior": idleBehavior,
382         }).Infof("instance appeared in cloud")
383         now := time.Now()
384         wkr := &worker{
385                 mtx:          &wp.mtx,
386                 wp:           wp,
387                 logger:       logger,
388                 executor:     wp.newExecutor(inst),
389                 state:        state,
390                 idleBehavior: idleBehavior,
391                 instance:     inst,
392                 instType:     it,
393                 appeared:     now,
394                 probed:       now,
395                 busy:         now,
396                 updated:      now,
397                 running:      make(map[string]*remoteRunner),
398                 starting:     make(map[string]*remoteRunner),
399                 probing:      make(chan struct{}, 1),
400         }
401         wp.workers[id] = wkr
402         return wkr, true
403 }
404
405 // Shutdown shuts down a worker with the given type, or returns false
406 // if all workers with the given type are busy.
407 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
408         wp.setupOnce.Do(wp.setup)
409         wp.mtx.Lock()
410         defer wp.mtx.Unlock()
411         logger := wp.logger.WithField("InstanceType", it.Name)
412         logger.Info("shutdown requested")
413         for _, tryState := range []State{StateBooting, StateIdle} {
414                 // TODO: shutdown the worker with the longest idle
415                 // time (Idle) or the earliest create time (Booting)
416                 for _, wkr := range wp.workers {
417                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
418                                 logger.WithField("Instance", wkr.instance).Info("shutting down")
419                                 wkr.shutdown()
420                                 return true
421                         }
422                 }
423         }
424         return false
425 }
426
427 // CountWorkers returns the current number of workers in each state.
428 //
429 // CountWorkers blocks, if necessary, until the initial instance list
430 // has been loaded from the cloud provider.
431 func (wp *Pool) CountWorkers() map[State]int {
432         wp.setupOnce.Do(wp.setup)
433         wp.waitUntilLoaded()
434         wp.mtx.Lock()
435         defer wp.mtx.Unlock()
436         r := map[State]int{}
437         for _, w := range wp.workers {
438                 r[w.state]++
439         }
440         return r
441 }
442
443 // Running returns the container UUIDs being prepared/run on workers.
444 //
445 // In the returned map, the time value indicates when the Pool
446 // observed that the container process had exited. A container that
447 // has not yet exited has a zero time value. The caller should use
448 // KillContainer() to garbage-collect the entries for exited
449 // containers.
450 func (wp *Pool) Running() map[string]time.Time {
451         wp.setupOnce.Do(wp.setup)
452         wp.mtx.Lock()
453         defer wp.mtx.Unlock()
454         r := map[string]time.Time{}
455         for _, wkr := range wp.workers {
456                 for uuid := range wkr.running {
457                         r[uuid] = time.Time{}
458                 }
459                 for uuid := range wkr.starting {
460                         r[uuid] = time.Time{}
461                 }
462         }
463         for uuid, exited := range wp.exited {
464                 r[uuid] = exited
465         }
466         return r
467 }
468
469 // StartContainer starts a container on an idle worker immediately if
470 // possible, otherwise returns false.
471 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
472         wp.setupOnce.Do(wp.setup)
473         wp.mtx.Lock()
474         defer wp.mtx.Unlock()
475         var wkr *worker
476         for _, w := range wp.workers {
477                 if w.instType == it && w.state == StateIdle {
478                         if wkr == nil || w.busy.After(wkr.busy) {
479                                 wkr = w
480                         }
481                 }
482         }
483         if wkr == nil {
484                 return false
485         }
486         wkr.startContainer(ctr)
487         return true
488 }
489
490 // KillContainer kills the crunch-run process for the given container
491 // UUID, if it's running on any worker.
492 //
493 // KillContainer returns immediately; the act of killing the container
494 // takes some time, and runs in the background.
495 func (wp *Pool) KillContainer(uuid string, reason string) {
496         wp.mtx.Lock()
497         defer wp.mtx.Unlock()
498         logger := wp.logger.WithFields(logrus.Fields{
499                 "ContainerUUID": uuid,
500                 "Reason":        reason,
501         })
502         if _, ok := wp.exited[uuid]; ok {
503                 logger.Debug("clearing placeholder for exited crunch-run process")
504                 delete(wp.exited, uuid)
505                 return
506         }
507         for _, wkr := range wp.workers {
508                 rr := wkr.running[uuid]
509                 if rr == nil {
510                         rr = wkr.starting[uuid]
511                 }
512                 if rr != nil {
513                         rr.Kill(reason)
514                         return
515                 }
516         }
517         logger.Debug("cannot kill: already disappeared")
518 }
519
520 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
521         if reg == nil {
522                 reg = prometheus.NewRegistry()
523         }
524         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
525                 Namespace: "arvados",
526                 Subsystem: "dispatchcloud",
527                 Name:      "containers_running",
528                 Help:      "Number of containers reported running by cloud VMs.",
529         })
530         reg.MustRegister(wp.mContainersRunning)
531         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
532                 Namespace: "arvados",
533                 Subsystem: "dispatchcloud",
534                 Name:      "instances_total",
535                 Help:      "Number of cloud VMs.",
536         }, []string{"category"})
537         reg.MustRegister(wp.mInstances)
538         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
539                 Namespace: "arvados",
540                 Subsystem: "dispatchcloud",
541                 Name:      "instances_price",
542                 Help:      "Price of cloud VMs.",
543         }, []string{"category"})
544         reg.MustRegister(wp.mInstancesPrice)
545         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
546                 Namespace: "arvados",
547                 Subsystem: "dispatchcloud",
548                 Name:      "vcpus_total",
549                 Help:      "Total VCPUs on all cloud VMs.",
550         }, []string{"category"})
551         reg.MustRegister(wp.mVCPUs)
552         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
553                 Namespace: "arvados",
554                 Subsystem: "dispatchcloud",
555                 Name:      "memory_bytes_total",
556                 Help:      "Total memory on all cloud VMs.",
557         }, []string{"category"})
558         reg.MustRegister(wp.mMemory)
559 }
560
561 func (wp *Pool) runMetrics() {
562         ch := wp.Subscribe()
563         defer wp.Unsubscribe(ch)
564         wp.updateMetrics()
565         for range ch {
566                 wp.updateMetrics()
567         }
568 }
569
570 func (wp *Pool) updateMetrics() {
571         wp.mtx.RLock()
572         defer wp.mtx.RUnlock()
573
574         instances := map[string]int64{}
575         price := map[string]float64{}
576         cpu := map[string]int64{}
577         mem := map[string]int64{}
578         var running int64
579         for _, wkr := range wp.workers {
580                 var cat string
581                 switch {
582                 case len(wkr.running)+len(wkr.starting) > 0:
583                         cat = "inuse"
584                 case wkr.idleBehavior == IdleBehaviorHold:
585                         cat = "hold"
586                 case wkr.state == StateBooting:
587                         cat = "booting"
588                 case wkr.state == StateUnknown:
589                         cat = "unknown"
590                 default:
591                         cat = "idle"
592                 }
593                 instances[cat]++
594                 price[cat] += wkr.instType.Price
595                 cpu[cat] += int64(wkr.instType.VCPUs)
596                 mem[cat] += int64(wkr.instType.RAM)
597                 running += int64(len(wkr.running) + len(wkr.starting))
598         }
599         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
600                 wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat]))
601                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
602                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
603                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
604         }
605         wp.mContainersRunning.Set(float64(running))
606 }
607
608 func (wp *Pool) runProbes() {
609         maxPPS := wp.maxProbesPerSecond
610         if maxPPS < 1 {
611                 maxPPS = defaultMaxProbesPerSecond
612         }
613         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
614         defer limitticker.Stop()
615
616         probeticker := time.NewTicker(wp.probeInterval)
617         defer probeticker.Stop()
618
619         workers := []cloud.InstanceID{}
620         for range probeticker.C {
621                 workers = workers[:0]
622                 wp.mtx.Lock()
623                 for id, wkr := range wp.workers {
624                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
625                                 continue
626                         }
627                         workers = append(workers, id)
628                 }
629                 wp.mtx.Unlock()
630
631                 for _, id := range workers {
632                         wp.mtx.Lock()
633                         wkr, ok := wp.workers[id]
634                         wp.mtx.Unlock()
635                         if !ok {
636                                 // Deleted while we were probing
637                                 // others
638                                 continue
639                         }
640                         go wkr.ProbeAndUpdate()
641                         select {
642                         case <-wp.stop:
643                                 return
644                         case <-limitticker.C:
645                         }
646                 }
647         }
648 }
649
650 func (wp *Pool) runSync() {
651         // sync once immediately, then wait syncInterval, sync again,
652         // etc.
653         timer := time.NewTimer(1)
654         for {
655                 select {
656                 case <-timer.C:
657                         err := wp.getInstancesAndSync()
658                         if err != nil {
659                                 wp.logger.WithError(err).Warn("sync failed")
660                         }
661                         timer.Reset(wp.syncInterval)
662                 case <-wp.stop:
663                         wp.logger.Debug("worker.Pool stopped")
664                         return
665                 }
666         }
667 }
668
669 // Stop synchronizing with the InstanceSet.
670 func (wp *Pool) Stop() {
671         wp.setupOnce.Do(wp.setup)
672         close(wp.stop)
673 }
674
675 // Instances returns an InstanceView for each worker in the pool,
676 // summarizing its current state and recent activity.
677 func (wp *Pool) Instances() []InstanceView {
678         var r []InstanceView
679         wp.setupOnce.Do(wp.setup)
680         wp.mtx.Lock()
681         for _, w := range wp.workers {
682                 r = append(r, InstanceView{
683                         Instance:             w.instance.ID(),
684                         Address:              w.instance.Address(),
685                         Price:                w.instType.Price,
686                         ArvadosInstanceType:  w.instType.Name,
687                         ProviderInstanceType: w.instType.ProviderType,
688                         LastContainerUUID:    w.lastUUID,
689                         LastBusy:             w.busy,
690                         WorkerState:          w.state.String(),
691                         IdleBehavior:         w.idleBehavior,
692                 })
693         }
694         wp.mtx.Unlock()
695         sort.Slice(r, func(i, j int) bool {
696                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
697         })
698         return r
699 }
700
701 // KillInstance destroys a cloud VM instance. It returns an error if
702 // the given instance does not exist.
703 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
704         wkr, ok := wp.workers[id]
705         if !ok {
706                 return errors.New("instance not found")
707         }
708         wkr.logger.WithField("Reason", reason).Info("shutting down")
709         wkr.shutdown()
710         return nil
711 }
712
713 func (wp *Pool) setup() {
714         wp.creating = map[string]createCall{}
715         wp.exited = map[string]time.Time{}
716         wp.workers = map[cloud.InstanceID]*worker{}
717         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
718 }
719
720 func (wp *Pool) notify() {
721         wp.mtx.RLock()
722         defer wp.mtx.RUnlock()
723         for _, send := range wp.subscribers {
724                 select {
725                 case send <- struct{}{}:
726                 default:
727                 }
728         }
729 }
730
731 func (wp *Pool) getInstancesAndSync() error {
732         wp.setupOnce.Do(wp.setup)
733         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
734                 return err
735         }
736         wp.logger.Debug("getting instance list")
737         threshold := time.Now()
738         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
739         if err != nil {
740                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
741                 return err
742         }
743         wp.sync(threshold, instances)
744         wp.logger.Debug("sync done")
745         return nil
746 }
747
748 // Add/remove/update workers based on instances, which was obtained
749 // from the instanceSet. However, don't clobber any other updates that
750 // already happened after threshold.
751 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
752         wp.mtx.Lock()
753         defer wp.mtx.Unlock()
754         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
755         notify := false
756
757         for _, inst := range instances {
758                 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
759                 it, ok := wp.instanceTypes[itTag]
760                 if !ok {
761                         wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
762                         continue
763                 }
764                 if wkr, isNew := wp.updateWorker(inst, it); isNew {
765                         notify = true
766                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
767                         wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
768                         wkr.shutdown()
769                 }
770         }
771
772         for id, wkr := range wp.workers {
773                 if wkr.updated.After(threshold) {
774                         continue
775                 }
776                 logger := wp.logger.WithFields(logrus.Fields{
777                         "Instance":    wkr.instance.ID(),
778                         "WorkerState": wkr.state,
779                 })
780                 logger.Info("instance disappeared in cloud")
781                 delete(wp.workers, id)
782                 go wkr.Close()
783                 notify = true
784         }
785
786         if !wp.loaded {
787                 notify = true
788                 wp.loaded = true
789                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
790         }
791
792         if notify {
793                 go wp.notify()
794         }
795 }
796
797 func (wp *Pool) waitUntilLoaded() {
798         ch := wp.Subscribe()
799         wp.mtx.RLock()
800         defer wp.mtx.RUnlock()
801         for !wp.loaded {
802                 wp.mtx.RUnlock()
803                 <-ch
804                 wp.mtx.RLock()
805         }
806 }
807
808 // Return a random string of n hexadecimal digits (n*4 random bits). n
809 // must be even.
810 func randomHex(n int) string {
811         buf := make([]byte, n/2)
812         _, err := rand.Read(buf)
813         if err != nil {
814                 panic(err)
815         }
816         return fmt.Sprintf("%x", buf)
817 }