1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
15 "git.curoverse.com/arvados.git/lib/cloud"
16 "git.curoverse.com/arvados.git/sdk/go/arvados"
17 "github.com/prometheus/client_golang/prometheus"
18 "github.com/sirupsen/logrus"
22 tagKeyInstanceType = "InstanceType"
23 tagKeyIdleBehavior = "IdleBehavior"
26 // An InstanceView shows a worker's current state and recent activity.
27 type InstanceView struct {
28 Instance cloud.InstanceID `json:"instance"`
29 Price float64 `json:"price"`
30 ArvadosInstanceType string `json:"arvados_instance_type"`
31 ProviderInstanceType string `json:"provider_instance_type"`
32 LastContainerUUID string `json:"last_container_uuid"`
33 LastBusy time.Time `json:"last_busy"`
34 WorkerState string `json:"worker_state"`
35 IdleBehavior IdleBehavior `json:"idle_behavior"`
38 // An Executor executes shell commands on a remote host.
39 type Executor interface {
40 // Run cmd on the current target.
41 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
43 // Use the given target for subsequent operations. The new
44 // target is the same host as the previous target, but it
45 // might return a different address and verify a different
48 // SetTarget is called frequently, and in most cases the new
49 // target will behave exactly the same as the old one. An
50 // implementation should optimize accordingly.
52 // SetTarget must not block on concurrent Execute calls.
53 SetTarget(cloud.ExecutorTarget)
59 defaultSyncInterval = time.Minute
60 defaultProbeInterval = time.Second * 10
61 defaultMaxProbesPerSecond = 10
62 defaultTimeoutIdle = time.Minute
63 defaultTimeoutBooting = time.Minute * 10
64 defaultTimeoutProbe = time.Minute * 10
65 defaultTimeoutShutdown = time.Second * 10
67 // Time after a quota error to try again anyway, even if no
68 // instances have been shutdown.
69 quotaErrorTTL = time.Minute
71 // Time between "X failed because rate limiting" messages
72 logRateLimitErrorInterval = time.Second * 10
75 func duration(conf arvados.Duration, def time.Duration) time.Duration {
77 return time.Duration(conf)
83 // NewPool creates a Pool of workers backed by instanceSet.
85 // New instances are configured and set up according to the given
86 // cluster configuration.
87 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
91 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
92 newExecutor: newExecutor,
93 bootProbeCommand: cluster.CloudVMs.BootProbeCommand,
94 imageID: cloud.ImageID(cluster.CloudVMs.ImageID),
95 instanceTypes: cluster.InstanceTypes,
96 maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
97 probeInterval: duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
98 syncInterval: duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
99 timeoutIdle: duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
100 timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
101 timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
102 timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
103 stop: make(chan bool),
105 wp.registerMetrics(reg)
107 wp.setupOnce.Do(wp.setup)
115 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
116 // zero Pool should not be used. Call NewPool to create a new Pool.
119 logger logrus.FieldLogger
120 arvClient *arvados.Client
121 instanceSet *throttledInstanceSet
122 newExecutor func(cloud.Instance) Executor
123 bootProbeCommand string
124 imageID cloud.ImageID
125 instanceTypes map[string]arvados.InstanceType
126 syncInterval time.Duration
127 probeInterval time.Duration
128 maxProbesPerSecond int
129 timeoutIdle time.Duration
130 timeoutBooting time.Duration
131 timeoutProbe time.Duration
132 timeoutShutdown time.Duration
135 subscribers map[<-chan struct{}]chan<- struct{}
136 creating map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls
137 workers map[cloud.InstanceID]*worker
138 loaded bool // loaded list of instances from InstanceSet at least once
139 exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
140 atQuotaUntil time.Time
141 atQuotaErr cloud.QuotaError
146 throttleCreate throttle
147 throttleInstances throttle
149 mInstances prometheus.Gauge
150 mInstancesPrice prometheus.Gauge
151 mContainersRunning prometheus.Gauge
152 mVCPUs prometheus.Gauge
153 mVCPUsInuse prometheus.Gauge
154 mMemory prometheus.Gauge
155 mMemoryInuse prometheus.Gauge
158 // Subscribe returns a buffered channel that becomes ready after any
159 // change to the pool's state that could have scheduling implications:
160 // a worker's state changes, a new worker appears, the cloud
161 // provider's API rate limiting period ends, etc.
163 // Additional events that occur while the channel is already ready
164 // will be dropped, so it is OK if the caller services the channel
169 // ch := wp.Subscribe()
170 // defer wp.Unsubscribe(ch)
177 func (wp *Pool) Subscribe() <-chan struct{} {
178 wp.setupOnce.Do(wp.setup)
180 defer wp.mtx.Unlock()
181 ch := make(chan struct{}, 1)
182 wp.subscribers[ch] = ch
186 // Unsubscribe stops sending updates to the given channel.
187 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
188 wp.setupOnce.Do(wp.setup)
190 defer wp.mtx.Unlock()
191 delete(wp.subscribers, ch)
194 // Unallocated returns the number of unallocated (creating + booting +
195 // idle + unknown) workers for each instance type. Workers in
196 // hold/drain mode are not included.
197 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
198 wp.setupOnce.Do(wp.setup)
200 defer wp.mtx.RUnlock()
201 unalloc := map[arvados.InstanceType]int{}
202 creating := map[arvados.InstanceType]int{}
203 for it, times := range wp.creating {
204 creating[it] = len(times)
206 for _, wkr := range wp.workers {
207 if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) || wkr.idleBehavior != IdleBehaviorRun {
212 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) {
213 // If up to N new workers appear in
214 // Instances() while we are waiting for N
215 // Create() calls to complete, we assume we're
216 // just seeing a race between Instances() and
217 // Create() responses.
219 // The other common reason why nodes have
220 // state==Unknown is that they appeared at
221 // startup, before any Create calls. They
222 // don't match the above timing condition, so
223 // we never mistakenly attribute them to
224 // pending Create calls.
228 for it, c := range creating {
234 // Create a new instance with the given type, and add it to the worker
235 // pool. The worker is added immediately; instance creation runs in
238 // Create returns false if a pre-existing error state prevents it from
239 // even attempting to create a new instance. Those errors are logged
240 // by the Pool, so the caller does not need to log anything in such
242 func (wp *Pool) Create(it arvados.InstanceType) bool {
243 logger := wp.logger.WithField("InstanceType", it.Name)
244 wp.setupOnce.Do(wp.setup)
246 defer wp.mtx.Unlock()
247 if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
250 tags := cloud.InstanceTags{
251 tagKeyInstanceType: it.Name,
252 tagKeyIdleBehavior: string(IdleBehaviorRun),
255 wp.creating[it] = append(wp.creating[it], now)
258 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
260 defer wp.mtx.Unlock()
261 // Remove our timestamp marker from wp.creating
262 for i, t := range wp.creating[it] {
264 copy(wp.creating[it][i:], wp.creating[it][i+1:])
265 wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1]
270 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
272 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
273 time.AfterFunc(quotaErrorTTL, wp.notify)
275 logger.WithError(err).Error("create failed")
276 wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
279 wp.updateWorker(inst, it, StateBooting)
284 // AtQuota returns true if Create is not expected to work at the
286 func (wp *Pool) AtQuota() bool {
288 defer wp.mtx.Unlock()
289 return time.Now().Before(wp.atQuotaUntil)
292 // SetIdleBehavior determines how the indicated instance will behave
293 // when it has no containers running.
294 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
296 defer wp.mtx.Unlock()
297 wkr, ok := wp.workers[id]
299 return errors.New("requested instance does not exist")
301 wkr.idleBehavior = idleBehavior
307 // Add or update worker attached to the given instance. Use
308 // initialState if a new worker is created.
310 // The second return value is true if a new worker is created.
312 // Caller must have lock.
313 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
315 if wkr := wp.workers[id]; wkr != nil {
316 wkr.executor.SetTarget(inst)
318 wkr.updated = time.Now()
319 if initialState == StateBooting && wkr.state == StateUnknown {
320 wkr.state = StateBooting
326 // If an instance has a valid IdleBehavior tag when it first
327 // appears, initialize the new worker accordingly (this is how
328 // we restore IdleBehavior that was set by a prior dispatch
329 // process); otherwise, default to "run". After this,
330 // wkr.idleBehavior is the source of truth, and will only be
331 // changed via SetIdleBehavior().
332 idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior])
333 if !validIdleBehavior[idleBehavior] {
334 idleBehavior = IdleBehaviorRun
337 logger := wp.logger.WithFields(logrus.Fields{
338 "InstanceType": it.Name,
341 logger.WithFields(logrus.Fields{
342 "State": initialState,
343 "IdleBehavior": idleBehavior,
344 }).Infof("instance appeared in cloud")
350 executor: wp.newExecutor(inst),
352 idleBehavior: idleBehavior,
359 running: make(map[string]struct{}),
360 starting: make(map[string]struct{}),
361 probing: make(chan struct{}, 1),
367 // caller must have lock.
368 func (wp *Pool) notifyExited(uuid string, t time.Time) {
372 // Shutdown shuts down a worker with the given type, or returns false
373 // if all workers with the given type are busy.
374 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
375 wp.setupOnce.Do(wp.setup)
377 defer wp.mtx.Unlock()
378 logger := wp.logger.WithField("InstanceType", it.Name)
379 logger.Info("shutdown requested")
380 for _, tryState := range []State{StateBooting, StateIdle} {
381 // TODO: shutdown the worker with the longest idle
382 // time (Idle) or the earliest create time (Booting)
383 for _, wkr := range wp.workers {
384 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
385 logger.WithField("Instance", wkr.instance).Info("shutting down")
394 // CountWorkers returns the current number of workers in each state.
395 func (wp *Pool) CountWorkers() map[State]int {
396 wp.setupOnce.Do(wp.setup)
398 defer wp.mtx.Unlock()
400 for _, w := range wp.workers {
406 // Running returns the container UUIDs being prepared/run on workers.
408 // In the returned map, the time value indicates when the Pool
409 // observed that the container process had exited. A container that
410 // has not yet exited has a zero time value. The caller should use
411 // KillContainer() to garbage-collect the entries for exited
413 func (wp *Pool) Running() map[string]time.Time {
414 wp.setupOnce.Do(wp.setup)
416 defer wp.mtx.Unlock()
417 r := map[string]time.Time{}
418 for _, wkr := range wp.workers {
419 for uuid := range wkr.running {
420 r[uuid] = time.Time{}
422 for uuid := range wkr.starting {
423 r[uuid] = time.Time{}
426 for uuid, exited := range wp.exited {
432 // StartContainer starts a container on an idle worker immediately if
433 // possible, otherwise returns false.
434 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
435 wp.setupOnce.Do(wp.setup)
437 defer wp.mtx.Unlock()
439 for _, w := range wp.workers {
440 if w.instType == it && w.state == StateIdle {
441 if wkr == nil || w.busy.After(wkr.busy) {
449 wkr.startContainer(ctr)
453 // KillContainer kills the crunch-run process for the given container
454 // UUID, if it's running on any worker.
456 // KillContainer returns immediately; the act of killing the container
457 // takes some time, and runs in the background.
458 func (wp *Pool) KillContainer(uuid string) {
460 defer wp.mtx.Unlock()
461 if _, ok := wp.exited[uuid]; ok {
462 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
463 delete(wp.exited, uuid)
466 for _, wkr := range wp.workers {
467 if _, ok := wkr.running[uuid]; ok {
468 go wp.kill(wkr, uuid)
472 wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
475 func (wp *Pool) kill(wkr *worker, uuid string) {
476 logger := wp.logger.WithFields(logrus.Fields{
477 "ContainerUUID": uuid,
478 "Instance": wkr.instance,
480 logger.Debug("killing process")
481 stdout, stderr, err := wkr.executor.Execute(nil, "crunch-run --kill 15 "+uuid, nil)
483 logger.WithFields(logrus.Fields{
484 "stderr": string(stderr),
485 "stdout": string(stdout),
487 }).Warn("kill failed")
490 logger.Debug("killing process succeeded")
492 defer wp.mtx.Unlock()
493 if _, ok := wkr.running[uuid]; ok {
494 delete(wkr.running, uuid)
495 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
496 wkr.state = StateIdle
498 wkr.updated = time.Now()
503 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
505 reg = prometheus.NewRegistry()
507 wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
508 Namespace: "arvados",
509 Subsystem: "dispatchcloud",
510 Name: "instances_total",
511 Help: "Number of cloud VMs including pending, booting, running, held, and shutting down.",
513 reg.MustRegister(wp.mInstances)
514 wp.mInstancesPrice = prometheus.NewGauge(prometheus.GaugeOpts{
515 Namespace: "arvados",
516 Subsystem: "dispatchcloud",
517 Name: "instances_price_total",
518 Help: "Sum of prices of all cloud VMs including pending, booting, running, held, and shutting down.",
520 reg.MustRegister(wp.mInstancesPrice)
521 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
522 Namespace: "arvados",
523 Subsystem: "dispatchcloud",
524 Name: "containers_running",
525 Help: "Number of containers reported running by cloud VMs.",
527 reg.MustRegister(wp.mContainersRunning)
529 wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
530 Namespace: "arvados",
531 Subsystem: "dispatchcloud",
533 Help: "Total VCPUs on all cloud VMs.",
535 reg.MustRegister(wp.mVCPUs)
536 wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
537 Namespace: "arvados",
538 Subsystem: "dispatchcloud",
540 Help: "VCPUs on cloud VMs that are running containers.",
542 reg.MustRegister(wp.mVCPUsInuse)
543 wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
544 Namespace: "arvados",
545 Subsystem: "dispatchcloud",
546 Name: "memory_bytes_total",
547 Help: "Total memory on all cloud VMs.",
549 reg.MustRegister(wp.mMemory)
550 wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
551 Namespace: "arvados",
552 Subsystem: "dispatchcloud",
553 Name: "memory_bytes_inuse",
554 Help: "Memory on cloud VMs that are running containers.",
556 reg.MustRegister(wp.mMemoryInuse)
559 func (wp *Pool) runMetrics() {
561 defer wp.Unsubscribe(ch)
567 func (wp *Pool) updateMetrics() {
569 defer wp.mtx.RUnlock()
572 var alloc, cpu, cpuInuse, mem, memInuse int64
573 for _, wkr := range wp.workers {
574 price += wkr.instType.Price
575 cpu += int64(wkr.instType.VCPUs)
576 mem += int64(wkr.instType.RAM)
577 if len(wkr.running)+len(wkr.starting) == 0 {
580 alloc += int64(len(wkr.running) + len(wkr.starting))
581 cpuInuse += int64(wkr.instType.VCPUs)
582 memInuse += int64(wkr.instType.RAM)
584 wp.mInstances.Set(float64(len(wp.workers)))
585 wp.mInstancesPrice.Set(price)
586 wp.mContainersRunning.Set(float64(alloc))
587 wp.mVCPUs.Set(float64(cpu))
588 wp.mMemory.Set(float64(mem))
589 wp.mVCPUsInuse.Set(float64(cpuInuse))
590 wp.mMemoryInuse.Set(float64(memInuse))
593 func (wp *Pool) runProbes() {
594 maxPPS := wp.maxProbesPerSecond
596 maxPPS = defaultMaxProbesPerSecond
598 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
599 defer limitticker.Stop()
601 probeticker := time.NewTicker(wp.probeInterval)
602 defer probeticker.Stop()
604 workers := []cloud.InstanceID{}
605 for range probeticker.C {
606 workers = workers[:0]
608 for id, wkr := range wp.workers {
609 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
612 workers = append(workers, id)
616 for _, id := range workers {
618 wkr, ok := wp.workers[id]
621 // Deleted while we were probing
625 go wkr.ProbeAndUpdate()
629 case <-limitticker.C:
635 func (wp *Pool) runSync() {
636 // sync once immediately, then wait syncInterval, sync again,
638 timer := time.NewTimer(1)
642 err := wp.getInstancesAndSync()
644 wp.logger.WithError(err).Warn("sync failed")
646 timer.Reset(wp.syncInterval)
648 wp.logger.Debug("worker.Pool stopped")
654 // Stop synchronizing with the InstanceSet.
655 func (wp *Pool) Stop() {
656 wp.setupOnce.Do(wp.setup)
660 // Instances returns an InstanceView for each worker in the pool,
661 // summarizing its current state and recent activity.
662 func (wp *Pool) Instances() []InstanceView {
664 wp.setupOnce.Do(wp.setup)
666 for _, w := range wp.workers {
667 r = append(r, InstanceView{
668 Instance: w.instance.ID(),
669 Price: w.instType.Price,
670 ArvadosInstanceType: w.instType.Name,
671 ProviderInstanceType: w.instType.ProviderType,
672 LastContainerUUID: w.lastUUID,
674 WorkerState: w.state.String(),
675 IdleBehavior: w.idleBehavior,
679 sort.Slice(r, func(i, j int) bool {
680 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
685 func (wp *Pool) setup() {
686 wp.creating = map[arvados.InstanceType][]time.Time{}
687 wp.exited = map[string]time.Time{}
688 wp.workers = map[cloud.InstanceID]*worker{}
689 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
692 func (wp *Pool) notify() {
694 defer wp.mtx.RUnlock()
695 for _, send := range wp.subscribers {
697 case send <- struct{}{}:
703 func (wp *Pool) getInstancesAndSync() error {
704 wp.setupOnce.Do(wp.setup)
705 if err := wp.instanceSet.throttleInstances.Error(); err != nil {
708 wp.logger.Debug("getting instance list")
709 threshold := time.Now()
710 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
712 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
715 wp.sync(threshold, instances)
716 wp.logger.Debug("sync done")
720 // Add/remove/update workers based on instances, which was obtained
721 // from the instanceSet. However, don't clobber any other updates that
722 // already happened after threshold.
723 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
725 defer wp.mtx.Unlock()
726 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
729 for _, inst := range instances {
730 itTag := inst.Tags()[tagKeyInstanceType]
731 it, ok := wp.instanceTypes[itTag]
733 wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
736 if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew {
738 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
739 wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
744 for id, wkr := range wp.workers {
745 if wkr.updated.After(threshold) {
748 logger := wp.logger.WithFields(logrus.Fields{
749 "Instance": wkr.instance,
750 "WorkerState": wkr.state,
752 logger.Info("instance disappeared in cloud")
753 delete(wp.workers, id)
754 go wkr.executor.Close()
760 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")