Merge branch 'master' into 14873-api-rails5-upgrade
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/rand"
9         "errors"
10         "fmt"
11         "io"
12         "sort"
13         "strings"
14         "sync"
15         "time"
16
17         "git.curoverse.com/arvados.git/lib/cloud"
18         "git.curoverse.com/arvados.git/sdk/go/arvados"
19         "github.com/prometheus/client_golang/prometheus"
20         "github.com/sirupsen/logrus"
21         "golang.org/x/crypto/ssh"
22 )
23
24 const (
25         tagKeyInstanceType   = "InstanceType"
26         tagKeyIdleBehavior   = "IdleBehavior"
27         tagKeyInstanceSecret = "InstanceSecret"
28 )
29
30 // An InstanceView shows a worker's current state and recent activity.
31 type InstanceView struct {
32         Instance             cloud.InstanceID `json:"instance"`
33         Address              string           `json:"address"`
34         Price                float64          `json:"price"`
35         ArvadosInstanceType  string           `json:"arvados_instance_type"`
36         ProviderInstanceType string           `json:"provider_instance_type"`
37         LastContainerUUID    string           `json:"last_container_uuid"`
38         LastBusy             time.Time        `json:"last_busy"`
39         WorkerState          string           `json:"worker_state"`
40         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
41 }
42
43 // An Executor executes shell commands on a remote host.
44 type Executor interface {
45         // Run cmd on the current target.
46         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
47
48         // Use the given target for subsequent operations. The new
49         // target is the same host as the previous target, but it
50         // might return a different address and verify a different
51         // host key.
52         //
53         // SetTarget is called frequently, and in most cases the new
54         // target will behave exactly the same as the old one. An
55         // implementation should optimize accordingly.
56         //
57         // SetTarget must not block on concurrent Execute calls.
58         SetTarget(cloud.ExecutorTarget)
59
60         Close()
61 }
62
63 const (
64         defaultSyncInterval       = time.Minute
65         defaultProbeInterval      = time.Second * 10
66         defaultMaxProbesPerSecond = 10
67         defaultTimeoutIdle        = time.Minute
68         defaultTimeoutBooting     = time.Minute * 10
69         defaultTimeoutProbe       = time.Minute * 10
70         defaultTimeoutShutdown    = time.Second * 10
71         defaultTimeoutTERM        = time.Minute * 2
72         defaultTimeoutSignal      = time.Second * 5
73
74         // Time after a quota error to try again anyway, even if no
75         // instances have been shutdown.
76         quotaErrorTTL = time.Minute
77
78         // Time between "X failed because rate limiting" messages
79         logRateLimitErrorInterval = time.Second * 10
80 )
81
82 func duration(conf arvados.Duration, def time.Duration) time.Duration {
83         if conf > 0 {
84                 return time.Duration(conf)
85         } else {
86                 return def
87         }
88 }
89
90 // NewPool creates a Pool of workers backed by instanceSet.
91 //
92 // New instances are configured and set up according to the given
93 // cluster configuration.
94 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
95         wp := &Pool{
96                 logger:             logger,
97                 arvClient:          arvClient,
98                 instanceSet:        &throttledInstanceSet{InstanceSet: instanceSet},
99                 newExecutor:        newExecutor,
100                 bootProbeCommand:   cluster.CloudVMs.BootProbeCommand,
101                 imageID:            cloud.ImageID(cluster.CloudVMs.ImageID),
102                 instanceTypes:      cluster.InstanceTypes,
103                 maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
104                 probeInterval:      duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
105                 syncInterval:       duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
106                 timeoutIdle:        duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
107                 timeoutBooting:     duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
108                 timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
109                 timeoutShutdown:    duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
110                 timeoutTERM:        duration(cluster.Dispatch.TimeoutTERM, defaultTimeoutTERM),
111                 timeoutSignal:      duration(cluster.Dispatch.TimeoutSignal, defaultTimeoutSignal),
112                 installPublicKey:   installPublicKey,
113                 stop:               make(chan bool),
114         }
115         wp.registerMetrics(reg)
116         go func() {
117                 wp.setupOnce.Do(wp.setup)
118                 go wp.runMetrics()
119                 go wp.runProbes()
120                 go wp.runSync()
121         }()
122         return wp
123 }
124
125 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
126 // zero Pool should not be used. Call NewPool to create a new Pool.
127 type Pool struct {
128         // configuration
129         logger             logrus.FieldLogger
130         arvClient          *arvados.Client
131         instanceSet        *throttledInstanceSet
132         newExecutor        func(cloud.Instance) Executor
133         bootProbeCommand   string
134         imageID            cloud.ImageID
135         instanceTypes      map[string]arvados.InstanceType
136         syncInterval       time.Duration
137         probeInterval      time.Duration
138         maxProbesPerSecond int
139         timeoutIdle        time.Duration
140         timeoutBooting     time.Duration
141         timeoutProbe       time.Duration
142         timeoutShutdown    time.Duration
143         timeoutTERM        time.Duration
144         timeoutSignal      time.Duration
145         installPublicKey   ssh.PublicKey
146
147         // private state
148         subscribers  map[<-chan struct{}]chan<- struct{}
149         creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
150         workers      map[cloud.InstanceID]*worker
151         loaded       bool                 // loaded list of instances from InstanceSet at least once
152         exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
153         atQuotaUntil time.Time
154         atQuotaErr   cloud.QuotaError
155         stop         chan bool
156         mtx          sync.RWMutex
157         setupOnce    sync.Once
158
159         throttleCreate    throttle
160         throttleInstances throttle
161
162         mContainersRunning prometheus.Gauge
163         mInstances         *prometheus.GaugeVec
164         mInstancesPrice    *prometheus.GaugeVec
165         mVCPUs             *prometheus.GaugeVec
166         mMemory            *prometheus.GaugeVec
167 }
168
169 type createCall struct {
170         time         time.Time
171         instanceType arvados.InstanceType
172 }
173
174 // Subscribe returns a buffered channel that becomes ready after any
175 // change to the pool's state that could have scheduling implications:
176 // a worker's state changes, a new worker appears, the cloud
177 // provider's API rate limiting period ends, etc.
178 //
179 // Additional events that occur while the channel is already ready
180 // will be dropped, so it is OK if the caller services the channel
181 // slowly.
182 //
183 // Example:
184 //
185 //      ch := wp.Subscribe()
186 //      defer wp.Unsubscribe(ch)
187 //      for range ch {
188 //              tryScheduling(wp)
189 //              if done {
190 //                      break
191 //              }
192 //      }
193 func (wp *Pool) Subscribe() <-chan struct{} {
194         wp.setupOnce.Do(wp.setup)
195         wp.mtx.Lock()
196         defer wp.mtx.Unlock()
197         ch := make(chan struct{}, 1)
198         wp.subscribers[ch] = ch
199         return ch
200 }
201
202 // Unsubscribe stops sending updates to the given channel.
203 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
204         wp.setupOnce.Do(wp.setup)
205         wp.mtx.Lock()
206         defer wp.mtx.Unlock()
207         delete(wp.subscribers, ch)
208 }
209
210 // Unallocated returns the number of unallocated (creating + booting +
211 // idle + unknown) workers for each instance type.  Workers in
212 // hold/drain mode are not included.
213 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
214         wp.setupOnce.Do(wp.setup)
215         wp.mtx.RLock()
216         defer wp.mtx.RUnlock()
217         unalloc := map[arvados.InstanceType]int{}
218         creating := map[arvados.InstanceType]int{}
219         oldestCreate := map[arvados.InstanceType]time.Time{}
220         for _, cc := range wp.creating {
221                 it := cc.instanceType
222                 creating[it]++
223                 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
224                         oldestCreate[it] = cc.time
225                 }
226         }
227         for _, wkr := range wp.workers {
228                 // Skip workers that are not expected to become
229                 // available soon. Note len(wkr.running)>0 is not
230                 // redundant here: it can be true even in
231                 // StateUnknown.
232                 if wkr.state == StateShutdown ||
233                         wkr.state == StateRunning ||
234                         wkr.idleBehavior != IdleBehaviorRun ||
235                         len(wkr.running) > 0 {
236                         continue
237                 }
238                 it := wkr.instType
239                 unalloc[it]++
240                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
241                         // If up to N new workers appear in
242                         // Instances() while we are waiting for N
243                         // Create() calls to complete, we assume we're
244                         // just seeing a race between Instances() and
245                         // Create() responses.
246                         //
247                         // The other common reason why nodes have
248                         // state==Unknown is that they appeared at
249                         // startup, before any Create calls. They
250                         // don't match the above timing condition, so
251                         // we never mistakenly attribute them to
252                         // pending Create calls.
253                         creating[it]--
254                 }
255         }
256         for it, c := range creating {
257                 unalloc[it] += c
258         }
259         return unalloc
260 }
261
262 // Create a new instance with the given type, and add it to the worker
263 // pool. The worker is added immediately; instance creation runs in
264 // the background.
265 //
266 // Create returns false if a pre-existing error state prevents it from
267 // even attempting to create a new instance. Those errors are logged
268 // by the Pool, so the caller does not need to log anything in such
269 // cases.
270 func (wp *Pool) Create(it arvados.InstanceType) bool {
271         logger := wp.logger.WithField("InstanceType", it.Name)
272         wp.setupOnce.Do(wp.setup)
273         wp.mtx.Lock()
274         defer wp.mtx.Unlock()
275         if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
276                 return false
277         }
278         now := time.Now()
279         secret := randomHex(instanceSecretLength)
280         wp.creating[secret] = createCall{time: now, instanceType: it}
281         go func() {
282                 defer wp.notify()
283                 tags := cloud.InstanceTags{
284                         tagKeyInstanceType:   it.Name,
285                         tagKeyIdleBehavior:   string(IdleBehaviorRun),
286                         tagKeyInstanceSecret: secret,
287                 }
288                 initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename))
289                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
290                 wp.mtx.Lock()
291                 defer wp.mtx.Unlock()
292                 // delete() is deferred so the updateWorker() call
293                 // below knows to use StateBooting when adding a new
294                 // worker.
295                 defer delete(wp.creating, secret)
296                 if err != nil {
297                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
298                                 wp.atQuotaErr = err
299                                 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
300                                 time.AfterFunc(quotaErrorTTL, wp.notify)
301                         }
302                         logger.WithError(err).Error("create failed")
303                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
304                         return
305                 }
306                 wp.updateWorker(inst, it)
307         }()
308         return true
309 }
310
311 // AtQuota returns true if Create is not expected to work at the
312 // moment.
313 func (wp *Pool) AtQuota() bool {
314         wp.mtx.Lock()
315         defer wp.mtx.Unlock()
316         return time.Now().Before(wp.atQuotaUntil)
317 }
318
319 // SetIdleBehavior determines how the indicated instance will behave
320 // when it has no containers running.
321 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
322         wp.mtx.Lock()
323         defer wp.mtx.Unlock()
324         wkr, ok := wp.workers[id]
325         if !ok {
326                 return errors.New("requested instance does not exist")
327         }
328         wkr.setIdleBehavior(idleBehavior)
329         return nil
330 }
331
332 // Add or update worker attached to the given instance.
333 //
334 // The second return value is true if a new worker is created.
335 //
336 // A newly added instance has state=StateBooting if its tags match an
337 // entry in wp.creating, otherwise StateUnknown.
338 //
339 // Caller must have lock.
340 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
341         inst = tagVerifier{inst}
342         id := inst.ID()
343         if wkr := wp.workers[id]; wkr != nil {
344                 wkr.executor.SetTarget(inst)
345                 wkr.instance = inst
346                 wkr.updated = time.Now()
347                 wkr.saveTags()
348                 return wkr, false
349         }
350
351         state := StateUnknown
352         if _, ok := wp.creating[inst.Tags()[tagKeyInstanceSecret]]; ok {
353                 state = StateBooting
354         }
355
356         // If an instance has a valid IdleBehavior tag when it first
357         // appears, initialize the new worker accordingly (this is how
358         // we restore IdleBehavior that was set by a prior dispatch
359         // process); otherwise, default to "run". After this,
360         // wkr.idleBehavior is the source of truth, and will only be
361         // changed via SetIdleBehavior().
362         idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior])
363         if !validIdleBehavior[idleBehavior] {
364                 idleBehavior = IdleBehaviorRun
365         }
366
367         logger := wp.logger.WithFields(logrus.Fields{
368                 "InstanceType": it.Name,
369                 "Instance":     inst.ID(),
370                 "Address":      inst.Address(),
371         })
372         logger.WithFields(logrus.Fields{
373                 "State":        state,
374                 "IdleBehavior": idleBehavior,
375         }).Infof("instance appeared in cloud")
376         now := time.Now()
377         wkr := &worker{
378                 mtx:          &wp.mtx,
379                 wp:           wp,
380                 logger:       logger,
381                 executor:     wp.newExecutor(inst),
382                 state:        state,
383                 idleBehavior: idleBehavior,
384                 instance:     inst,
385                 instType:     it,
386                 appeared:     now,
387                 probed:       now,
388                 busy:         now,
389                 updated:      now,
390                 running:      make(map[string]*remoteRunner),
391                 starting:     make(map[string]*remoteRunner),
392                 probing:      make(chan struct{}, 1),
393         }
394         wp.workers[id] = wkr
395         return wkr, true
396 }
397
398 // Shutdown shuts down a worker with the given type, or returns false
399 // if all workers with the given type are busy.
400 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
401         wp.setupOnce.Do(wp.setup)
402         wp.mtx.Lock()
403         defer wp.mtx.Unlock()
404         logger := wp.logger.WithField("InstanceType", it.Name)
405         logger.Info("shutdown requested")
406         for _, tryState := range []State{StateBooting, StateIdle} {
407                 // TODO: shutdown the worker with the longest idle
408                 // time (Idle) or the earliest create time (Booting)
409                 for _, wkr := range wp.workers {
410                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
411                                 logger.WithField("Instance", wkr.instance).Info("shutting down")
412                                 wkr.shutdown()
413                                 return true
414                         }
415                 }
416         }
417         return false
418 }
419
420 // CountWorkers returns the current number of workers in each state.
421 //
422 // CountWorkers blocks, if necessary, until the initial instance list
423 // has been loaded from the cloud provider.
424 func (wp *Pool) CountWorkers() map[State]int {
425         wp.setupOnce.Do(wp.setup)
426         wp.waitUntilLoaded()
427         wp.mtx.Lock()
428         defer wp.mtx.Unlock()
429         r := map[State]int{}
430         for _, w := range wp.workers {
431                 r[w.state]++
432         }
433         return r
434 }
435
436 // Running returns the container UUIDs being prepared/run on workers.
437 //
438 // In the returned map, the time value indicates when the Pool
439 // observed that the container process had exited. A container that
440 // has not yet exited has a zero time value. The caller should use
441 // KillContainer() to garbage-collect the entries for exited
442 // containers.
443 func (wp *Pool) Running() map[string]time.Time {
444         wp.setupOnce.Do(wp.setup)
445         wp.mtx.Lock()
446         defer wp.mtx.Unlock()
447         r := map[string]time.Time{}
448         for _, wkr := range wp.workers {
449                 for uuid := range wkr.running {
450                         r[uuid] = time.Time{}
451                 }
452                 for uuid := range wkr.starting {
453                         r[uuid] = time.Time{}
454                 }
455         }
456         for uuid, exited := range wp.exited {
457                 r[uuid] = exited
458         }
459         return r
460 }
461
462 // StartContainer starts a container on an idle worker immediately if
463 // possible, otherwise returns false.
464 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
465         wp.setupOnce.Do(wp.setup)
466         wp.mtx.Lock()
467         defer wp.mtx.Unlock()
468         var wkr *worker
469         for _, w := range wp.workers {
470                 if w.instType == it && w.state == StateIdle {
471                         if wkr == nil || w.busy.After(wkr.busy) {
472                                 wkr = w
473                         }
474                 }
475         }
476         if wkr == nil {
477                 return false
478         }
479         wkr.startContainer(ctr)
480         return true
481 }
482
483 // KillContainer kills the crunch-run process for the given container
484 // UUID, if it's running on any worker.
485 //
486 // KillContainer returns immediately; the act of killing the container
487 // takes some time, and runs in the background.
488 func (wp *Pool) KillContainer(uuid string, reason string) {
489         wp.mtx.Lock()
490         defer wp.mtx.Unlock()
491         logger := wp.logger.WithFields(logrus.Fields{
492                 "ContainerUUID": uuid,
493                 "Reason":        reason,
494         })
495         if _, ok := wp.exited[uuid]; ok {
496                 logger.Debug("clearing placeholder for exited crunch-run process")
497                 delete(wp.exited, uuid)
498                 return
499         }
500         for _, wkr := range wp.workers {
501                 rr := wkr.running[uuid]
502                 if rr == nil {
503                         rr = wkr.starting[uuid]
504                 }
505                 if rr != nil {
506                         rr.Kill(reason)
507                         return
508                 }
509         }
510         logger.Debug("cannot kill: already disappeared")
511 }
512
513 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
514         if reg == nil {
515                 reg = prometheus.NewRegistry()
516         }
517         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
518                 Namespace: "arvados",
519                 Subsystem: "dispatchcloud",
520                 Name:      "containers_running",
521                 Help:      "Number of containers reported running by cloud VMs.",
522         })
523         reg.MustRegister(wp.mContainersRunning)
524         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
525                 Namespace: "arvados",
526                 Subsystem: "dispatchcloud",
527                 Name:      "instances_total",
528                 Help:      "Number of cloud VMs.",
529         }, []string{"category"})
530         reg.MustRegister(wp.mInstances)
531         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
532                 Namespace: "arvados",
533                 Subsystem: "dispatchcloud",
534                 Name:      "instances_price",
535                 Help:      "Price of cloud VMs.",
536         }, []string{"category"})
537         reg.MustRegister(wp.mInstancesPrice)
538         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
539                 Namespace: "arvados",
540                 Subsystem: "dispatchcloud",
541                 Name:      "vcpus_total",
542                 Help:      "Total VCPUs on all cloud VMs.",
543         }, []string{"category"})
544         reg.MustRegister(wp.mVCPUs)
545         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
546                 Namespace: "arvados",
547                 Subsystem: "dispatchcloud",
548                 Name:      "memory_bytes_total",
549                 Help:      "Total memory on all cloud VMs.",
550         }, []string{"category"})
551         reg.MustRegister(wp.mMemory)
552 }
553
554 func (wp *Pool) runMetrics() {
555         ch := wp.Subscribe()
556         defer wp.Unsubscribe(ch)
557         wp.updateMetrics()
558         for range ch {
559                 wp.updateMetrics()
560         }
561 }
562
563 func (wp *Pool) updateMetrics() {
564         wp.mtx.RLock()
565         defer wp.mtx.RUnlock()
566
567         instances := map[string]int64{}
568         price := map[string]float64{}
569         cpu := map[string]int64{}
570         mem := map[string]int64{}
571         var running int64
572         for _, wkr := range wp.workers {
573                 var cat string
574                 switch {
575                 case len(wkr.running)+len(wkr.starting) > 0:
576                         cat = "inuse"
577                 case wkr.idleBehavior == IdleBehaviorHold:
578                         cat = "hold"
579                 case wkr.state == StateBooting:
580                         cat = "booting"
581                 case wkr.state == StateUnknown:
582                         cat = "unknown"
583                 default:
584                         cat = "idle"
585                 }
586                 instances[cat]++
587                 price[cat] += wkr.instType.Price
588                 cpu[cat] += int64(wkr.instType.VCPUs)
589                 mem[cat] += int64(wkr.instType.RAM)
590                 running += int64(len(wkr.running) + len(wkr.starting))
591         }
592         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
593                 wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat]))
594                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
595                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
596                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
597         }
598         wp.mContainersRunning.Set(float64(running))
599 }
600
601 func (wp *Pool) runProbes() {
602         maxPPS := wp.maxProbesPerSecond
603         if maxPPS < 1 {
604                 maxPPS = defaultMaxProbesPerSecond
605         }
606         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
607         defer limitticker.Stop()
608
609         probeticker := time.NewTicker(wp.probeInterval)
610         defer probeticker.Stop()
611
612         workers := []cloud.InstanceID{}
613         for range probeticker.C {
614                 workers = workers[:0]
615                 wp.mtx.Lock()
616                 for id, wkr := range wp.workers {
617                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
618                                 continue
619                         }
620                         workers = append(workers, id)
621                 }
622                 wp.mtx.Unlock()
623
624                 for _, id := range workers {
625                         wp.mtx.Lock()
626                         wkr, ok := wp.workers[id]
627                         wp.mtx.Unlock()
628                         if !ok {
629                                 // Deleted while we were probing
630                                 // others
631                                 continue
632                         }
633                         go wkr.ProbeAndUpdate()
634                         select {
635                         case <-wp.stop:
636                                 return
637                         case <-limitticker.C:
638                         }
639                 }
640         }
641 }
642
643 func (wp *Pool) runSync() {
644         // sync once immediately, then wait syncInterval, sync again,
645         // etc.
646         timer := time.NewTimer(1)
647         for {
648                 select {
649                 case <-timer.C:
650                         err := wp.getInstancesAndSync()
651                         if err != nil {
652                                 wp.logger.WithError(err).Warn("sync failed")
653                         }
654                         timer.Reset(wp.syncInterval)
655                 case <-wp.stop:
656                         wp.logger.Debug("worker.Pool stopped")
657                         return
658                 }
659         }
660 }
661
662 // Stop synchronizing with the InstanceSet.
663 func (wp *Pool) Stop() {
664         wp.setupOnce.Do(wp.setup)
665         close(wp.stop)
666 }
667
668 // Instances returns an InstanceView for each worker in the pool,
669 // summarizing its current state and recent activity.
670 func (wp *Pool) Instances() []InstanceView {
671         var r []InstanceView
672         wp.setupOnce.Do(wp.setup)
673         wp.mtx.Lock()
674         for _, w := range wp.workers {
675                 r = append(r, InstanceView{
676                         Instance:             w.instance.ID(),
677                         Address:              w.instance.Address(),
678                         Price:                w.instType.Price,
679                         ArvadosInstanceType:  w.instType.Name,
680                         ProviderInstanceType: w.instType.ProviderType,
681                         LastContainerUUID:    w.lastUUID,
682                         LastBusy:             w.busy,
683                         WorkerState:          w.state.String(),
684                         IdleBehavior:         w.idleBehavior,
685                 })
686         }
687         wp.mtx.Unlock()
688         sort.Slice(r, func(i, j int) bool {
689                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
690         })
691         return r
692 }
693
694 // KillInstance destroys a cloud VM instance. It returns an error if
695 // the given instance does not exist.
696 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
697         wkr, ok := wp.workers[id]
698         if !ok {
699                 return errors.New("instance not found")
700         }
701         wkr.logger.WithField("Reason", reason).Info("shutting down")
702         wkr.shutdown()
703         return nil
704 }
705
706 func (wp *Pool) setup() {
707         wp.creating = map[string]createCall{}
708         wp.exited = map[string]time.Time{}
709         wp.workers = map[cloud.InstanceID]*worker{}
710         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
711 }
712
713 func (wp *Pool) notify() {
714         wp.mtx.RLock()
715         defer wp.mtx.RUnlock()
716         for _, send := range wp.subscribers {
717                 select {
718                 case send <- struct{}{}:
719                 default:
720                 }
721         }
722 }
723
724 func (wp *Pool) getInstancesAndSync() error {
725         wp.setupOnce.Do(wp.setup)
726         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
727                 return err
728         }
729         wp.logger.Debug("getting instance list")
730         threshold := time.Now()
731         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
732         if err != nil {
733                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
734                 return err
735         }
736         wp.sync(threshold, instances)
737         wp.logger.Debug("sync done")
738         return nil
739 }
740
741 // Add/remove/update workers based on instances, which was obtained
742 // from the instanceSet. However, don't clobber any other updates that
743 // already happened after threshold.
744 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
745         wp.mtx.Lock()
746         defer wp.mtx.Unlock()
747         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
748         notify := false
749
750         for _, inst := range instances {
751                 itTag := inst.Tags()[tagKeyInstanceType]
752                 it, ok := wp.instanceTypes[itTag]
753                 if !ok {
754                         wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
755                         continue
756                 }
757                 if wkr, isNew := wp.updateWorker(inst, it); isNew {
758                         notify = true
759                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
760                         wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
761                         wkr.shutdown()
762                 }
763         }
764
765         for id, wkr := range wp.workers {
766                 if wkr.updated.After(threshold) {
767                         continue
768                 }
769                 logger := wp.logger.WithFields(logrus.Fields{
770                         "Instance":    wkr.instance.ID(),
771                         "WorkerState": wkr.state,
772                 })
773                 logger.Info("instance disappeared in cloud")
774                 delete(wp.workers, id)
775                 go wkr.Close()
776                 notify = true
777         }
778
779         if !wp.loaded {
780                 notify = true
781                 wp.loaded = true
782                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
783         }
784
785         if notify {
786                 go wp.notify()
787         }
788 }
789
790 func (wp *Pool) waitUntilLoaded() {
791         ch := wp.Subscribe()
792         wp.mtx.RLock()
793         defer wp.mtx.RUnlock()
794         for !wp.loaded {
795                 wp.mtx.RUnlock()
796                 <-ch
797                 wp.mtx.RLock()
798         }
799 }
800
801 // Return a random string of n hexadecimal digits (n*4 random bits). n
802 // must be even.
803 func randomHex(n int) string {
804         buf := make([]byte, n/2)
805         _, err := rand.Read(buf)
806         if err != nil {
807                 panic(err)
808         }
809         return fmt.Sprintf("%x", buf)
810 }