Merge branch '9865-cwl-fix-ignored-exceptions'
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/rand"
9         "errors"
10         "fmt"
11         "io"
12         "sort"
13         "strings"
14         "sync"
15         "time"
16
17         "git.curoverse.com/arvados.git/lib/cloud"
18         "git.curoverse.com/arvados.git/sdk/go/arvados"
19         "github.com/prometheus/client_golang/prometheus"
20         "github.com/sirupsen/logrus"
21         "golang.org/x/crypto/ssh"
22 )
23
24 const (
25         tagKeyInstanceType   = "InstanceType"
26         tagKeyIdleBehavior   = "IdleBehavior"
27         tagKeyInstanceSecret = "InstanceSecret"
28 )
29
30 // An InstanceView shows a worker's current state and recent activity.
31 type InstanceView struct {
32         Instance             cloud.InstanceID `json:"instance"`
33         Address              string           `json:"address"`
34         Price                float64          `json:"price"`
35         ArvadosInstanceType  string           `json:"arvados_instance_type"`
36         ProviderInstanceType string           `json:"provider_instance_type"`
37         LastContainerUUID    string           `json:"last_container_uuid"`
38         LastBusy             time.Time        `json:"last_busy"`
39         WorkerState          string           `json:"worker_state"`
40         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
41 }
42
43 // An Executor executes shell commands on a remote host.
44 type Executor interface {
45         // Run cmd on the current target.
46         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
47
48         // Use the given target for subsequent operations. The new
49         // target is the same host as the previous target, but it
50         // might return a different address and verify a different
51         // host key.
52         //
53         // SetTarget is called frequently, and in most cases the new
54         // target will behave exactly the same as the old one. An
55         // implementation should optimize accordingly.
56         //
57         // SetTarget must not block on concurrent Execute calls.
58         SetTarget(cloud.ExecutorTarget)
59
60         Close()
61 }
62
63 const (
64         defaultSyncInterval       = time.Minute
65         defaultProbeInterval      = time.Second * 10
66         defaultMaxProbesPerSecond = 10
67         defaultTimeoutIdle        = time.Minute
68         defaultTimeoutBooting     = time.Minute * 10
69         defaultTimeoutProbe       = time.Minute * 10
70         defaultTimeoutShutdown    = time.Second * 10
71
72         // Time after a quota error to try again anyway, even if no
73         // instances have been shutdown.
74         quotaErrorTTL = time.Minute
75
76         // Time between "X failed because rate limiting" messages
77         logRateLimitErrorInterval = time.Second * 10
78 )
79
80 func duration(conf arvados.Duration, def time.Duration) time.Duration {
81         if conf > 0 {
82                 return time.Duration(conf)
83         } else {
84                 return def
85         }
86 }
87
88 // NewPool creates a Pool of workers backed by instanceSet.
89 //
90 // New instances are configured and set up according to the given
91 // cluster configuration.
92 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
93         wp := &Pool{
94                 logger:             logger,
95                 arvClient:          arvClient,
96                 instanceSet:        &throttledInstanceSet{InstanceSet: instanceSet},
97                 newExecutor:        newExecutor,
98                 bootProbeCommand:   cluster.CloudVMs.BootProbeCommand,
99                 imageID:            cloud.ImageID(cluster.CloudVMs.ImageID),
100                 instanceTypes:      cluster.InstanceTypes,
101                 maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
102                 probeInterval:      duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
103                 syncInterval:       duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
104                 timeoutIdle:        duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
105                 timeoutBooting:     duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
106                 timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
107                 timeoutShutdown:    duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
108                 installPublicKey:   installPublicKey,
109                 stop:               make(chan bool),
110         }
111         wp.registerMetrics(reg)
112         go func() {
113                 wp.setupOnce.Do(wp.setup)
114                 go wp.runMetrics()
115                 go wp.runProbes()
116                 go wp.runSync()
117         }()
118         return wp
119 }
120
121 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
122 // zero Pool should not be used. Call NewPool to create a new Pool.
123 type Pool struct {
124         // configuration
125         logger             logrus.FieldLogger
126         arvClient          *arvados.Client
127         instanceSet        *throttledInstanceSet
128         newExecutor        func(cloud.Instance) Executor
129         bootProbeCommand   string
130         imageID            cloud.ImageID
131         instanceTypes      map[string]arvados.InstanceType
132         syncInterval       time.Duration
133         probeInterval      time.Duration
134         maxProbesPerSecond int
135         timeoutIdle        time.Duration
136         timeoutBooting     time.Duration
137         timeoutProbe       time.Duration
138         timeoutShutdown    time.Duration
139         installPublicKey   ssh.PublicKey
140
141         // private state
142         subscribers  map[<-chan struct{}]chan<- struct{}
143         creating     map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls
144         workers      map[cloud.InstanceID]*worker
145         loaded       bool                 // loaded list of instances from InstanceSet at least once
146         exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
147         atQuotaUntil time.Time
148         atQuotaErr   cloud.QuotaError
149         stop         chan bool
150         mtx          sync.RWMutex
151         setupOnce    sync.Once
152
153         throttleCreate    throttle
154         throttleInstances throttle
155
156         mContainersRunning prometheus.Gauge
157         mInstances         *prometheus.GaugeVec
158         mInstancesPrice    *prometheus.GaugeVec
159         mVCPUs             *prometheus.GaugeVec
160         mMemory            *prometheus.GaugeVec
161 }
162
163 // Subscribe returns a buffered channel that becomes ready after any
164 // change to the pool's state that could have scheduling implications:
165 // a worker's state changes, a new worker appears, the cloud
166 // provider's API rate limiting period ends, etc.
167 //
168 // Additional events that occur while the channel is already ready
169 // will be dropped, so it is OK if the caller services the channel
170 // slowly.
171 //
172 // Example:
173 //
174 //      ch := wp.Subscribe()
175 //      defer wp.Unsubscribe(ch)
176 //      for range ch {
177 //              tryScheduling(wp)
178 //              if done {
179 //                      break
180 //              }
181 //      }
182 func (wp *Pool) Subscribe() <-chan struct{} {
183         wp.setupOnce.Do(wp.setup)
184         wp.mtx.Lock()
185         defer wp.mtx.Unlock()
186         ch := make(chan struct{}, 1)
187         wp.subscribers[ch] = ch
188         return ch
189 }
190
191 // Unsubscribe stops sending updates to the given channel.
192 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
193         wp.setupOnce.Do(wp.setup)
194         wp.mtx.Lock()
195         defer wp.mtx.Unlock()
196         delete(wp.subscribers, ch)
197 }
198
199 // Unallocated returns the number of unallocated (creating + booting +
200 // idle + unknown) workers for each instance type.  Workers in
201 // hold/drain mode are not included.
202 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
203         wp.setupOnce.Do(wp.setup)
204         wp.mtx.RLock()
205         defer wp.mtx.RUnlock()
206         unalloc := map[arvados.InstanceType]int{}
207         creating := map[arvados.InstanceType]int{}
208         for it, times := range wp.creating {
209                 creating[it] = len(times)
210         }
211         for _, wkr := range wp.workers {
212                 // Skip workers that are not expected to become
213                 // available soon. Note len(wkr.running)>0 is not
214                 // redundant here: it can be true even in
215                 // StateUnknown.
216                 if wkr.state == StateShutdown ||
217                         wkr.state == StateRunning ||
218                         wkr.idleBehavior != IdleBehaviorRun ||
219                         len(wkr.running) > 0 {
220                         continue
221                 }
222                 it := wkr.instType
223                 unalloc[it]++
224                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) {
225                         // If up to N new workers appear in
226                         // Instances() while we are waiting for N
227                         // Create() calls to complete, we assume we're
228                         // just seeing a race between Instances() and
229                         // Create() responses.
230                         //
231                         // The other common reason why nodes have
232                         // state==Unknown is that they appeared at
233                         // startup, before any Create calls. They
234                         // don't match the above timing condition, so
235                         // we never mistakenly attribute them to
236                         // pending Create calls.
237                         creating[it]--
238                 }
239         }
240         for it, c := range creating {
241                 unalloc[it] += c
242         }
243         return unalloc
244 }
245
246 // Create a new instance with the given type, and add it to the worker
247 // pool. The worker is added immediately; instance creation runs in
248 // the background.
249 //
250 // Create returns false if a pre-existing error state prevents it from
251 // even attempting to create a new instance. Those errors are logged
252 // by the Pool, so the caller does not need to log anything in such
253 // cases.
254 func (wp *Pool) Create(it arvados.InstanceType) bool {
255         logger := wp.logger.WithField("InstanceType", it.Name)
256         wp.setupOnce.Do(wp.setup)
257         wp.mtx.Lock()
258         defer wp.mtx.Unlock()
259         if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
260                 return false
261         }
262         now := time.Now()
263         wp.creating[it] = append(wp.creating[it], now)
264         go func() {
265                 defer wp.notify()
266                 secret := randomHex(instanceSecretLength)
267                 tags := cloud.InstanceTags{
268                         tagKeyInstanceType:   it.Name,
269                         tagKeyIdleBehavior:   string(IdleBehaviorRun),
270                         tagKeyInstanceSecret: secret,
271                 }
272                 initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename))
273                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
274                 wp.mtx.Lock()
275                 defer wp.mtx.Unlock()
276                 // Remove our timestamp marker from wp.creating
277                 for i, t := range wp.creating[it] {
278                         if t == now {
279                                 copy(wp.creating[it][i:], wp.creating[it][i+1:])
280                                 wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1]
281                                 break
282                         }
283                 }
284                 if err != nil {
285                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
286                                 wp.atQuotaErr = err
287                                 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
288                                 time.AfterFunc(quotaErrorTTL, wp.notify)
289                         }
290                         logger.WithError(err).Error("create failed")
291                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
292                         return
293                 }
294                 wp.updateWorker(inst, it, StateBooting)
295         }()
296         return true
297 }
298
299 // AtQuota returns true if Create is not expected to work at the
300 // moment.
301 func (wp *Pool) AtQuota() bool {
302         wp.mtx.Lock()
303         defer wp.mtx.Unlock()
304         return time.Now().Before(wp.atQuotaUntil)
305 }
306
307 // SetIdleBehavior determines how the indicated instance will behave
308 // when it has no containers running.
309 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
310         wp.mtx.Lock()
311         defer wp.mtx.Unlock()
312         wkr, ok := wp.workers[id]
313         if !ok {
314                 return errors.New("requested instance does not exist")
315         }
316         wkr.idleBehavior = idleBehavior
317         wkr.saveTags()
318         wkr.shutdownIfIdle()
319         return nil
320 }
321
322 // Add or update worker attached to the given instance. Use
323 // initialState if a new worker is created.
324 //
325 // The second return value is true if a new worker is created.
326 //
327 // Caller must have lock.
328 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
329         inst = tagVerifier{inst}
330         id := inst.ID()
331         if wkr := wp.workers[id]; wkr != nil {
332                 wkr.executor.SetTarget(inst)
333                 wkr.instance = inst
334                 wkr.updated = time.Now()
335                 if initialState == StateBooting && wkr.state == StateUnknown {
336                         wkr.state = StateBooting
337                 }
338                 wkr.saveTags()
339                 return wkr, false
340         }
341
342         // If an instance has a valid IdleBehavior tag when it first
343         // appears, initialize the new worker accordingly (this is how
344         // we restore IdleBehavior that was set by a prior dispatch
345         // process); otherwise, default to "run". After this,
346         // wkr.idleBehavior is the source of truth, and will only be
347         // changed via SetIdleBehavior().
348         idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior])
349         if !validIdleBehavior[idleBehavior] {
350                 idleBehavior = IdleBehaviorRun
351         }
352
353         logger := wp.logger.WithFields(logrus.Fields{
354                 "InstanceType": it.Name,
355                 "Instance":     inst.ID(),
356                 "Address":      inst.Address(),
357         })
358         logger.WithFields(logrus.Fields{
359                 "State":        initialState,
360                 "IdleBehavior": idleBehavior,
361         }).Infof("instance appeared in cloud")
362         now := time.Now()
363         wkr := &worker{
364                 mtx:          &wp.mtx,
365                 wp:           wp,
366                 logger:       logger,
367                 executor:     wp.newExecutor(inst),
368                 state:        initialState,
369                 idleBehavior: idleBehavior,
370                 instance:     inst,
371                 instType:     it,
372                 appeared:     now,
373                 probed:       now,
374                 busy:         now,
375                 updated:      now,
376                 running:      make(map[string]struct{}),
377                 starting:     make(map[string]struct{}),
378                 probing:      make(chan struct{}, 1),
379         }
380         wp.workers[id] = wkr
381         return wkr, true
382 }
383
384 // caller must have lock.
385 func (wp *Pool) notifyExited(uuid string, t time.Time) {
386         wp.exited[uuid] = t
387 }
388
389 // Shutdown shuts down a worker with the given type, or returns false
390 // if all workers with the given type are busy.
391 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
392         wp.setupOnce.Do(wp.setup)
393         wp.mtx.Lock()
394         defer wp.mtx.Unlock()
395         logger := wp.logger.WithField("InstanceType", it.Name)
396         logger.Info("shutdown requested")
397         for _, tryState := range []State{StateBooting, StateIdle} {
398                 // TODO: shutdown the worker with the longest idle
399                 // time (Idle) or the earliest create time (Booting)
400                 for _, wkr := range wp.workers {
401                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
402                                 logger.WithField("Instance", wkr.instance).Info("shutting down")
403                                 wkr.shutdown()
404                                 return true
405                         }
406                 }
407         }
408         return false
409 }
410
411 // CountWorkers returns the current number of workers in each state.
412 func (wp *Pool) CountWorkers() map[State]int {
413         wp.setupOnce.Do(wp.setup)
414         wp.mtx.Lock()
415         defer wp.mtx.Unlock()
416         r := map[State]int{}
417         for _, w := range wp.workers {
418                 r[w.state]++
419         }
420         return r
421 }
422
423 // Running returns the container UUIDs being prepared/run on workers.
424 //
425 // In the returned map, the time value indicates when the Pool
426 // observed that the container process had exited. A container that
427 // has not yet exited has a zero time value. The caller should use
428 // KillContainer() to garbage-collect the entries for exited
429 // containers.
430 func (wp *Pool) Running() map[string]time.Time {
431         wp.setupOnce.Do(wp.setup)
432         wp.mtx.Lock()
433         defer wp.mtx.Unlock()
434         r := map[string]time.Time{}
435         for _, wkr := range wp.workers {
436                 for uuid := range wkr.running {
437                         r[uuid] = time.Time{}
438                 }
439                 for uuid := range wkr.starting {
440                         r[uuid] = time.Time{}
441                 }
442         }
443         for uuid, exited := range wp.exited {
444                 r[uuid] = exited
445         }
446         return r
447 }
448
449 // StartContainer starts a container on an idle worker immediately if
450 // possible, otherwise returns false.
451 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
452         wp.setupOnce.Do(wp.setup)
453         wp.mtx.Lock()
454         defer wp.mtx.Unlock()
455         var wkr *worker
456         for _, w := range wp.workers {
457                 if w.instType == it && w.state == StateIdle {
458                         if wkr == nil || w.busy.After(wkr.busy) {
459                                 wkr = w
460                         }
461                 }
462         }
463         if wkr == nil {
464                 return false
465         }
466         wkr.startContainer(ctr)
467         return true
468 }
469
470 // KillContainer kills the crunch-run process for the given container
471 // UUID, if it's running on any worker.
472 //
473 // KillContainer returns immediately; the act of killing the container
474 // takes some time, and runs in the background.
475 func (wp *Pool) KillContainer(uuid string) {
476         wp.mtx.Lock()
477         defer wp.mtx.Unlock()
478         if _, ok := wp.exited[uuid]; ok {
479                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
480                 delete(wp.exited, uuid)
481                 return
482         }
483         for _, wkr := range wp.workers {
484                 if _, ok := wkr.running[uuid]; ok {
485                         go wp.kill(wkr, uuid)
486                         return
487                 }
488         }
489         wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
490 }
491
492 func (wp *Pool) kill(wkr *worker, uuid string) {
493         logger := wp.logger.WithFields(logrus.Fields{
494                 "ContainerUUID": uuid,
495                 "Instance":      wkr.instance.ID(),
496         })
497         logger.Debug("killing process")
498         cmd := "crunch-run --kill 15 " + uuid
499         if u := wkr.instance.RemoteUser(); u != "root" {
500                 cmd = "sudo " + cmd
501         }
502         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
503         if err != nil {
504                 logger.WithFields(logrus.Fields{
505                         "stderr": string(stderr),
506                         "stdout": string(stdout),
507                         "error":  err,
508                 }).Warn("kill failed")
509                 return
510         }
511         logger.Debug("killing process succeeded")
512         wp.mtx.Lock()
513         defer wp.mtx.Unlock()
514         if _, ok := wkr.running[uuid]; ok {
515                 delete(wkr.running, uuid)
516                 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
517                         wkr.state = StateIdle
518                 }
519                 wkr.updated = time.Now()
520                 go wp.notify()
521         }
522 }
523
524 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
525         if reg == nil {
526                 reg = prometheus.NewRegistry()
527         }
528         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
529                 Namespace: "arvados",
530                 Subsystem: "dispatchcloud",
531                 Name:      "containers_running",
532                 Help:      "Number of containers reported running by cloud VMs.",
533         })
534         reg.MustRegister(wp.mContainersRunning)
535         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
536                 Namespace: "arvados",
537                 Subsystem: "dispatchcloud",
538                 Name:      "instances_total",
539                 Help:      "Number of cloud VMs.",
540         }, []string{"category"})
541         reg.MustRegister(wp.mInstances)
542         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
543                 Namespace: "arvados",
544                 Subsystem: "dispatchcloud",
545                 Name:      "instances_price",
546                 Help:      "Price of cloud VMs.",
547         }, []string{"category"})
548         reg.MustRegister(wp.mInstancesPrice)
549         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
550                 Namespace: "arvados",
551                 Subsystem: "dispatchcloud",
552                 Name:      "vcpus_total",
553                 Help:      "Total VCPUs on all cloud VMs.",
554         }, []string{"category"})
555         reg.MustRegister(wp.mVCPUs)
556         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
557                 Namespace: "arvados",
558                 Subsystem: "dispatchcloud",
559                 Name:      "memory_bytes_total",
560                 Help:      "Total memory on all cloud VMs.",
561         }, []string{"category"})
562         reg.MustRegister(wp.mMemory)
563 }
564
565 func (wp *Pool) runMetrics() {
566         ch := wp.Subscribe()
567         defer wp.Unsubscribe(ch)
568         for range ch {
569                 wp.updateMetrics()
570         }
571 }
572
573 func (wp *Pool) updateMetrics() {
574         wp.mtx.RLock()
575         defer wp.mtx.RUnlock()
576
577         instances := map[string]int64{}
578         price := map[string]float64{}
579         cpu := map[string]int64{}
580         mem := map[string]int64{}
581         var running int64
582         for _, wkr := range wp.workers {
583                 var cat string
584                 switch {
585                 case len(wkr.running)+len(wkr.starting) > 0:
586                         cat = "inuse"
587                 case wkr.idleBehavior == IdleBehaviorHold:
588                         cat = "hold"
589                 case wkr.state == StateBooting:
590                         cat = "booting"
591                 case wkr.state == StateUnknown:
592                         cat = "unknown"
593                 default:
594                         cat = "idle"
595                 }
596                 instances[cat]++
597                 price[cat] += wkr.instType.Price
598                 cpu[cat] += int64(wkr.instType.VCPUs)
599                 mem[cat] += int64(wkr.instType.RAM)
600                 running += int64(len(wkr.running) + len(wkr.starting))
601         }
602         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
603                 wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat]))
604                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
605                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
606                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
607         }
608         wp.mContainersRunning.Set(float64(running))
609 }
610
611 func (wp *Pool) runProbes() {
612         maxPPS := wp.maxProbesPerSecond
613         if maxPPS < 1 {
614                 maxPPS = defaultMaxProbesPerSecond
615         }
616         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
617         defer limitticker.Stop()
618
619         probeticker := time.NewTicker(wp.probeInterval)
620         defer probeticker.Stop()
621
622         workers := []cloud.InstanceID{}
623         for range probeticker.C {
624                 workers = workers[:0]
625                 wp.mtx.Lock()
626                 for id, wkr := range wp.workers {
627                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
628                                 continue
629                         }
630                         workers = append(workers, id)
631                 }
632                 wp.mtx.Unlock()
633
634                 for _, id := range workers {
635                         wp.mtx.Lock()
636                         wkr, ok := wp.workers[id]
637                         wp.mtx.Unlock()
638                         if !ok {
639                                 // Deleted while we were probing
640                                 // others
641                                 continue
642                         }
643                         go wkr.ProbeAndUpdate()
644                         select {
645                         case <-wp.stop:
646                                 return
647                         case <-limitticker.C:
648                         }
649                 }
650         }
651 }
652
653 func (wp *Pool) runSync() {
654         // sync once immediately, then wait syncInterval, sync again,
655         // etc.
656         timer := time.NewTimer(1)
657         for {
658                 select {
659                 case <-timer.C:
660                         err := wp.getInstancesAndSync()
661                         if err != nil {
662                                 wp.logger.WithError(err).Warn("sync failed")
663                         }
664                         timer.Reset(wp.syncInterval)
665                 case <-wp.stop:
666                         wp.logger.Debug("worker.Pool stopped")
667                         return
668                 }
669         }
670 }
671
672 // Stop synchronizing with the InstanceSet.
673 func (wp *Pool) Stop() {
674         wp.setupOnce.Do(wp.setup)
675         close(wp.stop)
676 }
677
678 // Instances returns an InstanceView for each worker in the pool,
679 // summarizing its current state and recent activity.
680 func (wp *Pool) Instances() []InstanceView {
681         var r []InstanceView
682         wp.setupOnce.Do(wp.setup)
683         wp.mtx.Lock()
684         for _, w := range wp.workers {
685                 r = append(r, InstanceView{
686                         Instance:             w.instance.ID(),
687                         Address:              w.instance.Address(),
688                         Price:                w.instType.Price,
689                         ArvadosInstanceType:  w.instType.Name,
690                         ProviderInstanceType: w.instType.ProviderType,
691                         LastContainerUUID:    w.lastUUID,
692                         LastBusy:             w.busy,
693                         WorkerState:          w.state.String(),
694                         IdleBehavior:         w.idleBehavior,
695                 })
696         }
697         wp.mtx.Unlock()
698         sort.Slice(r, func(i, j int) bool {
699                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
700         })
701         return r
702 }
703
704 func (wp *Pool) setup() {
705         wp.creating = map[arvados.InstanceType][]time.Time{}
706         wp.exited = map[string]time.Time{}
707         wp.workers = map[cloud.InstanceID]*worker{}
708         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
709 }
710
711 func (wp *Pool) notify() {
712         wp.mtx.RLock()
713         defer wp.mtx.RUnlock()
714         for _, send := range wp.subscribers {
715                 select {
716                 case send <- struct{}{}:
717                 default:
718                 }
719         }
720 }
721
722 func (wp *Pool) getInstancesAndSync() error {
723         wp.setupOnce.Do(wp.setup)
724         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
725                 return err
726         }
727         wp.logger.Debug("getting instance list")
728         threshold := time.Now()
729         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
730         if err != nil {
731                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
732                 return err
733         }
734         wp.sync(threshold, instances)
735         wp.logger.Debug("sync done")
736         return nil
737 }
738
739 // Add/remove/update workers based on instances, which was obtained
740 // from the instanceSet. However, don't clobber any other updates that
741 // already happened after threshold.
742 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
743         wp.mtx.Lock()
744         defer wp.mtx.Unlock()
745         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
746         notify := false
747
748         for _, inst := range instances {
749                 itTag := inst.Tags()[tagKeyInstanceType]
750                 it, ok := wp.instanceTypes[itTag]
751                 if !ok {
752                         wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
753                         continue
754                 }
755                 if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew {
756                         notify = true
757                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
758                         wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
759                         wkr.shutdown()
760                 }
761         }
762
763         for id, wkr := range wp.workers {
764                 if wkr.updated.After(threshold) {
765                         continue
766                 }
767                 logger := wp.logger.WithFields(logrus.Fields{
768                         "Instance":    wkr.instance.ID(),
769                         "WorkerState": wkr.state,
770                 })
771                 logger.Info("instance disappeared in cloud")
772                 delete(wp.workers, id)
773                 go wkr.executor.Close()
774                 notify = true
775         }
776
777         if !wp.loaded {
778                 wp.loaded = true
779                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
780         }
781
782         if notify {
783                 go wp.notify()
784         }
785 }
786
787 // Return a random string of n hexadecimal digits (n*4 random bits). n
788 // must be even.
789 func randomHex(n int) string {
790         buf := make([]byte, n/2)
791         _, err := rand.Read(buf)
792         if err != nil {
793                 panic(err)
794         }
795         return fmt.Sprintf("%x", buf)
796 }