1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
15 "git.curoverse.com/arvados.git/lib/cloud"
16 "git.curoverse.com/arvados.git/sdk/go/arvados"
17 "github.com/Sirupsen/logrus"
18 "github.com/prometheus/client_golang/prometheus"
22 tagKeyInstanceType = "InstanceType"
26 // An InstanceView shows a worker's current state and recent activity.
27 type InstanceView struct {
30 ArvadosInstanceType string
31 ProviderInstanceType string
32 LastContainerUUID string
37 // An Executor executes shell commands on a remote host.
38 type Executor interface {
39 // Run cmd on the current target.
40 Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
42 // Use the given target for subsequent operations. The new
43 // target is the same host as the previous target, but it
44 // might return a different address and verify a different
47 // SetTarget is called frequently, and in most cases the new
48 // target will behave exactly the same as the old one. An
49 // implementation should optimize accordingly.
51 // SetTarget must not block on concurrent Execute calls.
52 SetTarget(cloud.ExecutorTarget)
58 defaultSyncInterval = time.Minute
59 defaultProbeInterval = time.Second * 10
60 defaultMaxProbesPerSecond = 10
61 defaultTimeoutIdle = time.Minute
62 defaultTimeoutBooting = time.Minute * 10
63 defaultTimeoutProbe = time.Minute * 10
66 func duration(conf arvados.Duration, def time.Duration) time.Duration {
68 return time.Duration(conf)
74 // NewPool creates a Pool of workers backed by instanceSet.
76 // New instances are configured and set up according to the given
77 // cluster configuration.
78 func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
81 instanceSet: instanceSet,
82 newExecutor: newExecutor,
83 bootProbeCommand: cluster.CloudVMs.BootProbeCommand,
84 imageID: cloud.ImageID(cluster.CloudVMs.ImageID),
85 instanceTypes: cluster.InstanceTypes,
86 maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
87 probeInterval: duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
88 syncInterval: duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
89 timeoutIdle: duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
90 timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
91 timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
93 wp.registerMetrics(reg)
95 wp.setupOnce.Do(wp.setup)
103 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
104 // zero Pool should not be used. Call NewPool to create a new Pool.
107 logger logrus.FieldLogger
108 instanceSet cloud.InstanceSet
109 newExecutor func(cloud.Instance) Executor
110 bootProbeCommand string
111 imageID cloud.ImageID
112 instanceTypes map[string]arvados.InstanceType
113 syncInterval time.Duration
114 probeInterval time.Duration
115 maxProbesPerSecond int
116 timeoutIdle time.Duration
117 timeoutBooting time.Duration
118 timeoutProbe time.Duration
121 subscribers map[<-chan struct{}]chan<- struct{}
122 creating map[arvados.InstanceType]int // goroutines waiting for (InstanceSet)Create to return
123 workers map[cloud.InstanceID]*worker
124 loaded bool // loaded list of instances from InstanceSet at least once
125 exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
126 atQuotaUntil time.Time
127 atQuotaErr cloud.QuotaError
132 mInstances prometheus.Gauge
133 mContainersRunning prometheus.Gauge
134 mVCPUs prometheus.Gauge
135 mVCPUsInuse prometheus.Gauge
136 mMemory prometheus.Gauge
137 mMemoryInuse prometheus.Gauge
142 instance cloud.Instance
144 instType arvados.InstanceType
151 running map[string]struct{} // remember to update state idle<->running when this changes
152 starting map[string]struct{} // remember to update state idle<->running when this changes
153 probing chan struct{}
156 // Subscribe returns a channel that becomes ready whenever a worker's
161 // ch := wp.Subscribe()
162 // defer wp.Unsubscribe(ch)
164 // // ...try scheduling some work...
169 func (wp *Pool) Subscribe() <-chan struct{} {
170 wp.setupOnce.Do(wp.setup)
172 defer wp.mtx.Unlock()
173 ch := make(chan struct{}, 1)
174 wp.subscribers[ch] = ch
178 // Unsubscribe stops sending updates to the given channel.
179 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
180 wp.setupOnce.Do(wp.setup)
182 defer wp.mtx.Unlock()
183 delete(wp.subscribers, ch)
186 // Unallocated returns the number of unallocated (creating + booting +
187 // idle + unknown) workers for each instance type.
189 // The returned counts should be interpreted as upper bounds, rather
190 // than exact counts: they are sometimes artificially high when a
191 // newly created instance appears in the driver's Instances() list
192 // before the Create() call returns.
193 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
194 wp.setupOnce.Do(wp.setup)
196 defer wp.mtx.RUnlock()
197 u := map[arvados.InstanceType]int{}
198 for it, c := range wp.creating {
201 for _, wkr := range wp.workers {
202 if wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown {
209 // Create a new instance with the given type, and add it to the worker
210 // pool. The worker is added immediately; instance creation runs in
212 func (wp *Pool) Create(it arvados.InstanceType) error {
213 logger := wp.logger.WithField("InstanceType", it.Name)
214 wp.setupOnce.Do(wp.setup)
216 defer wp.mtx.Unlock()
217 if time.Now().Before(wp.atQuotaUntil) {
220 tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
224 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
226 defer wp.mtx.Unlock()
228 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
230 wp.atQuotaUntil = time.Now().Add(time.Minute)
233 logger.WithError(err).Error("create failed")
236 wp.updateWorker(inst, it, StateBooting)
241 // AtQuota returns true if Create is not expected to work at the
243 func (wp *Pool) AtQuota() bool {
245 defer wp.mtx.Unlock()
246 return time.Now().Before(wp.atQuotaUntil)
249 // Add or update worker attached to the given instance. Use
250 // initialState if a new worker is created. Caller must have lock.
252 // Returns true when a new worker is created.
253 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) bool {
255 if wp.workers[id] != nil {
256 wp.workers[id].executor.SetTarget(inst)
257 wp.workers[id].instance = inst
258 wp.workers[id].updated = time.Now()
259 if initialState == StateBooting && wp.workers[id].state == StateUnknown {
260 wp.workers[id].state = StateBooting
264 if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
265 initialState = StateHold
267 wp.logger.WithFields(logrus.Fields{
268 "InstanceType": it.Name,
270 "State": initialState,
271 }).Infof("instance appeared in cloud")
273 wp.workers[id] = &worker{
274 executor: wp.newExecutor(inst),
281 running: make(map[string]struct{}),
282 starting: make(map[string]struct{}),
283 probing: make(chan struct{}, 1),
288 // Shutdown shuts down a worker with the given type, or returns false
289 // if all workers with the given type are busy.
290 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
291 wp.setupOnce.Do(wp.setup)
293 defer wp.mtx.Unlock()
294 logger := wp.logger.WithField("InstanceType", it.Name)
295 logger.Info("shutdown requested")
296 for _, tryState := range []State{StateBooting, StateIdle} {
297 // TODO: shutdown the worker with the longest idle
298 // time (Idle) or the earliest create time (Booting)
299 for _, wkr := range wp.workers {
300 if wkr.state == tryState && wkr.instType == it {
301 logger = logger.WithField("Instance", wkr.instance)
302 logger.Info("shutting down")
303 wp.shutdown(wkr, logger)
311 // caller must have lock
312 func (wp *Pool) shutdown(wkr *worker, logger logrus.FieldLogger) {
313 wkr.updated = time.Now()
314 wkr.state = StateShutdown
316 err := wkr.instance.Destroy()
318 logger.WithError(err).WithField("Instance", wkr.instance).Warn("shutdown failed")
322 wp.atQuotaUntil = time.Now()
328 // Workers returns the current number of workers in each state.
329 func (wp *Pool) Workers() map[State]int {
330 wp.setupOnce.Do(wp.setup)
332 defer wp.mtx.Unlock()
334 for _, w := range wp.workers {
340 // Running returns the container UUIDs being prepared/run on workers.
341 func (wp *Pool) Running() map[string]time.Time {
342 wp.setupOnce.Do(wp.setup)
344 defer wp.mtx.Unlock()
345 r := map[string]time.Time{}
346 for _, wkr := range wp.workers {
347 for uuid := range wkr.running {
348 r[uuid] = time.Time{}
350 for uuid := range wkr.starting {
351 r[uuid] = time.Time{}
354 for uuid, exited := range wp.exited {
360 // StartContainer starts a container on an idle worker immediately if
361 // possible, otherwise returns false.
362 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
363 logger := wp.logger.WithFields(logrus.Fields{
364 "InstanceType": it.Name,
365 "ContainerUUID": ctr.UUID,
366 "Priority": ctr.Priority,
368 wp.setupOnce.Do(wp.setup)
370 defer wp.mtx.Unlock()
372 for _, w := range wp.workers {
373 if w.instType == it && w.state == StateIdle {
374 if wkr == nil || w.busy.After(wkr.busy) {
382 logger = logger.WithField("Instance", wkr.instance)
383 logger.Debug("starting container")
384 wkr.starting[ctr.UUID] = struct{}{}
385 wkr.state = StateRunning
387 stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
389 defer wp.mtx.Unlock()
393 delete(wkr.starting, ctr.UUID)
394 wkr.running[ctr.UUID] = struct{}{}
395 wkr.lastUUID = ctr.UUID
397 logger.WithField("stdout", string(stdout)).
398 WithField("stderr", string(stderr)).
400 Error("error starting crunch-run process")
401 // Leave uuid in wkr.running, though: it's
402 // possible the error was just a communication
403 // failure and the process was in fact
404 // started. Wait for next probe to find out.
407 logger.Info("crunch-run process started")
408 wkr.lastUUID = ctr.UUID
413 // KillContainer kills the crunch-run process for the given container
414 // UUID, if it's running on any worker.
416 // KillContainer returns immediately; the act of killing the container
417 // takes some time, and runs in the background.
418 func (wp *Pool) KillContainer(uuid string) {
420 defer wp.mtx.Unlock()
421 if _, ok := wp.exited[uuid]; ok {
422 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
423 delete(wp.exited, uuid)
426 for _, wkr := range wp.workers {
427 if _, ok := wkr.running[uuid]; ok {
428 go wp.kill(wkr, uuid)
432 wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
435 func (wp *Pool) kill(wkr *worker, uuid string) {
436 logger := wp.logger.WithFields(logrus.Fields{
437 "ContainerUUID": uuid,
438 "Instance": wkr.instance,
440 logger.Debug("killing process")
441 stdout, stderr, err := wkr.executor.Execute("crunch-run --kill "+uuid, nil)
443 logger.WithFields(logrus.Fields{
444 "stderr": string(stderr),
445 "stdout": string(stdout),
447 }).Warn("kill failed")
450 logger.Debug("killing process succeeded")
452 defer wp.mtx.Unlock()
453 if _, ok := wkr.running[uuid]; ok {
454 delete(wkr.running, uuid)
455 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
456 wkr.state = StateIdle
458 wkr.updated = time.Now()
463 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
465 reg = prometheus.NewRegistry()
467 wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
468 Namespace: "arvados",
469 Subsystem: "dispatchcloud",
470 Name: "instances_total",
471 Help: "Number of cloud VMs including pending, booting, running, held, and shutting down.",
473 reg.MustRegister(wp.mInstances)
474 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
475 Namespace: "arvados",
476 Subsystem: "dispatchcloud",
477 Name: "containers_running",
478 Help: "Number of containers reported running by cloud VMs.",
480 reg.MustRegister(wp.mContainersRunning)
482 wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
483 Namespace: "arvados",
484 Subsystem: "dispatchcloud",
486 Help: "Total VCPUs on all cloud VMs.",
488 reg.MustRegister(wp.mVCPUs)
489 wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
490 Namespace: "arvados",
491 Subsystem: "dispatchcloud",
493 Help: "VCPUs on cloud VMs that are running containers.",
495 reg.MustRegister(wp.mVCPUsInuse)
496 wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
497 Namespace: "arvados",
498 Subsystem: "dispatchcloud",
499 Name: "memory_bytes_total",
500 Help: "Total memory on all cloud VMs.",
502 reg.MustRegister(wp.mMemory)
503 wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
504 Namespace: "arvados",
505 Subsystem: "dispatchcloud",
506 Name: "memory_bytes_inuse",
507 Help: "Memory on cloud VMs that are running containers.",
509 reg.MustRegister(wp.mMemoryInuse)
512 func (wp *Pool) runMetrics() {
514 defer wp.Unsubscribe(ch)
520 func (wp *Pool) updateMetrics() {
522 defer wp.mtx.RUnlock()
524 var alloc, cpu, cpuInuse, mem, memInuse int64
525 for _, wkr := range wp.workers {
526 cpu += int64(wkr.instType.VCPUs)
527 mem += int64(wkr.instType.RAM)
528 if len(wkr.running)+len(wkr.starting) == 0 {
531 alloc += int64(len(wkr.running) + len(wkr.starting))
532 cpuInuse += int64(wkr.instType.VCPUs)
533 memInuse += int64(wkr.instType.RAM)
535 wp.mInstances.Set(float64(len(wp.workers)))
536 wp.mContainersRunning.Set(float64(alloc))
537 wp.mVCPUs.Set(float64(cpu))
538 wp.mMemory.Set(float64(mem))
539 wp.mVCPUsInuse.Set(float64(cpuInuse))
540 wp.mMemoryInuse.Set(float64(memInuse))
543 func (wp *Pool) runProbes() {
544 maxPPS := wp.maxProbesPerSecond
546 maxPPS = defaultMaxProbesPerSecond
548 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
549 defer limitticker.Stop()
551 probeticker := time.NewTicker(wp.probeInterval)
552 defer probeticker.Stop()
554 workers := []cloud.InstanceID{}
555 for range probeticker.C {
556 workers = workers[:0]
558 for id, wkr := range wp.workers {
559 if wkr.state == StateShutdown || wp.shutdownIfIdle(wkr) {
562 workers = append(workers, id)
566 for _, id := range workers {
568 wkr, ok := wp.workers[id]
570 if !ok || wkr.state == StateShutdown {
571 // Deleted/shutdown while we
572 // were probing others
576 case wkr.probing <- struct{}{}:
578 wp.probeAndUpdate(wkr)
582 wp.logger.WithField("Instance", wkr.instance).Debug("still waiting for last probe to finish")
587 case <-limitticker.C:
593 func (wp *Pool) runSync() {
594 // sync once immediately, then wait syncInterval, sync again,
596 timer := time.NewTimer(1)
600 err := wp.getInstancesAndSync()
602 wp.logger.WithError(err).Warn("sync failed")
604 timer.Reset(wp.syncInterval)
606 wp.logger.Debug("worker.Pool stopped")
612 // caller must have lock.
613 func (wp *Pool) shutdownIfBroken(wkr *worker, dur time.Duration) {
614 if wkr.state == StateHold {
617 label, threshold := "", wp.timeoutProbe
618 if wkr.state == StateBooting {
619 label, threshold = "new ", wp.timeoutBooting
624 wp.logger.WithFields(logrus.Fields{
625 "Instance": wkr.instance,
629 }).Warnf("%sinstance unresponsive, shutting down", label)
630 wp.shutdown(wkr, wp.logger)
633 // caller must have lock.
634 func (wp *Pool) shutdownIfIdle(wkr *worker) bool {
635 if wkr.state != StateIdle {
638 age := time.Since(wkr.busy)
639 if age < wp.timeoutIdle {
642 logger := wp.logger.WithFields(logrus.Fields{
644 "Instance": wkr.instance,
646 logger.Info("shutdown idle worker")
647 wp.shutdown(wkr, logger)
651 // Stop synchronizing with the InstanceSet.
652 func (wp *Pool) Stop() {
653 wp.setupOnce.Do(wp.setup)
657 // Instances returns an InstanceView for each worker in the pool,
658 // summarizing its current state and recent activity.
659 func (wp *Pool) Instances() []InstanceView {
661 wp.setupOnce.Do(wp.setup)
663 for _, w := range wp.workers {
664 r = append(r, InstanceView{
665 Instance: w.instance.String(),
666 Price: w.instType.Price,
667 ArvadosInstanceType: w.instType.Name,
668 ProviderInstanceType: w.instType.ProviderType,
669 LastContainerUUID: w.lastUUID,
671 WorkerState: w.state.String(),
675 sort.Slice(r, func(i, j int) bool {
676 return strings.Compare(r[i].Instance, r[j].Instance) < 0
681 func (wp *Pool) setup() {
682 wp.creating = map[arvados.InstanceType]int{}
683 wp.exited = map[string]time.Time{}
684 wp.workers = map[cloud.InstanceID]*worker{}
685 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
688 func (wp *Pool) notify() {
690 defer wp.mtx.RUnlock()
691 for _, send := range wp.subscribers {
693 case send <- struct{}{}:
699 func (wp *Pool) getInstancesAndSync() error {
700 wp.setupOnce.Do(wp.setup)
701 wp.logger.Debug("getting instance list")
702 threshold := time.Now()
703 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
707 wp.sync(threshold, instances)
708 wp.logger.Debug("sync done")
712 // Add/remove/update workers based on instances, which was obtained
713 // from the instanceSet. However, don't clobber any other updates that
714 // already happened after threshold.
715 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
717 defer wp.mtx.Unlock()
718 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
721 for _, inst := range instances {
722 itTag := inst.Tags()[tagKeyInstanceType]
723 it, ok := wp.instanceTypes[itTag]
725 wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
728 if wp.updateWorker(inst, it, StateUnknown) {
733 for id, wkr := range wp.workers {
734 if wkr.updated.After(threshold) {
737 logger := wp.logger.WithFields(logrus.Fields{
738 "Instance": wkr.instance,
739 "WorkerState": wkr.state,
741 logger.Info("instance disappeared in cloud")
742 delete(wp.workers, id)
743 go wkr.executor.Close()
749 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
757 // should be called in a new goroutine
758 func (wp *Pool) probeAndUpdate(wkr *worker) {
759 logger := wp.logger.WithField("Instance", wkr.instance)
761 updated := wkr.updated
762 needProbeRunning := wkr.state == StateRunning || wkr.state == StateIdle
763 needProbeBooted := wkr.state == StateUnknown || wkr.state == StateBooting
765 if !needProbeBooted && !needProbeRunning {
775 ok, stderr = wp.probeBooted(wkr)
777 if ok || wkr.state == StateRunning || wkr.state == StateIdle {
778 logger.Info("instance booted; will try probeRunning")
779 needProbeRunning = true
783 if needProbeRunning {
784 ctrUUIDs, ok, stderr = wp.probeRunning(wkr)
786 logger = logger.WithField("stderr", string(stderr))
788 defer wp.mtx.Unlock()
790 if wkr.state == StateShutdown && wkr.updated.After(updated) {
791 // Skip the logging noise if shutdown was
792 // initiated during probe.
795 dur := time.Since(wkr.probed)
796 logger := logger.WithFields(logrus.Fields{
800 if wkr.state == StateBooting {
801 logger.Debug("new instance not responding")
803 logger.Info("instance not responding")
805 wp.shutdownIfBroken(wkr, dur)
809 updateTime := time.Now()
810 wkr.probed = updateTime
812 if updated != wkr.updated {
813 // Worker was updated after the probe began, so
814 // wkr.running might have a container UUID that was
815 // not yet running when ctrUUIDs was generated. Leave
816 // wkr.running alone and wait for the next probe to
817 // catch up on any changes.
821 if len(ctrUUIDs) > 0 {
822 wkr.busy = updateTime
823 wkr.lastUUID = ctrUUIDs[0]
824 } else if len(wkr.running) > 0 {
825 // Actual last-busy time was sometime between wkr.busy
826 // and now. Now is the earliest opportunity to take
827 // advantage of the non-busy state, though.
828 wkr.busy = updateTime
830 running := map[string]struct{}{}
832 for _, uuid := range ctrUUIDs {
833 running[uuid] = struct{}{}
834 if _, ok := wkr.running[uuid]; !ok {
838 for uuid := range wkr.running {
839 if _, ok := running[uuid]; !ok {
840 logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
841 wp.exited[uuid] = updateTime
845 if wkr.state == StateUnknown || wkr.state == StateBooting {
846 wkr.state = StateIdle
850 wkr.running = running
851 if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
852 wkr.state = StateRunning
853 } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
854 wkr.state = StateIdle
856 wkr.updated = updateTime
861 func (wp *Pool) probeRunning(wkr *worker) (running []string, ok bool, stderr []byte) {
862 cmd := "crunch-run --list"
863 stdout, stderr, err := wkr.executor.Execute(cmd, nil)
865 wp.logger.WithFields(logrus.Fields{
866 "Instance": wkr.instance,
868 "stdout": string(stdout),
869 "stderr": string(stderr),
870 }).WithError(err).Warn("probe failed")
871 return nil, false, stderr
873 stdout = bytes.TrimRight(stdout, "\n")
874 if len(stdout) == 0 {
875 return nil, true, stderr
877 return strings.Split(string(stdout), "\n"), true, stderr
880 func (wp *Pool) probeBooted(wkr *worker) (ok bool, stderr []byte) {
881 cmd := wp.bootProbeCommand
885 stdout, stderr, err := wkr.executor.Execute(cmd, nil)
886 logger := wp.logger.WithFields(logrus.Fields{
887 "Instance": wkr.instance,
889 "stdout": string(stdout),
890 "stderr": string(stderr),
893 logger.WithError(err).Debug("boot probe failed")
896 logger.Info("boot probe succeeded")