1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
14 "git.curoverse.com/arvados.git/lib/cloud"
15 "git.curoverse.com/arvados.git/sdk/go/arvados"
16 "github.com/Sirupsen/logrus"
17 "github.com/prometheus/client_golang/prometheus"
21 tagKeyInstanceType = "InstanceType"
25 // An InstanceView shows a worker's current state and recent activity.
26 type InstanceView struct {
29 ArvadosInstanceType string
30 ProviderInstanceType string
31 LastContainerUUID string
36 // An Executor executes shell commands on a remote host.
37 type Executor interface {
38 // Run cmd on the current target.
39 Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
41 // Use the given target for subsequent operations. The new
42 // target is the same host as the previous target, but it
43 // might return a different address and verify a different
46 // SetTarget is called frequently, and in most cases the new
47 // target will behave exactly the same as the old one. An
48 // implementation should optimize accordingly.
50 // SetTarget must not block on concurrent Execute calls.
51 SetTarget(cloud.ExecutorTarget)
57 defaultSyncInterval = time.Minute
58 defaultProbeInterval = time.Second * 10
59 defaultMaxProbesPerSecond = 10
60 defaultTimeoutIdle = time.Minute
61 defaultTimeoutBooting = time.Minute * 10
62 defaultTimeoutProbe = time.Minute * 10
63 defaultTimeoutShutdown = time.Second * 10
66 func duration(conf arvados.Duration, def time.Duration) time.Duration {
68 return time.Duration(conf)
74 // NewPool creates a Pool of workers backed by instanceSet.
76 // New instances are configured and set up according to the given
77 // cluster configuration.
78 func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
81 instanceSet: instanceSet,
82 newExecutor: newExecutor,
83 bootProbeCommand: cluster.CloudVMs.BootProbeCommand,
84 imageID: cloud.ImageID(cluster.CloudVMs.ImageID),
85 instanceTypes: cluster.InstanceTypes,
86 maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
87 probeInterval: duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
88 syncInterval: duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
89 timeoutIdle: duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
90 timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
91 timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
92 timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
94 wp.registerMetrics(reg)
96 wp.setupOnce.Do(wp.setup)
104 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
105 // zero Pool should not be used. Call NewPool to create a new Pool.
108 logger logrus.FieldLogger
109 instanceSet cloud.InstanceSet
110 newExecutor func(cloud.Instance) Executor
111 bootProbeCommand string
112 imageID cloud.ImageID
113 instanceTypes map[string]arvados.InstanceType
114 syncInterval time.Duration
115 probeInterval time.Duration
116 maxProbesPerSecond int
117 timeoutIdle time.Duration
118 timeoutBooting time.Duration
119 timeoutProbe time.Duration
120 timeoutShutdown time.Duration
123 subscribers map[<-chan struct{}]chan<- struct{}
124 creating map[arvados.InstanceType]int // goroutines waiting for (InstanceSet)Create to return
125 workers map[cloud.InstanceID]*worker
126 loaded bool // loaded list of instances from InstanceSet at least once
127 exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
128 atQuotaUntil time.Time
129 atQuotaErr cloud.QuotaError
134 mInstances prometheus.Gauge
135 mContainersRunning prometheus.Gauge
136 mVCPUs prometheus.Gauge
137 mVCPUsInuse prometheus.Gauge
138 mMemory prometheus.Gauge
139 mMemoryInuse prometheus.Gauge
142 // Subscribe returns a channel that becomes ready whenever a worker's
147 // ch := wp.Subscribe()
148 // defer wp.Unsubscribe(ch)
150 // // ...try scheduling some work...
155 func (wp *Pool) Subscribe() <-chan struct{} {
156 wp.setupOnce.Do(wp.setup)
158 defer wp.mtx.Unlock()
159 ch := make(chan struct{}, 1)
160 wp.subscribers[ch] = ch
164 // Unsubscribe stops sending updates to the given channel.
165 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
166 wp.setupOnce.Do(wp.setup)
168 defer wp.mtx.Unlock()
169 delete(wp.subscribers, ch)
172 // Unallocated returns the number of unallocated (creating + booting +
173 // idle + unknown) workers for each instance type.
175 // The returned counts should be interpreted as upper bounds, rather
176 // than exact counts: they are sometimes artificially high when a
177 // newly created instance appears in the driver's Instances() list
178 // before the Create() call returns.
179 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
180 wp.setupOnce.Do(wp.setup)
182 defer wp.mtx.RUnlock()
183 u := map[arvados.InstanceType]int{}
184 for it, c := range wp.creating {
187 for _, wkr := range wp.workers {
188 if wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown {
195 // Create a new instance with the given type, and add it to the worker
196 // pool. The worker is added immediately; instance creation runs in
198 func (wp *Pool) Create(it arvados.InstanceType) error {
199 logger := wp.logger.WithField("InstanceType", it.Name)
200 wp.setupOnce.Do(wp.setup)
202 defer wp.mtx.Unlock()
203 if time.Now().Before(wp.atQuotaUntil) {
206 tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
210 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
212 defer wp.mtx.Unlock()
214 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
216 wp.atQuotaUntil = time.Now().Add(time.Minute)
219 logger.WithError(err).Error("create failed")
222 wp.updateWorker(inst, it, StateBooting)
227 // AtQuota returns true if Create is not expected to work at the
229 func (wp *Pool) AtQuota() bool {
231 defer wp.mtx.Unlock()
232 return time.Now().Before(wp.atQuotaUntil)
235 // Add or update worker attached to the given instance. Use
236 // initialState if a new worker is created.
238 // The second return value is true if a new worker is created.
240 // Caller must have lock.
241 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
243 if wkr := wp.workers[id]; wkr != nil {
244 wkr.executor.SetTarget(inst)
246 wkr.updated = time.Now()
247 if initialState == StateBooting && wkr.state == StateUnknown {
248 wkr.state = StateBooting
252 if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
253 initialState = StateHold
255 logger := wp.logger.WithFields(logrus.Fields{
256 "InstanceType": it.Name,
259 logger.WithField("State", initialState).Infof("instance appeared in cloud")
265 executor: wp.newExecutor(inst),
272 running: make(map[string]struct{}),
273 starting: make(map[string]struct{}),
274 probing: make(chan struct{}, 1),
280 // caller must have lock.
281 func (wp *Pool) notifyExited(uuid string, t time.Time) {
285 // Shutdown shuts down a worker with the given type, or returns false
286 // if all workers with the given type are busy.
287 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
288 wp.setupOnce.Do(wp.setup)
290 defer wp.mtx.Unlock()
291 logger := wp.logger.WithField("InstanceType", it.Name)
292 logger.Info("shutdown requested")
293 for _, tryState := range []State{StateBooting, StateIdle} {
294 // TODO: shutdown the worker with the longest idle
295 // time (Idle) or the earliest create time (Booting)
296 for _, wkr := range wp.workers {
297 if wkr.state == tryState && wkr.instType == it {
298 logger.WithField("Instance", wkr.instance).Info("shutting down")
307 // CountWorkers returns the current number of workers in each state.
308 func (wp *Pool) CountWorkers() map[State]int {
309 wp.setupOnce.Do(wp.setup)
311 defer wp.mtx.Unlock()
313 for _, w := range wp.workers {
319 // Running returns the container UUIDs being prepared/run on workers.
320 func (wp *Pool) Running() map[string]time.Time {
321 wp.setupOnce.Do(wp.setup)
323 defer wp.mtx.Unlock()
324 r := map[string]time.Time{}
325 for _, wkr := range wp.workers {
326 for uuid := range wkr.running {
327 r[uuid] = time.Time{}
329 for uuid := range wkr.starting {
330 r[uuid] = time.Time{}
333 for uuid, exited := range wp.exited {
339 // StartContainer starts a container on an idle worker immediately if
340 // possible, otherwise returns false.
341 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
342 wp.setupOnce.Do(wp.setup)
344 defer wp.mtx.Unlock()
346 for _, w := range wp.workers {
347 if w.instType == it && w.state == StateIdle {
348 if wkr == nil || w.busy.After(wkr.busy) {
356 wkr.startContainer(ctr)
360 // KillContainer kills the crunch-run process for the given container
361 // UUID, if it's running on any worker.
363 // KillContainer returns immediately; the act of killing the container
364 // takes some time, and runs in the background.
365 func (wp *Pool) KillContainer(uuid string) {
367 defer wp.mtx.Unlock()
368 if _, ok := wp.exited[uuid]; ok {
369 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
370 delete(wp.exited, uuid)
373 for _, wkr := range wp.workers {
374 if _, ok := wkr.running[uuid]; ok {
375 go wp.kill(wkr, uuid)
379 wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
382 func (wp *Pool) kill(wkr *worker, uuid string) {
383 logger := wp.logger.WithFields(logrus.Fields{
384 "ContainerUUID": uuid,
385 "Instance": wkr.instance,
387 logger.Debug("killing process")
388 stdout, stderr, err := wkr.executor.Execute("crunch-run --kill 15 "+uuid, nil)
390 logger.WithFields(logrus.Fields{
391 "stderr": string(stderr),
392 "stdout": string(stdout),
394 }).Warn("kill failed")
397 logger.Debug("killing process succeeded")
399 defer wp.mtx.Unlock()
400 if _, ok := wkr.running[uuid]; ok {
401 delete(wkr.running, uuid)
402 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
403 wkr.state = StateIdle
405 wkr.updated = time.Now()
410 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
412 reg = prometheus.NewRegistry()
414 wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
415 Namespace: "arvados",
416 Subsystem: "dispatchcloud",
417 Name: "instances_total",
418 Help: "Number of cloud VMs including pending, booting, running, held, and shutting down.",
420 reg.MustRegister(wp.mInstances)
421 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
422 Namespace: "arvados",
423 Subsystem: "dispatchcloud",
424 Name: "containers_running",
425 Help: "Number of containers reported running by cloud VMs.",
427 reg.MustRegister(wp.mContainersRunning)
429 wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
430 Namespace: "arvados",
431 Subsystem: "dispatchcloud",
433 Help: "Total VCPUs on all cloud VMs.",
435 reg.MustRegister(wp.mVCPUs)
436 wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
437 Namespace: "arvados",
438 Subsystem: "dispatchcloud",
440 Help: "VCPUs on cloud VMs that are running containers.",
442 reg.MustRegister(wp.mVCPUsInuse)
443 wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
444 Namespace: "arvados",
445 Subsystem: "dispatchcloud",
446 Name: "memory_bytes_total",
447 Help: "Total memory on all cloud VMs.",
449 reg.MustRegister(wp.mMemory)
450 wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
451 Namespace: "arvados",
452 Subsystem: "dispatchcloud",
453 Name: "memory_bytes_inuse",
454 Help: "Memory on cloud VMs that are running containers.",
456 reg.MustRegister(wp.mMemoryInuse)
459 func (wp *Pool) runMetrics() {
461 defer wp.Unsubscribe(ch)
467 func (wp *Pool) updateMetrics() {
469 defer wp.mtx.RUnlock()
471 var alloc, cpu, cpuInuse, mem, memInuse int64
472 for _, wkr := range wp.workers {
473 cpu += int64(wkr.instType.VCPUs)
474 mem += int64(wkr.instType.RAM)
475 if len(wkr.running)+len(wkr.starting) == 0 {
478 alloc += int64(len(wkr.running) + len(wkr.starting))
479 cpuInuse += int64(wkr.instType.VCPUs)
480 memInuse += int64(wkr.instType.RAM)
482 wp.mInstances.Set(float64(len(wp.workers)))
483 wp.mContainersRunning.Set(float64(alloc))
484 wp.mVCPUs.Set(float64(cpu))
485 wp.mMemory.Set(float64(mem))
486 wp.mVCPUsInuse.Set(float64(cpuInuse))
487 wp.mMemoryInuse.Set(float64(memInuse))
490 func (wp *Pool) runProbes() {
491 maxPPS := wp.maxProbesPerSecond
493 maxPPS = defaultMaxProbesPerSecond
495 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
496 defer limitticker.Stop()
498 probeticker := time.NewTicker(wp.probeInterval)
499 defer probeticker.Stop()
501 workers := []cloud.InstanceID{}
502 for range probeticker.C {
503 workers = workers[:0]
505 for id, wkr := range wp.workers {
506 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
509 workers = append(workers, id)
513 for _, id := range workers {
515 wkr, ok := wp.workers[id]
518 // Deleted while we were probing
522 go wkr.ProbeAndUpdate()
526 case <-limitticker.C:
532 func (wp *Pool) runSync() {
533 // sync once immediately, then wait syncInterval, sync again,
535 timer := time.NewTimer(1)
539 err := wp.getInstancesAndSync()
541 wp.logger.WithError(err).Warn("sync failed")
543 timer.Reset(wp.syncInterval)
545 wp.logger.Debug("worker.Pool stopped")
551 // Stop synchronizing with the InstanceSet.
552 func (wp *Pool) Stop() {
553 wp.setupOnce.Do(wp.setup)
557 // Instances returns an InstanceView for each worker in the pool,
558 // summarizing its current state and recent activity.
559 func (wp *Pool) Instances() []InstanceView {
561 wp.setupOnce.Do(wp.setup)
563 for _, w := range wp.workers {
564 r = append(r, InstanceView{
565 Instance: w.instance.String(),
566 Price: w.instType.Price,
567 ArvadosInstanceType: w.instType.Name,
568 ProviderInstanceType: w.instType.ProviderType,
569 LastContainerUUID: w.lastUUID,
571 WorkerState: w.state.String(),
575 sort.Slice(r, func(i, j int) bool {
576 return strings.Compare(r[i].Instance, r[j].Instance) < 0
581 func (wp *Pool) setup() {
582 wp.creating = map[arvados.InstanceType]int{}
583 wp.exited = map[string]time.Time{}
584 wp.workers = map[cloud.InstanceID]*worker{}
585 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
588 func (wp *Pool) notify() {
590 defer wp.mtx.RUnlock()
591 for _, send := range wp.subscribers {
593 case send <- struct{}{}:
599 func (wp *Pool) getInstancesAndSync() error {
600 wp.setupOnce.Do(wp.setup)
601 wp.logger.Debug("getting instance list")
602 threshold := time.Now()
603 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
607 wp.sync(threshold, instances)
608 wp.logger.Debug("sync done")
612 // Add/remove/update workers based on instances, which was obtained
613 // from the instanceSet. However, don't clobber any other updates that
614 // already happened after threshold.
615 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
617 defer wp.mtx.Unlock()
618 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
621 for _, inst := range instances {
622 itTag := inst.Tags()[tagKeyInstanceType]
623 it, ok := wp.instanceTypes[itTag]
625 wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
628 if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew {
630 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
631 wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
636 for id, wkr := range wp.workers {
637 if wkr.updated.After(threshold) {
640 logger := wp.logger.WithFields(logrus.Fields{
641 "Instance": wkr.instance,
642 "WorkerState": wkr.state,
644 logger.Info("instance disappeared in cloud")
645 delete(wp.workers, id)
646 go wkr.executor.Close()
652 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")