1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
14 "git.curoverse.com/arvados.git/lib/cloud"
15 "git.curoverse.com/arvados.git/sdk/go/arvados"
16 "github.com/Sirupsen/logrus"
17 "github.com/prometheus/client_golang/prometheus"
21 tagKeyInstanceType = "InstanceType"
25 // An InstanceView shows a worker's current state and recent activity.
26 type InstanceView struct {
29 ArvadosInstanceType string
30 ProviderInstanceType string
31 LastContainerUUID string
36 // An Executor executes shell commands on a remote host.
37 type Executor interface {
38 // Run cmd on the current target.
39 Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
41 // Use the given target for subsequent operations. The new
42 // target is the same host as the previous target, but it
43 // might return a different address and verify a different
46 // SetTarget is called frequently, and in most cases the new
47 // target will behave exactly the same as the old one. An
48 // implementation should optimize accordingly.
50 // SetTarget must not block on concurrent Execute calls.
51 SetTarget(cloud.ExecutorTarget)
57 defaultSyncInterval = time.Minute
58 defaultProbeInterval = time.Second * 10
59 defaultMaxProbesPerSecond = 10
60 defaultTimeoutIdle = time.Minute
61 defaultTimeoutBooting = time.Minute * 10
62 defaultTimeoutProbe = time.Minute * 10
63 defaultTimeoutShutdown = time.Second * 10
66 func duration(conf arvados.Duration, def time.Duration) time.Duration {
68 return time.Duration(conf)
74 // NewPool creates a Pool of workers backed by instanceSet.
76 // New instances are configured and set up according to the given
77 // cluster configuration.
78 func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
81 instanceSet: instanceSet,
82 newExecutor: newExecutor,
83 bootProbeCommand: cluster.CloudVMs.BootProbeCommand,
84 imageID: cloud.ImageID(cluster.CloudVMs.ImageID),
85 instanceTypes: cluster.InstanceTypes,
86 maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
87 probeInterval: duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
88 syncInterval: duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
89 timeoutIdle: duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
90 timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
91 timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
92 timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
94 wp.registerMetrics(reg)
96 wp.setupOnce.Do(wp.setup)
104 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
105 // zero Pool should not be used. Call NewPool to create a new Pool.
108 logger logrus.FieldLogger
109 instanceSet cloud.InstanceSet
110 newExecutor func(cloud.Instance) Executor
111 bootProbeCommand string
112 imageID cloud.ImageID
113 instanceTypes map[string]arvados.InstanceType
114 syncInterval time.Duration
115 probeInterval time.Duration
116 maxProbesPerSecond int
117 timeoutIdle time.Duration
118 timeoutBooting time.Duration
119 timeoutProbe time.Duration
120 timeoutShutdown time.Duration
123 subscribers map[<-chan struct{}]chan<- struct{}
124 creating map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls
125 workers map[cloud.InstanceID]*worker
126 loaded bool // loaded list of instances from InstanceSet at least once
127 exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
128 atQuotaUntil time.Time
129 atQuotaErr cloud.QuotaError
134 mInstances prometheus.Gauge
135 mContainersRunning prometheus.Gauge
136 mVCPUs prometheus.Gauge
137 mVCPUsInuse prometheus.Gauge
138 mMemory prometheus.Gauge
139 mMemoryInuse prometheus.Gauge
142 // Subscribe returns a channel that becomes ready whenever a worker's
147 // ch := wp.Subscribe()
148 // defer wp.Unsubscribe(ch)
150 // // ...try scheduling some work...
155 func (wp *Pool) Subscribe() <-chan struct{} {
156 wp.setupOnce.Do(wp.setup)
158 defer wp.mtx.Unlock()
159 ch := make(chan struct{}, 1)
160 wp.subscribers[ch] = ch
164 // Unsubscribe stops sending updates to the given channel.
165 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
166 wp.setupOnce.Do(wp.setup)
168 defer wp.mtx.Unlock()
169 delete(wp.subscribers, ch)
172 // Unallocated returns the number of unallocated (creating + booting +
173 // idle + unknown) workers for each instance type.
174 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
175 wp.setupOnce.Do(wp.setup)
177 defer wp.mtx.RUnlock()
178 unalloc := map[arvados.InstanceType]int{}
179 creating := map[arvados.InstanceType]int{}
180 for it, times := range wp.creating {
181 creating[it] = len(times)
183 for _, wkr := range wp.workers {
184 if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) {
189 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) {
190 // If up to N new workers appear in
191 // Instances() while we are waiting for N
192 // Create() calls to complete, we assume we're
193 // just seeing a race between Instances() and
194 // Create() responses.
196 // The other common reason why nodes have
197 // state==Unknown is that they appeared at
198 // startup, before any Create calls. They
199 // don't match the above timing condition, so
200 // we never mistakenly attribute them to
201 // pending Create calls.
205 for it, c := range creating {
211 // Create a new instance with the given type, and add it to the worker
212 // pool. The worker is added immediately; instance creation runs in
214 func (wp *Pool) Create(it arvados.InstanceType) error {
215 logger := wp.logger.WithField("InstanceType", it.Name)
216 wp.setupOnce.Do(wp.setup)
218 defer wp.mtx.Unlock()
219 if time.Now().Before(wp.atQuotaUntil) {
222 tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
224 wp.creating[it] = append(wp.creating[it], now)
227 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
229 defer wp.mtx.Unlock()
230 // Remove our timestamp marker from wp.creating
231 for i, t := range wp.creating[it] {
233 copy(wp.creating[it][i:], wp.creating[it][i+1:])
234 wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1]
238 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
240 wp.atQuotaUntil = time.Now().Add(time.Minute)
243 logger.WithError(err).Error("create failed")
246 wp.updateWorker(inst, it, StateBooting)
251 // AtQuota returns true if Create is not expected to work at the
253 func (wp *Pool) AtQuota() bool {
255 defer wp.mtx.Unlock()
256 return time.Now().Before(wp.atQuotaUntil)
259 // Add or update worker attached to the given instance. Use
260 // initialState if a new worker is created.
262 // The second return value is true if a new worker is created.
264 // Caller must have lock.
265 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
267 if wkr := wp.workers[id]; wkr != nil {
268 wkr.executor.SetTarget(inst)
270 wkr.updated = time.Now()
271 if initialState == StateBooting && wkr.state == StateUnknown {
272 wkr.state = StateBooting
276 if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
277 initialState = StateHold
279 logger := wp.logger.WithFields(logrus.Fields{
280 "InstanceType": it.Name,
283 logger.WithField("State", initialState).Infof("instance appeared in cloud")
289 executor: wp.newExecutor(inst),
297 running: make(map[string]struct{}),
298 starting: make(map[string]struct{}),
299 probing: make(chan struct{}, 1),
305 // caller must have lock.
306 func (wp *Pool) notifyExited(uuid string, t time.Time) {
310 // Shutdown shuts down a worker with the given type, or returns false
311 // if all workers with the given type are busy.
312 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
313 wp.setupOnce.Do(wp.setup)
315 defer wp.mtx.Unlock()
316 logger := wp.logger.WithField("InstanceType", it.Name)
317 logger.Info("shutdown requested")
318 for _, tryState := range []State{StateBooting, StateIdle} {
319 // TODO: shutdown the worker with the longest idle
320 // time (Idle) or the earliest create time (Booting)
321 for _, wkr := range wp.workers {
322 if wkr.state == tryState && wkr.instType == it {
323 logger.WithField("Instance", wkr.instance).Info("shutting down")
332 // CountWorkers returns the current number of workers in each state.
333 func (wp *Pool) CountWorkers() map[State]int {
334 wp.setupOnce.Do(wp.setup)
336 defer wp.mtx.Unlock()
338 for _, w := range wp.workers {
344 // Running returns the container UUIDs being prepared/run on workers.
345 func (wp *Pool) Running() map[string]time.Time {
346 wp.setupOnce.Do(wp.setup)
348 defer wp.mtx.Unlock()
349 r := map[string]time.Time{}
350 for _, wkr := range wp.workers {
351 for uuid := range wkr.running {
352 r[uuid] = time.Time{}
354 for uuid := range wkr.starting {
355 r[uuid] = time.Time{}
358 for uuid, exited := range wp.exited {
364 // StartContainer starts a container on an idle worker immediately if
365 // possible, otherwise returns false.
366 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
367 wp.setupOnce.Do(wp.setup)
369 defer wp.mtx.Unlock()
371 for _, w := range wp.workers {
372 if w.instType == it && w.state == StateIdle {
373 if wkr == nil || w.busy.After(wkr.busy) {
381 wkr.startContainer(ctr)
385 // KillContainer kills the crunch-run process for the given container
386 // UUID, if it's running on any worker.
388 // KillContainer returns immediately; the act of killing the container
389 // takes some time, and runs in the background.
390 func (wp *Pool) KillContainer(uuid string) {
392 defer wp.mtx.Unlock()
393 if _, ok := wp.exited[uuid]; ok {
394 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
395 delete(wp.exited, uuid)
398 for _, wkr := range wp.workers {
399 if _, ok := wkr.running[uuid]; ok {
400 go wp.kill(wkr, uuid)
404 wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
407 func (wp *Pool) kill(wkr *worker, uuid string) {
408 logger := wp.logger.WithFields(logrus.Fields{
409 "ContainerUUID": uuid,
410 "Instance": wkr.instance,
412 logger.Debug("killing process")
413 stdout, stderr, err := wkr.executor.Execute("crunch-run --kill 15 "+uuid, nil)
415 logger.WithFields(logrus.Fields{
416 "stderr": string(stderr),
417 "stdout": string(stdout),
419 }).Warn("kill failed")
422 logger.Debug("killing process succeeded")
424 defer wp.mtx.Unlock()
425 if _, ok := wkr.running[uuid]; ok {
426 delete(wkr.running, uuid)
427 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
428 wkr.state = StateIdle
430 wkr.updated = time.Now()
435 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
437 reg = prometheus.NewRegistry()
439 wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
440 Namespace: "arvados",
441 Subsystem: "dispatchcloud",
442 Name: "instances_total",
443 Help: "Number of cloud VMs including pending, booting, running, held, and shutting down.",
445 reg.MustRegister(wp.mInstances)
446 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
447 Namespace: "arvados",
448 Subsystem: "dispatchcloud",
449 Name: "containers_running",
450 Help: "Number of containers reported running by cloud VMs.",
452 reg.MustRegister(wp.mContainersRunning)
454 wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
455 Namespace: "arvados",
456 Subsystem: "dispatchcloud",
458 Help: "Total VCPUs on all cloud VMs.",
460 reg.MustRegister(wp.mVCPUs)
461 wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
462 Namespace: "arvados",
463 Subsystem: "dispatchcloud",
465 Help: "VCPUs on cloud VMs that are running containers.",
467 reg.MustRegister(wp.mVCPUsInuse)
468 wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
469 Namespace: "arvados",
470 Subsystem: "dispatchcloud",
471 Name: "memory_bytes_total",
472 Help: "Total memory on all cloud VMs.",
474 reg.MustRegister(wp.mMemory)
475 wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
476 Namespace: "arvados",
477 Subsystem: "dispatchcloud",
478 Name: "memory_bytes_inuse",
479 Help: "Memory on cloud VMs that are running containers.",
481 reg.MustRegister(wp.mMemoryInuse)
484 func (wp *Pool) runMetrics() {
486 defer wp.Unsubscribe(ch)
492 func (wp *Pool) updateMetrics() {
494 defer wp.mtx.RUnlock()
496 var alloc, cpu, cpuInuse, mem, memInuse int64
497 for _, wkr := range wp.workers {
498 cpu += int64(wkr.instType.VCPUs)
499 mem += int64(wkr.instType.RAM)
500 if len(wkr.running)+len(wkr.starting) == 0 {
503 alloc += int64(len(wkr.running) + len(wkr.starting))
504 cpuInuse += int64(wkr.instType.VCPUs)
505 memInuse += int64(wkr.instType.RAM)
507 wp.mInstances.Set(float64(len(wp.workers)))
508 wp.mContainersRunning.Set(float64(alloc))
509 wp.mVCPUs.Set(float64(cpu))
510 wp.mMemory.Set(float64(mem))
511 wp.mVCPUsInuse.Set(float64(cpuInuse))
512 wp.mMemoryInuse.Set(float64(memInuse))
515 func (wp *Pool) runProbes() {
516 maxPPS := wp.maxProbesPerSecond
518 maxPPS = defaultMaxProbesPerSecond
520 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
521 defer limitticker.Stop()
523 probeticker := time.NewTicker(wp.probeInterval)
524 defer probeticker.Stop()
526 workers := []cloud.InstanceID{}
527 for range probeticker.C {
528 workers = workers[:0]
530 for id, wkr := range wp.workers {
531 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
534 workers = append(workers, id)
538 for _, id := range workers {
540 wkr, ok := wp.workers[id]
543 // Deleted while we were probing
547 go wkr.ProbeAndUpdate()
551 case <-limitticker.C:
557 func (wp *Pool) runSync() {
558 // sync once immediately, then wait syncInterval, sync again,
560 timer := time.NewTimer(1)
564 err := wp.getInstancesAndSync()
566 wp.logger.WithError(err).Warn("sync failed")
568 timer.Reset(wp.syncInterval)
570 wp.logger.Debug("worker.Pool stopped")
576 // Stop synchronizing with the InstanceSet.
577 func (wp *Pool) Stop() {
578 wp.setupOnce.Do(wp.setup)
582 // Instances returns an InstanceView for each worker in the pool,
583 // summarizing its current state and recent activity.
584 func (wp *Pool) Instances() []InstanceView {
586 wp.setupOnce.Do(wp.setup)
588 for _, w := range wp.workers {
589 r = append(r, InstanceView{
590 Instance: w.instance.String(),
591 Price: w.instType.Price,
592 ArvadosInstanceType: w.instType.Name,
593 ProviderInstanceType: w.instType.ProviderType,
594 LastContainerUUID: w.lastUUID,
596 WorkerState: w.state.String(),
600 sort.Slice(r, func(i, j int) bool {
601 return strings.Compare(r[i].Instance, r[j].Instance) < 0
606 func (wp *Pool) setup() {
607 wp.creating = map[arvados.InstanceType][]time.Time{}
608 wp.exited = map[string]time.Time{}
609 wp.workers = map[cloud.InstanceID]*worker{}
610 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
613 func (wp *Pool) notify() {
615 defer wp.mtx.RUnlock()
616 for _, send := range wp.subscribers {
618 case send <- struct{}{}:
624 func (wp *Pool) getInstancesAndSync() error {
625 wp.setupOnce.Do(wp.setup)
626 wp.logger.Debug("getting instance list")
627 threshold := time.Now()
628 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
632 wp.sync(threshold, instances)
633 wp.logger.Debug("sync done")
637 // Add/remove/update workers based on instances, which was obtained
638 // from the instanceSet. However, don't clobber any other updates that
639 // already happened after threshold.
640 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
642 defer wp.mtx.Unlock()
643 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
646 for _, inst := range instances {
647 itTag := inst.Tags()[tagKeyInstanceType]
648 it, ok := wp.instanceTypes[itTag]
650 wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
653 if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew {
655 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
656 wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
661 for id, wkr := range wp.workers {
662 if wkr.updated.After(threshold) {
665 logger := wp.logger.WithFields(logrus.Fields{
666 "Instance": wkr.instance,
667 "WorkerState": wkr.state,
669 logger.Info("instance disappeared in cloud")
670 delete(wp.workers, id)
671 go wkr.executor.Close()
677 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")