1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
14 "git.curoverse.com/arvados.git/lib/cloud"
15 "git.curoverse.com/arvados.git/sdk/go/arvados"
16 "github.com/Sirupsen/logrus"
17 "github.com/prometheus/client_golang/prometheus"
21 tagKeyInstanceType = "InstanceType"
25 // An InstanceView shows a worker's current state and recent activity.
26 type InstanceView struct {
29 ArvadosInstanceType string
30 ProviderInstanceType string
31 LastContainerUUID string
36 // An Executor executes shell commands on a remote host.
37 type Executor interface {
38 // Run cmd on the current target.
39 Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
41 // Use the given target for subsequent operations. The new
42 // target is the same host as the previous target, but it
43 // might return a different address and verify a different
46 // SetTarget is called frequently, and in most cases the new
47 // target will behave exactly the same as the old one. An
48 // implementation should optimize accordingly.
50 // SetTarget must not block on concurrent Execute calls.
51 SetTarget(cloud.ExecutorTarget)
57 defaultSyncInterval = time.Minute
58 defaultProbeInterval = time.Second * 10
59 defaultMaxProbesPerSecond = 10
60 defaultTimeoutIdle = time.Minute
61 defaultTimeoutBooting = time.Minute * 10
62 defaultTimeoutProbe = time.Minute * 10
63 defaultTimeoutShutdown = time.Second * 10
66 func duration(conf arvados.Duration, def time.Duration) time.Duration {
68 return time.Duration(conf)
74 // NewPool creates a Pool of workers backed by instanceSet.
76 // New instances are configured and set up according to the given
77 // cluster configuration.
78 func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
81 instanceSet: instanceSet,
82 newExecutor: newExecutor,
83 bootProbeCommand: cluster.CloudVMs.BootProbeCommand,
84 imageID: cloud.ImageID(cluster.CloudVMs.ImageID),
85 instanceTypes: cluster.InstanceTypes,
86 maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
87 probeInterval: duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
88 syncInterval: duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
89 timeoutIdle: duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
90 timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
91 timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
92 timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
93 stop: make(chan bool),
95 wp.registerMetrics(reg)
97 wp.setupOnce.Do(wp.setup)
105 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
106 // zero Pool should not be used. Call NewPool to create a new Pool.
109 logger logrus.FieldLogger
110 instanceSet cloud.InstanceSet
111 newExecutor func(cloud.Instance) Executor
112 bootProbeCommand string
113 imageID cloud.ImageID
114 instanceTypes map[string]arvados.InstanceType
115 syncInterval time.Duration
116 probeInterval time.Duration
117 maxProbesPerSecond int
118 timeoutIdle time.Duration
119 timeoutBooting time.Duration
120 timeoutProbe time.Duration
121 timeoutShutdown time.Duration
124 subscribers map[<-chan struct{}]chan<- struct{}
125 creating map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls
126 workers map[cloud.InstanceID]*worker
127 loaded bool // loaded list of instances from InstanceSet at least once
128 exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
129 atQuotaUntil time.Time
130 atQuotaErr cloud.QuotaError
135 mInstances prometheus.Gauge
136 mContainersRunning prometheus.Gauge
137 mVCPUs prometheus.Gauge
138 mVCPUsInuse prometheus.Gauge
139 mMemory prometheus.Gauge
140 mMemoryInuse prometheus.Gauge
143 // Subscribe returns a channel that becomes ready whenever a worker's
148 // ch := wp.Subscribe()
149 // defer wp.Unsubscribe(ch)
151 // // ...try scheduling some work...
156 func (wp *Pool) Subscribe() <-chan struct{} {
157 wp.setupOnce.Do(wp.setup)
159 defer wp.mtx.Unlock()
160 ch := make(chan struct{}, 1)
161 wp.subscribers[ch] = ch
165 // Unsubscribe stops sending updates to the given channel.
166 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
167 wp.setupOnce.Do(wp.setup)
169 defer wp.mtx.Unlock()
170 delete(wp.subscribers, ch)
173 // Unallocated returns the number of unallocated (creating + booting +
174 // idle + unknown) workers for each instance type.
175 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
176 wp.setupOnce.Do(wp.setup)
178 defer wp.mtx.RUnlock()
179 unalloc := map[arvados.InstanceType]int{}
180 creating := map[arvados.InstanceType]int{}
181 for it, times := range wp.creating {
182 creating[it] = len(times)
184 for _, wkr := range wp.workers {
185 if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) {
190 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) {
191 // If up to N new workers appear in
192 // Instances() while we are waiting for N
193 // Create() calls to complete, we assume we're
194 // just seeing a race between Instances() and
195 // Create() responses.
197 // The other common reason why nodes have
198 // state==Unknown is that they appeared at
199 // startup, before any Create calls. They
200 // don't match the above timing condition, so
201 // we never mistakenly attribute them to
202 // pending Create calls.
206 for it, c := range creating {
212 // Create a new instance with the given type, and add it to the worker
213 // pool. The worker is added immediately; instance creation runs in
215 func (wp *Pool) Create(it arvados.InstanceType) error {
216 logger := wp.logger.WithField("InstanceType", it.Name)
217 wp.setupOnce.Do(wp.setup)
219 defer wp.mtx.Unlock()
220 if time.Now().Before(wp.atQuotaUntil) {
223 tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
225 wp.creating[it] = append(wp.creating[it], now)
228 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
230 defer wp.mtx.Unlock()
231 // Remove our timestamp marker from wp.creating
232 for i, t := range wp.creating[it] {
234 copy(wp.creating[it][i:], wp.creating[it][i+1:])
235 wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1]
239 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
241 wp.atQuotaUntil = time.Now().Add(time.Minute)
244 logger.WithError(err).Error("create failed")
247 wp.updateWorker(inst, it, StateBooting)
252 // AtQuota returns true if Create is not expected to work at the
254 func (wp *Pool) AtQuota() bool {
256 defer wp.mtx.Unlock()
257 return time.Now().Before(wp.atQuotaUntil)
260 // Add or update worker attached to the given instance. Use
261 // initialState if a new worker is created.
263 // The second return value is true if a new worker is created.
265 // Caller must have lock.
266 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
268 if wkr := wp.workers[id]; wkr != nil {
269 wkr.executor.SetTarget(inst)
271 wkr.updated = time.Now()
272 if initialState == StateBooting && wkr.state == StateUnknown {
273 wkr.state = StateBooting
277 if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
278 initialState = StateHold
280 logger := wp.logger.WithFields(logrus.Fields{
281 "InstanceType": it.Name,
284 logger.WithField("State", initialState).Infof("instance appeared in cloud")
290 executor: wp.newExecutor(inst),
298 running: make(map[string]struct{}),
299 starting: make(map[string]struct{}),
300 probing: make(chan struct{}, 1),
306 // caller must have lock.
307 func (wp *Pool) notifyExited(uuid string, t time.Time) {
311 // Shutdown shuts down a worker with the given type, or returns false
312 // if all workers with the given type are busy.
313 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
314 wp.setupOnce.Do(wp.setup)
316 defer wp.mtx.Unlock()
317 logger := wp.logger.WithField("InstanceType", it.Name)
318 logger.Info("shutdown requested")
319 for _, tryState := range []State{StateBooting, StateIdle} {
320 // TODO: shutdown the worker with the longest idle
321 // time (Idle) or the earliest create time (Booting)
322 for _, wkr := range wp.workers {
323 if wkr.state == tryState && wkr.instType == it {
324 logger.WithField("Instance", wkr.instance).Info("shutting down")
333 // CountWorkers returns the current number of workers in each state.
334 func (wp *Pool) CountWorkers() map[State]int {
335 wp.setupOnce.Do(wp.setup)
337 defer wp.mtx.Unlock()
339 for _, w := range wp.workers {
345 // Running returns the container UUIDs being prepared/run on workers.
346 func (wp *Pool) Running() map[string]time.Time {
347 wp.setupOnce.Do(wp.setup)
349 defer wp.mtx.Unlock()
350 r := map[string]time.Time{}
351 for _, wkr := range wp.workers {
352 for uuid := range wkr.running {
353 r[uuid] = time.Time{}
355 for uuid := range wkr.starting {
356 r[uuid] = time.Time{}
359 for uuid, exited := range wp.exited {
365 // StartContainer starts a container on an idle worker immediately if
366 // possible, otherwise returns false.
367 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
368 wp.setupOnce.Do(wp.setup)
370 defer wp.mtx.Unlock()
372 for _, w := range wp.workers {
373 if w.instType == it && w.state == StateIdle {
374 if wkr == nil || w.busy.After(wkr.busy) {
382 wkr.startContainer(ctr)
386 // KillContainer kills the crunch-run process for the given container
387 // UUID, if it's running on any worker.
389 // KillContainer returns immediately; the act of killing the container
390 // takes some time, and runs in the background.
391 func (wp *Pool) KillContainer(uuid string) {
393 defer wp.mtx.Unlock()
394 if _, ok := wp.exited[uuid]; ok {
395 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
396 delete(wp.exited, uuid)
399 for _, wkr := range wp.workers {
400 if _, ok := wkr.running[uuid]; ok {
401 go wp.kill(wkr, uuid)
405 wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
408 func (wp *Pool) kill(wkr *worker, uuid string) {
409 logger := wp.logger.WithFields(logrus.Fields{
410 "ContainerUUID": uuid,
411 "Instance": wkr.instance,
413 logger.Debug("killing process")
414 stdout, stderr, err := wkr.executor.Execute("crunch-run --kill 15 "+uuid, nil)
416 logger.WithFields(logrus.Fields{
417 "stderr": string(stderr),
418 "stdout": string(stdout),
420 }).Warn("kill failed")
423 logger.Debug("killing process succeeded")
425 defer wp.mtx.Unlock()
426 if _, ok := wkr.running[uuid]; ok {
427 delete(wkr.running, uuid)
428 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
429 wkr.state = StateIdle
431 wkr.updated = time.Now()
436 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
438 reg = prometheus.NewRegistry()
440 wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
441 Namespace: "arvados",
442 Subsystem: "dispatchcloud",
443 Name: "instances_total",
444 Help: "Number of cloud VMs including pending, booting, running, held, and shutting down.",
446 reg.MustRegister(wp.mInstances)
447 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
448 Namespace: "arvados",
449 Subsystem: "dispatchcloud",
450 Name: "containers_running",
451 Help: "Number of containers reported running by cloud VMs.",
453 reg.MustRegister(wp.mContainersRunning)
455 wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
456 Namespace: "arvados",
457 Subsystem: "dispatchcloud",
459 Help: "Total VCPUs on all cloud VMs.",
461 reg.MustRegister(wp.mVCPUs)
462 wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
463 Namespace: "arvados",
464 Subsystem: "dispatchcloud",
466 Help: "VCPUs on cloud VMs that are running containers.",
468 reg.MustRegister(wp.mVCPUsInuse)
469 wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
470 Namespace: "arvados",
471 Subsystem: "dispatchcloud",
472 Name: "memory_bytes_total",
473 Help: "Total memory on all cloud VMs.",
475 reg.MustRegister(wp.mMemory)
476 wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
477 Namespace: "arvados",
478 Subsystem: "dispatchcloud",
479 Name: "memory_bytes_inuse",
480 Help: "Memory on cloud VMs that are running containers.",
482 reg.MustRegister(wp.mMemoryInuse)
485 func (wp *Pool) runMetrics() {
487 defer wp.Unsubscribe(ch)
493 func (wp *Pool) updateMetrics() {
495 defer wp.mtx.RUnlock()
497 var alloc, cpu, cpuInuse, mem, memInuse int64
498 for _, wkr := range wp.workers {
499 cpu += int64(wkr.instType.VCPUs)
500 mem += int64(wkr.instType.RAM)
501 if len(wkr.running)+len(wkr.starting) == 0 {
504 alloc += int64(len(wkr.running) + len(wkr.starting))
505 cpuInuse += int64(wkr.instType.VCPUs)
506 memInuse += int64(wkr.instType.RAM)
508 wp.mInstances.Set(float64(len(wp.workers)))
509 wp.mContainersRunning.Set(float64(alloc))
510 wp.mVCPUs.Set(float64(cpu))
511 wp.mMemory.Set(float64(mem))
512 wp.mVCPUsInuse.Set(float64(cpuInuse))
513 wp.mMemoryInuse.Set(float64(memInuse))
516 func (wp *Pool) runProbes() {
517 maxPPS := wp.maxProbesPerSecond
519 maxPPS = defaultMaxProbesPerSecond
521 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
522 defer limitticker.Stop()
524 probeticker := time.NewTicker(wp.probeInterval)
525 defer probeticker.Stop()
527 workers := []cloud.InstanceID{}
528 for range probeticker.C {
529 workers = workers[:0]
531 for id, wkr := range wp.workers {
532 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
535 workers = append(workers, id)
539 for _, id := range workers {
541 wkr, ok := wp.workers[id]
544 // Deleted while we were probing
548 go wkr.ProbeAndUpdate()
552 case <-limitticker.C:
558 func (wp *Pool) runSync() {
559 // sync once immediately, then wait syncInterval, sync again,
561 timer := time.NewTimer(1)
565 err := wp.getInstancesAndSync()
567 wp.logger.WithError(err).Warn("sync failed")
569 timer.Reset(wp.syncInterval)
571 wp.logger.Debug("worker.Pool stopped")
577 // Stop synchronizing with the InstanceSet.
578 func (wp *Pool) Stop() {
579 wp.setupOnce.Do(wp.setup)
583 // Instances returns an InstanceView for each worker in the pool,
584 // summarizing its current state and recent activity.
585 func (wp *Pool) Instances() []InstanceView {
587 wp.setupOnce.Do(wp.setup)
589 for _, w := range wp.workers {
590 r = append(r, InstanceView{
591 Instance: w.instance.String(),
592 Price: w.instType.Price,
593 ArvadosInstanceType: w.instType.Name,
594 ProviderInstanceType: w.instType.ProviderType,
595 LastContainerUUID: w.lastUUID,
597 WorkerState: w.state.String(),
601 sort.Slice(r, func(i, j int) bool {
602 return strings.Compare(r[i].Instance, r[j].Instance) < 0
607 func (wp *Pool) setup() {
608 wp.creating = map[arvados.InstanceType][]time.Time{}
609 wp.exited = map[string]time.Time{}
610 wp.workers = map[cloud.InstanceID]*worker{}
611 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
614 func (wp *Pool) notify() {
616 defer wp.mtx.RUnlock()
617 for _, send := range wp.subscribers {
619 case send <- struct{}{}:
625 func (wp *Pool) getInstancesAndSync() error {
626 wp.setupOnce.Do(wp.setup)
627 wp.logger.Debug("getting instance list")
628 threshold := time.Now()
629 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
633 wp.sync(threshold, instances)
634 wp.logger.Debug("sync done")
638 // Add/remove/update workers based on instances, which was obtained
639 // from the instanceSet. However, don't clobber any other updates that
640 // already happened after threshold.
641 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
643 defer wp.mtx.Unlock()
644 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
647 for _, inst := range instances {
648 itTag := inst.Tags()[tagKeyInstanceType]
649 it, ok := wp.instanceTypes[itTag]
651 wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
654 if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew {
656 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
657 wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
662 for id, wkr := range wp.workers {
663 if wkr.updated.After(threshold) {
666 logger := wp.logger.WithFields(logrus.Fields{
667 "Instance": wkr.instance,
668 "WorkerState": wkr.state,
670 logger.Info("instance disappeared in cloud")
671 delete(wp.workers, id)
672 go wkr.executor.Close()
678 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")