1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
15 "git.curoverse.com/arvados.git/lib/cloud"
16 "git.curoverse.com/arvados.git/sdk/go/arvados"
17 "github.com/Sirupsen/logrus"
18 "github.com/prometheus/client_golang/prometheus"
22 tagKeyInstanceType = "InstanceType"
26 // A View shows a worker's current state and recent activity.
30 ArvadosInstanceType string
31 ProviderInstanceType string
32 LastContainerUUID string
37 // An Executor executes shell commands on a remote host.
38 type Executor interface {
39 // Run cmd on the current target.
40 Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
42 // Use the given target for subsequent operations. The new
43 // target is the same host as the previous target, but it
44 // might return a different address and verify a different
47 // SetTarget is called frequently, and in most cases the new
48 // target will behave exactly the same as the old one. An
49 // implementation should optimize accordingly.
51 // SetTarget must not block on concurrent Execute calls.
52 SetTarget(cloud.ExecutorTarget)
58 defaultSyncInterval = time.Minute
59 defaultProbeInterval = time.Second * 10
60 defaultMaxProbesPerSecond = 10
61 defaultTimeoutIdle = time.Minute
62 defaultTimeoutBooting = time.Minute * 10
63 defaultTimeoutProbe = time.Minute * 10
66 func duration(conf arvados.Duration, def time.Duration) time.Duration {
68 return time.Duration(conf)
74 // NewPool creates a Pool of workers backed by instanceSet.
76 // New instances are configured and set up according to the given
77 // cluster configuration.
78 func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
81 instanceSet: instanceSet,
82 newExecutor: newExecutor,
83 bootProbeCommand: cluster.CloudVMs.BootProbeCommand,
84 imageID: cloud.ImageID(cluster.CloudVMs.ImageID),
85 instanceTypes: cluster.InstanceTypes,
86 maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
87 probeInterval: duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
88 syncInterval: duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
89 timeoutIdle: duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
90 timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
91 timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
93 wp.registerMetrics(reg)
95 wp.setupOnce.Do(wp.setup)
103 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
104 // zero Pool should not be used. Call NewPool to create a new Pool.
107 logger logrus.FieldLogger
108 instanceSet cloud.InstanceSet
109 newExecutor func(cloud.Instance) Executor
110 bootProbeCommand string
111 imageID cloud.ImageID
112 instanceTypes map[string]arvados.InstanceType
113 syncInterval time.Duration
114 probeInterval time.Duration
115 maxProbesPerSecond int
116 timeoutIdle time.Duration
117 timeoutBooting time.Duration
118 timeoutProbe time.Duration
121 subscribers map[<-chan struct{}]chan<- struct{}
122 creating map[arvados.InstanceType]int // goroutines waiting for (InstanceSet)Create to return
123 workers map[cloud.InstanceID]*worker
124 loaded bool // loaded list of instances from InstanceSet at least once
125 exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
126 atQuotaUntil time.Time
131 mInstances prometheus.Gauge
132 mContainersRunning prometheus.Gauge
133 mVCPUs prometheus.Gauge
134 mVCPUsInuse prometheus.Gauge
135 mMemory prometheus.Gauge
136 mMemoryInuse prometheus.Gauge
141 instance cloud.Instance
143 instType arvados.InstanceType
150 unallocated time.Time
152 running map[string]struct{}
153 starting map[string]struct{}
154 probing chan struct{}
157 // Subscribe returns a channel that becomes ready whenever a worker's
162 // ch := wp.Subscribe()
163 // defer wp.Unsubscribe(ch)
165 // // ...try scheduling some work...
170 func (wp *Pool) Subscribe() <-chan struct{} {
171 wp.setupOnce.Do(wp.setup)
173 defer wp.mtx.Unlock()
174 ch := make(chan struct{}, 1)
175 wp.subscribers[ch] = ch
179 // Unsubscribe stops sending updates to the given channel.
180 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
181 wp.setupOnce.Do(wp.setup)
183 defer wp.mtx.Unlock()
184 delete(wp.subscribers, ch)
187 // Unallocated returns the number of unallocated (creating + booting +
188 // idle + unknown) workers for each instance type.
189 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
190 wp.setupOnce.Do(wp.setup)
192 defer wp.mtx.RUnlock()
193 u := map[arvados.InstanceType]int{}
194 for it, c := range wp.creating {
197 for _, wkr := range wp.workers {
198 if len(wkr.running)+len(wkr.starting) == 0 && (wkr.state == StateRunning || wkr.state == StateBooting || wkr.state == StateUnknown) {
205 // Create a new instance with the given type, and add it to the worker
206 // pool. The worker is added immediately; instance creation runs in
208 func (wp *Pool) Create(it arvados.InstanceType) error {
209 logger := wp.logger.WithField("InstanceType", it.Name)
210 wp.setupOnce.Do(wp.setup)
212 defer wp.mtx.Unlock()
213 tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
216 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
218 defer wp.mtx.Unlock()
220 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
221 wp.atQuotaUntil = time.Now().Add(time.Minute)
224 logger.WithError(err).Error("create failed")
228 wp.updateWorker(inst, it, StateBooting)
233 // AtQuota returns true if Create is not expected to work at the
235 func (wp *Pool) AtQuota() bool {
236 return time.Now().Before(wp.atQuotaUntil)
239 // Add or update worker attached to the given instance. Use
240 // initialState if a new worker is created. Caller must have lock.
241 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) {
243 if wp.workers[id] != nil {
244 wp.workers[id].executor.SetTarget(inst)
245 wp.workers[id].instance = inst
246 wp.workers[id].updated = time.Now()
247 if initialState == StateBooting && wp.workers[id].state == StateUnknown {
248 wp.workers[id].state = StateBooting
252 if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
253 initialState = StateHold
255 wp.logger.WithFields(logrus.Fields{
256 "InstanceType": it.Name,
258 "State": initialState,
259 }).Infof("instance appeared in cloud")
261 wp.workers[id] = &worker{
262 executor: wp.newExecutor(inst),
270 running: make(map[string]struct{}),
271 starting: make(map[string]struct{}),
272 probing: make(chan struct{}, 1),
277 // Shutdown shuts down a worker with the given type, or returns false
278 // if all workers with the given type are busy.
279 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
280 wp.setupOnce.Do(wp.setup)
282 defer wp.mtx.Unlock()
283 logger := wp.logger.WithField("InstanceType", it.Name)
284 logger.Info("shutdown requested")
285 for _, tryState := range []State{StateBooting, StateRunning} {
286 // TODO: shutdown the worker with the longest idle
287 // time (Running) or the earliest create time
289 for _, wkr := range wp.workers {
290 if wkr.state != tryState || len(wkr.running)+len(wkr.starting) > 0 {
293 if wkr.instType != it {
296 logger = logger.WithField("Instance", wkr.instance)
297 logger.Info("shutting down")
298 wp.shutdown(wkr, logger)
305 // caller must have lock
306 func (wp *Pool) shutdown(wkr *worker, logger logrus.FieldLogger) {
307 wkr.updated = time.Now()
308 wkr.state = StateShutdown
310 err := wkr.instance.Destroy()
312 logger.WithError(err).Warn("shutdown failed")
316 wp.atQuotaUntil = time.Now()
322 // Workers returns the current number of workers in each state.
323 func (wp *Pool) Workers() map[State]int {
324 wp.setupOnce.Do(wp.setup)
326 defer wp.mtx.Unlock()
328 for _, w := range wp.workers {
334 // Running returns the container UUIDs being prepared/run on workers.
335 func (wp *Pool) Running() map[string]time.Time {
336 wp.setupOnce.Do(wp.setup)
338 defer wp.mtx.Unlock()
339 r := map[string]time.Time{}
340 for _, wkr := range wp.workers {
341 for uuid := range wkr.running {
342 r[uuid] = time.Time{}
344 for uuid := range wkr.starting {
345 r[uuid] = time.Time{}
348 for uuid, exited := range wp.exited {
354 // StartContainer starts a container on an idle worker immediately if
355 // possible, otherwise returns false.
356 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
357 logger := wp.logger.WithFields(logrus.Fields{
358 "InstanceType": it.Name,
359 "ContainerUUID": ctr.UUID,
360 "Priority": ctr.Priority,
362 wp.setupOnce.Do(wp.setup)
364 defer wp.mtx.Unlock()
366 for _, w := range wp.workers {
367 if w.instType == it && w.state == StateRunning && len(w.running)+len(w.starting) == 0 {
368 if wkr == nil || w.busy.After(wkr.busy) {
376 logger = logger.WithField("Instance", wkr.instance)
377 logger.Debug("starting container")
378 wkr.starting[ctr.UUID] = struct{}{}
380 stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
382 defer wp.mtx.Unlock()
383 wkr.updated = time.Now()
384 delete(wkr.starting, ctr.UUID)
385 wkr.running[ctr.UUID] = struct{}{}
387 logger.WithField("stdout", string(stdout)).
388 WithField("stderr", string(stderr)).
390 Error("error starting crunch-run process")
391 // Leave uuid in wkr.running, though: it's
392 // possible the error was just a communication
393 // failure and the process was in fact
394 // started. Wait for next probe to find out.
397 logger.Info("crunch-run process started")
398 wkr.lastUUID = ctr.UUID
403 // KillContainer kills the crunch-run process for the given container
404 // UUID, if it's running on any worker.
405 func (wp *Pool) KillContainer(uuid string) {
407 defer wp.mtx.Unlock()
408 if _, ok := wp.exited[uuid]; ok {
409 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
410 delete(wp.exited, uuid)
413 for _, wkr := range wp.workers {
414 if _, ok := wkr.running[uuid]; ok {
415 go wp.kill(wkr, uuid)
419 wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
422 func (wp *Pool) kill(wkr *worker, uuid string) {
423 logger := wp.logger.WithFields(logrus.Fields{
424 "ContainerUUID": uuid,
425 "Instance": wkr.instance,
427 logger.Debug("killing process")
428 stdout, stderr, err := wkr.executor.Execute("crunch-run --kill "+uuid, nil)
430 logger.WithFields(logrus.Fields{
431 "stderr": string(stderr),
432 "stdout": string(stdout),
434 }).Warn("kill failed")
437 logger.Debug("killing process succeeded")
439 defer wp.mtx.Unlock()
440 if _, ok := wkr.running[uuid]; ok {
441 delete(wkr.running, uuid)
442 wkr.updated = time.Now()
447 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
449 reg = prometheus.NewRegistry()
451 wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
452 Namespace: "arvados",
453 Subsystem: "dispatchcloud",
454 Name: "instances_total",
455 Help: "Number of cloud VMs including pending, booting, running, held, and shutting down.",
457 reg.MustRegister(wp.mInstances)
458 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
459 Namespace: "arvados",
460 Subsystem: "dispatchcloud",
461 Name: "containers_running",
462 Help: "Number of containers reported running by cloud VMs.",
464 reg.MustRegister(wp.mContainersRunning)
466 wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
467 Namespace: "arvados",
468 Subsystem: "dispatchcloud",
470 Help: "Total VCPUs on all cloud VMs.",
472 reg.MustRegister(wp.mVCPUs)
473 wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
474 Namespace: "arvados",
475 Subsystem: "dispatchcloud",
477 Help: "VCPUs on cloud VMs that are running containers.",
479 reg.MustRegister(wp.mVCPUsInuse)
480 wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
481 Namespace: "arvados",
482 Subsystem: "dispatchcloud",
483 Name: "memory_bytes_total",
484 Help: "Total memory on all cloud VMs.",
486 reg.MustRegister(wp.mMemory)
487 wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
488 Namespace: "arvados",
489 Subsystem: "dispatchcloud",
490 Name: "memory_bytes_inuse",
491 Help: "Memory on cloud VMs that are running containers.",
493 reg.MustRegister(wp.mMemoryInuse)
496 func (wp *Pool) runMetrics() {
498 defer wp.Unsubscribe(ch)
504 func (wp *Pool) updateMetrics() {
506 defer wp.mtx.RUnlock()
508 var alloc, cpu, cpuInuse, mem, memInuse int64
509 for _, wkr := range wp.workers {
510 cpu += int64(wkr.instType.VCPUs)
511 mem += int64(wkr.instType.RAM)
512 if len(wkr.running)+len(wkr.starting) == 0 {
515 alloc += int64(len(wkr.running) + len(wkr.starting))
516 cpuInuse += int64(wkr.instType.VCPUs)
517 memInuse += int64(wkr.instType.RAM)
519 wp.mInstances.Set(float64(len(wp.workers)))
520 wp.mContainersRunning.Set(float64(alloc))
521 wp.mVCPUs.Set(float64(cpu))
522 wp.mMemory.Set(float64(mem))
523 wp.mVCPUsInuse.Set(float64(cpuInuse))
524 wp.mMemoryInuse.Set(float64(memInuse))
527 func (wp *Pool) runProbes() {
528 maxPPS := wp.maxProbesPerSecond
530 maxPPS = defaultMaxProbesPerSecond
532 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
533 defer limitticker.Stop()
535 probeticker := time.NewTicker(wp.probeInterval)
536 defer probeticker.Stop()
538 workers := []cloud.InstanceID{}
539 for range probeticker.C {
540 workers = workers[:0]
542 for id, wkr := range wp.workers {
543 if wkr.state == StateShutdown || wp.autoShutdown(wkr) {
546 workers = append(workers, id)
550 for _, id := range workers {
552 wkr, ok := wp.workers[id]
554 if !ok || wkr.state == StateShutdown {
555 // Deleted/shutdown while we
556 // were probing others
560 case wkr.probing <- struct{}{}:
562 wp.probeAndUpdate(wkr)
566 wp.logger.WithField("Instance", wkr.instance).Debug("still waiting for last probe to finish")
571 case <-limitticker.C:
577 func (wp *Pool) runSync() {
578 // sync once immediately, then wait syncInterval, sync again,
580 timer := time.NewTimer(1)
584 err := wp.getInstancesAndSync()
586 wp.logger.WithError(err).Warn("sync failed")
588 timer.Reset(wp.syncInterval)
590 wp.logger.Debug("worker.Pool stopped")
596 // caller must have lock.
597 func (wp *Pool) autoShutdown(wkr *worker) bool {
598 if len(wkr.running)+len(wkr.starting) > 0 || wkr.state != StateRunning {
601 age := time.Since(wkr.unallocated)
602 if age < wp.timeoutIdle {
605 logger := wp.logger.WithFields(logrus.Fields{
607 "Instance": wkr.instance,
609 logger.Info("shutdown idle worker")
610 wp.shutdown(wkr, logger)
614 // Stop synchronizing with the InstanceSet.
615 func (wp *Pool) Stop() {
616 wp.setupOnce.Do(wp.setup)
620 // View reports status information for every worker in the pool.
621 func (wp *Pool) View() []View {
623 wp.setupOnce.Do(wp.setup)
625 for _, w := range wp.workers {
627 Instance: w.instance.String(),
628 Price: w.instType.Price,
629 ArvadosInstanceType: w.instType.Name,
630 ProviderInstanceType: w.instType.ProviderType,
631 LastContainerUUID: w.lastUUID,
632 Unallocated: w.unallocated,
633 WorkerState: w.state.String(),
637 sort.Slice(r, func(i, j int) bool {
638 return strings.Compare(r[i].Instance, r[j].Instance) < 0
643 func (wp *Pool) setup() {
644 wp.creating = map[arvados.InstanceType]int{}
645 wp.exited = map[string]time.Time{}
646 wp.workers = map[cloud.InstanceID]*worker{}
647 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
650 func (wp *Pool) notify() {
652 defer wp.mtx.RUnlock()
653 for _, send := range wp.subscribers {
655 case send <- struct{}{}:
661 func (wp *Pool) getInstancesAndSync() error {
662 wp.setupOnce.Do(wp.setup)
663 wp.logger.Debug("getting instance list")
664 threshold := time.Now()
665 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
669 wp.sync(threshold, instances)
670 wp.logger.Debug("sync done")
674 // Add/remove/update workers based on instances, which was obtained
675 // from the instanceSet. However, don't clobber any other updates that
676 // already happened after threshold.
677 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
679 defer wp.mtx.Unlock()
680 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
682 for _, inst := range instances {
683 itTag := inst.Tags()[tagKeyInstanceType]
684 it, ok := wp.instanceTypes[itTag]
686 wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
689 wp.updateWorker(inst, it, StateUnknown)
692 for id, wkr := range wp.workers {
693 if wkr.updated.After(threshold) {
696 logger := wp.logger.WithFields(logrus.Fields{
697 "Instance": wkr.instance,
698 "WorkerState": wkr.state,
700 logger.Info("instance disappeared in cloud")
701 delete(wp.workers, id)
702 go wkr.executor.Close()
708 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
712 // should be called in a new goroutine
713 func (wp *Pool) probeAndUpdate(wkr *worker) {
714 logger := wp.logger.WithField("Instance", wkr.instance)
716 updated := wkr.updated
726 booted, stderr = wp.probeBooted(wkr)
728 if booted && !wkr.booted {
730 logger.Info("instance booted")
737 ctrUUIDs, ok, stderr = wp.probeRunning(wkr)
739 logger = logger.WithField("stderr", string(stderr))
741 defer wp.mtx.Unlock()
743 if wkr.state == StateShutdown {
746 dur := time.Since(wkr.probed)
747 logger := logger.WithFields(logrus.Fields{
751 if wkr.state == StateBooting {
752 logger.Debug("new instance not responding")
754 logger.Info("instance not responding")
757 if wkr.state == StateHold {
761 label, threshold := "", wp.timeoutProbe
762 if wkr.state == StateBooting {
763 label, threshold = "new ", wp.timeoutBooting
766 logger.WithField("Since", wkr.probed).Warnf("%sinstance unresponsive, shutting down", label)
767 wp.shutdown(wkr, logger)
772 updateTime := time.Now()
773 wkr.probed = updateTime
774 if len(ctrUUIDs) > 0 {
775 wkr.busy = updateTime
776 wkr.lastUUID = ctrUUIDs[0]
778 if wkr.state == StateShutdown || wkr.state == StateHold {
780 if wkr.state != StateRunning {
781 wkr.state = StateRunning
785 wkr.state = StateBooting
788 if updated != wkr.updated {
789 // Worker was updated (e.g., by starting a new
790 // container) after the probe began. Avoid clobbering
791 // those changes with the probe results.
795 if len(ctrUUIDs) == 0 && len(wkr.running) > 0 {
796 wkr.unallocated = updateTime
798 running := map[string]struct{}{}
800 for _, uuid := range ctrUUIDs {
801 running[uuid] = struct{}{}
802 if _, ok := wkr.running[uuid]; !ok {
806 for uuid := range wkr.running {
807 if _, ok := running[uuid]; !ok {
808 logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
809 wp.exited[uuid] = updateTime
814 wkr.running = running
815 wkr.updated = updateTime
820 func (wp *Pool) probeRunning(wkr *worker) (running []string, ok bool, stderr []byte) {
821 cmd := "crunch-run --list"
822 stdout, stderr, err := wkr.executor.Execute(cmd, nil)
824 wp.logger.WithFields(logrus.Fields{
825 "Instance": wkr.instance,
827 "stdout": string(stdout),
828 "stderr": string(stderr),
829 }).WithError(err).Warn("probe failed")
830 return nil, false, stderr
832 stdout = bytes.TrimRight(stdout, "\n")
833 if len(stdout) == 0 {
834 return nil, true, stderr
836 return strings.Split(string(stdout), "\n"), true, stderr
839 func (wp *Pool) probeBooted(wkr *worker) (ok bool, stderr []byte) {
840 cmd := wp.bootProbeCommand
844 stdout, stderr, err := wkr.executor.Execute(cmd, nil)
845 logger := wp.logger.WithFields(logrus.Fields{
846 "Instance": wkr.instance,
848 "stdout": string(stdout),
849 "stderr": string(stderr),
852 logger.WithError(err).Debug("boot probe failed")
855 logger.Info("boot probe succeeded")