1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
15 "git.curoverse.com/arvados.git/lib/cloud"
16 "git.curoverse.com/arvados.git/sdk/go/arvados"
17 "github.com/prometheus/client_golang/prometheus"
18 "github.com/sirupsen/logrus"
22 tagKeyInstanceType = "InstanceType"
23 tagKeyIdleBehavior = "IdleBehavior"
26 // An InstanceView shows a worker's current state and recent activity.
27 type InstanceView struct {
28 Instance cloud.InstanceID `json:"instance"`
29 Price float64 `json:"price"`
30 ArvadosInstanceType string `json:"arvados_instance_type"`
31 ProviderInstanceType string `json:"provider_instance_type"`
32 LastContainerUUID string `json:"last_container_uuid"`
33 LastBusy time.Time `json:"last_busy"`
34 WorkerState string `json:"worker_state"`
35 IdleBehavior IdleBehavior `json:"idle_behavior"`
38 // An Executor executes shell commands on a remote host.
39 type Executor interface {
40 // Run cmd on the current target.
41 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
43 // Use the given target for subsequent operations. The new
44 // target is the same host as the previous target, but it
45 // might return a different address and verify a different
48 // SetTarget is called frequently, and in most cases the new
49 // target will behave exactly the same as the old one. An
50 // implementation should optimize accordingly.
52 // SetTarget must not block on concurrent Execute calls.
53 SetTarget(cloud.ExecutorTarget)
59 defaultSyncInterval = time.Minute
60 defaultProbeInterval = time.Second * 10
61 defaultMaxProbesPerSecond = 10
62 defaultTimeoutIdle = time.Minute
63 defaultTimeoutBooting = time.Minute * 10
64 defaultTimeoutProbe = time.Minute * 10
65 defaultTimeoutShutdown = time.Second * 10
67 // Time after a quota error to try again anyway, even if no
68 // instances have been shutdown.
69 quotaErrorTTL = time.Minute
71 // Time between "X failed because rate limiting" messages
72 logRateLimitErrorInterval = time.Second * 10
75 func duration(conf arvados.Duration, def time.Duration) time.Duration {
77 return time.Duration(conf)
83 // NewPool creates a Pool of workers backed by instanceSet.
85 // New instances are configured and set up according to the given
86 // cluster configuration.
87 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
91 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
92 newExecutor: newExecutor,
93 bootProbeCommand: cluster.CloudVMs.BootProbeCommand,
94 imageID: cloud.ImageID(cluster.CloudVMs.ImageID),
95 instanceTypes: cluster.InstanceTypes,
96 maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
97 probeInterval: duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
98 syncInterval: duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
99 timeoutIdle: duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
100 timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
101 timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
102 timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
103 stop: make(chan bool),
105 wp.registerMetrics(reg)
107 wp.setupOnce.Do(wp.setup)
115 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
116 // zero Pool should not be used. Call NewPool to create a new Pool.
119 logger logrus.FieldLogger
120 arvClient *arvados.Client
121 instanceSet *throttledInstanceSet
122 newExecutor func(cloud.Instance) Executor
123 bootProbeCommand string
124 imageID cloud.ImageID
125 instanceTypes map[string]arvados.InstanceType
126 syncInterval time.Duration
127 probeInterval time.Duration
128 maxProbesPerSecond int
129 timeoutIdle time.Duration
130 timeoutBooting time.Duration
131 timeoutProbe time.Duration
132 timeoutShutdown time.Duration
135 subscribers map[<-chan struct{}]chan<- struct{}
136 creating map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls
137 workers map[cloud.InstanceID]*worker
138 loaded bool // loaded list of instances from InstanceSet at least once
139 exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
140 atQuotaUntil time.Time
141 atQuotaErr cloud.QuotaError
146 throttleCreate throttle
147 throttleInstances throttle
149 mInstances prometheus.Gauge
150 mInstancesPrice prometheus.Gauge
151 mContainersRunning prometheus.Gauge
152 mVCPUs prometheus.Gauge
153 mVCPUsInuse prometheus.Gauge
154 mMemory prometheus.Gauge
155 mMemoryInuse prometheus.Gauge
158 // Subscribe returns a buffered channel that becomes ready after any
159 // change to the pool's state that could have scheduling implications:
160 // a worker's state changes, a new worker appears, the cloud
161 // provider's API rate limiting period ends, etc.
163 // Additional events that occur while the channel is already ready
164 // will be dropped, so it is OK if the caller services the channel
169 // ch := wp.Subscribe()
170 // defer wp.Unsubscribe(ch)
177 func (wp *Pool) Subscribe() <-chan struct{} {
178 wp.setupOnce.Do(wp.setup)
180 defer wp.mtx.Unlock()
181 ch := make(chan struct{}, 1)
182 wp.subscribers[ch] = ch
186 // Unsubscribe stops sending updates to the given channel.
187 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
188 wp.setupOnce.Do(wp.setup)
190 defer wp.mtx.Unlock()
191 delete(wp.subscribers, ch)
194 // Unallocated returns the number of unallocated (creating + booting +
195 // idle + unknown) workers for each instance type. Workers in
196 // hold/drain mode are not included.
197 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
198 wp.setupOnce.Do(wp.setup)
200 defer wp.mtx.RUnlock()
201 unalloc := map[arvados.InstanceType]int{}
202 creating := map[arvados.InstanceType]int{}
203 for it, times := range wp.creating {
204 creating[it] = len(times)
206 for _, wkr := range wp.workers {
207 // Skip workers that are not expected to become
208 // available soon. Note len(wkr.running)>0 is not
209 // redundant here: it can be true even in
211 if wkr.state == StateShutdown ||
212 wkr.state == StateRunning ||
213 wkr.idleBehavior != IdleBehaviorRun ||
214 len(wkr.running) > 0 {
219 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) {
220 // If up to N new workers appear in
221 // Instances() while we are waiting for N
222 // Create() calls to complete, we assume we're
223 // just seeing a race between Instances() and
224 // Create() responses.
226 // The other common reason why nodes have
227 // state==Unknown is that they appeared at
228 // startup, before any Create calls. They
229 // don't match the above timing condition, so
230 // we never mistakenly attribute them to
231 // pending Create calls.
235 for it, c := range creating {
241 // Create a new instance with the given type, and add it to the worker
242 // pool. The worker is added immediately; instance creation runs in
245 // Create returns false if a pre-existing error state prevents it from
246 // even attempting to create a new instance. Those errors are logged
247 // by the Pool, so the caller does not need to log anything in such
249 func (wp *Pool) Create(it arvados.InstanceType) bool {
250 logger := wp.logger.WithField("InstanceType", it.Name)
251 wp.setupOnce.Do(wp.setup)
253 defer wp.mtx.Unlock()
254 if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
257 tags := cloud.InstanceTags{
258 tagKeyInstanceType: it.Name,
259 tagKeyIdleBehavior: string(IdleBehaviorRun),
262 wp.creating[it] = append(wp.creating[it], now)
265 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
267 defer wp.mtx.Unlock()
268 // Remove our timestamp marker from wp.creating
269 for i, t := range wp.creating[it] {
271 copy(wp.creating[it][i:], wp.creating[it][i+1:])
272 wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1]
277 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
279 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
280 time.AfterFunc(quotaErrorTTL, wp.notify)
282 logger.WithError(err).Error("create failed")
283 wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
286 wp.updateWorker(inst, it, StateBooting)
291 // AtQuota returns true if Create is not expected to work at the
293 func (wp *Pool) AtQuota() bool {
295 defer wp.mtx.Unlock()
296 return time.Now().Before(wp.atQuotaUntil)
299 // SetIdleBehavior determines how the indicated instance will behave
300 // when it has no containers running.
301 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
303 defer wp.mtx.Unlock()
304 wkr, ok := wp.workers[id]
306 return errors.New("requested instance does not exist")
308 wkr.idleBehavior = idleBehavior
314 // Add or update worker attached to the given instance. Use
315 // initialState if a new worker is created.
317 // The second return value is true if a new worker is created.
319 // Caller must have lock.
320 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
322 if wkr := wp.workers[id]; wkr != nil {
323 wkr.executor.SetTarget(inst)
325 wkr.updated = time.Now()
326 if initialState == StateBooting && wkr.state == StateUnknown {
327 wkr.state = StateBooting
333 // If an instance has a valid IdleBehavior tag when it first
334 // appears, initialize the new worker accordingly (this is how
335 // we restore IdleBehavior that was set by a prior dispatch
336 // process); otherwise, default to "run". After this,
337 // wkr.idleBehavior is the source of truth, and will only be
338 // changed via SetIdleBehavior().
339 idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior])
340 if !validIdleBehavior[idleBehavior] {
341 idleBehavior = IdleBehaviorRun
344 logger := wp.logger.WithFields(logrus.Fields{
345 "InstanceType": it.Name,
348 logger.WithFields(logrus.Fields{
349 "State": initialState,
350 "IdleBehavior": idleBehavior,
351 }).Infof("instance appeared in cloud")
357 executor: wp.newExecutor(inst),
359 idleBehavior: idleBehavior,
366 running: make(map[string]struct{}),
367 starting: make(map[string]struct{}),
368 probing: make(chan struct{}, 1),
374 // caller must have lock.
375 func (wp *Pool) notifyExited(uuid string, t time.Time) {
379 // Shutdown shuts down a worker with the given type, or returns false
380 // if all workers with the given type are busy.
381 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
382 wp.setupOnce.Do(wp.setup)
384 defer wp.mtx.Unlock()
385 logger := wp.logger.WithField("InstanceType", it.Name)
386 logger.Info("shutdown requested")
387 for _, tryState := range []State{StateBooting, StateIdle} {
388 // TODO: shutdown the worker with the longest idle
389 // time (Idle) or the earliest create time (Booting)
390 for _, wkr := range wp.workers {
391 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
392 logger.WithField("Instance", wkr.instance).Info("shutting down")
401 // CountWorkers returns the current number of workers in each state.
402 func (wp *Pool) CountWorkers() map[State]int {
403 wp.setupOnce.Do(wp.setup)
405 defer wp.mtx.Unlock()
407 for _, w := range wp.workers {
413 // Running returns the container UUIDs being prepared/run on workers.
415 // In the returned map, the time value indicates when the Pool
416 // observed that the container process had exited. A container that
417 // has not yet exited has a zero time value. The caller should use
418 // KillContainer() to garbage-collect the entries for exited
420 func (wp *Pool) Running() map[string]time.Time {
421 wp.setupOnce.Do(wp.setup)
423 defer wp.mtx.Unlock()
424 r := map[string]time.Time{}
425 for _, wkr := range wp.workers {
426 for uuid := range wkr.running {
427 r[uuid] = time.Time{}
429 for uuid := range wkr.starting {
430 r[uuid] = time.Time{}
433 for uuid, exited := range wp.exited {
439 // StartContainer starts a container on an idle worker immediately if
440 // possible, otherwise returns false.
441 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
442 wp.setupOnce.Do(wp.setup)
444 defer wp.mtx.Unlock()
446 for _, w := range wp.workers {
447 if w.instType == it && w.state == StateIdle {
448 if wkr == nil || w.busy.After(wkr.busy) {
456 wkr.startContainer(ctr)
460 // KillContainer kills the crunch-run process for the given container
461 // UUID, if it's running on any worker.
463 // KillContainer returns immediately; the act of killing the container
464 // takes some time, and runs in the background.
465 func (wp *Pool) KillContainer(uuid string) {
467 defer wp.mtx.Unlock()
468 if _, ok := wp.exited[uuid]; ok {
469 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
470 delete(wp.exited, uuid)
473 for _, wkr := range wp.workers {
474 if _, ok := wkr.running[uuid]; ok {
475 go wp.kill(wkr, uuid)
479 wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
482 func (wp *Pool) kill(wkr *worker, uuid string) {
483 logger := wp.logger.WithFields(logrus.Fields{
484 "ContainerUUID": uuid,
485 "Instance": wkr.instance,
487 logger.Debug("killing process")
488 stdout, stderr, err := wkr.executor.Execute(nil, "crunch-run --kill 15 "+uuid, nil)
490 logger.WithFields(logrus.Fields{
491 "stderr": string(stderr),
492 "stdout": string(stdout),
494 }).Warn("kill failed")
497 logger.Debug("killing process succeeded")
499 defer wp.mtx.Unlock()
500 if _, ok := wkr.running[uuid]; ok {
501 delete(wkr.running, uuid)
502 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
503 wkr.state = StateIdle
505 wkr.updated = time.Now()
510 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
512 reg = prometheus.NewRegistry()
514 wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
515 Namespace: "arvados",
516 Subsystem: "dispatchcloud",
517 Name: "instances_total",
518 Help: "Number of cloud VMs including pending, booting, running, held, and shutting down.",
520 reg.MustRegister(wp.mInstances)
521 wp.mInstancesPrice = prometheus.NewGauge(prometheus.GaugeOpts{
522 Namespace: "arvados",
523 Subsystem: "dispatchcloud",
524 Name: "instances_price_total",
525 Help: "Sum of prices of all cloud VMs including pending, booting, running, held, and shutting down.",
527 reg.MustRegister(wp.mInstancesPrice)
528 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
529 Namespace: "arvados",
530 Subsystem: "dispatchcloud",
531 Name: "containers_running",
532 Help: "Number of containers reported running by cloud VMs.",
534 reg.MustRegister(wp.mContainersRunning)
536 wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
537 Namespace: "arvados",
538 Subsystem: "dispatchcloud",
540 Help: "Total VCPUs on all cloud VMs.",
542 reg.MustRegister(wp.mVCPUs)
543 wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
544 Namespace: "arvados",
545 Subsystem: "dispatchcloud",
547 Help: "VCPUs on cloud VMs that are running containers.",
549 reg.MustRegister(wp.mVCPUsInuse)
550 wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
551 Namespace: "arvados",
552 Subsystem: "dispatchcloud",
553 Name: "memory_bytes_total",
554 Help: "Total memory on all cloud VMs.",
556 reg.MustRegister(wp.mMemory)
557 wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
558 Namespace: "arvados",
559 Subsystem: "dispatchcloud",
560 Name: "memory_bytes_inuse",
561 Help: "Memory on cloud VMs that are running containers.",
563 reg.MustRegister(wp.mMemoryInuse)
566 func (wp *Pool) runMetrics() {
568 defer wp.Unsubscribe(ch)
574 func (wp *Pool) updateMetrics() {
576 defer wp.mtx.RUnlock()
579 var alloc, cpu, cpuInuse, mem, memInuse int64
580 for _, wkr := range wp.workers {
581 price += wkr.instType.Price
582 cpu += int64(wkr.instType.VCPUs)
583 mem += int64(wkr.instType.RAM)
584 if len(wkr.running)+len(wkr.starting) == 0 {
587 alloc += int64(len(wkr.running) + len(wkr.starting))
588 cpuInuse += int64(wkr.instType.VCPUs)
589 memInuse += int64(wkr.instType.RAM)
591 wp.mInstances.Set(float64(len(wp.workers)))
592 wp.mInstancesPrice.Set(price)
593 wp.mContainersRunning.Set(float64(alloc))
594 wp.mVCPUs.Set(float64(cpu))
595 wp.mMemory.Set(float64(mem))
596 wp.mVCPUsInuse.Set(float64(cpuInuse))
597 wp.mMemoryInuse.Set(float64(memInuse))
600 func (wp *Pool) runProbes() {
601 maxPPS := wp.maxProbesPerSecond
603 maxPPS = defaultMaxProbesPerSecond
605 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
606 defer limitticker.Stop()
608 probeticker := time.NewTicker(wp.probeInterval)
609 defer probeticker.Stop()
611 workers := []cloud.InstanceID{}
612 for range probeticker.C {
613 workers = workers[:0]
615 for id, wkr := range wp.workers {
616 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
619 workers = append(workers, id)
623 for _, id := range workers {
625 wkr, ok := wp.workers[id]
628 // Deleted while we were probing
632 go wkr.ProbeAndUpdate()
636 case <-limitticker.C:
642 func (wp *Pool) runSync() {
643 // sync once immediately, then wait syncInterval, sync again,
645 timer := time.NewTimer(1)
649 err := wp.getInstancesAndSync()
651 wp.logger.WithError(err).Warn("sync failed")
653 timer.Reset(wp.syncInterval)
655 wp.logger.Debug("worker.Pool stopped")
661 // Stop synchronizing with the InstanceSet.
662 func (wp *Pool) Stop() {
663 wp.setupOnce.Do(wp.setup)
667 // Instances returns an InstanceView for each worker in the pool,
668 // summarizing its current state and recent activity.
669 func (wp *Pool) Instances() []InstanceView {
671 wp.setupOnce.Do(wp.setup)
673 for _, w := range wp.workers {
674 r = append(r, InstanceView{
675 Instance: w.instance.ID(),
676 Price: w.instType.Price,
677 ArvadosInstanceType: w.instType.Name,
678 ProviderInstanceType: w.instType.ProviderType,
679 LastContainerUUID: w.lastUUID,
681 WorkerState: w.state.String(),
682 IdleBehavior: w.idleBehavior,
686 sort.Slice(r, func(i, j int) bool {
687 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
692 func (wp *Pool) setup() {
693 wp.creating = map[arvados.InstanceType][]time.Time{}
694 wp.exited = map[string]time.Time{}
695 wp.workers = map[cloud.InstanceID]*worker{}
696 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
699 func (wp *Pool) notify() {
701 defer wp.mtx.RUnlock()
702 for _, send := range wp.subscribers {
704 case send <- struct{}{}:
710 func (wp *Pool) getInstancesAndSync() error {
711 wp.setupOnce.Do(wp.setup)
712 if err := wp.instanceSet.throttleInstances.Error(); err != nil {
715 wp.logger.Debug("getting instance list")
716 threshold := time.Now()
717 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
719 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
722 wp.sync(threshold, instances)
723 wp.logger.Debug("sync done")
727 // Add/remove/update workers based on instances, which was obtained
728 // from the instanceSet. However, don't clobber any other updates that
729 // already happened after threshold.
730 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
732 defer wp.mtx.Unlock()
733 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
736 for _, inst := range instances {
737 itTag := inst.Tags()[tagKeyInstanceType]
738 it, ok := wp.instanceTypes[itTag]
740 wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
743 if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew {
745 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
746 wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
751 for id, wkr := range wp.workers {
752 if wkr.updated.After(threshold) {
755 logger := wp.logger.WithFields(logrus.Fields{
756 "Instance": wkr.instance,
757 "WorkerState": wkr.state,
759 logger.Info("instance disappeared in cloud")
760 delete(wp.workers, id)
761 go wkr.executor.Close()
767 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")