1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
15 "git.curoverse.com/arvados.git/lib/cloud"
16 "git.curoverse.com/arvados.git/sdk/go/arvados"
17 "github.com/Sirupsen/logrus"
18 "github.com/prometheus/client_golang/prometheus"
22 tagKeyInstanceType = "InstanceType"
26 // An InstanceView shows a worker's current state and recent activity.
27 type InstanceView struct {
30 ArvadosInstanceType string
31 ProviderInstanceType string
32 LastContainerUUID string
37 // An Executor executes shell commands on a remote host.
38 type Executor interface {
39 // Run cmd on the current target.
40 Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
42 // Use the given target for subsequent operations. The new
43 // target is the same host as the previous target, but it
44 // might return a different address and verify a different
47 // SetTarget is called frequently, and in most cases the new
48 // target will behave exactly the same as the old one. An
49 // implementation should optimize accordingly.
51 // SetTarget must not block on concurrent Execute calls.
52 SetTarget(cloud.ExecutorTarget)
58 defaultSyncInterval = time.Minute
59 defaultProbeInterval = time.Second * 10
60 defaultMaxProbesPerSecond = 10
61 defaultTimeoutIdle = time.Minute
62 defaultTimeoutBooting = time.Minute * 10
63 defaultTimeoutProbe = time.Minute * 10
66 func duration(conf arvados.Duration, def time.Duration) time.Duration {
68 return time.Duration(conf)
74 // NewPool creates a Pool of workers backed by instanceSet.
76 // New instances are configured and set up according to the given
77 // cluster configuration.
78 func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
81 instanceSet: instanceSet,
82 newExecutor: newExecutor,
83 bootProbeCommand: cluster.CloudVMs.BootProbeCommand,
84 imageID: cloud.ImageID(cluster.CloudVMs.ImageID),
85 instanceTypes: cluster.InstanceTypes,
86 maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
87 probeInterval: duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
88 syncInterval: duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
89 timeoutIdle: duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
90 timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
91 timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
93 wp.registerMetrics(reg)
95 wp.setupOnce.Do(wp.setup)
103 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
104 // zero Pool should not be used. Call NewPool to create a new Pool.
107 logger logrus.FieldLogger
108 instanceSet cloud.InstanceSet
109 newExecutor func(cloud.Instance) Executor
110 bootProbeCommand string
111 imageID cloud.ImageID
112 instanceTypes map[string]arvados.InstanceType
113 syncInterval time.Duration
114 probeInterval time.Duration
115 maxProbesPerSecond int
116 timeoutIdle time.Duration
117 timeoutBooting time.Duration
118 timeoutProbe time.Duration
121 subscribers map[<-chan struct{}]chan<- struct{}
122 creating map[arvados.InstanceType]int // goroutines waiting for (InstanceSet)Create to return
123 workers map[cloud.InstanceID]*worker
124 loaded bool // loaded list of instances from InstanceSet at least once
125 exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
126 atQuotaUntil time.Time
127 atQuotaErr cloud.QuotaError
132 mInstances prometheus.Gauge
133 mContainersRunning prometheus.Gauge
134 mVCPUs prometheus.Gauge
135 mVCPUsInuse prometheus.Gauge
136 mMemory prometheus.Gauge
137 mMemoryInuse prometheus.Gauge
142 instance cloud.Instance
144 instType arvados.InstanceType
151 running map[string]struct{} // remember to update state idle<->running when this changes
152 starting map[string]struct{} // remember to update state idle<->running when this changes
153 probing chan struct{}
156 // Subscribe returns a channel that becomes ready whenever a worker's
161 // ch := wp.Subscribe()
162 // defer wp.Unsubscribe(ch)
164 // // ...try scheduling some work...
169 func (wp *Pool) Subscribe() <-chan struct{} {
170 wp.setupOnce.Do(wp.setup)
172 defer wp.mtx.Unlock()
173 ch := make(chan struct{}, 1)
174 wp.subscribers[ch] = ch
178 // Unsubscribe stops sending updates to the given channel.
179 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
180 wp.setupOnce.Do(wp.setup)
182 defer wp.mtx.Unlock()
183 delete(wp.subscribers, ch)
186 // Unallocated returns the number of unallocated (creating + booting +
187 // idle + unknown) workers for each instance type.
188 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
189 wp.setupOnce.Do(wp.setup)
191 defer wp.mtx.RUnlock()
192 u := map[arvados.InstanceType]int{}
193 for it, c := range wp.creating {
196 for _, wkr := range wp.workers {
197 if wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown {
204 // Create a new instance with the given type, and add it to the worker
205 // pool. The worker is added immediately; instance creation runs in
207 func (wp *Pool) Create(it arvados.InstanceType) error {
208 logger := wp.logger.WithField("InstanceType", it.Name)
209 wp.setupOnce.Do(wp.setup)
211 defer wp.mtx.Unlock()
212 if time.Now().Before(wp.atQuotaUntil) {
215 tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
219 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
221 defer wp.mtx.Unlock()
223 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
225 wp.atQuotaUntil = time.Now().Add(time.Minute)
228 logger.WithError(err).Error("create failed")
231 wp.updateWorker(inst, it, StateBooting)
236 // AtQuota returns true if Create is not expected to work at the
238 func (wp *Pool) AtQuota() bool {
240 defer wp.mtx.Unlock()
241 return time.Now().Before(wp.atQuotaUntil)
244 // Add or update worker attached to the given instance. Use
245 // initialState if a new worker is created. Caller must have lock.
247 // Returns true when a new worker is created.
248 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) bool {
250 if wp.workers[id] != nil {
251 wp.workers[id].executor.SetTarget(inst)
252 wp.workers[id].instance = inst
253 wp.workers[id].updated = time.Now()
254 if initialState == StateBooting && wp.workers[id].state == StateUnknown {
255 wp.workers[id].state = StateBooting
259 if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
260 initialState = StateHold
262 wp.logger.WithFields(logrus.Fields{
263 "InstanceType": it.Name,
265 "State": initialState,
266 }).Infof("instance appeared in cloud")
268 wp.workers[id] = &worker{
269 executor: wp.newExecutor(inst),
276 running: make(map[string]struct{}),
277 starting: make(map[string]struct{}),
278 probing: make(chan struct{}, 1),
283 // Shutdown shuts down a worker with the given type, or returns false
284 // if all workers with the given type are busy.
285 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
286 wp.setupOnce.Do(wp.setup)
288 defer wp.mtx.Unlock()
289 logger := wp.logger.WithField("InstanceType", it.Name)
290 logger.Info("shutdown requested")
291 for _, tryState := range []State{StateBooting, StateIdle} {
292 // TODO: shutdown the worker with the longest idle
293 // time (Idle) or the earliest create time (Booting)
294 for _, wkr := range wp.workers {
295 if wkr.state == tryState && wkr.instType == it {
296 logger = logger.WithField("Instance", wkr.instance)
297 logger.Info("shutting down")
298 wp.shutdown(wkr, logger)
306 // caller must have lock
307 func (wp *Pool) shutdown(wkr *worker, logger logrus.FieldLogger) {
308 wkr.updated = time.Now()
309 wkr.state = StateShutdown
311 err := wkr.instance.Destroy()
313 logger.WithError(err).WithField("Instance", wkr.instance).Warn("shutdown failed")
317 wp.atQuotaUntil = time.Now()
323 // Workers returns the current number of workers in each state.
324 func (wp *Pool) Workers() map[State]int {
325 wp.setupOnce.Do(wp.setup)
327 defer wp.mtx.Unlock()
329 for _, w := range wp.workers {
335 // Running returns the container UUIDs being prepared/run on workers.
336 func (wp *Pool) Running() map[string]time.Time {
337 wp.setupOnce.Do(wp.setup)
339 defer wp.mtx.Unlock()
340 r := map[string]time.Time{}
341 for _, wkr := range wp.workers {
342 for uuid := range wkr.running {
343 r[uuid] = time.Time{}
345 for uuid := range wkr.starting {
346 r[uuid] = time.Time{}
349 for uuid, exited := range wp.exited {
355 // StartContainer starts a container on an idle worker immediately if
356 // possible, otherwise returns false.
357 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
358 logger := wp.logger.WithFields(logrus.Fields{
359 "InstanceType": it.Name,
360 "ContainerUUID": ctr.UUID,
361 "Priority": ctr.Priority,
363 wp.setupOnce.Do(wp.setup)
365 defer wp.mtx.Unlock()
367 for _, w := range wp.workers {
368 if w.instType == it && w.state == StateIdle {
369 if wkr == nil || w.busy.After(wkr.busy) {
377 logger = logger.WithField("Instance", wkr.instance)
378 logger.Debug("starting container")
379 wkr.starting[ctr.UUID] = struct{}{}
380 wkr.state = StateRunning
382 stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
384 defer wp.mtx.Unlock()
388 delete(wkr.starting, ctr.UUID)
389 wkr.running[ctr.UUID] = struct{}{}
390 wkr.lastUUID = ctr.UUID
392 logger.WithField("stdout", string(stdout)).
393 WithField("stderr", string(stderr)).
395 Error("error starting crunch-run process")
396 // Leave uuid in wkr.running, though: it's
397 // possible the error was just a communication
398 // failure and the process was in fact
399 // started. Wait for next probe to find out.
402 logger.Info("crunch-run process started")
403 wkr.lastUUID = ctr.UUID
408 // KillContainer kills the crunch-run process for the given container
409 // UUID, if it's running on any worker.
411 // KillContainer returns immediately; the act of killing the container
412 // takes some time, and runs in the background.
413 func (wp *Pool) KillContainer(uuid string) {
415 defer wp.mtx.Unlock()
416 if _, ok := wp.exited[uuid]; ok {
417 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
418 delete(wp.exited, uuid)
421 for _, wkr := range wp.workers {
422 if _, ok := wkr.running[uuid]; ok {
423 go wp.kill(wkr, uuid)
427 wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
430 func (wp *Pool) kill(wkr *worker, uuid string) {
431 logger := wp.logger.WithFields(logrus.Fields{
432 "ContainerUUID": uuid,
433 "Instance": wkr.instance,
435 logger.Debug("killing process")
436 stdout, stderr, err := wkr.executor.Execute("crunch-run --kill "+uuid, nil)
438 logger.WithFields(logrus.Fields{
439 "stderr": string(stderr),
440 "stdout": string(stdout),
442 }).Warn("kill failed")
445 logger.Debug("killing process succeeded")
447 defer wp.mtx.Unlock()
448 if _, ok := wkr.running[uuid]; ok {
449 delete(wkr.running, uuid)
450 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
451 wkr.state = StateIdle
453 wkr.updated = time.Now()
458 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
460 reg = prometheus.NewRegistry()
462 wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
463 Namespace: "arvados",
464 Subsystem: "dispatchcloud",
465 Name: "instances_total",
466 Help: "Number of cloud VMs including pending, booting, running, held, and shutting down.",
468 reg.MustRegister(wp.mInstances)
469 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
470 Namespace: "arvados",
471 Subsystem: "dispatchcloud",
472 Name: "containers_running",
473 Help: "Number of containers reported running by cloud VMs.",
475 reg.MustRegister(wp.mContainersRunning)
477 wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
478 Namespace: "arvados",
479 Subsystem: "dispatchcloud",
481 Help: "Total VCPUs on all cloud VMs.",
483 reg.MustRegister(wp.mVCPUs)
484 wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
485 Namespace: "arvados",
486 Subsystem: "dispatchcloud",
488 Help: "VCPUs on cloud VMs that are running containers.",
490 reg.MustRegister(wp.mVCPUsInuse)
491 wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
492 Namespace: "arvados",
493 Subsystem: "dispatchcloud",
494 Name: "memory_bytes_total",
495 Help: "Total memory on all cloud VMs.",
497 reg.MustRegister(wp.mMemory)
498 wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
499 Namespace: "arvados",
500 Subsystem: "dispatchcloud",
501 Name: "memory_bytes_inuse",
502 Help: "Memory on cloud VMs that are running containers.",
504 reg.MustRegister(wp.mMemoryInuse)
507 func (wp *Pool) runMetrics() {
509 defer wp.Unsubscribe(ch)
515 func (wp *Pool) updateMetrics() {
517 defer wp.mtx.RUnlock()
519 var alloc, cpu, cpuInuse, mem, memInuse int64
520 for _, wkr := range wp.workers {
521 cpu += int64(wkr.instType.VCPUs)
522 mem += int64(wkr.instType.RAM)
523 if len(wkr.running)+len(wkr.starting) == 0 {
526 alloc += int64(len(wkr.running) + len(wkr.starting))
527 cpuInuse += int64(wkr.instType.VCPUs)
528 memInuse += int64(wkr.instType.RAM)
530 wp.mInstances.Set(float64(len(wp.workers)))
531 wp.mContainersRunning.Set(float64(alloc))
532 wp.mVCPUs.Set(float64(cpu))
533 wp.mMemory.Set(float64(mem))
534 wp.mVCPUsInuse.Set(float64(cpuInuse))
535 wp.mMemoryInuse.Set(float64(memInuse))
538 func (wp *Pool) runProbes() {
539 maxPPS := wp.maxProbesPerSecond
541 maxPPS = defaultMaxProbesPerSecond
543 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
544 defer limitticker.Stop()
546 probeticker := time.NewTicker(wp.probeInterval)
547 defer probeticker.Stop()
549 workers := []cloud.InstanceID{}
550 for range probeticker.C {
551 workers = workers[:0]
553 for id, wkr := range wp.workers {
554 if wkr.state == StateShutdown || wp.shutdownIfIdle(wkr) {
557 workers = append(workers, id)
561 for _, id := range workers {
563 wkr, ok := wp.workers[id]
565 if !ok || wkr.state == StateShutdown {
566 // Deleted/shutdown while we
567 // were probing others
571 case wkr.probing <- struct{}{}:
573 wp.probeAndUpdate(wkr)
577 wp.logger.WithField("Instance", wkr.instance).Debug("still waiting for last probe to finish")
582 case <-limitticker.C:
588 func (wp *Pool) runSync() {
589 // sync once immediately, then wait syncInterval, sync again,
591 timer := time.NewTimer(1)
595 err := wp.getInstancesAndSync()
597 wp.logger.WithError(err).Warn("sync failed")
599 timer.Reset(wp.syncInterval)
601 wp.logger.Debug("worker.Pool stopped")
607 // caller must have lock.
608 func (wp *Pool) shutdownIfBroken(wkr *worker, dur time.Duration) {
609 if wkr.state == StateHold {
612 label, threshold := "", wp.timeoutProbe
613 if wkr.state == StateBooting {
614 label, threshold = "new ", wp.timeoutBooting
619 wp.logger.WithFields(logrus.Fields{
620 "Instance": wkr.instance,
624 }).Warnf("%sinstance unresponsive, shutting down", label)
625 wp.shutdown(wkr, wp.logger)
628 // caller must have lock.
629 func (wp *Pool) shutdownIfIdle(wkr *worker) bool {
630 if wkr.state != StateIdle {
633 age := time.Since(wkr.busy)
634 if age < wp.timeoutIdle {
637 logger := wp.logger.WithFields(logrus.Fields{
639 "Instance": wkr.instance,
641 logger.Info("shutdown idle worker")
642 wp.shutdown(wkr, logger)
646 // Stop synchronizing with the InstanceSet.
647 func (wp *Pool) Stop() {
648 wp.setupOnce.Do(wp.setup)
652 // Instances returns an InstanceView for each worker in the pool,
653 // summarizing its current state and recent activity.
654 func (wp *Pool) Instances() []InstanceView {
656 wp.setupOnce.Do(wp.setup)
658 for _, w := range wp.workers {
659 r = append(r, InstanceView{
660 Instance: w.instance.String(),
661 Price: w.instType.Price,
662 ArvadosInstanceType: w.instType.Name,
663 ProviderInstanceType: w.instType.ProviderType,
664 LastContainerUUID: w.lastUUID,
666 WorkerState: w.state.String(),
670 sort.Slice(r, func(i, j int) bool {
671 return strings.Compare(r[i].Instance, r[j].Instance) < 0
676 func (wp *Pool) setup() {
677 wp.creating = map[arvados.InstanceType]int{}
678 wp.exited = map[string]time.Time{}
679 wp.workers = map[cloud.InstanceID]*worker{}
680 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
683 func (wp *Pool) notify() {
685 defer wp.mtx.RUnlock()
686 for _, send := range wp.subscribers {
688 case send <- struct{}{}:
694 func (wp *Pool) getInstancesAndSync() error {
695 wp.setupOnce.Do(wp.setup)
696 wp.logger.Debug("getting instance list")
697 threshold := time.Now()
698 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
702 wp.sync(threshold, instances)
703 wp.logger.Debug("sync done")
707 // Add/remove/update workers based on instances, which was obtained
708 // from the instanceSet. However, don't clobber any other updates that
709 // already happened after threshold.
710 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
712 defer wp.mtx.Unlock()
713 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
716 for _, inst := range instances {
717 itTag := inst.Tags()[tagKeyInstanceType]
718 it, ok := wp.instanceTypes[itTag]
720 wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
723 if wp.updateWorker(inst, it, StateUnknown) {
728 for id, wkr := range wp.workers {
729 if wkr.updated.After(threshold) {
732 logger := wp.logger.WithFields(logrus.Fields{
733 "Instance": wkr.instance,
734 "WorkerState": wkr.state,
736 logger.Info("instance disappeared in cloud")
737 delete(wp.workers, id)
738 go wkr.executor.Close()
744 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
752 // should be called in a new goroutine
753 func (wp *Pool) probeAndUpdate(wkr *worker) {
754 logger := wp.logger.WithField("Instance", wkr.instance)
756 updated := wkr.updated
757 needProbeRunning := wkr.state == StateRunning || wkr.state == StateIdle
758 needProbeBooted := wkr.state == StateUnknown || wkr.state == StateBooting
760 if !needProbeBooted && !needProbeRunning {
770 ok, stderr = wp.probeBooted(wkr)
772 if ok || wkr.state == StateRunning || wkr.state == StateIdle {
773 logger.Info("instance booted; will try probeRunning")
774 needProbeRunning = true
778 if needProbeRunning {
779 ctrUUIDs, ok, stderr = wp.probeRunning(wkr)
781 logger = logger.WithField("stderr", string(stderr))
783 defer wp.mtx.Unlock()
785 if wkr.state == StateShutdown && wkr.updated.After(updated) {
786 // Skip the logging noise if shutdown was
787 // initiated during probe.
790 dur := time.Since(wkr.probed)
791 logger := logger.WithFields(logrus.Fields{
795 if wkr.state == StateBooting {
796 logger.Debug("new instance not responding")
798 logger.Info("instance not responding")
800 wp.shutdownIfBroken(wkr, dur)
804 updateTime := time.Now()
805 wkr.probed = updateTime
807 if updated != wkr.updated {
808 // Worker was updated after the probe began, so
809 // wkr.running might have a container UUID that was
810 // not yet running when ctrUUIDs was generated. Leave
811 // wkr.running alone and wait for the next probe to
812 // catch up on any changes.
816 if len(ctrUUIDs) > 0 {
817 wkr.busy = updateTime
818 wkr.lastUUID = ctrUUIDs[0]
819 } else if len(wkr.running) > 0 {
820 // Actual last-busy time was sometime between wkr.busy
821 // and now. Now is the earliest opportunity to take
822 // advantage of the non-busy state, though.
823 wkr.busy = updateTime
825 running := map[string]struct{}{}
827 for _, uuid := range ctrUUIDs {
828 running[uuid] = struct{}{}
829 if _, ok := wkr.running[uuid]; !ok {
833 for uuid := range wkr.running {
834 if _, ok := running[uuid]; !ok {
835 logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
836 wp.exited[uuid] = updateTime
840 if wkr.state == StateUnknown || wkr.state == StateBooting {
841 wkr.state = StateIdle
845 wkr.running = running
846 if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
847 wkr.state = StateRunning
848 } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
849 wkr.state = StateIdle
851 wkr.updated = updateTime
856 func (wp *Pool) probeRunning(wkr *worker) (running []string, ok bool, stderr []byte) {
857 cmd := "crunch-run --list"
858 stdout, stderr, err := wkr.executor.Execute(cmd, nil)
860 wp.logger.WithFields(logrus.Fields{
861 "Instance": wkr.instance,
863 "stdout": string(stdout),
864 "stderr": string(stderr),
865 }).WithError(err).Warn("probe failed")
866 return nil, false, stderr
868 stdout = bytes.TrimRight(stdout, "\n")
869 if len(stdout) == 0 {
870 return nil, true, stderr
872 return strings.Split(string(stdout), "\n"), true, stderr
875 func (wp *Pool) probeBooted(wkr *worker) (ok bool, stderr []byte) {
876 cmd := wp.bootProbeCommand
880 stdout, stderr, err := wkr.executor.Execute(cmd, nil)
881 logger := wp.logger.WithFields(logrus.Fields{
882 "Instance": wkr.instance,
884 "stdout": string(stdout),
885 "stderr": string(stderr),
888 logger.WithError(err).Debug("boot probe failed")
891 logger.Info("boot probe succeeded")