1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
14 "git.curoverse.com/arvados.git/lib/cloud"
15 "git.curoverse.com/arvados.git/sdk/go/arvados"
16 "github.com/prometheus/client_golang/prometheus"
17 "github.com/sirupsen/logrus"
21 tagKeyInstanceType = "InstanceType"
25 // An InstanceView shows a worker's current state and recent activity.
26 type InstanceView struct {
29 ArvadosInstanceType string
30 ProviderInstanceType string
31 LastContainerUUID string
36 // An Executor executes shell commands on a remote host.
37 type Executor interface {
38 // Run cmd on the current target.
39 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
41 // Use the given target for subsequent operations. The new
42 // target is the same host as the previous target, but it
43 // might return a different address and verify a different
46 // SetTarget is called frequently, and in most cases the new
47 // target will behave exactly the same as the old one. An
48 // implementation should optimize accordingly.
50 // SetTarget must not block on concurrent Execute calls.
51 SetTarget(cloud.ExecutorTarget)
57 defaultSyncInterval = time.Minute
58 defaultProbeInterval = time.Second * 10
59 defaultMaxProbesPerSecond = 10
60 defaultTimeoutIdle = time.Minute
61 defaultTimeoutBooting = time.Minute * 10
62 defaultTimeoutProbe = time.Minute * 10
63 defaultTimeoutShutdown = time.Second * 10
66 func duration(conf arvados.Duration, def time.Duration) time.Duration {
68 return time.Duration(conf)
74 // NewPool creates a Pool of workers backed by instanceSet.
76 // New instances are configured and set up according to the given
77 // cluster configuration.
78 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
82 instanceSet: instanceSet,
83 newExecutor: newExecutor,
84 bootProbeCommand: cluster.CloudVMs.BootProbeCommand,
85 imageID: cloud.ImageID(cluster.CloudVMs.ImageID),
86 instanceTypes: cluster.InstanceTypes,
87 maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
88 probeInterval: duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
89 syncInterval: duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
90 timeoutIdle: duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
91 timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
92 timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
93 timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
94 stop: make(chan bool),
96 wp.registerMetrics(reg)
98 wp.setupOnce.Do(wp.setup)
106 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
107 // zero Pool should not be used. Call NewPool to create a new Pool.
110 logger logrus.FieldLogger
111 arvClient *arvados.Client
112 instanceSet cloud.InstanceSet
113 newExecutor func(cloud.Instance) Executor
114 bootProbeCommand string
115 imageID cloud.ImageID
116 instanceTypes map[string]arvados.InstanceType
117 syncInterval time.Duration
118 probeInterval time.Duration
119 maxProbesPerSecond int
120 timeoutIdle time.Duration
121 timeoutBooting time.Duration
122 timeoutProbe time.Duration
123 timeoutShutdown time.Duration
126 subscribers map[<-chan struct{}]chan<- struct{}
127 creating map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls
128 workers map[cloud.InstanceID]*worker
129 loaded bool // loaded list of instances from InstanceSet at least once
130 exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
131 atQuotaUntil time.Time
132 atQuotaErr cloud.QuotaError
137 mInstances prometheus.Gauge
138 mContainersRunning prometheus.Gauge
139 mVCPUs prometheus.Gauge
140 mVCPUsInuse prometheus.Gauge
141 mMemory prometheus.Gauge
142 mMemoryInuse prometheus.Gauge
145 // Subscribe returns a channel that becomes ready whenever a worker's
150 // ch := wp.Subscribe()
151 // defer wp.Unsubscribe(ch)
153 // // ...try scheduling some work...
158 func (wp *Pool) Subscribe() <-chan struct{} {
159 wp.setupOnce.Do(wp.setup)
161 defer wp.mtx.Unlock()
162 ch := make(chan struct{}, 1)
163 wp.subscribers[ch] = ch
167 // Unsubscribe stops sending updates to the given channel.
168 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
169 wp.setupOnce.Do(wp.setup)
171 defer wp.mtx.Unlock()
172 delete(wp.subscribers, ch)
175 // Unallocated returns the number of unallocated (creating + booting +
176 // idle + unknown) workers for each instance type.
177 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
178 wp.setupOnce.Do(wp.setup)
180 defer wp.mtx.RUnlock()
181 unalloc := map[arvados.InstanceType]int{}
182 creating := map[arvados.InstanceType]int{}
183 for it, times := range wp.creating {
184 creating[it] = len(times)
186 for _, wkr := range wp.workers {
187 if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) {
192 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) {
193 // If up to N new workers appear in
194 // Instances() while we are waiting for N
195 // Create() calls to complete, we assume we're
196 // just seeing a race between Instances() and
197 // Create() responses.
199 // The other common reason why nodes have
200 // state==Unknown is that they appeared at
201 // startup, before any Create calls. They
202 // don't match the above timing condition, so
203 // we never mistakenly attribute them to
204 // pending Create calls.
208 for it, c := range creating {
214 // Create a new instance with the given type, and add it to the worker
215 // pool. The worker is added immediately; instance creation runs in
217 func (wp *Pool) Create(it arvados.InstanceType) error {
218 logger := wp.logger.WithField("InstanceType", it.Name)
219 wp.setupOnce.Do(wp.setup)
221 defer wp.mtx.Unlock()
222 if time.Now().Before(wp.atQuotaUntil) {
225 tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
227 wp.creating[it] = append(wp.creating[it], now)
230 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
232 defer wp.mtx.Unlock()
233 // Remove our timestamp marker from wp.creating
234 for i, t := range wp.creating[it] {
236 copy(wp.creating[it][i:], wp.creating[it][i+1:])
237 wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1]
241 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
243 wp.atQuotaUntil = time.Now().Add(time.Minute)
246 logger.WithError(err).Error("create failed")
249 wp.updateWorker(inst, it, StateBooting)
254 // AtQuota returns true if Create is not expected to work at the
256 func (wp *Pool) AtQuota() bool {
258 defer wp.mtx.Unlock()
259 return time.Now().Before(wp.atQuotaUntil)
262 // Add or update worker attached to the given instance. Use
263 // initialState if a new worker is created.
265 // The second return value is true if a new worker is created.
267 // Caller must have lock.
268 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
270 if wkr := wp.workers[id]; wkr != nil {
271 wkr.executor.SetTarget(inst)
273 wkr.updated = time.Now()
274 if initialState == StateBooting && wkr.state == StateUnknown {
275 wkr.state = StateBooting
279 if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
280 initialState = StateHold
282 logger := wp.logger.WithFields(logrus.Fields{
283 "InstanceType": it.Name,
286 logger.WithField("State", initialState).Infof("instance appeared in cloud")
292 executor: wp.newExecutor(inst),
300 running: make(map[string]struct{}),
301 starting: make(map[string]struct{}),
302 probing: make(chan struct{}, 1),
308 // caller must have lock.
309 func (wp *Pool) notifyExited(uuid string, t time.Time) {
313 // Shutdown shuts down a worker with the given type, or returns false
314 // if all workers with the given type are busy.
315 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
316 wp.setupOnce.Do(wp.setup)
318 defer wp.mtx.Unlock()
319 logger := wp.logger.WithField("InstanceType", it.Name)
320 logger.Info("shutdown requested")
321 for _, tryState := range []State{StateBooting, StateIdle} {
322 // TODO: shutdown the worker with the longest idle
323 // time (Idle) or the earliest create time (Booting)
324 for _, wkr := range wp.workers {
325 if wkr.state == tryState && wkr.instType == it {
326 logger.WithField("Instance", wkr.instance).Info("shutting down")
335 // CountWorkers returns the current number of workers in each state.
336 func (wp *Pool) CountWorkers() map[State]int {
337 wp.setupOnce.Do(wp.setup)
339 defer wp.mtx.Unlock()
341 for _, w := range wp.workers {
347 // Running returns the container UUIDs being prepared/run on workers.
348 func (wp *Pool) Running() map[string]time.Time {
349 wp.setupOnce.Do(wp.setup)
351 defer wp.mtx.Unlock()
352 r := map[string]time.Time{}
353 for _, wkr := range wp.workers {
354 for uuid := range wkr.running {
355 r[uuid] = time.Time{}
357 for uuid := range wkr.starting {
358 r[uuid] = time.Time{}
361 for uuid, exited := range wp.exited {
367 // StartContainer starts a container on an idle worker immediately if
368 // possible, otherwise returns false.
369 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
370 wp.setupOnce.Do(wp.setup)
372 defer wp.mtx.Unlock()
374 for _, w := range wp.workers {
375 if w.instType == it && w.state == StateIdle {
376 if wkr == nil || w.busy.After(wkr.busy) {
384 wkr.startContainer(ctr)
388 // KillContainer kills the crunch-run process for the given container
389 // UUID, if it's running on any worker.
391 // KillContainer returns immediately; the act of killing the container
392 // takes some time, and runs in the background.
393 func (wp *Pool) KillContainer(uuid string) {
395 defer wp.mtx.Unlock()
396 if _, ok := wp.exited[uuid]; ok {
397 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
398 delete(wp.exited, uuid)
401 for _, wkr := range wp.workers {
402 if _, ok := wkr.running[uuid]; ok {
403 go wp.kill(wkr, uuid)
407 wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
410 func (wp *Pool) kill(wkr *worker, uuid string) {
411 logger := wp.logger.WithFields(logrus.Fields{
412 "ContainerUUID": uuid,
413 "Instance": wkr.instance,
415 logger.Debug("killing process")
416 stdout, stderr, err := wkr.executor.Execute(nil, "crunch-run --kill 15 "+uuid, nil)
418 logger.WithFields(logrus.Fields{
419 "stderr": string(stderr),
420 "stdout": string(stdout),
422 }).Warn("kill failed")
425 logger.Debug("killing process succeeded")
427 defer wp.mtx.Unlock()
428 if _, ok := wkr.running[uuid]; ok {
429 delete(wkr.running, uuid)
430 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
431 wkr.state = StateIdle
433 wkr.updated = time.Now()
438 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
440 reg = prometheus.NewRegistry()
442 wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
443 Namespace: "arvados",
444 Subsystem: "dispatchcloud",
445 Name: "instances_total",
446 Help: "Number of cloud VMs including pending, booting, running, held, and shutting down.",
448 reg.MustRegister(wp.mInstances)
449 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
450 Namespace: "arvados",
451 Subsystem: "dispatchcloud",
452 Name: "containers_running",
453 Help: "Number of containers reported running by cloud VMs.",
455 reg.MustRegister(wp.mContainersRunning)
457 wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
458 Namespace: "arvados",
459 Subsystem: "dispatchcloud",
461 Help: "Total VCPUs on all cloud VMs.",
463 reg.MustRegister(wp.mVCPUs)
464 wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
465 Namespace: "arvados",
466 Subsystem: "dispatchcloud",
468 Help: "VCPUs on cloud VMs that are running containers.",
470 reg.MustRegister(wp.mVCPUsInuse)
471 wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
472 Namespace: "arvados",
473 Subsystem: "dispatchcloud",
474 Name: "memory_bytes_total",
475 Help: "Total memory on all cloud VMs.",
477 reg.MustRegister(wp.mMemory)
478 wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
479 Namespace: "arvados",
480 Subsystem: "dispatchcloud",
481 Name: "memory_bytes_inuse",
482 Help: "Memory on cloud VMs that are running containers.",
484 reg.MustRegister(wp.mMemoryInuse)
487 func (wp *Pool) runMetrics() {
489 defer wp.Unsubscribe(ch)
495 func (wp *Pool) updateMetrics() {
497 defer wp.mtx.RUnlock()
499 var alloc, cpu, cpuInuse, mem, memInuse int64
500 for _, wkr := range wp.workers {
501 cpu += int64(wkr.instType.VCPUs)
502 mem += int64(wkr.instType.RAM)
503 if len(wkr.running)+len(wkr.starting) == 0 {
506 alloc += int64(len(wkr.running) + len(wkr.starting))
507 cpuInuse += int64(wkr.instType.VCPUs)
508 memInuse += int64(wkr.instType.RAM)
510 wp.mInstances.Set(float64(len(wp.workers)))
511 wp.mContainersRunning.Set(float64(alloc))
512 wp.mVCPUs.Set(float64(cpu))
513 wp.mMemory.Set(float64(mem))
514 wp.mVCPUsInuse.Set(float64(cpuInuse))
515 wp.mMemoryInuse.Set(float64(memInuse))
518 func (wp *Pool) runProbes() {
519 maxPPS := wp.maxProbesPerSecond
521 maxPPS = defaultMaxProbesPerSecond
523 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
524 defer limitticker.Stop()
526 probeticker := time.NewTicker(wp.probeInterval)
527 defer probeticker.Stop()
529 workers := []cloud.InstanceID{}
530 for range probeticker.C {
531 workers = workers[:0]
533 for id, wkr := range wp.workers {
534 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
537 workers = append(workers, id)
541 for _, id := range workers {
543 wkr, ok := wp.workers[id]
546 // Deleted while we were probing
550 go wkr.ProbeAndUpdate()
554 case <-limitticker.C:
560 func (wp *Pool) runSync() {
561 // sync once immediately, then wait syncInterval, sync again,
563 timer := time.NewTimer(1)
567 err := wp.getInstancesAndSync()
569 wp.logger.WithError(err).Warn("sync failed")
571 timer.Reset(wp.syncInterval)
573 wp.logger.Debug("worker.Pool stopped")
579 // Stop synchronizing with the InstanceSet.
580 func (wp *Pool) Stop() {
581 wp.setupOnce.Do(wp.setup)
585 // Instances returns an InstanceView for each worker in the pool,
586 // summarizing its current state and recent activity.
587 func (wp *Pool) Instances() []InstanceView {
589 wp.setupOnce.Do(wp.setup)
591 for _, w := range wp.workers {
592 r = append(r, InstanceView{
593 Instance: w.instance.String(),
594 Price: w.instType.Price,
595 ArvadosInstanceType: w.instType.Name,
596 ProviderInstanceType: w.instType.ProviderType,
597 LastContainerUUID: w.lastUUID,
599 WorkerState: w.state.String(),
603 sort.Slice(r, func(i, j int) bool {
604 return strings.Compare(r[i].Instance, r[j].Instance) < 0
609 func (wp *Pool) setup() {
610 wp.creating = map[arvados.InstanceType][]time.Time{}
611 wp.exited = map[string]time.Time{}
612 wp.workers = map[cloud.InstanceID]*worker{}
613 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
616 func (wp *Pool) notify() {
618 defer wp.mtx.RUnlock()
619 for _, send := range wp.subscribers {
621 case send <- struct{}{}:
627 func (wp *Pool) getInstancesAndSync() error {
628 wp.setupOnce.Do(wp.setup)
629 wp.logger.Debug("getting instance list")
630 threshold := time.Now()
631 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
635 wp.sync(threshold, instances)
636 wp.logger.Debug("sync done")
640 // Add/remove/update workers based on instances, which was obtained
641 // from the instanceSet. However, don't clobber any other updates that
642 // already happened after threshold.
643 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
645 defer wp.mtx.Unlock()
646 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
649 for _, inst := range instances {
650 itTag := inst.Tags()[tagKeyInstanceType]
651 it, ok := wp.instanceTypes[itTag]
653 wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
656 if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew {
658 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
659 wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
664 for id, wkr := range wp.workers {
665 if wkr.updated.After(threshold) {
668 logger := wp.logger.WithFields(logrus.Fields{
669 "Instance": wkr.instance,
670 "WorkerState": wkr.state,
672 logger.Info("instance disappeared in cloud")
673 delete(wp.workers, id)
674 go wkr.executor.Close()
680 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")