e81c2c091f1c37c7b52488b4d919bdb9a9fe4d79
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/rand"
9         "errors"
10         "fmt"
11         "io"
12         "sort"
13         "strings"
14         "sync"
15         "time"
16
17         "git.curoverse.com/arvados.git/lib/cloud"
18         "git.curoverse.com/arvados.git/sdk/go/arvados"
19         "github.com/prometheus/client_golang/prometheus"
20         "github.com/sirupsen/logrus"
21         "golang.org/x/crypto/ssh"
22 )
23
24 const (
25         tagKeyInstanceType   = "InstanceType"
26         tagKeyIdleBehavior   = "IdleBehavior"
27         tagKeyInstanceSecret = "InstanceSecret"
28 )
29
30 // An InstanceView shows a worker's current state and recent activity.
31 type InstanceView struct {
32         Instance             cloud.InstanceID `json:"instance"`
33         Address              string           `json:"address"`
34         Price                float64          `json:"price"`
35         ArvadosInstanceType  string           `json:"arvados_instance_type"`
36         ProviderInstanceType string           `json:"provider_instance_type"`
37         LastContainerUUID    string           `json:"last_container_uuid"`
38         LastBusy             time.Time        `json:"last_busy"`
39         WorkerState          string           `json:"worker_state"`
40         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
41 }
42
43 // An Executor executes shell commands on a remote host.
44 type Executor interface {
45         // Run cmd on the current target.
46         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
47
48         // Use the given target for subsequent operations. The new
49         // target is the same host as the previous target, but it
50         // might return a different address and verify a different
51         // host key.
52         //
53         // SetTarget is called frequently, and in most cases the new
54         // target will behave exactly the same as the old one. An
55         // implementation should optimize accordingly.
56         //
57         // SetTarget must not block on concurrent Execute calls.
58         SetTarget(cloud.ExecutorTarget)
59
60         Close()
61 }
62
63 const (
64         defaultSyncInterval       = time.Minute
65         defaultProbeInterval      = time.Second * 10
66         defaultMaxProbesPerSecond = 10
67         defaultTimeoutIdle        = time.Minute
68         defaultTimeoutBooting     = time.Minute * 10
69         defaultTimeoutProbe       = time.Minute * 10
70         defaultTimeoutShutdown    = time.Second * 10
71
72         // Time after a quota error to try again anyway, even if no
73         // instances have been shutdown.
74         quotaErrorTTL = time.Minute
75
76         // Time between "X failed because rate limiting" messages
77         logRateLimitErrorInterval = time.Second * 10
78 )
79
80 func duration(conf arvados.Duration, def time.Duration) time.Duration {
81         if conf > 0 {
82                 return time.Duration(conf)
83         } else {
84                 return def
85         }
86 }
87
88 // NewPool creates a Pool of workers backed by instanceSet.
89 //
90 // New instances are configured and set up according to the given
91 // cluster configuration.
92 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
93         wp := &Pool{
94                 logger:             logger,
95                 arvClient:          arvClient,
96                 instanceSet:        &throttledInstanceSet{InstanceSet: instanceSet},
97                 newExecutor:        newExecutor,
98                 bootProbeCommand:   cluster.CloudVMs.BootProbeCommand,
99                 imageID:            cloud.ImageID(cluster.CloudVMs.ImageID),
100                 instanceTypes:      cluster.InstanceTypes,
101                 maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
102                 probeInterval:      duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
103                 syncInterval:       duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
104                 timeoutIdle:        duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
105                 timeoutBooting:     duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
106                 timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
107                 timeoutShutdown:    duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
108                 installPublicKey:   installPublicKey,
109                 stop:               make(chan bool),
110         }
111         wp.registerMetrics(reg)
112         go func() {
113                 wp.setupOnce.Do(wp.setup)
114                 go wp.runMetrics()
115                 go wp.runProbes()
116                 go wp.runSync()
117         }()
118         return wp
119 }
120
121 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
122 // zero Pool should not be used. Call NewPool to create a new Pool.
123 type Pool struct {
124         // configuration
125         logger             logrus.FieldLogger
126         arvClient          *arvados.Client
127         instanceSet        *throttledInstanceSet
128         newExecutor        func(cloud.Instance) Executor
129         bootProbeCommand   string
130         imageID            cloud.ImageID
131         instanceTypes      map[string]arvados.InstanceType
132         syncInterval       time.Duration
133         probeInterval      time.Duration
134         maxProbesPerSecond int
135         timeoutIdle        time.Duration
136         timeoutBooting     time.Duration
137         timeoutProbe       time.Duration
138         timeoutShutdown    time.Duration
139         installPublicKey   ssh.PublicKey
140
141         // private state
142         subscribers  map[<-chan struct{}]chan<- struct{}
143         creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
144         workers      map[cloud.InstanceID]*worker
145         loaded       bool                 // loaded list of instances from InstanceSet at least once
146         exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
147         atQuotaUntil time.Time
148         atQuotaErr   cloud.QuotaError
149         stop         chan bool
150         mtx          sync.RWMutex
151         setupOnce    sync.Once
152
153         throttleCreate    throttle
154         throttleInstances throttle
155
156         mContainersRunning prometheus.Gauge
157         mInstances         *prometheus.GaugeVec
158         mInstancesPrice    *prometheus.GaugeVec
159         mVCPUs             *prometheus.GaugeVec
160         mMemory            *prometheus.GaugeVec
161 }
162
163 type createCall struct {
164         time         time.Time
165         instanceType arvados.InstanceType
166 }
167
168 // Subscribe returns a buffered channel that becomes ready after any
169 // change to the pool's state that could have scheduling implications:
170 // a worker's state changes, a new worker appears, the cloud
171 // provider's API rate limiting period ends, etc.
172 //
173 // Additional events that occur while the channel is already ready
174 // will be dropped, so it is OK if the caller services the channel
175 // slowly.
176 //
177 // Example:
178 //
179 //      ch := wp.Subscribe()
180 //      defer wp.Unsubscribe(ch)
181 //      for range ch {
182 //              tryScheduling(wp)
183 //              if done {
184 //                      break
185 //              }
186 //      }
187 func (wp *Pool) Subscribe() <-chan struct{} {
188         wp.setupOnce.Do(wp.setup)
189         wp.mtx.Lock()
190         defer wp.mtx.Unlock()
191         ch := make(chan struct{}, 1)
192         wp.subscribers[ch] = ch
193         return ch
194 }
195
196 // Unsubscribe stops sending updates to the given channel.
197 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
198         wp.setupOnce.Do(wp.setup)
199         wp.mtx.Lock()
200         defer wp.mtx.Unlock()
201         delete(wp.subscribers, ch)
202 }
203
204 // Unallocated returns the number of unallocated (creating + booting +
205 // idle + unknown) workers for each instance type.  Workers in
206 // hold/drain mode are not included.
207 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
208         wp.setupOnce.Do(wp.setup)
209         wp.mtx.RLock()
210         defer wp.mtx.RUnlock()
211         unalloc := map[arvados.InstanceType]int{}
212         creating := map[arvados.InstanceType]int{}
213         oldestCreate := map[arvados.InstanceType]time.Time{}
214         for _, cc := range wp.creating {
215                 it := cc.instanceType
216                 creating[it]++
217                 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
218                         oldestCreate[it] = cc.time
219                 }
220         }
221         for _, wkr := range wp.workers {
222                 // Skip workers that are not expected to become
223                 // available soon. Note len(wkr.running)>0 is not
224                 // redundant here: it can be true even in
225                 // StateUnknown.
226                 if wkr.state == StateShutdown ||
227                         wkr.state == StateRunning ||
228                         wkr.idleBehavior != IdleBehaviorRun ||
229                         len(wkr.running) > 0 {
230                         continue
231                 }
232                 it := wkr.instType
233                 unalloc[it]++
234                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
235                         // If up to N new workers appear in
236                         // Instances() while we are waiting for N
237                         // Create() calls to complete, we assume we're
238                         // just seeing a race between Instances() and
239                         // Create() responses.
240                         //
241                         // The other common reason why nodes have
242                         // state==Unknown is that they appeared at
243                         // startup, before any Create calls. They
244                         // don't match the above timing condition, so
245                         // we never mistakenly attribute them to
246                         // pending Create calls.
247                         creating[it]--
248                 }
249         }
250         for it, c := range creating {
251                 unalloc[it] += c
252         }
253         return unalloc
254 }
255
256 // Create a new instance with the given type, and add it to the worker
257 // pool. The worker is added immediately; instance creation runs in
258 // the background.
259 //
260 // Create returns false if a pre-existing error state prevents it from
261 // even attempting to create a new instance. Those errors are logged
262 // by the Pool, so the caller does not need to log anything in such
263 // cases.
264 func (wp *Pool) Create(it arvados.InstanceType) bool {
265         logger := wp.logger.WithField("InstanceType", it.Name)
266         wp.setupOnce.Do(wp.setup)
267         wp.mtx.Lock()
268         defer wp.mtx.Unlock()
269         if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
270                 return false
271         }
272         now := time.Now()
273         secret := randomHex(instanceSecretLength)
274         wp.creating[secret] = createCall{time: now, instanceType: it}
275         go func() {
276                 defer wp.notify()
277                 tags := cloud.InstanceTags{
278                         tagKeyInstanceType:   it.Name,
279                         tagKeyIdleBehavior:   string(IdleBehaviorRun),
280                         tagKeyInstanceSecret: secret,
281                 }
282                 initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename))
283                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
284                 wp.mtx.Lock()
285                 defer wp.mtx.Unlock()
286                 // delete() is deferred so the updateWorker() call
287                 // below knows to use StateBooting when adding a new
288                 // worker.
289                 defer delete(wp.creating, secret)
290                 if err != nil {
291                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
292                                 wp.atQuotaErr = err
293                                 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
294                                 time.AfterFunc(quotaErrorTTL, wp.notify)
295                         }
296                         logger.WithError(err).Error("create failed")
297                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
298                         return
299                 }
300                 wp.updateWorker(inst, it)
301         }()
302         return true
303 }
304
305 // AtQuota returns true if Create is not expected to work at the
306 // moment.
307 func (wp *Pool) AtQuota() bool {
308         wp.mtx.Lock()
309         defer wp.mtx.Unlock()
310         return time.Now().Before(wp.atQuotaUntil)
311 }
312
313 // SetIdleBehavior determines how the indicated instance will behave
314 // when it has no containers running.
315 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
316         wp.mtx.Lock()
317         defer wp.mtx.Unlock()
318         wkr, ok := wp.workers[id]
319         if !ok {
320                 return errors.New("requested instance does not exist")
321         }
322         wkr.idleBehavior = idleBehavior
323         wkr.saveTags()
324         wkr.shutdownIfIdle()
325         return nil
326 }
327
328 // Add or update worker attached to the given instance.
329 //
330 // The second return value is true if a new worker is created.
331 //
332 // A newly added instance has state=StateBooting if its tags match an
333 // entry in wp.creating, otherwise StateUnknown.
334 //
335 // Caller must have lock.
336 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
337         inst = tagVerifier{inst}
338         id := inst.ID()
339         if wkr := wp.workers[id]; wkr != nil {
340                 wkr.executor.SetTarget(inst)
341                 wkr.instance = inst
342                 wkr.updated = time.Now()
343                 wkr.saveTags()
344                 return wkr, false
345         }
346
347         state := StateUnknown
348         if _, ok := wp.creating[inst.Tags()[tagKeyInstanceSecret]]; ok {
349                 state = StateBooting
350         }
351
352         // If an instance has a valid IdleBehavior tag when it first
353         // appears, initialize the new worker accordingly (this is how
354         // we restore IdleBehavior that was set by a prior dispatch
355         // process); otherwise, default to "run". After this,
356         // wkr.idleBehavior is the source of truth, and will only be
357         // changed via SetIdleBehavior().
358         idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior])
359         if !validIdleBehavior[idleBehavior] {
360                 idleBehavior = IdleBehaviorRun
361         }
362
363         logger := wp.logger.WithFields(logrus.Fields{
364                 "InstanceType": it.Name,
365                 "Instance":     inst.ID(),
366                 "Address":      inst.Address(),
367         })
368         logger.WithFields(logrus.Fields{
369                 "State":        state,
370                 "IdleBehavior": idleBehavior,
371         }).Infof("instance appeared in cloud")
372         now := time.Now()
373         wkr := &worker{
374                 mtx:          &wp.mtx,
375                 wp:           wp,
376                 logger:       logger,
377                 executor:     wp.newExecutor(inst),
378                 state:        state,
379                 idleBehavior: idleBehavior,
380                 instance:     inst,
381                 instType:     it,
382                 appeared:     now,
383                 probed:       now,
384                 busy:         now,
385                 updated:      now,
386                 running:      make(map[string]struct{}),
387                 starting:     make(map[string]struct{}),
388                 probing:      make(chan struct{}, 1),
389         }
390         wp.workers[id] = wkr
391         return wkr, true
392 }
393
394 // caller must have lock.
395 func (wp *Pool) notifyExited(uuid string, t time.Time) {
396         wp.exited[uuid] = t
397 }
398
399 // Shutdown shuts down a worker with the given type, or returns false
400 // if all workers with the given type are busy.
401 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
402         wp.setupOnce.Do(wp.setup)
403         wp.mtx.Lock()
404         defer wp.mtx.Unlock()
405         logger := wp.logger.WithField("InstanceType", it.Name)
406         logger.Info("shutdown requested")
407         for _, tryState := range []State{StateBooting, StateIdle} {
408                 // TODO: shutdown the worker with the longest idle
409                 // time (Idle) or the earliest create time (Booting)
410                 for _, wkr := range wp.workers {
411                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
412                                 logger.WithField("Instance", wkr.instance).Info("shutting down")
413                                 wkr.shutdown()
414                                 return true
415                         }
416                 }
417         }
418         return false
419 }
420
421 // CountWorkers returns the current number of workers in each state.
422 func (wp *Pool) CountWorkers() map[State]int {
423         wp.setupOnce.Do(wp.setup)
424         wp.mtx.Lock()
425         defer wp.mtx.Unlock()
426         r := map[State]int{}
427         for _, w := range wp.workers {
428                 r[w.state]++
429         }
430         return r
431 }
432
433 // Running returns the container UUIDs being prepared/run on workers.
434 //
435 // In the returned map, the time value indicates when the Pool
436 // observed that the container process had exited. A container that
437 // has not yet exited has a zero time value. The caller should use
438 // KillContainer() to garbage-collect the entries for exited
439 // containers.
440 func (wp *Pool) Running() map[string]time.Time {
441         wp.setupOnce.Do(wp.setup)
442         wp.mtx.Lock()
443         defer wp.mtx.Unlock()
444         r := map[string]time.Time{}
445         for _, wkr := range wp.workers {
446                 for uuid := range wkr.running {
447                         r[uuid] = time.Time{}
448                 }
449                 for uuid := range wkr.starting {
450                         r[uuid] = time.Time{}
451                 }
452         }
453         for uuid, exited := range wp.exited {
454                 r[uuid] = exited
455         }
456         return r
457 }
458
459 // StartContainer starts a container on an idle worker immediately if
460 // possible, otherwise returns false.
461 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
462         wp.setupOnce.Do(wp.setup)
463         wp.mtx.Lock()
464         defer wp.mtx.Unlock()
465         var wkr *worker
466         for _, w := range wp.workers {
467                 if w.instType == it && w.state == StateIdle {
468                         if wkr == nil || w.busy.After(wkr.busy) {
469                                 wkr = w
470                         }
471                 }
472         }
473         if wkr == nil {
474                 return false
475         }
476         wkr.startContainer(ctr)
477         return true
478 }
479
480 // KillContainer kills the crunch-run process for the given container
481 // UUID, if it's running on any worker.
482 //
483 // KillContainer returns immediately; the act of killing the container
484 // takes some time, and runs in the background.
485 func (wp *Pool) KillContainer(uuid string) {
486         wp.mtx.Lock()
487         defer wp.mtx.Unlock()
488         if _, ok := wp.exited[uuid]; ok {
489                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
490                 delete(wp.exited, uuid)
491                 return
492         }
493         for _, wkr := range wp.workers {
494                 if _, ok := wkr.running[uuid]; ok {
495                         go wp.kill(wkr, uuid)
496                         return
497                 }
498         }
499         wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
500 }
501
502 func (wp *Pool) kill(wkr *worker, uuid string) {
503         logger := wp.logger.WithFields(logrus.Fields{
504                 "ContainerUUID": uuid,
505                 "Instance":      wkr.instance.ID(),
506         })
507         logger.Debug("killing process")
508         cmd := "crunch-run --kill 15 " + uuid
509         if u := wkr.instance.RemoteUser(); u != "root" {
510                 cmd = "sudo " + cmd
511         }
512         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
513         if err != nil {
514                 logger.WithFields(logrus.Fields{
515                         "stderr": string(stderr),
516                         "stdout": string(stdout),
517                         "error":  err,
518                 }).Warn("kill failed")
519                 return
520         }
521         logger.Debug("killing process succeeded")
522         wp.mtx.Lock()
523         defer wp.mtx.Unlock()
524         if _, ok := wkr.running[uuid]; ok {
525                 delete(wkr.running, uuid)
526                 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
527                         wkr.state = StateIdle
528                 }
529                 wkr.updated = time.Now()
530                 go wp.notify()
531         }
532 }
533
534 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
535         if reg == nil {
536                 reg = prometheus.NewRegistry()
537         }
538         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
539                 Namespace: "arvados",
540                 Subsystem: "dispatchcloud",
541                 Name:      "containers_running",
542                 Help:      "Number of containers reported running by cloud VMs.",
543         })
544         reg.MustRegister(wp.mContainersRunning)
545         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
546                 Namespace: "arvados",
547                 Subsystem: "dispatchcloud",
548                 Name:      "instances_total",
549                 Help:      "Number of cloud VMs.",
550         }, []string{"category"})
551         reg.MustRegister(wp.mInstances)
552         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
553                 Namespace: "arvados",
554                 Subsystem: "dispatchcloud",
555                 Name:      "instances_price",
556                 Help:      "Price of cloud VMs.",
557         }, []string{"category"})
558         reg.MustRegister(wp.mInstancesPrice)
559         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
560                 Namespace: "arvados",
561                 Subsystem: "dispatchcloud",
562                 Name:      "vcpus_total",
563                 Help:      "Total VCPUs on all cloud VMs.",
564         }, []string{"category"})
565         reg.MustRegister(wp.mVCPUs)
566         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
567                 Namespace: "arvados",
568                 Subsystem: "dispatchcloud",
569                 Name:      "memory_bytes_total",
570                 Help:      "Total memory on all cloud VMs.",
571         }, []string{"category"})
572         reg.MustRegister(wp.mMemory)
573 }
574
575 func (wp *Pool) runMetrics() {
576         ch := wp.Subscribe()
577         defer wp.Unsubscribe(ch)
578         wp.updateMetrics()
579         for range ch {
580                 wp.updateMetrics()
581         }
582 }
583
584 func (wp *Pool) updateMetrics() {
585         wp.mtx.RLock()
586         defer wp.mtx.RUnlock()
587
588         instances := map[string]int64{}
589         price := map[string]float64{}
590         cpu := map[string]int64{}
591         mem := map[string]int64{}
592         var running int64
593         for _, wkr := range wp.workers {
594                 var cat string
595                 switch {
596                 case len(wkr.running)+len(wkr.starting) > 0:
597                         cat = "inuse"
598                 case wkr.idleBehavior == IdleBehaviorHold:
599                         cat = "hold"
600                 case wkr.state == StateBooting:
601                         cat = "booting"
602                 case wkr.state == StateUnknown:
603                         cat = "unknown"
604                 default:
605                         cat = "idle"
606                 }
607                 instances[cat]++
608                 price[cat] += wkr.instType.Price
609                 cpu[cat] += int64(wkr.instType.VCPUs)
610                 mem[cat] += int64(wkr.instType.RAM)
611                 running += int64(len(wkr.running) + len(wkr.starting))
612         }
613         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
614                 wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat]))
615                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
616                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
617                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
618         }
619         wp.mContainersRunning.Set(float64(running))
620 }
621
622 func (wp *Pool) runProbes() {
623         maxPPS := wp.maxProbesPerSecond
624         if maxPPS < 1 {
625                 maxPPS = defaultMaxProbesPerSecond
626         }
627         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
628         defer limitticker.Stop()
629
630         probeticker := time.NewTicker(wp.probeInterval)
631         defer probeticker.Stop()
632
633         workers := []cloud.InstanceID{}
634         for range probeticker.C {
635                 workers = workers[:0]
636                 wp.mtx.Lock()
637                 for id, wkr := range wp.workers {
638                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
639                                 continue
640                         }
641                         workers = append(workers, id)
642                 }
643                 wp.mtx.Unlock()
644
645                 for _, id := range workers {
646                         wp.mtx.Lock()
647                         wkr, ok := wp.workers[id]
648                         wp.mtx.Unlock()
649                         if !ok {
650                                 // Deleted while we were probing
651                                 // others
652                                 continue
653                         }
654                         go wkr.ProbeAndUpdate()
655                         select {
656                         case <-wp.stop:
657                                 return
658                         case <-limitticker.C:
659                         }
660                 }
661         }
662 }
663
664 func (wp *Pool) runSync() {
665         // sync once immediately, then wait syncInterval, sync again,
666         // etc.
667         timer := time.NewTimer(1)
668         for {
669                 select {
670                 case <-timer.C:
671                         err := wp.getInstancesAndSync()
672                         if err != nil {
673                                 wp.logger.WithError(err).Warn("sync failed")
674                         }
675                         timer.Reset(wp.syncInterval)
676                 case <-wp.stop:
677                         wp.logger.Debug("worker.Pool stopped")
678                         return
679                 }
680         }
681 }
682
683 // Stop synchronizing with the InstanceSet.
684 func (wp *Pool) Stop() {
685         wp.setupOnce.Do(wp.setup)
686         close(wp.stop)
687 }
688
689 // Instances returns an InstanceView for each worker in the pool,
690 // summarizing its current state and recent activity.
691 func (wp *Pool) Instances() []InstanceView {
692         var r []InstanceView
693         wp.setupOnce.Do(wp.setup)
694         wp.mtx.Lock()
695         for _, w := range wp.workers {
696                 r = append(r, InstanceView{
697                         Instance:             w.instance.ID(),
698                         Address:              w.instance.Address(),
699                         Price:                w.instType.Price,
700                         ArvadosInstanceType:  w.instType.Name,
701                         ProviderInstanceType: w.instType.ProviderType,
702                         LastContainerUUID:    w.lastUUID,
703                         LastBusy:             w.busy,
704                         WorkerState:          w.state.String(),
705                         IdleBehavior:         w.idleBehavior,
706                 })
707         }
708         wp.mtx.Unlock()
709         sort.Slice(r, func(i, j int) bool {
710                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
711         })
712         return r
713 }
714
715 func (wp *Pool) setup() {
716         wp.creating = map[string]createCall{}
717         wp.exited = map[string]time.Time{}
718         wp.workers = map[cloud.InstanceID]*worker{}
719         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
720 }
721
722 func (wp *Pool) notify() {
723         wp.mtx.RLock()
724         defer wp.mtx.RUnlock()
725         for _, send := range wp.subscribers {
726                 select {
727                 case send <- struct{}{}:
728                 default:
729                 }
730         }
731 }
732
733 func (wp *Pool) getInstancesAndSync() error {
734         wp.setupOnce.Do(wp.setup)
735         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
736                 return err
737         }
738         wp.logger.Debug("getting instance list")
739         threshold := time.Now()
740         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
741         if err != nil {
742                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
743                 return err
744         }
745         wp.sync(threshold, instances)
746         wp.logger.Debug("sync done")
747         return nil
748 }
749
750 // Add/remove/update workers based on instances, which was obtained
751 // from the instanceSet. However, don't clobber any other updates that
752 // already happened after threshold.
753 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
754         wp.mtx.Lock()
755         defer wp.mtx.Unlock()
756         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
757         notify := false
758
759         for _, inst := range instances {
760                 itTag := inst.Tags()[tagKeyInstanceType]
761                 it, ok := wp.instanceTypes[itTag]
762                 if !ok {
763                         wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
764                         continue
765                 }
766                 if wkr, isNew := wp.updateWorker(inst, it); isNew {
767                         notify = true
768                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
769                         wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
770                         wkr.shutdown()
771                 }
772         }
773
774         for id, wkr := range wp.workers {
775                 if wkr.updated.After(threshold) {
776                         continue
777                 }
778                 logger := wp.logger.WithFields(logrus.Fields{
779                         "Instance":    wkr.instance.ID(),
780                         "WorkerState": wkr.state,
781                 })
782                 logger.Info("instance disappeared in cloud")
783                 delete(wp.workers, id)
784                 go wkr.executor.Close()
785                 notify = true
786         }
787
788         if !wp.loaded {
789                 wp.loaded = true
790                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
791         }
792
793         if notify {
794                 go wp.notify()
795         }
796 }
797
798 // Return a random string of n hexadecimal digits (n*4 random bits). n
799 // must be even.
800 func randomHex(n int) string {
801         buf := make([]byte, n/2)
802         _, err := rand.Read(buf)
803         if err != nil {
804                 panic(err)
805         }
806         return fmt.Sprintf("%x", buf)
807 }