1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
17 "git.curoverse.com/arvados.git/lib/cloud"
18 "git.curoverse.com/arvados.git/sdk/go/arvados"
19 "github.com/prometheus/client_golang/prometheus"
20 "github.com/sirupsen/logrus"
21 "golang.org/x/crypto/ssh"
25 tagKeyInstanceType = "InstanceType"
26 tagKeyIdleBehavior = "IdleBehavior"
27 tagKeyInstanceSecret = "InstanceSecret"
30 // An InstanceView shows a worker's current state and recent activity.
31 type InstanceView struct {
32 Instance cloud.InstanceID `json:"instance"`
33 Address string `json:"address"`
34 Price float64 `json:"price"`
35 ArvadosInstanceType string `json:"arvados_instance_type"`
36 ProviderInstanceType string `json:"provider_instance_type"`
37 LastContainerUUID string `json:"last_container_uuid"`
38 LastBusy time.Time `json:"last_busy"`
39 WorkerState string `json:"worker_state"`
40 IdleBehavior IdleBehavior `json:"idle_behavior"`
43 // An Executor executes shell commands on a remote host.
44 type Executor interface {
45 // Run cmd on the current target.
46 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
48 // Use the given target for subsequent operations. The new
49 // target is the same host as the previous target, but it
50 // might return a different address and verify a different
53 // SetTarget is called frequently, and in most cases the new
54 // target will behave exactly the same as the old one. An
55 // implementation should optimize accordingly.
57 // SetTarget must not block on concurrent Execute calls.
58 SetTarget(cloud.ExecutorTarget)
64 defaultSyncInterval = time.Minute
65 defaultProbeInterval = time.Second * 10
66 defaultMaxProbesPerSecond = 10
67 defaultTimeoutIdle = time.Minute
68 defaultTimeoutBooting = time.Minute * 10
69 defaultTimeoutProbe = time.Minute * 10
70 defaultTimeoutShutdown = time.Second * 10
72 // Time after a quota error to try again anyway, even if no
73 // instances have been shutdown.
74 quotaErrorTTL = time.Minute
76 // Time between "X failed because rate limiting" messages
77 logRateLimitErrorInterval = time.Second * 10
80 func duration(conf arvados.Duration, def time.Duration) time.Duration {
82 return time.Duration(conf)
88 // NewPool creates a Pool of workers backed by instanceSet.
90 // New instances are configured and set up according to the given
91 // cluster configuration.
92 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
96 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
97 newExecutor: newExecutor,
98 bootProbeCommand: cluster.CloudVMs.BootProbeCommand,
99 imageID: cloud.ImageID(cluster.CloudVMs.ImageID),
100 instanceTypes: cluster.InstanceTypes,
101 maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
102 probeInterval: duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
103 syncInterval: duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
104 timeoutIdle: duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
105 timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
106 timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
107 timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
108 installPublicKey: installPublicKey,
109 stop: make(chan bool),
111 wp.registerMetrics(reg)
113 wp.setupOnce.Do(wp.setup)
121 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
122 // zero Pool should not be used. Call NewPool to create a new Pool.
125 logger logrus.FieldLogger
126 arvClient *arvados.Client
127 instanceSet *throttledInstanceSet
128 newExecutor func(cloud.Instance) Executor
129 bootProbeCommand string
130 imageID cloud.ImageID
131 instanceTypes map[string]arvados.InstanceType
132 syncInterval time.Duration
133 probeInterval time.Duration
134 maxProbesPerSecond int
135 timeoutIdle time.Duration
136 timeoutBooting time.Duration
137 timeoutProbe time.Duration
138 timeoutShutdown time.Duration
139 installPublicKey ssh.PublicKey
142 subscribers map[<-chan struct{}]chan<- struct{}
143 creating map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls
144 workers map[cloud.InstanceID]*worker
145 loaded bool // loaded list of instances from InstanceSet at least once
146 exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
147 atQuotaUntil time.Time
148 atQuotaErr cloud.QuotaError
153 throttleCreate throttle
154 throttleInstances throttle
156 mContainersRunning prometheus.Gauge
157 mInstances *prometheus.GaugeVec
158 mInstancesPrice *prometheus.GaugeVec
159 mVCPUs *prometheus.GaugeVec
160 mMemory *prometheus.GaugeVec
163 // Subscribe returns a buffered channel that becomes ready after any
164 // change to the pool's state that could have scheduling implications:
165 // a worker's state changes, a new worker appears, the cloud
166 // provider's API rate limiting period ends, etc.
168 // Additional events that occur while the channel is already ready
169 // will be dropped, so it is OK if the caller services the channel
174 // ch := wp.Subscribe()
175 // defer wp.Unsubscribe(ch)
182 func (wp *Pool) Subscribe() <-chan struct{} {
183 wp.setupOnce.Do(wp.setup)
185 defer wp.mtx.Unlock()
186 ch := make(chan struct{}, 1)
187 wp.subscribers[ch] = ch
191 // Unsubscribe stops sending updates to the given channel.
192 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
193 wp.setupOnce.Do(wp.setup)
195 defer wp.mtx.Unlock()
196 delete(wp.subscribers, ch)
199 // Unallocated returns the number of unallocated (creating + booting +
200 // idle + unknown) workers for each instance type. Workers in
201 // hold/drain mode are not included.
202 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
203 wp.setupOnce.Do(wp.setup)
205 defer wp.mtx.RUnlock()
206 unalloc := map[arvados.InstanceType]int{}
207 creating := map[arvados.InstanceType]int{}
208 for it, times := range wp.creating {
209 creating[it] = len(times)
211 for _, wkr := range wp.workers {
212 // Skip workers that are not expected to become
213 // available soon. Note len(wkr.running)>0 is not
214 // redundant here: it can be true even in
216 if wkr.state == StateShutdown ||
217 wkr.state == StateRunning ||
218 wkr.idleBehavior != IdleBehaviorRun ||
219 len(wkr.running) > 0 {
224 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) {
225 // If up to N new workers appear in
226 // Instances() while we are waiting for N
227 // Create() calls to complete, we assume we're
228 // just seeing a race between Instances() and
229 // Create() responses.
231 // The other common reason why nodes have
232 // state==Unknown is that they appeared at
233 // startup, before any Create calls. They
234 // don't match the above timing condition, so
235 // we never mistakenly attribute them to
236 // pending Create calls.
240 for it, c := range creating {
246 // Create a new instance with the given type, and add it to the worker
247 // pool. The worker is added immediately; instance creation runs in
250 // Create returns false if a pre-existing error state prevents it from
251 // even attempting to create a new instance. Those errors are logged
252 // by the Pool, so the caller does not need to log anything in such
254 func (wp *Pool) Create(it arvados.InstanceType) bool {
255 logger := wp.logger.WithField("InstanceType", it.Name)
256 wp.setupOnce.Do(wp.setup)
258 defer wp.mtx.Unlock()
259 if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
263 wp.creating[it] = append(wp.creating[it], now)
266 secret := randomHex(instanceSecretLength)
267 tags := cloud.InstanceTags{
268 tagKeyInstanceType: it.Name,
269 tagKeyIdleBehavior: string(IdleBehaviorRun),
270 tagKeyInstanceSecret: secret,
272 initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename))
273 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
275 defer wp.mtx.Unlock()
276 // Remove our timestamp marker from wp.creating
277 for i, t := range wp.creating[it] {
279 copy(wp.creating[it][i:], wp.creating[it][i+1:])
280 wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1]
285 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
287 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
288 time.AfterFunc(quotaErrorTTL, wp.notify)
290 logger.WithError(err).Error("create failed")
291 wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
294 wp.updateWorker(inst, it, StateBooting)
299 // AtQuota returns true if Create is not expected to work at the
301 func (wp *Pool) AtQuota() bool {
303 defer wp.mtx.Unlock()
304 return time.Now().Before(wp.atQuotaUntil)
307 // SetIdleBehavior determines how the indicated instance will behave
308 // when it has no containers running.
309 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
311 defer wp.mtx.Unlock()
312 wkr, ok := wp.workers[id]
314 return errors.New("requested instance does not exist")
316 wkr.idleBehavior = idleBehavior
322 // Add or update worker attached to the given instance. Use
323 // initialState if a new worker is created.
325 // The second return value is true if a new worker is created.
327 // Caller must have lock.
328 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
329 inst = tagVerifier{inst}
331 if wkr := wp.workers[id]; wkr != nil {
332 wkr.executor.SetTarget(inst)
334 wkr.updated = time.Now()
335 if initialState == StateBooting && wkr.state == StateUnknown {
336 wkr.state = StateBooting
342 // If an instance has a valid IdleBehavior tag when it first
343 // appears, initialize the new worker accordingly (this is how
344 // we restore IdleBehavior that was set by a prior dispatch
345 // process); otherwise, default to "run". After this,
346 // wkr.idleBehavior is the source of truth, and will only be
347 // changed via SetIdleBehavior().
348 idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior])
349 if !validIdleBehavior[idleBehavior] {
350 idleBehavior = IdleBehaviorRun
353 logger := wp.logger.WithFields(logrus.Fields{
354 "InstanceType": it.Name,
355 "Instance": inst.ID(),
356 "Address": inst.Address(),
358 logger.WithFields(logrus.Fields{
359 "State": initialState,
360 "IdleBehavior": idleBehavior,
361 }).Infof("instance appeared in cloud")
367 executor: wp.newExecutor(inst),
369 idleBehavior: idleBehavior,
376 running: make(map[string]struct{}),
377 starting: make(map[string]struct{}),
378 probing: make(chan struct{}, 1),
384 // caller must have lock.
385 func (wp *Pool) notifyExited(uuid string, t time.Time) {
389 // Shutdown shuts down a worker with the given type, or returns false
390 // if all workers with the given type are busy.
391 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
392 wp.setupOnce.Do(wp.setup)
394 defer wp.mtx.Unlock()
395 logger := wp.logger.WithField("InstanceType", it.Name)
396 logger.Info("shutdown requested")
397 for _, tryState := range []State{StateBooting, StateIdle} {
398 // TODO: shutdown the worker with the longest idle
399 // time (Idle) or the earliest create time (Booting)
400 for _, wkr := range wp.workers {
401 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
402 logger.WithField("Instance", wkr.instance).Info("shutting down")
411 // CountWorkers returns the current number of workers in each state.
412 func (wp *Pool) CountWorkers() map[State]int {
413 wp.setupOnce.Do(wp.setup)
415 defer wp.mtx.Unlock()
417 for _, w := range wp.workers {
423 // Running returns the container UUIDs being prepared/run on workers.
425 // In the returned map, the time value indicates when the Pool
426 // observed that the container process had exited. A container that
427 // has not yet exited has a zero time value. The caller should use
428 // KillContainer() to garbage-collect the entries for exited
430 func (wp *Pool) Running() map[string]time.Time {
431 wp.setupOnce.Do(wp.setup)
433 defer wp.mtx.Unlock()
434 r := map[string]time.Time{}
435 for _, wkr := range wp.workers {
436 for uuid := range wkr.running {
437 r[uuid] = time.Time{}
439 for uuid := range wkr.starting {
440 r[uuid] = time.Time{}
443 for uuid, exited := range wp.exited {
449 // StartContainer starts a container on an idle worker immediately if
450 // possible, otherwise returns false.
451 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
452 wp.setupOnce.Do(wp.setup)
454 defer wp.mtx.Unlock()
456 for _, w := range wp.workers {
457 if w.instType == it && w.state == StateIdle {
458 if wkr == nil || w.busy.After(wkr.busy) {
466 wkr.startContainer(ctr)
470 // KillContainer kills the crunch-run process for the given container
471 // UUID, if it's running on any worker.
473 // KillContainer returns immediately; the act of killing the container
474 // takes some time, and runs in the background.
475 func (wp *Pool) KillContainer(uuid string) {
477 defer wp.mtx.Unlock()
478 if _, ok := wp.exited[uuid]; ok {
479 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
480 delete(wp.exited, uuid)
483 for _, wkr := range wp.workers {
484 if _, ok := wkr.running[uuid]; ok {
485 go wp.kill(wkr, uuid)
489 wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
492 func (wp *Pool) kill(wkr *worker, uuid string) {
493 logger := wp.logger.WithFields(logrus.Fields{
494 "ContainerUUID": uuid,
495 "Instance": wkr.instance.ID(),
497 logger.Debug("killing process")
498 cmd := "crunch-run --kill 15 " + uuid
499 if u := wkr.instance.RemoteUser(); u != "root" {
502 stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
504 logger.WithFields(logrus.Fields{
505 "stderr": string(stderr),
506 "stdout": string(stdout),
508 }).Warn("kill failed")
511 logger.Debug("killing process succeeded")
513 defer wp.mtx.Unlock()
514 if _, ok := wkr.running[uuid]; ok {
515 delete(wkr.running, uuid)
516 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
517 wkr.state = StateIdle
519 wkr.updated = time.Now()
524 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
526 reg = prometheus.NewRegistry()
528 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
529 Namespace: "arvados",
530 Subsystem: "dispatchcloud",
531 Name: "containers_running",
532 Help: "Number of containers reported running by cloud VMs.",
534 reg.MustRegister(wp.mContainersRunning)
535 wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
536 Namespace: "arvados",
537 Subsystem: "dispatchcloud",
538 Name: "instances_total",
539 Help: "Number of cloud VMs.",
540 }, []string{"category"})
541 reg.MustRegister(wp.mInstances)
542 wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
543 Namespace: "arvados",
544 Subsystem: "dispatchcloud",
545 Name: "instances_price",
546 Help: "Price of cloud VMs.",
547 }, []string{"category"})
548 reg.MustRegister(wp.mInstancesPrice)
549 wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
550 Namespace: "arvados",
551 Subsystem: "dispatchcloud",
553 Help: "Total VCPUs on all cloud VMs.",
554 }, []string{"category"})
555 reg.MustRegister(wp.mVCPUs)
556 wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
557 Namespace: "arvados",
558 Subsystem: "dispatchcloud",
559 Name: "memory_bytes_total",
560 Help: "Total memory on all cloud VMs.",
561 }, []string{"category"})
562 reg.MustRegister(wp.mMemory)
565 func (wp *Pool) runMetrics() {
567 defer wp.Unsubscribe(ch)
574 func (wp *Pool) updateMetrics() {
576 defer wp.mtx.RUnlock()
578 instances := map[string]int64{}
579 price := map[string]float64{}
580 cpu := map[string]int64{}
581 mem := map[string]int64{}
583 for _, wkr := range wp.workers {
586 case len(wkr.running)+len(wkr.starting) > 0:
588 case wkr.idleBehavior == IdleBehaviorHold:
590 case wkr.state == StateBooting:
592 case wkr.state == StateUnknown:
598 price[cat] += wkr.instType.Price
599 cpu[cat] += int64(wkr.instType.VCPUs)
600 mem[cat] += int64(wkr.instType.RAM)
601 running += int64(len(wkr.running) + len(wkr.starting))
603 for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
604 wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat]))
605 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
606 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
607 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
609 wp.mContainersRunning.Set(float64(running))
612 func (wp *Pool) runProbes() {
613 maxPPS := wp.maxProbesPerSecond
615 maxPPS = defaultMaxProbesPerSecond
617 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
618 defer limitticker.Stop()
620 probeticker := time.NewTicker(wp.probeInterval)
621 defer probeticker.Stop()
623 workers := []cloud.InstanceID{}
624 for range probeticker.C {
625 workers = workers[:0]
627 for id, wkr := range wp.workers {
628 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
631 workers = append(workers, id)
635 for _, id := range workers {
637 wkr, ok := wp.workers[id]
640 // Deleted while we were probing
644 go wkr.ProbeAndUpdate()
648 case <-limitticker.C:
654 func (wp *Pool) runSync() {
655 // sync once immediately, then wait syncInterval, sync again,
657 timer := time.NewTimer(1)
661 err := wp.getInstancesAndSync()
663 wp.logger.WithError(err).Warn("sync failed")
665 timer.Reset(wp.syncInterval)
667 wp.logger.Debug("worker.Pool stopped")
673 // Stop synchronizing with the InstanceSet.
674 func (wp *Pool) Stop() {
675 wp.setupOnce.Do(wp.setup)
679 // Instances returns an InstanceView for each worker in the pool,
680 // summarizing its current state and recent activity.
681 func (wp *Pool) Instances() []InstanceView {
683 wp.setupOnce.Do(wp.setup)
685 for _, w := range wp.workers {
686 r = append(r, InstanceView{
687 Instance: w.instance.ID(),
688 Address: w.instance.Address(),
689 Price: w.instType.Price,
690 ArvadosInstanceType: w.instType.Name,
691 ProviderInstanceType: w.instType.ProviderType,
692 LastContainerUUID: w.lastUUID,
694 WorkerState: w.state.String(),
695 IdleBehavior: w.idleBehavior,
699 sort.Slice(r, func(i, j int) bool {
700 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
705 func (wp *Pool) setup() {
706 wp.creating = map[arvados.InstanceType][]time.Time{}
707 wp.exited = map[string]time.Time{}
708 wp.workers = map[cloud.InstanceID]*worker{}
709 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
712 func (wp *Pool) notify() {
714 defer wp.mtx.RUnlock()
715 for _, send := range wp.subscribers {
717 case send <- struct{}{}:
723 func (wp *Pool) getInstancesAndSync() error {
724 wp.setupOnce.Do(wp.setup)
725 if err := wp.instanceSet.throttleInstances.Error(); err != nil {
728 wp.logger.Debug("getting instance list")
729 threshold := time.Now()
730 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
732 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
735 wp.sync(threshold, instances)
736 wp.logger.Debug("sync done")
740 // Add/remove/update workers based on instances, which was obtained
741 // from the instanceSet. However, don't clobber any other updates that
742 // already happened after threshold.
743 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
745 defer wp.mtx.Unlock()
746 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
749 for _, inst := range instances {
750 itTag := inst.Tags()[tagKeyInstanceType]
751 it, ok := wp.instanceTypes[itTag]
753 wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
756 if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew {
758 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
759 wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
764 for id, wkr := range wp.workers {
765 if wkr.updated.After(threshold) {
768 logger := wp.logger.WithFields(logrus.Fields{
769 "Instance": wkr.instance.ID(),
770 "WorkerState": wkr.state,
772 logger.Info("instance disappeared in cloud")
773 delete(wp.workers, id)
774 go wkr.executor.Close()
780 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
788 // Return a random string of n hexadecimal digits (n*4 random bits). n
790 func randomHex(n int) string {
791 buf := make([]byte, n/2)
792 _, err := rand.Read(buf)
796 return fmt.Sprintf("%x", buf)