1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
15 "git.curoverse.com/arvados.git/lib/cloud"
16 "git.curoverse.com/arvados.git/sdk/go/arvados"
17 "github.com/Sirupsen/logrus"
18 "github.com/prometheus/client_golang/prometheus"
22 tagKeyInstanceType = "InstanceType"
26 // An InstanceView shows a worker's current state and recent activity.
27 type InstanceView struct {
30 ArvadosInstanceType string
31 ProviderInstanceType string
32 LastContainerUUID string
37 // An Executor executes shell commands on a remote host.
38 type Executor interface {
39 // Run cmd on the current target.
40 Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
42 // Use the given target for subsequent operations. The new
43 // target is the same host as the previous target, but it
44 // might return a different address and verify a different
47 // SetTarget is called frequently, and in most cases the new
48 // target will behave exactly the same as the old one. An
49 // implementation should optimize accordingly.
51 // SetTarget must not block on concurrent Execute calls.
52 SetTarget(cloud.ExecutorTarget)
58 defaultSyncInterval = time.Minute
59 defaultProbeInterval = time.Second * 10
60 defaultMaxProbesPerSecond = 10
61 defaultTimeoutIdle = time.Minute
62 defaultTimeoutBooting = time.Minute * 10
63 defaultTimeoutProbe = time.Minute * 10
66 func duration(conf arvados.Duration, def time.Duration) time.Duration {
68 return time.Duration(conf)
74 // NewPool creates a Pool of workers backed by instanceSet.
76 // New instances are configured and set up according to the given
77 // cluster configuration.
78 func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
81 instanceSet: instanceSet,
82 newExecutor: newExecutor,
83 bootProbeCommand: cluster.CloudVMs.BootProbeCommand,
84 imageID: cloud.ImageID(cluster.CloudVMs.ImageID),
85 instanceTypes: cluster.InstanceTypes,
86 maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
87 probeInterval: duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
88 syncInterval: duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
89 timeoutIdle: duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
90 timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
91 timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
93 wp.registerMetrics(reg)
95 wp.setupOnce.Do(wp.setup)
103 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
104 // zero Pool should not be used. Call NewPool to create a new Pool.
107 logger logrus.FieldLogger
108 instanceSet cloud.InstanceSet
109 newExecutor func(cloud.Instance) Executor
110 bootProbeCommand string
111 imageID cloud.ImageID
112 instanceTypes map[string]arvados.InstanceType
113 syncInterval time.Duration
114 probeInterval time.Duration
115 maxProbesPerSecond int
116 timeoutIdle time.Duration
117 timeoutBooting time.Duration
118 timeoutProbe time.Duration
121 subscribers map[<-chan struct{}]chan<- struct{}
122 creating map[arvados.InstanceType]int // goroutines waiting for (InstanceSet)Create to return
123 workers map[cloud.InstanceID]*worker
124 loaded bool // loaded list of instances from InstanceSet at least once
125 exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
126 atQuotaUntil time.Time
131 mInstances prometheus.Gauge
132 mContainersRunning prometheus.Gauge
133 mVCPUs prometheus.Gauge
134 mVCPUsInuse prometheus.Gauge
135 mMemory prometheus.Gauge
136 mMemoryInuse prometheus.Gauge
141 instance cloud.Instance
143 instType arvados.InstanceType
150 unallocated time.Time
152 running map[string]struct{}
153 starting map[string]struct{}
154 probing chan struct{}
157 // Subscribe returns a channel that becomes ready whenever a worker's
162 // ch := wp.Subscribe()
163 // defer wp.Unsubscribe(ch)
165 // // ...try scheduling some work...
170 func (wp *Pool) Subscribe() <-chan struct{} {
171 wp.setupOnce.Do(wp.setup)
173 defer wp.mtx.Unlock()
174 ch := make(chan struct{}, 1)
175 wp.subscribers[ch] = ch
179 // Unsubscribe stops sending updates to the given channel.
180 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
181 wp.setupOnce.Do(wp.setup)
183 defer wp.mtx.Unlock()
184 delete(wp.subscribers, ch)
187 // Unallocated returns the number of unallocated (creating + booting +
188 // idle + unknown) workers for each instance type.
189 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
190 wp.setupOnce.Do(wp.setup)
192 defer wp.mtx.RUnlock()
193 u := map[arvados.InstanceType]int{}
194 for it, c := range wp.creating {
197 for _, wkr := range wp.workers {
198 if len(wkr.running)+len(wkr.starting) == 0 && (wkr.state == StateRunning || wkr.state == StateBooting || wkr.state == StateUnknown) {
205 // Create a new instance with the given type, and add it to the worker
206 // pool. The worker is added immediately; instance creation runs in
208 func (wp *Pool) Create(it arvados.InstanceType) error {
209 logger := wp.logger.WithField("InstanceType", it.Name)
210 wp.setupOnce.Do(wp.setup)
212 defer wp.mtx.Unlock()
213 tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
217 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
219 defer wp.mtx.Unlock()
221 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
222 wp.atQuotaUntil = time.Now().Add(time.Minute)
225 logger.WithError(err).Error("create failed")
228 wp.updateWorker(inst, it, StateBooting)
233 // AtQuota returns true if Create is not expected to work at the
235 func (wp *Pool) AtQuota() bool {
237 defer wp.mtx.Unlock()
238 return time.Now().Before(wp.atQuotaUntil)
241 // Add or update worker attached to the given instance. Use
242 // initialState if a new worker is created. Caller must have lock.
244 // Returns true when a new worker is created.
245 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) bool {
247 if wp.workers[id] != nil {
248 wp.workers[id].executor.SetTarget(inst)
249 wp.workers[id].instance = inst
250 wp.workers[id].updated = time.Now()
251 if initialState == StateBooting && wp.workers[id].state == StateUnknown {
252 wp.workers[id].state = StateBooting
256 if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
257 initialState = StateHold
259 wp.logger.WithFields(logrus.Fields{
260 "InstanceType": it.Name,
262 "State": initialState,
263 }).Infof("instance appeared in cloud")
265 wp.workers[id] = &worker{
266 executor: wp.newExecutor(inst),
274 running: make(map[string]struct{}),
275 starting: make(map[string]struct{}),
276 probing: make(chan struct{}, 1),
281 // Shutdown shuts down a worker with the given type, or returns false
282 // if all workers with the given type are busy.
283 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
284 wp.setupOnce.Do(wp.setup)
286 defer wp.mtx.Unlock()
287 logger := wp.logger.WithField("InstanceType", it.Name)
288 logger.Info("shutdown requested")
289 for _, tryState := range []State{StateBooting, StateRunning} {
290 // TODO: shutdown the worker with the longest idle
291 // time (Running) or the earliest create time
293 for _, wkr := range wp.workers {
294 if wkr.state != tryState || len(wkr.running)+len(wkr.starting) > 0 {
297 if wkr.instType != it {
300 logger = logger.WithField("Instance", wkr.instance)
301 logger.Info("shutting down")
302 wp.shutdown(wkr, logger)
309 // caller must have lock
310 func (wp *Pool) shutdown(wkr *worker, logger logrus.FieldLogger) {
311 wkr.updated = time.Now()
312 wkr.state = StateShutdown
314 err := wkr.instance.Destroy()
316 logger.WithError(err).WithField("Instance", wkr.instance).Warn("shutdown failed")
320 wp.atQuotaUntil = time.Now()
326 // Workers returns the current number of workers in each state.
327 func (wp *Pool) Workers() map[State]int {
328 wp.setupOnce.Do(wp.setup)
330 defer wp.mtx.Unlock()
332 for _, w := range wp.workers {
338 // Running returns the container UUIDs being prepared/run on workers.
339 func (wp *Pool) Running() map[string]time.Time {
340 wp.setupOnce.Do(wp.setup)
342 defer wp.mtx.Unlock()
343 r := map[string]time.Time{}
344 for _, wkr := range wp.workers {
345 for uuid := range wkr.running {
346 r[uuid] = time.Time{}
348 for uuid := range wkr.starting {
349 r[uuid] = time.Time{}
352 for uuid, exited := range wp.exited {
358 // StartContainer starts a container on an idle worker immediately if
359 // possible, otherwise returns false.
360 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
361 logger := wp.logger.WithFields(logrus.Fields{
362 "InstanceType": it.Name,
363 "ContainerUUID": ctr.UUID,
364 "Priority": ctr.Priority,
366 wp.setupOnce.Do(wp.setup)
368 defer wp.mtx.Unlock()
370 for _, w := range wp.workers {
371 if w.instType == it && w.state == StateRunning && len(w.running)+len(w.starting) == 0 {
372 if wkr == nil || w.busy.After(wkr.busy) {
380 logger = logger.WithField("Instance", wkr.instance)
381 logger.Debug("starting container")
382 wkr.starting[ctr.UUID] = struct{}{}
384 stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
386 defer wp.mtx.Unlock()
390 delete(wkr.starting, ctr.UUID)
391 wkr.running[ctr.UUID] = struct{}{}
392 wkr.lastUUID = ctr.UUID
394 logger.WithField("stdout", string(stdout)).
395 WithField("stderr", string(stderr)).
397 Error("error starting crunch-run process")
398 // Leave uuid in wkr.running, though: it's
399 // possible the error was just a communication
400 // failure and the process was in fact
401 // started. Wait for next probe to find out.
404 logger.Info("crunch-run process started")
405 wkr.lastUUID = ctr.UUID
410 // KillContainer kills the crunch-run process for the given container
411 // UUID, if it's running on any worker.
413 // KillContainer returns immediately; the act of killing the container
414 // takes some time, and runs in the background.
415 func (wp *Pool) KillContainer(uuid string) {
417 defer wp.mtx.Unlock()
418 if _, ok := wp.exited[uuid]; ok {
419 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
420 delete(wp.exited, uuid)
423 for _, wkr := range wp.workers {
424 if _, ok := wkr.running[uuid]; ok {
425 go wp.kill(wkr, uuid)
429 wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
432 func (wp *Pool) kill(wkr *worker, uuid string) {
433 logger := wp.logger.WithFields(logrus.Fields{
434 "ContainerUUID": uuid,
435 "Instance": wkr.instance,
437 logger.Debug("killing process")
438 stdout, stderr, err := wkr.executor.Execute("crunch-run --kill "+uuid, nil)
440 logger.WithFields(logrus.Fields{
441 "stderr": string(stderr),
442 "stdout": string(stdout),
444 }).Warn("kill failed")
447 logger.Debug("killing process succeeded")
449 defer wp.mtx.Unlock()
450 if _, ok := wkr.running[uuid]; ok {
451 delete(wkr.running, uuid)
452 wkr.updated = time.Now()
457 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
459 reg = prometheus.NewRegistry()
461 wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
462 Namespace: "arvados",
463 Subsystem: "dispatchcloud",
464 Name: "instances_total",
465 Help: "Number of cloud VMs including pending, booting, running, held, and shutting down.",
467 reg.MustRegister(wp.mInstances)
468 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
469 Namespace: "arvados",
470 Subsystem: "dispatchcloud",
471 Name: "containers_running",
472 Help: "Number of containers reported running by cloud VMs.",
474 reg.MustRegister(wp.mContainersRunning)
476 wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
477 Namespace: "arvados",
478 Subsystem: "dispatchcloud",
480 Help: "Total VCPUs on all cloud VMs.",
482 reg.MustRegister(wp.mVCPUs)
483 wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
484 Namespace: "arvados",
485 Subsystem: "dispatchcloud",
487 Help: "VCPUs on cloud VMs that are running containers.",
489 reg.MustRegister(wp.mVCPUsInuse)
490 wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
491 Namespace: "arvados",
492 Subsystem: "dispatchcloud",
493 Name: "memory_bytes_total",
494 Help: "Total memory on all cloud VMs.",
496 reg.MustRegister(wp.mMemory)
497 wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
498 Namespace: "arvados",
499 Subsystem: "dispatchcloud",
500 Name: "memory_bytes_inuse",
501 Help: "Memory on cloud VMs that are running containers.",
503 reg.MustRegister(wp.mMemoryInuse)
506 func (wp *Pool) runMetrics() {
508 defer wp.Unsubscribe(ch)
514 func (wp *Pool) updateMetrics() {
516 defer wp.mtx.RUnlock()
518 var alloc, cpu, cpuInuse, mem, memInuse int64
519 for _, wkr := range wp.workers {
520 cpu += int64(wkr.instType.VCPUs)
521 mem += int64(wkr.instType.RAM)
522 if len(wkr.running)+len(wkr.starting) == 0 {
525 alloc += int64(len(wkr.running) + len(wkr.starting))
526 cpuInuse += int64(wkr.instType.VCPUs)
527 memInuse += int64(wkr.instType.RAM)
529 wp.mInstances.Set(float64(len(wp.workers)))
530 wp.mContainersRunning.Set(float64(alloc))
531 wp.mVCPUs.Set(float64(cpu))
532 wp.mMemory.Set(float64(mem))
533 wp.mVCPUsInuse.Set(float64(cpuInuse))
534 wp.mMemoryInuse.Set(float64(memInuse))
537 func (wp *Pool) runProbes() {
538 maxPPS := wp.maxProbesPerSecond
540 maxPPS = defaultMaxProbesPerSecond
542 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
543 defer limitticker.Stop()
545 probeticker := time.NewTicker(wp.probeInterval)
546 defer probeticker.Stop()
548 workers := []cloud.InstanceID{}
549 for range probeticker.C {
550 workers = workers[:0]
552 for id, wkr := range wp.workers {
553 if wkr.state == StateShutdown || wp.shutdownIfIdle(wkr) {
556 workers = append(workers, id)
560 for _, id := range workers {
562 wkr, ok := wp.workers[id]
564 if !ok || wkr.state == StateShutdown {
565 // Deleted/shutdown while we
566 // were probing others
570 case wkr.probing <- struct{}{}:
572 wp.probeAndUpdate(wkr)
576 wp.logger.WithField("Instance", wkr.instance).Debug("still waiting for last probe to finish")
581 case <-limitticker.C:
587 func (wp *Pool) runSync() {
588 // sync once immediately, then wait syncInterval, sync again,
590 timer := time.NewTimer(1)
594 err := wp.getInstancesAndSync()
596 wp.logger.WithError(err).Warn("sync failed")
598 timer.Reset(wp.syncInterval)
600 wp.logger.Debug("worker.Pool stopped")
606 // caller must have lock.
607 func (wp *Pool) shutdownIfBroken(wkr *worker, dur time.Duration) {
608 if wkr.state == StateHold {
611 label, threshold := "", wp.timeoutProbe
612 if wkr.state == StateBooting {
613 label, threshold = "new ", wp.timeoutBooting
618 wp.logger.WithFields(logrus.Fields{
619 "Instance": wkr.instance,
623 }).Warnf("%sinstance unresponsive, shutting down", label)
624 wp.shutdown(wkr, wp.logger)
627 // caller must have lock.
628 func (wp *Pool) shutdownIfIdle(wkr *worker) bool {
629 if len(wkr.running)+len(wkr.starting) > 0 || wkr.state != StateRunning {
632 age := time.Since(wkr.unallocated)
633 if age < wp.timeoutIdle {
636 logger := wp.logger.WithFields(logrus.Fields{
638 "Instance": wkr.instance,
640 logger.Info("shutdown idle worker")
641 wp.shutdown(wkr, logger)
645 // Stop synchronizing with the InstanceSet.
646 func (wp *Pool) Stop() {
647 wp.setupOnce.Do(wp.setup)
651 // Instances returns an InstanceView for each worker in the pool,
652 // summarizing its current state and recent activity.
653 func (wp *Pool) Instances() []InstanceView {
655 wp.setupOnce.Do(wp.setup)
657 for _, w := range wp.workers {
658 r = append(r, InstanceView{
659 Instance: w.instance.String(),
660 Price: w.instType.Price,
661 ArvadosInstanceType: w.instType.Name,
662 ProviderInstanceType: w.instType.ProviderType,
663 LastContainerUUID: w.lastUUID,
664 Unallocated: w.unallocated,
665 WorkerState: w.state.String(),
669 sort.Slice(r, func(i, j int) bool {
670 return strings.Compare(r[i].Instance, r[j].Instance) < 0
675 func (wp *Pool) setup() {
676 wp.creating = map[arvados.InstanceType]int{}
677 wp.exited = map[string]time.Time{}
678 wp.workers = map[cloud.InstanceID]*worker{}
679 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
682 func (wp *Pool) notify() {
684 defer wp.mtx.RUnlock()
685 for _, send := range wp.subscribers {
687 case send <- struct{}{}:
693 func (wp *Pool) getInstancesAndSync() error {
694 wp.setupOnce.Do(wp.setup)
695 wp.logger.Debug("getting instance list")
696 threshold := time.Now()
697 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
701 wp.sync(threshold, instances)
702 wp.logger.Debug("sync done")
706 // Add/remove/update workers based on instances, which was obtained
707 // from the instanceSet. However, don't clobber any other updates that
708 // already happened after threshold.
709 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
711 defer wp.mtx.Unlock()
712 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
715 for _, inst := range instances {
716 itTag := inst.Tags()[tagKeyInstanceType]
717 it, ok := wp.instanceTypes[itTag]
719 wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
722 if wp.updateWorker(inst, it, StateUnknown) {
727 for id, wkr := range wp.workers {
728 if wkr.updated.After(threshold) {
731 logger := wp.logger.WithFields(logrus.Fields{
732 "Instance": wkr.instance,
733 "WorkerState": wkr.state,
735 logger.Info("instance disappeared in cloud")
736 delete(wp.workers, id)
737 go wkr.executor.Close()
743 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
751 // should be called in a new goroutine
752 func (wp *Pool) probeAndUpdate(wkr *worker) {
753 logger := wp.logger.WithField("Instance", wkr.instance)
755 updated := wkr.updated
765 booted, stderr = wp.probeBooted(wkr)
767 if booted && !wkr.booted {
769 logger.Info("instance booted")
776 ctrUUIDs, ok, stderr = wp.probeRunning(wkr)
778 logger = logger.WithField("stderr", string(stderr))
780 defer wp.mtx.Unlock()
782 if wkr.state == StateShutdown {
785 dur := time.Since(wkr.probed)
786 logger := logger.WithFields(logrus.Fields{
790 if wkr.state == StateBooting {
791 logger.Debug("new instance not responding")
793 logger.Info("instance not responding")
795 wp.shutdownIfBroken(wkr, dur)
799 updateTime := time.Now()
800 wkr.probed = updateTime
801 if wkr.state == StateShutdown || wkr.state == StateHold {
803 if wkr.state != StateRunning {
804 wkr.state = StateRunning
808 wkr.state = StateBooting
811 if updated != wkr.updated {
812 // Worker was updated after the probe began, so
813 // wkr.running might have a container UUID that was
814 // not yet running when ctrUUIDs was generated. Leave
815 // wkr.running alone and wait for the next probe to
816 // catch up on any changes.
820 if len(ctrUUIDs) > 0 {
821 wkr.busy = updateTime
822 wkr.lastUUID = ctrUUIDs[0]
823 } else if len(wkr.running) > 0 {
824 wkr.unallocated = updateTime
826 running := map[string]struct{}{}
828 for _, uuid := range ctrUUIDs {
829 running[uuid] = struct{}{}
830 if _, ok := wkr.running[uuid]; !ok {
834 for uuid := range wkr.running {
835 if _, ok := running[uuid]; !ok {
836 logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
837 wp.exited[uuid] = updateTime
842 wkr.running = running
843 wkr.updated = updateTime
848 func (wp *Pool) probeRunning(wkr *worker) (running []string, ok bool, stderr []byte) {
849 cmd := "crunch-run --list"
850 stdout, stderr, err := wkr.executor.Execute(cmd, nil)
852 wp.logger.WithFields(logrus.Fields{
853 "Instance": wkr.instance,
855 "stdout": string(stdout),
856 "stderr": string(stderr),
857 }).WithError(err).Warn("probe failed")
858 return nil, false, stderr
860 stdout = bytes.TrimRight(stdout, "\n")
861 if len(stdout) == 0 {
862 return nil, true, stderr
864 return strings.Split(string(stdout), "\n"), true, stderr
867 func (wp *Pool) probeBooted(wkr *worker) (ok bool, stderr []byte) {
868 cmd := wp.bootProbeCommand
872 stdout, stderr, err := wkr.executor.Execute(cmd, nil)
873 logger := wp.logger.WithFields(logrus.Fields{
874 "Instance": wkr.instance,
876 "stdout": string(stdout),
877 "stderr": string(stderr),
880 logger.WithError(err).Debug("boot probe failed")
883 logger.Info("boot probe succeeded")