1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
15 "git.curoverse.com/arvados.git/lib/cloud"
16 "git.curoverse.com/arvados.git/sdk/go/arvados"
17 "github.com/prometheus/client_golang/prometheus"
18 "github.com/sirupsen/logrus"
22 tagKeyInstanceType = "InstanceType"
23 tagKeyIdleBehavior = "IdleBehavior"
26 // An InstanceView shows a worker's current state and recent activity.
27 type InstanceView struct {
28 Instance cloud.InstanceID `json:"instance"`
29 Price float64 `json:"price"`
30 ArvadosInstanceType string `json:"arvados_instance_type"`
31 ProviderInstanceType string `json:"provider_instance_type"`
32 LastContainerUUID string `json:"last_container_uuid"`
33 LastBusy time.Time `json:"last_busy"`
34 WorkerState string `json:"worker_state"`
35 IdleBehavior IdleBehavior `json:"idle_behavior"`
38 // An Executor executes shell commands on a remote host.
39 type Executor interface {
40 // Run cmd on the current target.
41 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
43 // Use the given target for subsequent operations. The new
44 // target is the same host as the previous target, but it
45 // might return a different address and verify a different
48 // SetTarget is called frequently, and in most cases the new
49 // target will behave exactly the same as the old one. An
50 // implementation should optimize accordingly.
52 // SetTarget must not block on concurrent Execute calls.
53 SetTarget(cloud.ExecutorTarget)
59 defaultSyncInterval = time.Minute
60 defaultProbeInterval = time.Second * 10
61 defaultMaxProbesPerSecond = 10
62 defaultTimeoutIdle = time.Minute
63 defaultTimeoutBooting = time.Minute * 10
64 defaultTimeoutProbe = time.Minute * 10
65 defaultTimeoutShutdown = time.Second * 10
68 func duration(conf arvados.Duration, def time.Duration) time.Duration {
70 return time.Duration(conf)
76 // NewPool creates a Pool of workers backed by instanceSet.
78 // New instances are configured and set up according to the given
79 // cluster configuration.
80 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
84 instanceSet: instanceSet,
85 newExecutor: newExecutor,
86 bootProbeCommand: cluster.CloudVMs.BootProbeCommand,
87 imageID: cloud.ImageID(cluster.CloudVMs.ImageID),
88 instanceTypes: cluster.InstanceTypes,
89 maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
90 probeInterval: duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
91 syncInterval: duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
92 timeoutIdle: duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
93 timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
94 timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
95 timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
96 stop: make(chan bool),
98 wp.registerMetrics(reg)
100 wp.setupOnce.Do(wp.setup)
108 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
109 // zero Pool should not be used. Call NewPool to create a new Pool.
112 logger logrus.FieldLogger
113 arvClient *arvados.Client
114 instanceSet cloud.InstanceSet
115 newExecutor func(cloud.Instance) Executor
116 bootProbeCommand string
117 imageID cloud.ImageID
118 instanceTypes map[string]arvados.InstanceType
119 syncInterval time.Duration
120 probeInterval time.Duration
121 maxProbesPerSecond int
122 timeoutIdle time.Duration
123 timeoutBooting time.Duration
124 timeoutProbe time.Duration
125 timeoutShutdown time.Duration
128 subscribers map[<-chan struct{}]chan<- struct{}
129 creating map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls
130 workers map[cloud.InstanceID]*worker
131 loaded bool // loaded list of instances from InstanceSet at least once
132 exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
133 atQuotaUntil time.Time
134 atQuotaErr cloud.QuotaError
139 mInstances prometheus.Gauge
140 mInstancesPrice prometheus.Gauge
141 mContainersRunning prometheus.Gauge
142 mVCPUs prometheus.Gauge
143 mVCPUsInuse prometheus.Gauge
144 mMemory prometheus.Gauge
145 mMemoryInuse prometheus.Gauge
148 // Subscribe returns a channel that becomes ready whenever a worker's
153 // ch := wp.Subscribe()
154 // defer wp.Unsubscribe(ch)
156 // // ...try scheduling some work...
161 func (wp *Pool) Subscribe() <-chan struct{} {
162 wp.setupOnce.Do(wp.setup)
164 defer wp.mtx.Unlock()
165 ch := make(chan struct{}, 1)
166 wp.subscribers[ch] = ch
170 // Unsubscribe stops sending updates to the given channel.
171 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
172 wp.setupOnce.Do(wp.setup)
174 defer wp.mtx.Unlock()
175 delete(wp.subscribers, ch)
178 // Unallocated returns the number of unallocated (creating + booting +
179 // idle + unknown) workers for each instance type. Workers in
180 // hold/drain mode are not included.
181 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
182 wp.setupOnce.Do(wp.setup)
184 defer wp.mtx.RUnlock()
185 unalloc := map[arvados.InstanceType]int{}
186 creating := map[arvados.InstanceType]int{}
187 for it, times := range wp.creating {
188 creating[it] = len(times)
190 for _, wkr := range wp.workers {
191 if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) || wkr.idleBehavior != IdleBehaviorRun {
196 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) {
197 // If up to N new workers appear in
198 // Instances() while we are waiting for N
199 // Create() calls to complete, we assume we're
200 // just seeing a race between Instances() and
201 // Create() responses.
203 // The other common reason why nodes have
204 // state==Unknown is that they appeared at
205 // startup, before any Create calls. They
206 // don't match the above timing condition, so
207 // we never mistakenly attribute them to
208 // pending Create calls.
212 for it, c := range creating {
218 // Create a new instance with the given type, and add it to the worker
219 // pool. The worker is added immediately; instance creation runs in
221 func (wp *Pool) Create(it arvados.InstanceType) error {
222 logger := wp.logger.WithField("InstanceType", it.Name)
223 wp.setupOnce.Do(wp.setup)
225 defer wp.mtx.Unlock()
226 if time.Now().Before(wp.atQuotaUntil) {
229 tags := cloud.InstanceTags{
230 tagKeyInstanceType: it.Name,
231 tagKeyIdleBehavior: string(IdleBehaviorRun),
234 wp.creating[it] = append(wp.creating[it], now)
237 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
239 defer wp.mtx.Unlock()
240 // Remove our timestamp marker from wp.creating
241 for i, t := range wp.creating[it] {
243 copy(wp.creating[it][i:], wp.creating[it][i+1:])
244 wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1]
248 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
250 wp.atQuotaUntil = time.Now().Add(time.Minute)
253 logger.WithError(err).Error("create failed")
256 wp.updateWorker(inst, it, StateBooting)
261 // AtQuota returns true if Create is not expected to work at the
263 func (wp *Pool) AtQuota() bool {
265 defer wp.mtx.Unlock()
266 return time.Now().Before(wp.atQuotaUntil)
269 // SetIdleBehavior determines how the indicated instance will behave
270 // when it has no containers running.
271 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
273 defer wp.mtx.Unlock()
274 wkr, ok := wp.workers[id]
276 return errors.New("requested instance does not exist")
278 wkr.idleBehavior = idleBehavior
284 // Add or update worker attached to the given instance. Use
285 // initialState if a new worker is created.
287 // The second return value is true if a new worker is created.
289 // Caller must have lock.
290 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
292 if wkr := wp.workers[id]; wkr != nil {
293 wkr.executor.SetTarget(inst)
295 wkr.updated = time.Now()
296 if initialState == StateBooting && wkr.state == StateUnknown {
297 wkr.state = StateBooting
303 // If an instance has a valid IdleBehavior tag when it first
304 // appears, initialize the new worker accordingly (this is how
305 // we restore IdleBehavior that was set by a prior dispatch
306 // process); otherwise, default to "run". After this,
307 // wkr.idleBehavior is the source of truth, and will only be
308 // changed via SetIdleBehavior().
309 idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior])
310 if !validIdleBehavior[idleBehavior] {
311 idleBehavior = IdleBehaviorRun
314 logger := wp.logger.WithFields(logrus.Fields{
315 "InstanceType": it.Name,
318 logger.WithFields(logrus.Fields{
319 "State": initialState,
320 "IdleBehavior": idleBehavior,
321 }).Infof("instance appeared in cloud")
327 executor: wp.newExecutor(inst),
329 idleBehavior: idleBehavior,
336 running: make(map[string]struct{}),
337 starting: make(map[string]struct{}),
338 probing: make(chan struct{}, 1),
344 // caller must have lock.
345 func (wp *Pool) notifyExited(uuid string, t time.Time) {
349 // Shutdown shuts down a worker with the given type, or returns false
350 // if all workers with the given type are busy.
351 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
352 wp.setupOnce.Do(wp.setup)
354 defer wp.mtx.Unlock()
355 logger := wp.logger.WithField("InstanceType", it.Name)
356 logger.Info("shutdown requested")
357 for _, tryState := range []State{StateBooting, StateIdle} {
358 // TODO: shutdown the worker with the longest idle
359 // time (Idle) or the earliest create time (Booting)
360 for _, wkr := range wp.workers {
361 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
362 logger.WithField("Instance", wkr.instance).Info("shutting down")
371 // CountWorkers returns the current number of workers in each state.
372 func (wp *Pool) CountWorkers() map[State]int {
373 wp.setupOnce.Do(wp.setup)
375 defer wp.mtx.Unlock()
377 for _, w := range wp.workers {
383 // Running returns the container UUIDs being prepared/run on workers.
384 func (wp *Pool) Running() map[string]time.Time {
385 wp.setupOnce.Do(wp.setup)
387 defer wp.mtx.Unlock()
388 r := map[string]time.Time{}
389 for _, wkr := range wp.workers {
390 for uuid := range wkr.running {
391 r[uuid] = time.Time{}
393 for uuid := range wkr.starting {
394 r[uuid] = time.Time{}
397 for uuid, exited := range wp.exited {
403 // StartContainer starts a container on an idle worker immediately if
404 // possible, otherwise returns false.
405 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
406 wp.setupOnce.Do(wp.setup)
408 defer wp.mtx.Unlock()
410 for _, w := range wp.workers {
411 if w.instType == it && w.state == StateIdle {
412 if wkr == nil || w.busy.After(wkr.busy) {
420 wkr.startContainer(ctr)
424 // KillContainer kills the crunch-run process for the given container
425 // UUID, if it's running on any worker.
427 // KillContainer returns immediately; the act of killing the container
428 // takes some time, and runs in the background.
429 func (wp *Pool) KillContainer(uuid string) {
431 defer wp.mtx.Unlock()
432 if _, ok := wp.exited[uuid]; ok {
433 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
434 delete(wp.exited, uuid)
437 for _, wkr := range wp.workers {
438 if _, ok := wkr.running[uuid]; ok {
439 go wp.kill(wkr, uuid)
443 wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
446 func (wp *Pool) kill(wkr *worker, uuid string) {
447 logger := wp.logger.WithFields(logrus.Fields{
448 "ContainerUUID": uuid,
449 "Instance": wkr.instance,
451 logger.Debug("killing process")
452 stdout, stderr, err := wkr.executor.Execute(nil, "crunch-run --kill 15 "+uuid, nil)
454 logger.WithFields(logrus.Fields{
455 "stderr": string(stderr),
456 "stdout": string(stdout),
458 }).Warn("kill failed")
461 logger.Debug("killing process succeeded")
463 defer wp.mtx.Unlock()
464 if _, ok := wkr.running[uuid]; ok {
465 delete(wkr.running, uuid)
466 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
467 wkr.state = StateIdle
469 wkr.updated = time.Now()
474 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
476 reg = prometheus.NewRegistry()
478 wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
479 Namespace: "arvados",
480 Subsystem: "dispatchcloud",
481 Name: "instances_total",
482 Help: "Number of cloud VMs including pending, booting, running, held, and shutting down.",
484 reg.MustRegister(wp.mInstances)
485 wp.mInstancesPrice = prometheus.NewGauge(prometheus.GaugeOpts{
486 Namespace: "arvados",
487 Subsystem: "dispatchcloud",
488 Name: "instances_price_total",
489 Help: "Sum of prices of all cloud VMs including pending, booting, running, held, and shutting down.",
491 reg.MustRegister(wp.mInstancesPrice)
492 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
493 Namespace: "arvados",
494 Subsystem: "dispatchcloud",
495 Name: "containers_running",
496 Help: "Number of containers reported running by cloud VMs.",
498 reg.MustRegister(wp.mContainersRunning)
500 wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
501 Namespace: "arvados",
502 Subsystem: "dispatchcloud",
504 Help: "Total VCPUs on all cloud VMs.",
506 reg.MustRegister(wp.mVCPUs)
507 wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
508 Namespace: "arvados",
509 Subsystem: "dispatchcloud",
511 Help: "VCPUs on cloud VMs that are running containers.",
513 reg.MustRegister(wp.mVCPUsInuse)
514 wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
515 Namespace: "arvados",
516 Subsystem: "dispatchcloud",
517 Name: "memory_bytes_total",
518 Help: "Total memory on all cloud VMs.",
520 reg.MustRegister(wp.mMemory)
521 wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
522 Namespace: "arvados",
523 Subsystem: "dispatchcloud",
524 Name: "memory_bytes_inuse",
525 Help: "Memory on cloud VMs that are running containers.",
527 reg.MustRegister(wp.mMemoryInuse)
530 func (wp *Pool) runMetrics() {
532 defer wp.Unsubscribe(ch)
538 func (wp *Pool) updateMetrics() {
540 defer wp.mtx.RUnlock()
543 var alloc, cpu, cpuInuse, mem, memInuse int64
544 for _, wkr := range wp.workers {
545 price += wkr.instType.Price
546 cpu += int64(wkr.instType.VCPUs)
547 mem += int64(wkr.instType.RAM)
548 if len(wkr.running)+len(wkr.starting) == 0 {
551 alloc += int64(len(wkr.running) + len(wkr.starting))
552 cpuInuse += int64(wkr.instType.VCPUs)
553 memInuse += int64(wkr.instType.RAM)
555 wp.mInstances.Set(float64(len(wp.workers)))
556 wp.mInstancesPrice.Set(price)
557 wp.mContainersRunning.Set(float64(alloc))
558 wp.mVCPUs.Set(float64(cpu))
559 wp.mMemory.Set(float64(mem))
560 wp.mVCPUsInuse.Set(float64(cpuInuse))
561 wp.mMemoryInuse.Set(float64(memInuse))
564 func (wp *Pool) runProbes() {
565 maxPPS := wp.maxProbesPerSecond
567 maxPPS = defaultMaxProbesPerSecond
569 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
570 defer limitticker.Stop()
572 probeticker := time.NewTicker(wp.probeInterval)
573 defer probeticker.Stop()
575 workers := []cloud.InstanceID{}
576 for range probeticker.C {
577 workers = workers[:0]
579 for id, wkr := range wp.workers {
580 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
583 workers = append(workers, id)
587 for _, id := range workers {
589 wkr, ok := wp.workers[id]
592 // Deleted while we were probing
596 go wkr.ProbeAndUpdate()
600 case <-limitticker.C:
606 func (wp *Pool) runSync() {
607 // sync once immediately, then wait syncInterval, sync again,
609 timer := time.NewTimer(1)
613 err := wp.getInstancesAndSync()
615 wp.logger.WithError(err).Warn("sync failed")
617 timer.Reset(wp.syncInterval)
619 wp.logger.Debug("worker.Pool stopped")
625 // Stop synchronizing with the InstanceSet.
626 func (wp *Pool) Stop() {
627 wp.setupOnce.Do(wp.setup)
631 // Instances returns an InstanceView for each worker in the pool,
632 // summarizing its current state and recent activity.
633 func (wp *Pool) Instances() []InstanceView {
635 wp.setupOnce.Do(wp.setup)
637 for _, w := range wp.workers {
638 r = append(r, InstanceView{
639 Instance: w.instance.ID(),
640 Price: w.instType.Price,
641 ArvadosInstanceType: w.instType.Name,
642 ProviderInstanceType: w.instType.ProviderType,
643 LastContainerUUID: w.lastUUID,
645 WorkerState: w.state.String(),
646 IdleBehavior: w.idleBehavior,
650 sort.Slice(r, func(i, j int) bool {
651 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
656 func (wp *Pool) setup() {
657 wp.creating = map[arvados.InstanceType][]time.Time{}
658 wp.exited = map[string]time.Time{}
659 wp.workers = map[cloud.InstanceID]*worker{}
660 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
663 func (wp *Pool) notify() {
665 defer wp.mtx.RUnlock()
666 for _, send := range wp.subscribers {
668 case send <- struct{}{}:
674 func (wp *Pool) getInstancesAndSync() error {
675 wp.setupOnce.Do(wp.setup)
676 wp.logger.Debug("getting instance list")
677 threshold := time.Now()
678 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
682 wp.sync(threshold, instances)
683 wp.logger.Debug("sync done")
687 // Add/remove/update workers based on instances, which was obtained
688 // from the instanceSet. However, don't clobber any other updates that
689 // already happened after threshold.
690 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
692 defer wp.mtx.Unlock()
693 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
696 for _, inst := range instances {
697 itTag := inst.Tags()[tagKeyInstanceType]
698 it, ok := wp.instanceTypes[itTag]
700 wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
703 if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew {
705 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
706 wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
711 for id, wkr := range wp.workers {
712 if wkr.updated.After(threshold) {
715 logger := wp.logger.WithFields(logrus.Fields{
716 "Instance": wkr.instance,
717 "WorkerState": wkr.state,
719 logger.Info("instance disappeared in cloud")
720 delete(wp.workers, id)
721 go wkr.executor.Close()
727 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")