8616d6e9a5d9de42d3b36c966f44362d2b632fbd
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/rand"
9         "errors"
10         "fmt"
11         "io"
12         "sort"
13         "strings"
14         "sync"
15         "time"
16
17         "git.curoverse.com/arvados.git/lib/cloud"
18         "git.curoverse.com/arvados.git/sdk/go/arvados"
19         "github.com/prometheus/client_golang/prometheus"
20         "github.com/sirupsen/logrus"
21         "golang.org/x/crypto/ssh"
22 )
23
24 const (
25         tagKeyInstanceType   = "InstanceType"
26         tagKeyIdleBehavior   = "IdleBehavior"
27         tagKeyInstanceSecret = "InstanceSecret"
28         tagKeyInstanceSetID  = "InstanceSetID"
29 )
30
31 // An InstanceView shows a worker's current state and recent activity.
32 type InstanceView struct {
33         Instance             cloud.InstanceID `json:"instance"`
34         Address              string           `json:"address"`
35         Price                float64          `json:"price"`
36         ArvadosInstanceType  string           `json:"arvados_instance_type"`
37         ProviderInstanceType string           `json:"provider_instance_type"`
38         LastContainerUUID    string           `json:"last_container_uuid"`
39         LastBusy             time.Time        `json:"last_busy"`
40         WorkerState          string           `json:"worker_state"`
41         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
42 }
43
44 // An Executor executes shell commands on a remote host.
45 type Executor interface {
46         // Run cmd on the current target.
47         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
48
49         // Use the given target for subsequent operations. The new
50         // target is the same host as the previous target, but it
51         // might return a different address and verify a different
52         // host key.
53         //
54         // SetTarget is called frequently, and in most cases the new
55         // target will behave exactly the same as the old one. An
56         // implementation should optimize accordingly.
57         //
58         // SetTarget must not block on concurrent Execute calls.
59         SetTarget(cloud.ExecutorTarget)
60
61         Close()
62 }
63
64 const (
65         defaultSyncInterval       = time.Minute
66         defaultProbeInterval      = time.Second * 10
67         defaultMaxProbesPerSecond = 10
68         defaultTimeoutIdle        = time.Minute
69         defaultTimeoutBooting     = time.Minute * 10
70         defaultTimeoutProbe       = time.Minute * 10
71         defaultTimeoutShutdown    = time.Second * 10
72         defaultTimeoutTERM        = time.Minute * 2
73         defaultTimeoutSignal      = time.Second * 5
74
75         // Time after a quota error to try again anyway, even if no
76         // instances have been shutdown.
77         quotaErrorTTL = time.Minute
78
79         // Time between "X failed because rate limiting" messages
80         logRateLimitErrorInterval = time.Second * 10
81 )
82
83 func duration(conf arvados.Duration, def time.Duration) time.Duration {
84         if conf > 0 {
85                 return time.Duration(conf)
86         } else {
87                 return def
88         }
89 }
90
91 // NewPool creates a Pool of workers backed by instanceSet.
92 //
93 // New instances are configured and set up according to the given
94 // cluster configuration.
95 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
96         wp := &Pool{
97                 logger:             logger,
98                 arvClient:          arvClient,
99                 instanceSetID:      instanceSetID,
100                 instanceSet:        &throttledInstanceSet{InstanceSet: instanceSet},
101                 newExecutor:        newExecutor,
102                 bootProbeCommand:   cluster.Containers.CloudVMs.BootProbeCommand,
103                 imageID:            cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
104                 instanceTypes:      cluster.InstanceTypes,
105                 maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
106                 probeInterval:      duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
107                 syncInterval:       duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
108                 timeoutIdle:        duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
109                 timeoutBooting:     duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
110                 timeoutProbe:       duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
111                 timeoutShutdown:    duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
112                 timeoutTERM:        duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
113                 timeoutSignal:      duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
114                 installPublicKey:   installPublicKey,
115                 tagKeyPrefix:       cluster.Containers.CloudVMs.TagKeyPrefix,
116                 stop:               make(chan bool),
117         }
118         wp.registerMetrics(reg)
119         go func() {
120                 wp.setupOnce.Do(wp.setup)
121                 go wp.runMetrics()
122                 go wp.runProbes()
123                 go wp.runSync()
124         }()
125         return wp
126 }
127
128 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
129 // zero Pool should not be used. Call NewPool to create a new Pool.
130 type Pool struct {
131         // configuration
132         logger             logrus.FieldLogger
133         arvClient          *arvados.Client
134         instanceSetID      cloud.InstanceSetID
135         instanceSet        *throttledInstanceSet
136         newExecutor        func(cloud.Instance) Executor
137         bootProbeCommand   string
138         imageID            cloud.ImageID
139         instanceTypes      map[string]arvados.InstanceType
140         syncInterval       time.Duration
141         probeInterval      time.Duration
142         maxProbesPerSecond int
143         timeoutIdle        time.Duration
144         timeoutBooting     time.Duration
145         timeoutProbe       time.Duration
146         timeoutShutdown    time.Duration
147         timeoutTERM        time.Duration
148         timeoutSignal      time.Duration
149         installPublicKey   ssh.PublicKey
150         tagKeyPrefix       string
151
152         // private state
153         subscribers  map[<-chan struct{}]chan<- struct{}
154         creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
155         workers      map[cloud.InstanceID]*worker
156         loaded       bool                 // loaded list of instances from InstanceSet at least once
157         exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
158         atQuotaUntil time.Time
159         atQuotaErr   cloud.QuotaError
160         stop         chan bool
161         mtx          sync.RWMutex
162         setupOnce    sync.Once
163
164         throttleCreate    throttle
165         throttleInstances throttle
166
167         mContainersRunning prometheus.Gauge
168         mInstances         *prometheus.GaugeVec
169         mInstancesPrice    *prometheus.GaugeVec
170         mVCPUs             *prometheus.GaugeVec
171         mMemory            *prometheus.GaugeVec
172         mDisappearances    *prometheus.CounterVec
173 }
174
175 type createCall struct {
176         time         time.Time
177         instanceType arvados.InstanceType
178 }
179
180 // Subscribe returns a buffered channel that becomes ready after any
181 // change to the pool's state that could have scheduling implications:
182 // a worker's state changes, a new worker appears, the cloud
183 // provider's API rate limiting period ends, etc.
184 //
185 // Additional events that occur while the channel is already ready
186 // will be dropped, so it is OK if the caller services the channel
187 // slowly.
188 //
189 // Example:
190 //
191 //      ch := wp.Subscribe()
192 //      defer wp.Unsubscribe(ch)
193 //      for range ch {
194 //              tryScheduling(wp)
195 //              if done {
196 //                      break
197 //              }
198 //      }
199 func (wp *Pool) Subscribe() <-chan struct{} {
200         wp.setupOnce.Do(wp.setup)
201         wp.mtx.Lock()
202         defer wp.mtx.Unlock()
203         ch := make(chan struct{}, 1)
204         wp.subscribers[ch] = ch
205         return ch
206 }
207
208 // Unsubscribe stops sending updates to the given channel.
209 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
210         wp.setupOnce.Do(wp.setup)
211         wp.mtx.Lock()
212         defer wp.mtx.Unlock()
213         delete(wp.subscribers, ch)
214 }
215
216 // Unallocated returns the number of unallocated (creating + booting +
217 // idle + unknown) workers for each instance type.  Workers in
218 // hold/drain mode are not included.
219 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
220         wp.setupOnce.Do(wp.setup)
221         wp.mtx.RLock()
222         defer wp.mtx.RUnlock()
223         unalloc := map[arvados.InstanceType]int{}
224         creating := map[arvados.InstanceType]int{}
225         oldestCreate := map[arvados.InstanceType]time.Time{}
226         for _, cc := range wp.creating {
227                 it := cc.instanceType
228                 creating[it]++
229                 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
230                         oldestCreate[it] = cc.time
231                 }
232         }
233         for _, wkr := range wp.workers {
234                 // Skip workers that are not expected to become
235                 // available soon. Note len(wkr.running)>0 is not
236                 // redundant here: it can be true even in
237                 // StateUnknown.
238                 if wkr.state == StateShutdown ||
239                         wkr.state == StateRunning ||
240                         wkr.idleBehavior != IdleBehaviorRun ||
241                         len(wkr.running) > 0 {
242                         continue
243                 }
244                 it := wkr.instType
245                 unalloc[it]++
246                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
247                         // If up to N new workers appear in
248                         // Instances() while we are waiting for N
249                         // Create() calls to complete, we assume we're
250                         // just seeing a race between Instances() and
251                         // Create() responses.
252                         //
253                         // The other common reason why nodes have
254                         // state==Unknown is that they appeared at
255                         // startup, before any Create calls. They
256                         // don't match the above timing condition, so
257                         // we never mistakenly attribute them to
258                         // pending Create calls.
259                         creating[it]--
260                 }
261         }
262         for it, c := range creating {
263                 unalloc[it] += c
264         }
265         return unalloc
266 }
267
268 // Create a new instance with the given type, and add it to the worker
269 // pool. The worker is added immediately; instance creation runs in
270 // the background.
271 //
272 // Create returns false if a pre-existing error state prevents it from
273 // even attempting to create a new instance. Those errors are logged
274 // by the Pool, so the caller does not need to log anything in such
275 // cases.
276 func (wp *Pool) Create(it arvados.InstanceType) bool {
277         logger := wp.logger.WithField("InstanceType", it.Name)
278         wp.setupOnce.Do(wp.setup)
279         wp.mtx.Lock()
280         defer wp.mtx.Unlock()
281         if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
282                 return false
283         }
284         now := time.Now()
285         secret := randomHex(instanceSecretLength)
286         wp.creating[secret] = createCall{time: now, instanceType: it}
287         go func() {
288                 defer wp.notify()
289                 tags := cloud.InstanceTags{
290                         wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
291                         wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
292                         wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
293                         wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
294                 }
295                 initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename))
296                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
297                 wp.mtx.Lock()
298                 defer wp.mtx.Unlock()
299                 // delete() is deferred so the updateWorker() call
300                 // below knows to use StateBooting when adding a new
301                 // worker.
302                 defer delete(wp.creating, secret)
303                 if err != nil {
304                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
305                                 wp.atQuotaErr = err
306                                 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
307                                 time.AfterFunc(quotaErrorTTL, wp.notify)
308                         }
309                         logger.WithError(err).Error("create failed")
310                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
311                         return
312                 }
313                 wp.updateWorker(inst, it)
314         }()
315         return true
316 }
317
318 // AtQuota returns true if Create is not expected to work at the
319 // moment.
320 func (wp *Pool) AtQuota() bool {
321         wp.mtx.Lock()
322         defer wp.mtx.Unlock()
323         return time.Now().Before(wp.atQuotaUntil)
324 }
325
326 // SetIdleBehavior determines how the indicated instance will behave
327 // when it has no containers running.
328 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
329         wp.mtx.Lock()
330         defer wp.mtx.Unlock()
331         wkr, ok := wp.workers[id]
332         if !ok {
333                 return errors.New("requested instance does not exist")
334         }
335         wkr.setIdleBehavior(idleBehavior)
336         return nil
337 }
338
339 // Add or update worker attached to the given instance.
340 //
341 // The second return value is true if a new worker is created.
342 //
343 // A newly added instance has state=StateBooting if its tags match an
344 // entry in wp.creating, otherwise StateUnknown.
345 //
346 // Caller must have lock.
347 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
348         secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
349         inst = tagVerifier{inst, secret}
350         id := inst.ID()
351         if wkr := wp.workers[id]; wkr != nil {
352                 wkr.executor.SetTarget(inst)
353                 wkr.instance = inst
354                 wkr.updated = time.Now()
355                 wkr.saveTags()
356                 return wkr, false
357         }
358
359         state := StateUnknown
360         if _, ok := wp.creating[secret]; ok {
361                 state = StateBooting
362         }
363
364         // If an instance has a valid IdleBehavior tag when it first
365         // appears, initialize the new worker accordingly (this is how
366         // we restore IdleBehavior that was set by a prior dispatch
367         // process); otherwise, default to "run". After this,
368         // wkr.idleBehavior is the source of truth, and will only be
369         // changed via SetIdleBehavior().
370         idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
371         if !validIdleBehavior[idleBehavior] {
372                 idleBehavior = IdleBehaviorRun
373         }
374
375         logger := wp.logger.WithFields(logrus.Fields{
376                 "InstanceType": it.Name,
377                 "Instance":     inst.ID(),
378                 "Address":      inst.Address(),
379         })
380         logger.WithFields(logrus.Fields{
381                 "State":        state,
382                 "IdleBehavior": idleBehavior,
383         }).Infof("instance appeared in cloud")
384         now := time.Now()
385         wkr := &worker{
386                 mtx:          &wp.mtx,
387                 wp:           wp,
388                 logger:       logger,
389                 executor:     wp.newExecutor(inst),
390                 state:        state,
391                 idleBehavior: idleBehavior,
392                 instance:     inst,
393                 instType:     it,
394                 appeared:     now,
395                 probed:       now,
396                 busy:         now,
397                 updated:      now,
398                 running:      make(map[string]*remoteRunner),
399                 starting:     make(map[string]*remoteRunner),
400                 probing:      make(chan struct{}, 1),
401         }
402         wp.workers[id] = wkr
403         return wkr, true
404 }
405
406 // Shutdown shuts down a worker with the given type, or returns false
407 // if all workers with the given type are busy.
408 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
409         wp.setupOnce.Do(wp.setup)
410         wp.mtx.Lock()
411         defer wp.mtx.Unlock()
412         logger := wp.logger.WithField("InstanceType", it.Name)
413         logger.Info("shutdown requested")
414         for _, tryState := range []State{StateBooting, StateIdle} {
415                 // TODO: shutdown the worker with the longest idle
416                 // time (Idle) or the earliest create time (Booting)
417                 for _, wkr := range wp.workers {
418                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
419                                 logger.WithField("Instance", wkr.instance).Info("shutting down")
420                                 wkr.shutdown()
421                                 return true
422                         }
423                 }
424         }
425         return false
426 }
427
428 // CountWorkers returns the current number of workers in each state.
429 //
430 // CountWorkers blocks, if necessary, until the initial instance list
431 // has been loaded from the cloud provider.
432 func (wp *Pool) CountWorkers() map[State]int {
433         wp.setupOnce.Do(wp.setup)
434         wp.waitUntilLoaded()
435         wp.mtx.Lock()
436         defer wp.mtx.Unlock()
437         r := map[State]int{}
438         for _, w := range wp.workers {
439                 r[w.state]++
440         }
441         return r
442 }
443
444 // Running returns the container UUIDs being prepared/run on workers.
445 //
446 // In the returned map, the time value indicates when the Pool
447 // observed that the container process had exited. A container that
448 // has not yet exited has a zero time value. The caller should use
449 // KillContainer() to garbage-collect the entries for exited
450 // containers.
451 func (wp *Pool) Running() map[string]time.Time {
452         wp.setupOnce.Do(wp.setup)
453         wp.mtx.Lock()
454         defer wp.mtx.Unlock()
455         r := map[string]time.Time{}
456         for _, wkr := range wp.workers {
457                 for uuid := range wkr.running {
458                         r[uuid] = time.Time{}
459                 }
460                 for uuid := range wkr.starting {
461                         r[uuid] = time.Time{}
462                 }
463         }
464         for uuid, exited := range wp.exited {
465                 r[uuid] = exited
466         }
467         return r
468 }
469
470 // StartContainer starts a container on an idle worker immediately if
471 // possible, otherwise returns false.
472 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
473         wp.setupOnce.Do(wp.setup)
474         wp.mtx.Lock()
475         defer wp.mtx.Unlock()
476         var wkr *worker
477         for _, w := range wp.workers {
478                 if w.instType == it && w.state == StateIdle {
479                         if wkr == nil || w.busy.After(wkr.busy) {
480                                 wkr = w
481                         }
482                 }
483         }
484         if wkr == nil {
485                 return false
486         }
487         wkr.startContainer(ctr)
488         return true
489 }
490
491 // KillContainer kills the crunch-run process for the given container
492 // UUID, if it's running on any worker.
493 //
494 // KillContainer returns immediately; the act of killing the container
495 // takes some time, and runs in the background.
496 func (wp *Pool) KillContainer(uuid string, reason string) {
497         wp.mtx.Lock()
498         defer wp.mtx.Unlock()
499         logger := wp.logger.WithFields(logrus.Fields{
500                 "ContainerUUID": uuid,
501                 "Reason":        reason,
502         })
503         if _, ok := wp.exited[uuid]; ok {
504                 logger.Debug("clearing placeholder for exited crunch-run process")
505                 delete(wp.exited, uuid)
506                 return
507         }
508         for _, wkr := range wp.workers {
509                 rr := wkr.running[uuid]
510                 if rr == nil {
511                         rr = wkr.starting[uuid]
512                 }
513                 if rr != nil {
514                         rr.Kill(reason)
515                         return
516                 }
517         }
518         logger.Debug("cannot kill: already disappeared")
519 }
520
521 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
522         if reg == nil {
523                 reg = prometheus.NewRegistry()
524         }
525         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
526                 Namespace: "arvados",
527                 Subsystem: "dispatchcloud",
528                 Name:      "containers_running",
529                 Help:      "Number of containers reported running by cloud VMs.",
530         })
531         reg.MustRegister(wp.mContainersRunning)
532         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
533                 Namespace: "arvados",
534                 Subsystem: "dispatchcloud",
535                 Name:      "instances_total",
536                 Help:      "Number of cloud VMs.",
537         }, []string{"category"})
538         reg.MustRegister(wp.mInstances)
539         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
540                 Namespace: "arvados",
541                 Subsystem: "dispatchcloud",
542                 Name:      "instances_price",
543                 Help:      "Price of cloud VMs.",
544         }, []string{"category"})
545         reg.MustRegister(wp.mInstancesPrice)
546         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
547                 Namespace: "arvados",
548                 Subsystem: "dispatchcloud",
549                 Name:      "vcpus_total",
550                 Help:      "Total VCPUs on all cloud VMs.",
551         }, []string{"category"})
552         reg.MustRegister(wp.mVCPUs)
553         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
554                 Namespace: "arvados",
555                 Subsystem: "dispatchcloud",
556                 Name:      "memory_bytes_total",
557                 Help:      "Total memory on all cloud VMs.",
558         }, []string{"category"})
559         reg.MustRegister(wp.mMemory)
560         wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
561                 Namespace: "arvados",
562                 Subsystem: "dispatchcloud",
563                 Name:      "instances_disappeared",
564                 Help:      "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
565         }, []string{"state"})
566         for _, v := range stateString {
567                 wp.mDisappearances.WithLabelValues(v).Add(0)
568         }
569         reg.MustRegister(wp.mDisappearances)
570 }
571
572 func (wp *Pool) runMetrics() {
573         ch := wp.Subscribe()
574         defer wp.Unsubscribe(ch)
575         wp.updateMetrics()
576         for range ch {
577                 wp.updateMetrics()
578         }
579 }
580
581 func (wp *Pool) updateMetrics() {
582         wp.mtx.RLock()
583         defer wp.mtx.RUnlock()
584
585         instances := map[string]int64{}
586         price := map[string]float64{}
587         cpu := map[string]int64{}
588         mem := map[string]int64{}
589         var running int64
590         for _, wkr := range wp.workers {
591                 var cat string
592                 switch {
593                 case len(wkr.running)+len(wkr.starting) > 0:
594                         cat = "inuse"
595                 case wkr.idleBehavior == IdleBehaviorHold:
596                         cat = "hold"
597                 case wkr.state == StateBooting:
598                         cat = "booting"
599                 case wkr.state == StateUnknown:
600                         cat = "unknown"
601                 default:
602                         cat = "idle"
603                 }
604                 instances[cat]++
605                 price[cat] += wkr.instType.Price
606                 cpu[cat] += int64(wkr.instType.VCPUs)
607                 mem[cat] += int64(wkr.instType.RAM)
608                 running += int64(len(wkr.running) + len(wkr.starting))
609         }
610         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
611                 wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat]))
612                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
613                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
614                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
615         }
616         wp.mContainersRunning.Set(float64(running))
617 }
618
619 func (wp *Pool) runProbes() {
620         maxPPS := wp.maxProbesPerSecond
621         if maxPPS < 1 {
622                 maxPPS = defaultMaxProbesPerSecond
623         }
624         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
625         defer limitticker.Stop()
626
627         probeticker := time.NewTicker(wp.probeInterval)
628         defer probeticker.Stop()
629
630         workers := []cloud.InstanceID{}
631         for range probeticker.C {
632                 workers = workers[:0]
633                 wp.mtx.Lock()
634                 for id, wkr := range wp.workers {
635                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
636                                 continue
637                         }
638                         workers = append(workers, id)
639                 }
640                 wp.mtx.Unlock()
641
642                 for _, id := range workers {
643                         wp.mtx.Lock()
644                         wkr, ok := wp.workers[id]
645                         wp.mtx.Unlock()
646                         if !ok {
647                                 // Deleted while we were probing
648                                 // others
649                                 continue
650                         }
651                         go wkr.ProbeAndUpdate()
652                         select {
653                         case <-wp.stop:
654                                 return
655                         case <-limitticker.C:
656                         }
657                 }
658         }
659 }
660
661 func (wp *Pool) runSync() {
662         // sync once immediately, then wait syncInterval, sync again,
663         // etc.
664         timer := time.NewTimer(1)
665         for {
666                 select {
667                 case <-timer.C:
668                         err := wp.getInstancesAndSync()
669                         if err != nil {
670                                 wp.logger.WithError(err).Warn("sync failed")
671                         }
672                         timer.Reset(wp.syncInterval)
673                 case <-wp.stop:
674                         wp.logger.Debug("worker.Pool stopped")
675                         return
676                 }
677         }
678 }
679
680 // Stop synchronizing with the InstanceSet.
681 func (wp *Pool) Stop() {
682         wp.setupOnce.Do(wp.setup)
683         close(wp.stop)
684 }
685
686 // Instances returns an InstanceView for each worker in the pool,
687 // summarizing its current state and recent activity.
688 func (wp *Pool) Instances() []InstanceView {
689         var r []InstanceView
690         wp.setupOnce.Do(wp.setup)
691         wp.mtx.Lock()
692         for _, w := range wp.workers {
693                 r = append(r, InstanceView{
694                         Instance:             w.instance.ID(),
695                         Address:              w.instance.Address(),
696                         Price:                w.instType.Price,
697                         ArvadosInstanceType:  w.instType.Name,
698                         ProviderInstanceType: w.instType.ProviderType,
699                         LastContainerUUID:    w.lastUUID,
700                         LastBusy:             w.busy,
701                         WorkerState:          w.state.String(),
702                         IdleBehavior:         w.idleBehavior,
703                 })
704         }
705         wp.mtx.Unlock()
706         sort.Slice(r, func(i, j int) bool {
707                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
708         })
709         return r
710 }
711
712 // KillInstance destroys a cloud VM instance. It returns an error if
713 // the given instance does not exist.
714 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
715         wkr, ok := wp.workers[id]
716         if !ok {
717                 return errors.New("instance not found")
718         }
719         wkr.logger.WithField("Reason", reason).Info("shutting down")
720         wkr.shutdown()
721         return nil
722 }
723
724 func (wp *Pool) setup() {
725         wp.creating = map[string]createCall{}
726         wp.exited = map[string]time.Time{}
727         wp.workers = map[cloud.InstanceID]*worker{}
728         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
729 }
730
731 func (wp *Pool) notify() {
732         wp.mtx.RLock()
733         defer wp.mtx.RUnlock()
734         for _, send := range wp.subscribers {
735                 select {
736                 case send <- struct{}{}:
737                 default:
738                 }
739         }
740 }
741
742 func (wp *Pool) getInstancesAndSync() error {
743         wp.setupOnce.Do(wp.setup)
744         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
745                 return err
746         }
747         wp.logger.Debug("getting instance list")
748         threshold := time.Now()
749         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
750         if err != nil {
751                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
752                 return err
753         }
754         wp.sync(threshold, instances)
755         wp.logger.Debug("sync done")
756         return nil
757 }
758
759 // Add/remove/update workers based on instances, which was obtained
760 // from the instanceSet. However, don't clobber any other updates that
761 // already happened after threshold.
762 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
763         wp.mtx.Lock()
764         defer wp.mtx.Unlock()
765         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
766         notify := false
767
768         for _, inst := range instances {
769                 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
770                 it, ok := wp.instanceTypes[itTag]
771                 if !ok {
772                         wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
773                         continue
774                 }
775                 if wkr, isNew := wp.updateWorker(inst, it); isNew {
776                         notify = true
777                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
778                         wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
779                         wkr.shutdown()
780                 }
781         }
782
783         for id, wkr := range wp.workers {
784                 if wkr.updated.After(threshold) {
785                         continue
786                 }
787                 logger := wp.logger.WithFields(logrus.Fields{
788                         "Instance":    wkr.instance.ID(),
789                         "WorkerState": wkr.state,
790                 })
791                 logger.Info("instance disappeared in cloud")
792                 wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
793                 delete(wp.workers, id)
794                 go wkr.Close()
795                 notify = true
796         }
797
798         if !wp.loaded {
799                 notify = true
800                 wp.loaded = true
801                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
802         }
803
804         if notify {
805                 go wp.notify()
806         }
807 }
808
809 func (wp *Pool) waitUntilLoaded() {
810         ch := wp.Subscribe()
811         wp.mtx.RLock()
812         defer wp.mtx.RUnlock()
813         for !wp.loaded {
814                 wp.mtx.RUnlock()
815                 <-ch
816                 wp.mtx.RLock()
817         }
818 }
819
820 // Return a random string of n hexadecimal digits (n*4 random bits). n
821 // must be even.
822 func randomHex(n int) string {
823         buf := make([]byte, n/2)
824         _, err := rand.Read(buf)
825         if err != nil {
826                 panic(err)
827         }
828         return fmt.Sprintf("%x", buf)
829 }