1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
15 "git.curoverse.com/arvados.git/lib/cloud"
16 "git.curoverse.com/arvados.git/sdk/go/arvados"
17 "github.com/prometheus/client_golang/prometheus"
18 "github.com/sirupsen/logrus"
19 "golang.org/x/crypto/ssh"
23 tagKeyInstanceType = "InstanceType"
24 tagKeyIdleBehavior = "IdleBehavior"
27 // An InstanceView shows a worker's current state and recent activity.
28 type InstanceView struct {
29 Instance cloud.InstanceID `json:"instance"`
30 Address string `json:"address"`
31 Price float64 `json:"price"`
32 ArvadosInstanceType string `json:"arvados_instance_type"`
33 ProviderInstanceType string `json:"provider_instance_type"`
34 LastContainerUUID string `json:"last_container_uuid"`
35 LastBusy time.Time `json:"last_busy"`
36 WorkerState string `json:"worker_state"`
37 IdleBehavior IdleBehavior `json:"idle_behavior"`
40 // An Executor executes shell commands on a remote host.
41 type Executor interface {
42 // Run cmd on the current target.
43 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
45 // Use the given target for subsequent operations. The new
46 // target is the same host as the previous target, but it
47 // might return a different address and verify a different
50 // SetTarget is called frequently, and in most cases the new
51 // target will behave exactly the same as the old one. An
52 // implementation should optimize accordingly.
54 // SetTarget must not block on concurrent Execute calls.
55 SetTarget(cloud.ExecutorTarget)
61 defaultSyncInterval = time.Minute
62 defaultProbeInterval = time.Second * 10
63 defaultMaxProbesPerSecond = 10
64 defaultTimeoutIdle = time.Minute
65 defaultTimeoutBooting = time.Minute * 10
66 defaultTimeoutProbe = time.Minute * 10
67 defaultTimeoutShutdown = time.Second * 10
69 // Time after a quota error to try again anyway, even if no
70 // instances have been shutdown.
71 quotaErrorTTL = time.Minute
73 // Time between "X failed because rate limiting" messages
74 logRateLimitErrorInterval = time.Second * 10
77 func duration(conf arvados.Duration, def time.Duration) time.Duration {
79 return time.Duration(conf)
85 // NewPool creates a Pool of workers backed by instanceSet.
87 // New instances are configured and set up according to the given
88 // cluster configuration.
89 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
93 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
94 newExecutor: newExecutor,
95 bootProbeCommand: cluster.CloudVMs.BootProbeCommand,
96 imageID: cloud.ImageID(cluster.CloudVMs.ImageID),
97 instanceTypes: cluster.InstanceTypes,
98 maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
99 probeInterval: duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
100 syncInterval: duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
101 timeoutIdle: duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
102 timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
103 timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
104 timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
105 installPublicKey: installPublicKey,
106 stop: make(chan bool),
108 wp.registerMetrics(reg)
110 wp.setupOnce.Do(wp.setup)
118 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
119 // zero Pool should not be used. Call NewPool to create a new Pool.
122 logger logrus.FieldLogger
123 arvClient *arvados.Client
124 instanceSet *throttledInstanceSet
125 newExecutor func(cloud.Instance) Executor
126 bootProbeCommand string
127 imageID cloud.ImageID
128 instanceTypes map[string]arvados.InstanceType
129 syncInterval time.Duration
130 probeInterval time.Duration
131 maxProbesPerSecond int
132 timeoutIdle time.Duration
133 timeoutBooting time.Duration
134 timeoutProbe time.Duration
135 timeoutShutdown time.Duration
136 installPublicKey ssh.PublicKey
139 subscribers map[<-chan struct{}]chan<- struct{}
140 creating map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls
141 workers map[cloud.InstanceID]*worker
142 loaded bool // loaded list of instances from InstanceSet at least once
143 exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
144 atQuotaUntil time.Time
145 atQuotaErr cloud.QuotaError
150 throttleCreate throttle
151 throttleInstances throttle
153 mInstances prometheus.Gauge
154 mInstancesPrice prometheus.Gauge
155 mContainersRunning prometheus.Gauge
156 mVCPUs prometheus.Gauge
157 mVCPUsInuse prometheus.Gauge
158 mMemory prometheus.Gauge
159 mMemoryInuse prometheus.Gauge
162 // Subscribe returns a buffered channel that becomes ready after any
163 // change to the pool's state that could have scheduling implications:
164 // a worker's state changes, a new worker appears, the cloud
165 // provider's API rate limiting period ends, etc.
167 // Additional events that occur while the channel is already ready
168 // will be dropped, so it is OK if the caller services the channel
173 // ch := wp.Subscribe()
174 // defer wp.Unsubscribe(ch)
181 func (wp *Pool) Subscribe() <-chan struct{} {
182 wp.setupOnce.Do(wp.setup)
184 defer wp.mtx.Unlock()
185 ch := make(chan struct{}, 1)
186 wp.subscribers[ch] = ch
190 // Unsubscribe stops sending updates to the given channel.
191 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
192 wp.setupOnce.Do(wp.setup)
194 defer wp.mtx.Unlock()
195 delete(wp.subscribers, ch)
198 // Unallocated returns the number of unallocated (creating + booting +
199 // idle + unknown) workers for each instance type. Workers in
200 // hold/drain mode are not included.
201 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
202 wp.setupOnce.Do(wp.setup)
204 defer wp.mtx.RUnlock()
205 unalloc := map[arvados.InstanceType]int{}
206 creating := map[arvados.InstanceType]int{}
207 for it, times := range wp.creating {
208 creating[it] = len(times)
210 for _, wkr := range wp.workers {
211 // Skip workers that are not expected to become
212 // available soon. Note len(wkr.running)>0 is not
213 // redundant here: it can be true even in
215 if wkr.state == StateShutdown ||
216 wkr.state == StateRunning ||
217 wkr.idleBehavior != IdleBehaviorRun ||
218 len(wkr.running) > 0 {
223 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) {
224 // If up to N new workers appear in
225 // Instances() while we are waiting for N
226 // Create() calls to complete, we assume we're
227 // just seeing a race between Instances() and
228 // Create() responses.
230 // The other common reason why nodes have
231 // state==Unknown is that they appeared at
232 // startup, before any Create calls. They
233 // don't match the above timing condition, so
234 // we never mistakenly attribute them to
235 // pending Create calls.
239 for it, c := range creating {
245 // Create a new instance with the given type, and add it to the worker
246 // pool. The worker is added immediately; instance creation runs in
249 // Create returns false if a pre-existing error state prevents it from
250 // even attempting to create a new instance. Those errors are logged
251 // by the Pool, so the caller does not need to log anything in such
253 func (wp *Pool) Create(it arvados.InstanceType) bool {
254 logger := wp.logger.WithField("InstanceType", it.Name)
255 wp.setupOnce.Do(wp.setup)
257 defer wp.mtx.Unlock()
258 if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
261 tags := cloud.InstanceTags{
262 tagKeyInstanceType: it.Name,
263 tagKeyIdleBehavior: string(IdleBehaviorRun),
266 wp.creating[it] = append(wp.creating[it], now)
269 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, wp.installPublicKey)
271 defer wp.mtx.Unlock()
272 // Remove our timestamp marker from wp.creating
273 for i, t := range wp.creating[it] {
275 copy(wp.creating[it][i:], wp.creating[it][i+1:])
276 wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1]
281 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
283 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
284 time.AfterFunc(quotaErrorTTL, wp.notify)
286 logger.WithError(err).Error("create failed")
287 wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
290 wp.updateWorker(inst, it, StateBooting)
295 // AtQuota returns true if Create is not expected to work at the
297 func (wp *Pool) AtQuota() bool {
299 defer wp.mtx.Unlock()
300 return time.Now().Before(wp.atQuotaUntil)
303 // SetIdleBehavior determines how the indicated instance will behave
304 // when it has no containers running.
305 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
307 defer wp.mtx.Unlock()
308 wkr, ok := wp.workers[id]
310 return errors.New("requested instance does not exist")
312 wkr.idleBehavior = idleBehavior
318 // Add or update worker attached to the given instance. Use
319 // initialState if a new worker is created.
321 // The second return value is true if a new worker is created.
323 // Caller must have lock.
324 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
326 if wkr := wp.workers[id]; wkr != nil {
327 wkr.executor.SetTarget(inst)
329 wkr.updated = time.Now()
330 if initialState == StateBooting && wkr.state == StateUnknown {
331 wkr.state = StateBooting
337 // If an instance has a valid IdleBehavior tag when it first
338 // appears, initialize the new worker accordingly (this is how
339 // we restore IdleBehavior that was set by a prior dispatch
340 // process); otherwise, default to "run". After this,
341 // wkr.idleBehavior is the source of truth, and will only be
342 // changed via SetIdleBehavior().
343 idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior])
344 if !validIdleBehavior[idleBehavior] {
345 idleBehavior = IdleBehaviorRun
348 logger := wp.logger.WithFields(logrus.Fields{
349 "InstanceType": it.Name,
350 "Instance": inst.ID(),
351 "Address": inst.Address(),
353 logger.WithFields(logrus.Fields{
354 "State": initialState,
355 "IdleBehavior": idleBehavior,
356 }).Infof("instance appeared in cloud")
362 executor: wp.newExecutor(inst),
364 idleBehavior: idleBehavior,
371 running: make(map[string]struct{}),
372 starting: make(map[string]struct{}),
373 probing: make(chan struct{}, 1),
379 // caller must have lock.
380 func (wp *Pool) notifyExited(uuid string, t time.Time) {
384 // Shutdown shuts down a worker with the given type, or returns false
385 // if all workers with the given type are busy.
386 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
387 wp.setupOnce.Do(wp.setup)
389 defer wp.mtx.Unlock()
390 logger := wp.logger.WithField("InstanceType", it.Name)
391 logger.Info("shutdown requested")
392 for _, tryState := range []State{StateBooting, StateIdle} {
393 // TODO: shutdown the worker with the longest idle
394 // time (Idle) or the earliest create time (Booting)
395 for _, wkr := range wp.workers {
396 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
397 logger.WithField("Instance", wkr.instance).Info("shutting down")
406 // CountWorkers returns the current number of workers in each state.
407 func (wp *Pool) CountWorkers() map[State]int {
408 wp.setupOnce.Do(wp.setup)
410 defer wp.mtx.Unlock()
412 for _, w := range wp.workers {
418 // Running returns the container UUIDs being prepared/run on workers.
420 // In the returned map, the time value indicates when the Pool
421 // observed that the container process had exited. A container that
422 // has not yet exited has a zero time value. The caller should use
423 // KillContainer() to garbage-collect the entries for exited
425 func (wp *Pool) Running() map[string]time.Time {
426 wp.setupOnce.Do(wp.setup)
428 defer wp.mtx.Unlock()
429 r := map[string]time.Time{}
430 for _, wkr := range wp.workers {
431 for uuid := range wkr.running {
432 r[uuid] = time.Time{}
434 for uuid := range wkr.starting {
435 r[uuid] = time.Time{}
438 for uuid, exited := range wp.exited {
444 // StartContainer starts a container on an idle worker immediately if
445 // possible, otherwise returns false.
446 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
447 wp.setupOnce.Do(wp.setup)
449 defer wp.mtx.Unlock()
451 for _, w := range wp.workers {
452 if w.instType == it && w.state == StateIdle {
453 if wkr == nil || w.busy.After(wkr.busy) {
461 wkr.startContainer(ctr)
465 // KillContainer kills the crunch-run process for the given container
466 // UUID, if it's running on any worker.
468 // KillContainer returns immediately; the act of killing the container
469 // takes some time, and runs in the background.
470 func (wp *Pool) KillContainer(uuid string) {
472 defer wp.mtx.Unlock()
473 if _, ok := wp.exited[uuid]; ok {
474 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
475 delete(wp.exited, uuid)
478 for _, wkr := range wp.workers {
479 if _, ok := wkr.running[uuid]; ok {
480 go wp.kill(wkr, uuid)
484 wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
487 func (wp *Pool) kill(wkr *worker, uuid string) {
488 logger := wp.logger.WithFields(logrus.Fields{
489 "ContainerUUID": uuid,
490 "Instance": wkr.instance.ID(),
492 logger.Debug("killing process")
493 cmd := "crunch-run --kill 15 " + uuid
494 if u := wkr.instance.RemoteUser(); u != "root" {
497 stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
499 logger.WithFields(logrus.Fields{
500 "stderr": string(stderr),
501 "stdout": string(stdout),
503 }).Warn("kill failed")
506 logger.Debug("killing process succeeded")
508 defer wp.mtx.Unlock()
509 if _, ok := wkr.running[uuid]; ok {
510 delete(wkr.running, uuid)
511 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
512 wkr.state = StateIdle
514 wkr.updated = time.Now()
519 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
521 reg = prometheus.NewRegistry()
523 wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
524 Namespace: "arvados",
525 Subsystem: "dispatchcloud",
526 Name: "instances_total",
527 Help: "Number of cloud VMs including pending, booting, running, held, and shutting down.",
529 reg.MustRegister(wp.mInstances)
530 wp.mInstancesPrice = prometheus.NewGauge(prometheus.GaugeOpts{
531 Namespace: "arvados",
532 Subsystem: "dispatchcloud",
533 Name: "instances_price_total",
534 Help: "Sum of prices of all cloud VMs including pending, booting, running, held, and shutting down.",
536 reg.MustRegister(wp.mInstancesPrice)
537 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
538 Namespace: "arvados",
539 Subsystem: "dispatchcloud",
540 Name: "containers_running",
541 Help: "Number of containers reported running by cloud VMs.",
543 reg.MustRegister(wp.mContainersRunning)
545 wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
546 Namespace: "arvados",
547 Subsystem: "dispatchcloud",
549 Help: "Total VCPUs on all cloud VMs.",
551 reg.MustRegister(wp.mVCPUs)
552 wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
553 Namespace: "arvados",
554 Subsystem: "dispatchcloud",
556 Help: "VCPUs on cloud VMs that are running containers.",
558 reg.MustRegister(wp.mVCPUsInuse)
559 wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
560 Namespace: "arvados",
561 Subsystem: "dispatchcloud",
562 Name: "memory_bytes_total",
563 Help: "Total memory on all cloud VMs.",
565 reg.MustRegister(wp.mMemory)
566 wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
567 Namespace: "arvados",
568 Subsystem: "dispatchcloud",
569 Name: "memory_bytes_inuse",
570 Help: "Memory on cloud VMs that are running containers.",
572 reg.MustRegister(wp.mMemoryInuse)
575 func (wp *Pool) runMetrics() {
577 defer wp.Unsubscribe(ch)
583 func (wp *Pool) updateMetrics() {
585 defer wp.mtx.RUnlock()
588 var alloc, cpu, cpuInuse, mem, memInuse int64
589 for _, wkr := range wp.workers {
590 price += wkr.instType.Price
591 cpu += int64(wkr.instType.VCPUs)
592 mem += int64(wkr.instType.RAM)
593 if len(wkr.running)+len(wkr.starting) == 0 {
596 alloc += int64(len(wkr.running) + len(wkr.starting))
597 cpuInuse += int64(wkr.instType.VCPUs)
598 memInuse += int64(wkr.instType.RAM)
600 wp.mInstances.Set(float64(len(wp.workers)))
601 wp.mInstancesPrice.Set(price)
602 wp.mContainersRunning.Set(float64(alloc))
603 wp.mVCPUs.Set(float64(cpu))
604 wp.mMemory.Set(float64(mem))
605 wp.mVCPUsInuse.Set(float64(cpuInuse))
606 wp.mMemoryInuse.Set(float64(memInuse))
609 func (wp *Pool) runProbes() {
610 maxPPS := wp.maxProbesPerSecond
612 maxPPS = defaultMaxProbesPerSecond
614 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
615 defer limitticker.Stop()
617 probeticker := time.NewTicker(wp.probeInterval)
618 defer probeticker.Stop()
620 workers := []cloud.InstanceID{}
621 for range probeticker.C {
622 workers = workers[:0]
624 for id, wkr := range wp.workers {
625 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
628 workers = append(workers, id)
632 for _, id := range workers {
634 wkr, ok := wp.workers[id]
637 // Deleted while we were probing
641 go wkr.ProbeAndUpdate()
645 case <-limitticker.C:
651 func (wp *Pool) runSync() {
652 // sync once immediately, then wait syncInterval, sync again,
654 timer := time.NewTimer(1)
658 err := wp.getInstancesAndSync()
660 wp.logger.WithError(err).Warn("sync failed")
662 timer.Reset(wp.syncInterval)
664 wp.logger.Debug("worker.Pool stopped")
670 // Stop synchronizing with the InstanceSet.
671 func (wp *Pool) Stop() {
672 wp.setupOnce.Do(wp.setup)
676 // Instances returns an InstanceView for each worker in the pool,
677 // summarizing its current state and recent activity.
678 func (wp *Pool) Instances() []InstanceView {
680 wp.setupOnce.Do(wp.setup)
682 for _, w := range wp.workers {
683 r = append(r, InstanceView{
684 Instance: w.instance.ID(),
685 Address: w.instance.Address(),
686 Price: w.instType.Price,
687 ArvadosInstanceType: w.instType.Name,
688 ProviderInstanceType: w.instType.ProviderType,
689 LastContainerUUID: w.lastUUID,
691 WorkerState: w.state.String(),
692 IdleBehavior: w.idleBehavior,
696 sort.Slice(r, func(i, j int) bool {
697 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
702 func (wp *Pool) setup() {
703 wp.creating = map[arvados.InstanceType][]time.Time{}
704 wp.exited = map[string]time.Time{}
705 wp.workers = map[cloud.InstanceID]*worker{}
706 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
709 func (wp *Pool) notify() {
711 defer wp.mtx.RUnlock()
712 for _, send := range wp.subscribers {
714 case send <- struct{}{}:
720 func (wp *Pool) getInstancesAndSync() error {
721 wp.setupOnce.Do(wp.setup)
722 if err := wp.instanceSet.throttleInstances.Error(); err != nil {
725 wp.logger.Debug("getting instance list")
726 threshold := time.Now()
727 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
729 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
732 wp.sync(threshold, instances)
733 wp.logger.Debug("sync done")
737 // Add/remove/update workers based on instances, which was obtained
738 // from the instanceSet. However, don't clobber any other updates that
739 // already happened after threshold.
740 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
742 defer wp.mtx.Unlock()
743 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
746 for _, inst := range instances {
747 itTag := inst.Tags()[tagKeyInstanceType]
748 it, ok := wp.instanceTypes[itTag]
750 wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
753 if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew {
755 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
756 wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
761 for id, wkr := range wp.workers {
762 if wkr.updated.After(threshold) {
765 logger := wp.logger.WithFields(logrus.Fields{
766 "Instance": wkr.instance.ID(),
767 "WorkerState": wkr.state,
769 logger.Info("instance disappeared in cloud")
770 delete(wp.workers, id)
771 go wkr.executor.Close()
777 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")