1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
15 "git.curoverse.com/arvados.git/lib/cloud"
16 "git.curoverse.com/arvados.git/sdk/go/arvados"
17 "github.com/prometheus/client_golang/prometheus"
18 "github.com/sirupsen/logrus"
22 tagKeyInstanceType = "InstanceType"
23 tagKeyIdleBehavior = "IdleBehavior"
26 // An InstanceView shows a worker's current state and recent activity.
27 type InstanceView struct {
28 Instance cloud.InstanceID `json:"instance"`
29 Price float64 `json:"price"`
30 ArvadosInstanceType string `json:"arvados_instance_type"`
31 ProviderInstanceType string `json:"provider_instance_type"`
32 LastContainerUUID string `json:"last_container_uuid"`
33 LastBusy time.Time `json:"last_busy"`
34 WorkerState string `json:"worker_state"`
35 IdleBehavior IdleBehavior `json:"idle_behavior"`
38 // An Executor executes shell commands on a remote host.
39 type Executor interface {
40 // Run cmd on the current target.
41 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
43 // Use the given target for subsequent operations. The new
44 // target is the same host as the previous target, but it
45 // might return a different address and verify a different
48 // SetTarget is called frequently, and in most cases the new
49 // target will behave exactly the same as the old one. An
50 // implementation should optimize accordingly.
52 // SetTarget must not block on concurrent Execute calls.
53 SetTarget(cloud.ExecutorTarget)
59 defaultSyncInterval = time.Minute
60 defaultProbeInterval = time.Second * 10
61 defaultMaxProbesPerSecond = 10
62 defaultTimeoutIdle = time.Minute
63 defaultTimeoutBooting = time.Minute * 10
64 defaultTimeoutProbe = time.Minute * 10
65 defaultTimeoutShutdown = time.Second * 10
67 // Time after a quota error to try again anyway, even if no
68 // instances have been shutdown.
69 quotaErrorTTL = time.Minute
71 // Time between "X failed because rate limiting" messages
72 logRateLimitErrorInterval = time.Second * 10
75 func duration(conf arvados.Duration, def time.Duration) time.Duration {
77 return time.Duration(conf)
83 // NewPool creates a Pool of workers backed by instanceSet.
85 // New instances are configured and set up according to the given
86 // cluster configuration.
87 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
91 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
92 newExecutor: newExecutor,
93 bootProbeCommand: cluster.CloudVMs.BootProbeCommand,
94 imageID: cloud.ImageID(cluster.CloudVMs.ImageID),
95 instanceTypes: cluster.InstanceTypes,
96 maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
97 probeInterval: duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
98 syncInterval: duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
99 timeoutIdle: duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
100 timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
101 timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
102 timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
103 stop: make(chan bool),
105 wp.registerMetrics(reg)
107 wp.setupOnce.Do(wp.setup)
115 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
116 // zero Pool should not be used. Call NewPool to create a new Pool.
119 logger logrus.FieldLogger
120 arvClient *arvados.Client
121 instanceSet *throttledInstanceSet
122 newExecutor func(cloud.Instance) Executor
123 bootProbeCommand string
124 imageID cloud.ImageID
125 instanceTypes map[string]arvados.InstanceType
126 syncInterval time.Duration
127 probeInterval time.Duration
128 maxProbesPerSecond int
129 timeoutIdle time.Duration
130 timeoutBooting time.Duration
131 timeoutProbe time.Duration
132 timeoutShutdown time.Duration
135 subscribers map[<-chan struct{}]chan<- struct{}
136 creating map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls
137 workers map[cloud.InstanceID]*worker
138 loaded bool // loaded list of instances from InstanceSet at least once
139 exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
140 atQuotaUntil time.Time
141 atQuotaErr cloud.QuotaError
146 throttleCreate throttle
147 throttleInstances throttle
149 mInstances prometheus.Gauge
150 mInstancesPrice prometheus.Gauge
151 mContainersRunning prometheus.Gauge
152 mVCPUs prometheus.Gauge
153 mVCPUsInuse prometheus.Gauge
154 mMemory prometheus.Gauge
155 mMemoryInuse prometheus.Gauge
158 // Subscribe returns a buffered channel that becomes ready after any
159 // change to the pool's state that could have scheduling implications:
160 // a worker's state changes, a new worker appears, the cloud
161 // provider's API rate limiting period ends, etc.
163 // Additional events that occur while the channel is already ready
164 // will be dropped, so it is OK if the caller services the channel
169 // ch := wp.Subscribe()
170 // defer wp.Unsubscribe(ch)
177 func (wp *Pool) Subscribe() <-chan struct{} {
178 wp.setupOnce.Do(wp.setup)
180 defer wp.mtx.Unlock()
181 ch := make(chan struct{}, 1)
182 wp.subscribers[ch] = ch
186 // Unsubscribe stops sending updates to the given channel.
187 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
188 wp.setupOnce.Do(wp.setup)
190 defer wp.mtx.Unlock()
191 delete(wp.subscribers, ch)
194 // Unallocated returns the number of unallocated (creating + booting +
195 // idle + unknown) workers for each instance type. Workers in
196 // hold/drain mode are not included.
197 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
198 wp.setupOnce.Do(wp.setup)
200 defer wp.mtx.RUnlock()
201 unalloc := map[arvados.InstanceType]int{}
202 creating := map[arvados.InstanceType]int{}
203 for it, times := range wp.creating {
204 creating[it] = len(times)
206 for _, wkr := range wp.workers {
207 if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) || wkr.idleBehavior != IdleBehaviorRun {
212 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) {
213 // If up to N new workers appear in
214 // Instances() while we are waiting for N
215 // Create() calls to complete, we assume we're
216 // just seeing a race between Instances() and
217 // Create() responses.
219 // The other common reason why nodes have
220 // state==Unknown is that they appeared at
221 // startup, before any Create calls. They
222 // don't match the above timing condition, so
223 // we never mistakenly attribute them to
224 // pending Create calls.
228 for it, c := range creating {
234 // Create a new instance with the given type, and add it to the worker
235 // pool. The worker is added immediately; instance creation runs in
238 // Create returns false if a pre-existing error state prevents it from
239 // even attempting to create a new instance. Those errors are logged
240 // by the Pool, so the caller does not need to log anything in such
242 func (wp *Pool) Create(it arvados.InstanceType) bool {
243 logger := wp.logger.WithField("InstanceType", it.Name)
244 wp.setupOnce.Do(wp.setup)
246 defer wp.mtx.Unlock()
247 if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
250 tags := cloud.InstanceTags{
251 tagKeyInstanceType: it.Name,
252 tagKeyIdleBehavior: string(IdleBehaviorRun),
255 wp.creating[it] = append(wp.creating[it], now)
258 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
260 defer wp.mtx.Unlock()
261 // Remove our timestamp marker from wp.creating
262 for i, t := range wp.creating[it] {
264 copy(wp.creating[it][i:], wp.creating[it][i+1:])
265 wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1]
270 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
272 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
273 time.AfterFunc(quotaErrorTTL, wp.notify)
275 logger.WithError(err).Error("create failed")
276 wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
279 wp.updateWorker(inst, it, StateBooting)
284 // AtQuota returns true if Create is not expected to work at the
286 func (wp *Pool) AtQuota() bool {
288 defer wp.mtx.Unlock()
289 return time.Now().Before(wp.atQuotaUntil)
292 // SetIdleBehavior determines how the indicated instance will behave
293 // when it has no containers running.
294 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
296 defer wp.mtx.Unlock()
297 wkr, ok := wp.workers[id]
299 return errors.New("requested instance does not exist")
301 wkr.idleBehavior = idleBehavior
307 // Add or update worker attached to the given instance. Use
308 // initialState if a new worker is created.
310 // The second return value is true if a new worker is created.
312 // Caller must have lock.
313 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
315 if wkr := wp.workers[id]; wkr != nil {
316 wkr.executor.SetTarget(inst)
318 wkr.updated = time.Now()
319 if initialState == StateBooting && wkr.state == StateUnknown {
320 wkr.state = StateBooting
326 // If an instance has a valid IdleBehavior tag when it first
327 // appears, initialize the new worker accordingly (this is how
328 // we restore IdleBehavior that was set by a prior dispatch
329 // process); otherwise, default to "run". After this,
330 // wkr.idleBehavior is the source of truth, and will only be
331 // changed via SetIdleBehavior().
332 idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior])
333 if !validIdleBehavior[idleBehavior] {
334 idleBehavior = IdleBehaviorRun
337 logger := wp.logger.WithFields(logrus.Fields{
338 "InstanceType": it.Name,
341 logger.WithFields(logrus.Fields{
342 "State": initialState,
343 "IdleBehavior": idleBehavior,
344 }).Infof("instance appeared in cloud")
350 executor: wp.newExecutor(inst),
352 idleBehavior: idleBehavior,
359 running: make(map[string]struct{}),
360 starting: make(map[string]struct{}),
361 probing: make(chan struct{}, 1),
367 // caller must have lock.
368 func (wp *Pool) notifyExited(uuid string, t time.Time) {
372 // Shutdown shuts down a worker with the given type, or returns false
373 // if all workers with the given type are busy.
374 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
375 wp.setupOnce.Do(wp.setup)
377 defer wp.mtx.Unlock()
378 logger := wp.logger.WithField("InstanceType", it.Name)
379 logger.Info("shutdown requested")
380 for _, tryState := range []State{StateBooting, StateIdle} {
381 // TODO: shutdown the worker with the longest idle
382 // time (Idle) or the earliest create time (Booting)
383 for _, wkr := range wp.workers {
384 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
385 logger.WithField("Instance", wkr.instance).Info("shutting down")
394 // CountWorkers returns the current number of workers in each state.
395 func (wp *Pool) CountWorkers() map[State]int {
396 wp.setupOnce.Do(wp.setup)
398 defer wp.mtx.Unlock()
400 for _, w := range wp.workers {
406 // Running returns the container UUIDs being prepared/run on workers.
407 func (wp *Pool) Running() map[string]time.Time {
408 wp.setupOnce.Do(wp.setup)
410 defer wp.mtx.Unlock()
411 r := map[string]time.Time{}
412 for _, wkr := range wp.workers {
413 for uuid := range wkr.running {
414 r[uuid] = time.Time{}
416 for uuid := range wkr.starting {
417 r[uuid] = time.Time{}
420 for uuid, exited := range wp.exited {
426 // StartContainer starts a container on an idle worker immediately if
427 // possible, otherwise returns false.
428 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
429 wp.setupOnce.Do(wp.setup)
431 defer wp.mtx.Unlock()
433 for _, w := range wp.workers {
434 if w.instType == it && w.state == StateIdle {
435 if wkr == nil || w.busy.After(wkr.busy) {
443 wkr.startContainer(ctr)
447 // KillContainer kills the crunch-run process for the given container
448 // UUID, if it's running on any worker.
450 // KillContainer returns immediately; the act of killing the container
451 // takes some time, and runs in the background.
452 func (wp *Pool) KillContainer(uuid string) {
454 defer wp.mtx.Unlock()
455 if _, ok := wp.exited[uuid]; ok {
456 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
457 delete(wp.exited, uuid)
460 for _, wkr := range wp.workers {
461 if _, ok := wkr.running[uuid]; ok {
462 go wp.kill(wkr, uuid)
466 wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
469 func (wp *Pool) kill(wkr *worker, uuid string) {
470 logger := wp.logger.WithFields(logrus.Fields{
471 "ContainerUUID": uuid,
472 "Instance": wkr.instance,
474 logger.Debug("killing process")
475 stdout, stderr, err := wkr.executor.Execute(nil, "crunch-run --kill 15 "+uuid, nil)
477 logger.WithFields(logrus.Fields{
478 "stderr": string(stderr),
479 "stdout": string(stdout),
481 }).Warn("kill failed")
484 logger.Debug("killing process succeeded")
486 defer wp.mtx.Unlock()
487 if _, ok := wkr.running[uuid]; ok {
488 delete(wkr.running, uuid)
489 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
490 wkr.state = StateIdle
492 wkr.updated = time.Now()
497 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
499 reg = prometheus.NewRegistry()
501 wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
502 Namespace: "arvados",
503 Subsystem: "dispatchcloud",
504 Name: "instances_total",
505 Help: "Number of cloud VMs including pending, booting, running, held, and shutting down.",
507 reg.MustRegister(wp.mInstances)
508 wp.mInstancesPrice = prometheus.NewGauge(prometheus.GaugeOpts{
509 Namespace: "arvados",
510 Subsystem: "dispatchcloud",
511 Name: "instances_price_total",
512 Help: "Sum of prices of all cloud VMs including pending, booting, running, held, and shutting down.",
514 reg.MustRegister(wp.mInstancesPrice)
515 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
516 Namespace: "arvados",
517 Subsystem: "dispatchcloud",
518 Name: "containers_running",
519 Help: "Number of containers reported running by cloud VMs.",
521 reg.MustRegister(wp.mContainersRunning)
523 wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
524 Namespace: "arvados",
525 Subsystem: "dispatchcloud",
527 Help: "Total VCPUs on all cloud VMs.",
529 reg.MustRegister(wp.mVCPUs)
530 wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
531 Namespace: "arvados",
532 Subsystem: "dispatchcloud",
534 Help: "VCPUs on cloud VMs that are running containers.",
536 reg.MustRegister(wp.mVCPUsInuse)
537 wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
538 Namespace: "arvados",
539 Subsystem: "dispatchcloud",
540 Name: "memory_bytes_total",
541 Help: "Total memory on all cloud VMs.",
543 reg.MustRegister(wp.mMemory)
544 wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
545 Namespace: "arvados",
546 Subsystem: "dispatchcloud",
547 Name: "memory_bytes_inuse",
548 Help: "Memory on cloud VMs that are running containers.",
550 reg.MustRegister(wp.mMemoryInuse)
553 func (wp *Pool) runMetrics() {
555 defer wp.Unsubscribe(ch)
561 func (wp *Pool) updateMetrics() {
563 defer wp.mtx.RUnlock()
566 var alloc, cpu, cpuInuse, mem, memInuse int64
567 for _, wkr := range wp.workers {
568 price += wkr.instType.Price
569 cpu += int64(wkr.instType.VCPUs)
570 mem += int64(wkr.instType.RAM)
571 if len(wkr.running)+len(wkr.starting) == 0 {
574 alloc += int64(len(wkr.running) + len(wkr.starting))
575 cpuInuse += int64(wkr.instType.VCPUs)
576 memInuse += int64(wkr.instType.RAM)
578 wp.mInstances.Set(float64(len(wp.workers)))
579 wp.mInstancesPrice.Set(price)
580 wp.mContainersRunning.Set(float64(alloc))
581 wp.mVCPUs.Set(float64(cpu))
582 wp.mMemory.Set(float64(mem))
583 wp.mVCPUsInuse.Set(float64(cpuInuse))
584 wp.mMemoryInuse.Set(float64(memInuse))
587 func (wp *Pool) runProbes() {
588 maxPPS := wp.maxProbesPerSecond
590 maxPPS = defaultMaxProbesPerSecond
592 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
593 defer limitticker.Stop()
595 probeticker := time.NewTicker(wp.probeInterval)
596 defer probeticker.Stop()
598 workers := []cloud.InstanceID{}
599 for range probeticker.C {
600 workers = workers[:0]
602 for id, wkr := range wp.workers {
603 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
606 workers = append(workers, id)
610 for _, id := range workers {
612 wkr, ok := wp.workers[id]
615 // Deleted while we were probing
619 go wkr.ProbeAndUpdate()
623 case <-limitticker.C:
629 func (wp *Pool) runSync() {
630 // sync once immediately, then wait syncInterval, sync again,
632 timer := time.NewTimer(1)
636 err := wp.getInstancesAndSync()
638 wp.logger.WithError(err).Warn("sync failed")
640 timer.Reset(wp.syncInterval)
642 wp.logger.Debug("worker.Pool stopped")
648 // Stop synchronizing with the InstanceSet.
649 func (wp *Pool) Stop() {
650 wp.setupOnce.Do(wp.setup)
654 // Instances returns an InstanceView for each worker in the pool,
655 // summarizing its current state and recent activity.
656 func (wp *Pool) Instances() []InstanceView {
658 wp.setupOnce.Do(wp.setup)
660 for _, w := range wp.workers {
661 r = append(r, InstanceView{
662 Instance: w.instance.ID(),
663 Price: w.instType.Price,
664 ArvadosInstanceType: w.instType.Name,
665 ProviderInstanceType: w.instType.ProviderType,
666 LastContainerUUID: w.lastUUID,
668 WorkerState: w.state.String(),
669 IdleBehavior: w.idleBehavior,
673 sort.Slice(r, func(i, j int) bool {
674 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
679 func (wp *Pool) setup() {
680 wp.creating = map[arvados.InstanceType][]time.Time{}
681 wp.exited = map[string]time.Time{}
682 wp.workers = map[cloud.InstanceID]*worker{}
683 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
686 func (wp *Pool) notify() {
688 defer wp.mtx.RUnlock()
689 for _, send := range wp.subscribers {
691 case send <- struct{}{}:
697 func (wp *Pool) getInstancesAndSync() error {
698 wp.setupOnce.Do(wp.setup)
699 if err := wp.instanceSet.throttleInstances.Error(); err != nil {
702 wp.logger.Debug("getting instance list")
703 threshold := time.Now()
704 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
706 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
709 wp.sync(threshold, instances)
710 wp.logger.Debug("sync done")
714 // Add/remove/update workers based on instances, which was obtained
715 // from the instanceSet. However, don't clobber any other updates that
716 // already happened after threshold.
717 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
719 defer wp.mtx.Unlock()
720 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
723 for _, inst := range instances {
724 itTag := inst.Tags()[tagKeyInstanceType]
725 it, ok := wp.instanceTypes[itTag]
727 wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
730 if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew {
732 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
733 wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
738 for id, wkr := range wp.workers {
739 if wkr.updated.After(threshold) {
742 logger := wp.logger.WithFields(logrus.Fields{
743 "Instance": wkr.instance,
744 "WorkerState": wkr.state,
746 logger.Info("instance disappeared in cloud")
747 delete(wp.workers, id)
748 go wkr.executor.Close()
754 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")