1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
17 "git.curoverse.com/arvados.git/lib/cloud"
18 "git.curoverse.com/arvados.git/sdk/go/arvados"
19 "github.com/prometheus/client_golang/prometheus"
20 "github.com/sirupsen/logrus"
21 "golang.org/x/crypto/ssh"
25 tagKeyInstanceType = "InstanceType"
26 tagKeyIdleBehavior = "IdleBehavior"
27 tagKeyInstanceSecret = "InstanceSecret"
30 // An InstanceView shows a worker's current state and recent activity.
31 type InstanceView struct {
32 Instance cloud.InstanceID `json:"instance"`
33 Address string `json:"address"`
34 Price float64 `json:"price"`
35 ArvadosInstanceType string `json:"arvados_instance_type"`
36 ProviderInstanceType string `json:"provider_instance_type"`
37 LastContainerUUID string `json:"last_container_uuid"`
38 LastBusy time.Time `json:"last_busy"`
39 WorkerState string `json:"worker_state"`
40 IdleBehavior IdleBehavior `json:"idle_behavior"`
43 // An Executor executes shell commands on a remote host.
44 type Executor interface {
45 // Run cmd on the current target.
46 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
48 // Use the given target for subsequent operations. The new
49 // target is the same host as the previous target, but it
50 // might return a different address and verify a different
53 // SetTarget is called frequently, and in most cases the new
54 // target will behave exactly the same as the old one. An
55 // implementation should optimize accordingly.
57 // SetTarget must not block on concurrent Execute calls.
58 SetTarget(cloud.ExecutorTarget)
64 defaultSyncInterval = time.Minute
65 defaultProbeInterval = time.Second * 10
66 defaultMaxProbesPerSecond = 10
67 defaultTimeoutIdle = time.Minute
68 defaultTimeoutBooting = time.Minute * 10
69 defaultTimeoutProbe = time.Minute * 10
70 defaultTimeoutShutdown = time.Second * 10
72 // Time after a quota error to try again anyway, even if no
73 // instances have been shutdown.
74 quotaErrorTTL = time.Minute
76 // Time between "X failed because rate limiting" messages
77 logRateLimitErrorInterval = time.Second * 10
80 func duration(conf arvados.Duration, def time.Duration) time.Duration {
82 return time.Duration(conf)
88 // NewPool creates a Pool of workers backed by instanceSet.
90 // New instances are configured and set up according to the given
91 // cluster configuration.
92 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
96 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
97 newExecutor: newExecutor,
98 bootProbeCommand: cluster.CloudVMs.BootProbeCommand,
99 imageID: cloud.ImageID(cluster.CloudVMs.ImageID),
100 instanceTypes: cluster.InstanceTypes,
101 maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
102 probeInterval: duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
103 syncInterval: duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
104 timeoutIdle: duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
105 timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
106 timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
107 timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
108 installPublicKey: installPublicKey,
109 stop: make(chan bool),
111 wp.registerMetrics(reg)
113 wp.setupOnce.Do(wp.setup)
121 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
122 // zero Pool should not be used. Call NewPool to create a new Pool.
125 logger logrus.FieldLogger
126 arvClient *arvados.Client
127 instanceSet *throttledInstanceSet
128 newExecutor func(cloud.Instance) Executor
129 bootProbeCommand string
130 imageID cloud.ImageID
131 instanceTypes map[string]arvados.InstanceType
132 syncInterval time.Duration
133 probeInterval time.Duration
134 maxProbesPerSecond int
135 timeoutIdle time.Duration
136 timeoutBooting time.Duration
137 timeoutProbe time.Duration
138 timeoutShutdown time.Duration
139 installPublicKey ssh.PublicKey
142 subscribers map[<-chan struct{}]chan<- struct{}
143 creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
144 workers map[cloud.InstanceID]*worker
145 loaded bool // loaded list of instances from InstanceSet at least once
146 exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
147 atQuotaUntil time.Time
148 atQuotaErr cloud.QuotaError
153 throttleCreate throttle
154 throttleInstances throttle
156 mContainersRunning prometheus.Gauge
157 mInstances *prometheus.GaugeVec
158 mInstancesPrice *prometheus.GaugeVec
159 mVCPUs *prometheus.GaugeVec
160 mMemory *prometheus.GaugeVec
163 type createCall struct {
165 instanceType arvados.InstanceType
168 // Subscribe returns a buffered channel that becomes ready after any
169 // change to the pool's state that could have scheduling implications:
170 // a worker's state changes, a new worker appears, the cloud
171 // provider's API rate limiting period ends, etc.
173 // Additional events that occur while the channel is already ready
174 // will be dropped, so it is OK if the caller services the channel
179 // ch := wp.Subscribe()
180 // defer wp.Unsubscribe(ch)
187 func (wp *Pool) Subscribe() <-chan struct{} {
188 wp.setupOnce.Do(wp.setup)
190 defer wp.mtx.Unlock()
191 ch := make(chan struct{}, 1)
192 wp.subscribers[ch] = ch
196 // Unsubscribe stops sending updates to the given channel.
197 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
198 wp.setupOnce.Do(wp.setup)
200 defer wp.mtx.Unlock()
201 delete(wp.subscribers, ch)
204 // Unallocated returns the number of unallocated (creating + booting +
205 // idle + unknown) workers for each instance type. Workers in
206 // hold/drain mode are not included.
207 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
208 wp.setupOnce.Do(wp.setup)
210 defer wp.mtx.RUnlock()
211 unalloc := map[arvados.InstanceType]int{}
212 creating := map[arvados.InstanceType]int{}
213 oldestCreate := map[arvados.InstanceType]time.Time{}
214 for _, cc := range wp.creating {
215 it := cc.instanceType
217 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
218 oldestCreate[it] = cc.time
221 for _, wkr := range wp.workers {
222 // Skip workers that are not expected to become
223 // available soon. Note len(wkr.running)>0 is not
224 // redundant here: it can be true even in
226 if wkr.state == StateShutdown ||
227 wkr.state == StateRunning ||
228 wkr.idleBehavior != IdleBehaviorRun ||
229 len(wkr.running) > 0 {
234 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
235 // If up to N new workers appear in
236 // Instances() while we are waiting for N
237 // Create() calls to complete, we assume we're
238 // just seeing a race between Instances() and
239 // Create() responses.
241 // The other common reason why nodes have
242 // state==Unknown is that they appeared at
243 // startup, before any Create calls. They
244 // don't match the above timing condition, so
245 // we never mistakenly attribute them to
246 // pending Create calls.
250 for it, c := range creating {
256 // Create a new instance with the given type, and add it to the worker
257 // pool. The worker is added immediately; instance creation runs in
260 // Create returns false if a pre-existing error state prevents it from
261 // even attempting to create a new instance. Those errors are logged
262 // by the Pool, so the caller does not need to log anything in such
264 func (wp *Pool) Create(it arvados.InstanceType) bool {
265 logger := wp.logger.WithField("InstanceType", it.Name)
266 wp.setupOnce.Do(wp.setup)
268 defer wp.mtx.Unlock()
269 if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
273 secret := randomHex(instanceSecretLength)
274 wp.creating[secret] = createCall{time: now, instanceType: it}
277 tags := cloud.InstanceTags{
278 tagKeyInstanceType: it.Name,
279 tagKeyIdleBehavior: string(IdleBehaviorRun),
280 tagKeyInstanceSecret: secret,
282 initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename))
283 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
285 defer wp.mtx.Unlock()
286 // delete() is deferred so the updateWorker() call
287 // below knows to use StateBooting when adding a new
289 defer delete(wp.creating, secret)
291 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
293 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
294 time.AfterFunc(quotaErrorTTL, wp.notify)
296 logger.WithError(err).Error("create failed")
297 wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
300 wp.updateWorker(inst, it)
305 // AtQuota returns true if Create is not expected to work at the
307 func (wp *Pool) AtQuota() bool {
309 defer wp.mtx.Unlock()
310 return time.Now().Before(wp.atQuotaUntil)
313 // SetIdleBehavior determines how the indicated instance will behave
314 // when it has no containers running.
315 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
317 defer wp.mtx.Unlock()
318 wkr, ok := wp.workers[id]
320 return errors.New("requested instance does not exist")
322 wkr.idleBehavior = idleBehavior
328 // Add or update worker attached to the given instance.
330 // The second return value is true if a new worker is created.
332 // A newly added instance has state=StateBooting if its tags match an
333 // entry in wp.creating, otherwise StateUnknown.
335 // Caller must have lock.
336 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
337 inst = tagVerifier{inst}
339 if wkr := wp.workers[id]; wkr != nil {
340 wkr.executor.SetTarget(inst)
342 wkr.updated = time.Now()
347 state := StateUnknown
348 if _, ok := wp.creating[inst.Tags()[tagKeyInstanceSecret]]; ok {
352 // If an instance has a valid IdleBehavior tag when it first
353 // appears, initialize the new worker accordingly (this is how
354 // we restore IdleBehavior that was set by a prior dispatch
355 // process); otherwise, default to "run". After this,
356 // wkr.idleBehavior is the source of truth, and will only be
357 // changed via SetIdleBehavior().
358 idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior])
359 if !validIdleBehavior[idleBehavior] {
360 idleBehavior = IdleBehaviorRun
363 logger := wp.logger.WithFields(logrus.Fields{
364 "InstanceType": it.Name,
365 "Instance": inst.ID(),
366 "Address": inst.Address(),
368 logger.WithFields(logrus.Fields{
370 "IdleBehavior": idleBehavior,
371 }).Infof("instance appeared in cloud")
377 executor: wp.newExecutor(inst),
379 idleBehavior: idleBehavior,
386 running: make(map[string]struct{}),
387 starting: make(map[string]struct{}),
388 probing: make(chan struct{}, 1),
394 // caller must have lock.
395 func (wp *Pool) notifyExited(uuid string, t time.Time) {
399 // Shutdown shuts down a worker with the given type, or returns false
400 // if all workers with the given type are busy.
401 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
402 wp.setupOnce.Do(wp.setup)
404 defer wp.mtx.Unlock()
405 logger := wp.logger.WithField("InstanceType", it.Name)
406 logger.Info("shutdown requested")
407 for _, tryState := range []State{StateBooting, StateIdle} {
408 // TODO: shutdown the worker with the longest idle
409 // time (Idle) or the earliest create time (Booting)
410 for _, wkr := range wp.workers {
411 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
412 logger.WithField("Instance", wkr.instance).Info("shutting down")
421 // CountWorkers returns the current number of workers in each state.
422 func (wp *Pool) CountWorkers() map[State]int {
423 wp.setupOnce.Do(wp.setup)
425 defer wp.mtx.Unlock()
427 for _, w := range wp.workers {
433 // Running returns the container UUIDs being prepared/run on workers.
435 // In the returned map, the time value indicates when the Pool
436 // observed that the container process had exited. A container that
437 // has not yet exited has a zero time value. The caller should use
438 // KillContainer() to garbage-collect the entries for exited
440 func (wp *Pool) Running() map[string]time.Time {
441 wp.setupOnce.Do(wp.setup)
443 defer wp.mtx.Unlock()
444 r := map[string]time.Time{}
445 for _, wkr := range wp.workers {
446 for uuid := range wkr.running {
447 r[uuid] = time.Time{}
449 for uuid := range wkr.starting {
450 r[uuid] = time.Time{}
453 for uuid, exited := range wp.exited {
459 // StartContainer starts a container on an idle worker immediately if
460 // possible, otherwise returns false.
461 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
462 wp.setupOnce.Do(wp.setup)
464 defer wp.mtx.Unlock()
466 for _, w := range wp.workers {
467 if w.instType == it && w.state == StateIdle {
468 if wkr == nil || w.busy.After(wkr.busy) {
476 wkr.startContainer(ctr)
480 // KillContainer kills the crunch-run process for the given container
481 // UUID, if it's running on any worker.
483 // KillContainer returns immediately; the act of killing the container
484 // takes some time, and runs in the background.
485 func (wp *Pool) KillContainer(uuid string) {
487 defer wp.mtx.Unlock()
488 if _, ok := wp.exited[uuid]; ok {
489 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
490 delete(wp.exited, uuid)
493 for _, wkr := range wp.workers {
494 if _, ok := wkr.running[uuid]; ok {
495 go wp.kill(wkr, uuid)
499 wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
502 func (wp *Pool) kill(wkr *worker, uuid string) {
503 logger := wp.logger.WithFields(logrus.Fields{
504 "ContainerUUID": uuid,
505 "Instance": wkr.instance.ID(),
507 logger.Debug("killing process")
508 cmd := "crunch-run --kill 15 " + uuid
509 if u := wkr.instance.RemoteUser(); u != "root" {
512 stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
514 logger.WithFields(logrus.Fields{
515 "stderr": string(stderr),
516 "stdout": string(stdout),
518 }).Warn("kill failed")
521 logger.Debug("killing process succeeded")
523 defer wp.mtx.Unlock()
524 if _, ok := wkr.running[uuid]; ok {
525 delete(wkr.running, uuid)
526 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
527 wkr.state = StateIdle
529 wkr.updated = time.Now()
534 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
536 reg = prometheus.NewRegistry()
538 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
539 Namespace: "arvados",
540 Subsystem: "dispatchcloud",
541 Name: "containers_running",
542 Help: "Number of containers reported running by cloud VMs.",
544 reg.MustRegister(wp.mContainersRunning)
545 wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
546 Namespace: "arvados",
547 Subsystem: "dispatchcloud",
548 Name: "instances_total",
549 Help: "Number of cloud VMs.",
550 }, []string{"category"})
551 reg.MustRegister(wp.mInstances)
552 wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
553 Namespace: "arvados",
554 Subsystem: "dispatchcloud",
555 Name: "instances_price",
556 Help: "Price of cloud VMs.",
557 }, []string{"category"})
558 reg.MustRegister(wp.mInstancesPrice)
559 wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
560 Namespace: "arvados",
561 Subsystem: "dispatchcloud",
563 Help: "Total VCPUs on all cloud VMs.",
564 }, []string{"category"})
565 reg.MustRegister(wp.mVCPUs)
566 wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
567 Namespace: "arvados",
568 Subsystem: "dispatchcloud",
569 Name: "memory_bytes_total",
570 Help: "Total memory on all cloud VMs.",
571 }, []string{"category"})
572 reg.MustRegister(wp.mMemory)
575 func (wp *Pool) runMetrics() {
577 defer wp.Unsubscribe(ch)
584 func (wp *Pool) updateMetrics() {
586 defer wp.mtx.RUnlock()
588 instances := map[string]int64{}
589 price := map[string]float64{}
590 cpu := map[string]int64{}
591 mem := map[string]int64{}
593 for _, wkr := range wp.workers {
596 case len(wkr.running)+len(wkr.starting) > 0:
598 case wkr.idleBehavior == IdleBehaviorHold:
600 case wkr.state == StateBooting:
602 case wkr.state == StateUnknown:
608 price[cat] += wkr.instType.Price
609 cpu[cat] += int64(wkr.instType.VCPUs)
610 mem[cat] += int64(wkr.instType.RAM)
611 running += int64(len(wkr.running) + len(wkr.starting))
613 for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
614 wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat]))
615 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
616 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
617 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
619 wp.mContainersRunning.Set(float64(running))
622 func (wp *Pool) runProbes() {
623 maxPPS := wp.maxProbesPerSecond
625 maxPPS = defaultMaxProbesPerSecond
627 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
628 defer limitticker.Stop()
630 probeticker := time.NewTicker(wp.probeInterval)
631 defer probeticker.Stop()
633 workers := []cloud.InstanceID{}
634 for range probeticker.C {
635 workers = workers[:0]
637 for id, wkr := range wp.workers {
638 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
641 workers = append(workers, id)
645 for _, id := range workers {
647 wkr, ok := wp.workers[id]
650 // Deleted while we were probing
654 go wkr.ProbeAndUpdate()
658 case <-limitticker.C:
664 func (wp *Pool) runSync() {
665 // sync once immediately, then wait syncInterval, sync again,
667 timer := time.NewTimer(1)
671 err := wp.getInstancesAndSync()
673 wp.logger.WithError(err).Warn("sync failed")
675 timer.Reset(wp.syncInterval)
677 wp.logger.Debug("worker.Pool stopped")
683 // Stop synchronizing with the InstanceSet.
684 func (wp *Pool) Stop() {
685 wp.setupOnce.Do(wp.setup)
689 // Instances returns an InstanceView for each worker in the pool,
690 // summarizing its current state and recent activity.
691 func (wp *Pool) Instances() []InstanceView {
693 wp.setupOnce.Do(wp.setup)
695 for _, w := range wp.workers {
696 r = append(r, InstanceView{
697 Instance: w.instance.ID(),
698 Address: w.instance.Address(),
699 Price: w.instType.Price,
700 ArvadosInstanceType: w.instType.Name,
701 ProviderInstanceType: w.instType.ProviderType,
702 LastContainerUUID: w.lastUUID,
704 WorkerState: w.state.String(),
705 IdleBehavior: w.idleBehavior,
709 sort.Slice(r, func(i, j int) bool {
710 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
715 func (wp *Pool) setup() {
716 wp.creating = map[string]createCall{}
717 wp.exited = map[string]time.Time{}
718 wp.workers = map[cloud.InstanceID]*worker{}
719 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
722 func (wp *Pool) notify() {
724 defer wp.mtx.RUnlock()
725 for _, send := range wp.subscribers {
727 case send <- struct{}{}:
733 func (wp *Pool) getInstancesAndSync() error {
734 wp.setupOnce.Do(wp.setup)
735 if err := wp.instanceSet.throttleInstances.Error(); err != nil {
738 wp.logger.Debug("getting instance list")
739 threshold := time.Now()
740 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
742 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
745 wp.sync(threshold, instances)
746 wp.logger.Debug("sync done")
750 // Add/remove/update workers based on instances, which was obtained
751 // from the instanceSet. However, don't clobber any other updates that
752 // already happened after threshold.
753 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
755 defer wp.mtx.Unlock()
756 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
759 for _, inst := range instances {
760 itTag := inst.Tags()[tagKeyInstanceType]
761 it, ok := wp.instanceTypes[itTag]
763 wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
766 if wkr, isNew := wp.updateWorker(inst, it); isNew {
768 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
769 wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
774 for id, wkr := range wp.workers {
775 if wkr.updated.After(threshold) {
778 logger := wp.logger.WithFields(logrus.Fields{
779 "Instance": wkr.instance.ID(),
780 "WorkerState": wkr.state,
782 logger.Info("instance disappeared in cloud")
783 delete(wp.workers, id)
784 go wkr.executor.Close()
790 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
798 // Return a random string of n hexadecimal digits (n*4 random bits). n
800 func randomHex(n int) string {
801 buf := make([]byte, n/2)
802 _, err := rand.Read(buf)
806 return fmt.Sprintf("%x", buf)