1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
17 "git.curoverse.com/arvados.git/lib/cloud"
18 "git.curoverse.com/arvados.git/sdk/go/arvados"
19 "github.com/prometheus/client_golang/prometheus"
20 "github.com/sirupsen/logrus"
21 "golang.org/x/crypto/ssh"
25 tagKeyInstanceType = "InstanceType"
26 tagKeyIdleBehavior = "IdleBehavior"
27 tagKeyInstanceSecret = "InstanceSecret"
30 // An InstanceView shows a worker's current state and recent activity.
31 type InstanceView struct {
32 Instance cloud.InstanceID `json:"instance"`
33 Address string `json:"address"`
34 Price float64 `json:"price"`
35 ArvadosInstanceType string `json:"arvados_instance_type"`
36 ProviderInstanceType string `json:"provider_instance_type"`
37 LastContainerUUID string `json:"last_container_uuid"`
38 LastBusy time.Time `json:"last_busy"`
39 WorkerState string `json:"worker_state"`
40 IdleBehavior IdleBehavior `json:"idle_behavior"`
43 // An Executor executes shell commands on a remote host.
44 type Executor interface {
45 // Run cmd on the current target.
46 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
48 // Use the given target for subsequent operations. The new
49 // target is the same host as the previous target, but it
50 // might return a different address and verify a different
53 // SetTarget is called frequently, and in most cases the new
54 // target will behave exactly the same as the old one. An
55 // implementation should optimize accordingly.
57 // SetTarget must not block on concurrent Execute calls.
58 SetTarget(cloud.ExecutorTarget)
64 defaultSyncInterval = time.Minute
65 defaultProbeInterval = time.Second * 10
66 defaultMaxProbesPerSecond = 10
67 defaultTimeoutIdle = time.Minute
68 defaultTimeoutBooting = time.Minute * 10
69 defaultTimeoutProbe = time.Minute * 10
70 defaultTimeoutShutdown = time.Second * 10
72 // Time after a quota error to try again anyway, even if no
73 // instances have been shutdown.
74 quotaErrorTTL = time.Minute
76 // Time between "X failed because rate limiting" messages
77 logRateLimitErrorInterval = time.Second * 10
80 func duration(conf arvados.Duration, def time.Duration) time.Duration {
82 return time.Duration(conf)
88 // NewPool creates a Pool of workers backed by instanceSet.
90 // New instances are configured and set up according to the given
91 // cluster configuration.
92 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
96 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
97 newExecutor: newExecutor,
98 bootProbeCommand: cluster.CloudVMs.BootProbeCommand,
99 imageID: cloud.ImageID(cluster.CloudVMs.ImageID),
100 instanceTypes: cluster.InstanceTypes,
101 maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
102 probeInterval: duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
103 syncInterval: duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
104 timeoutIdle: duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
105 timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
106 timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
107 timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
108 installPublicKey: installPublicKey,
109 stop: make(chan bool),
111 wp.registerMetrics(reg)
113 wp.setupOnce.Do(wp.setup)
121 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
122 // zero Pool should not be used. Call NewPool to create a new Pool.
125 logger logrus.FieldLogger
126 arvClient *arvados.Client
127 instanceSet *throttledInstanceSet
128 newExecutor func(cloud.Instance) Executor
129 bootProbeCommand string
130 imageID cloud.ImageID
131 instanceTypes map[string]arvados.InstanceType
132 syncInterval time.Duration
133 probeInterval time.Duration
134 maxProbesPerSecond int
135 timeoutIdle time.Duration
136 timeoutBooting time.Duration
137 timeoutProbe time.Duration
138 timeoutShutdown time.Duration
139 installPublicKey ssh.PublicKey
142 subscribers map[<-chan struct{}]chan<- struct{}
143 creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
144 workers map[cloud.InstanceID]*worker
145 loaded bool // loaded list of instances from InstanceSet at least once
146 exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
147 atQuotaUntil time.Time
148 atQuotaErr cloud.QuotaError
153 throttleCreate throttle
154 throttleInstances throttle
156 mContainersRunning prometheus.Gauge
157 mInstances *prometheus.GaugeVec
158 mInstancesPrice *prometheus.GaugeVec
159 mVCPUs *prometheus.GaugeVec
160 mMemory *prometheus.GaugeVec
163 type createCall struct {
165 instanceType arvados.InstanceType
168 // Subscribe returns a buffered channel that becomes ready after any
169 // change to the pool's state that could have scheduling implications:
170 // a worker's state changes, a new worker appears, the cloud
171 // provider's API rate limiting period ends, etc.
173 // Additional events that occur while the channel is already ready
174 // will be dropped, so it is OK if the caller services the channel
179 // ch := wp.Subscribe()
180 // defer wp.Unsubscribe(ch)
187 func (wp *Pool) Subscribe() <-chan struct{} {
188 wp.setupOnce.Do(wp.setup)
190 defer wp.mtx.Unlock()
191 ch := make(chan struct{}, 1)
192 wp.subscribers[ch] = ch
196 // Unsubscribe stops sending updates to the given channel.
197 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
198 wp.setupOnce.Do(wp.setup)
200 defer wp.mtx.Unlock()
201 delete(wp.subscribers, ch)
204 // Unallocated returns the number of unallocated (creating + booting +
205 // idle + unknown) workers for each instance type. Workers in
206 // hold/drain mode are not included.
207 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
208 wp.setupOnce.Do(wp.setup)
210 defer wp.mtx.RUnlock()
211 unalloc := map[arvados.InstanceType]int{}
212 creating := map[arvados.InstanceType]int{}
213 oldestCreate := map[arvados.InstanceType]time.Time{}
214 for _, cc := range wp.creating {
215 it := cc.instanceType
217 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
218 oldestCreate[it] = cc.time
221 for _, wkr := range wp.workers {
222 // Skip workers that are not expected to become
223 // available soon. Note len(wkr.running)>0 is not
224 // redundant here: it can be true even in
226 if wkr.state == StateShutdown ||
227 wkr.state == StateRunning ||
228 wkr.idleBehavior != IdleBehaviorRun ||
229 len(wkr.running) > 0 {
234 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
235 // If up to N new workers appear in
236 // Instances() while we are waiting for N
237 // Create() calls to complete, we assume we're
238 // just seeing a race between Instances() and
239 // Create() responses.
241 // The other common reason why nodes have
242 // state==Unknown is that they appeared at
243 // startup, before any Create calls. They
244 // don't match the above timing condition, so
245 // we never mistakenly attribute them to
246 // pending Create calls.
250 for it, c := range creating {
256 // Create a new instance with the given type, and add it to the worker
257 // pool. The worker is added immediately; instance creation runs in
260 // Create returns false if a pre-existing error state prevents it from
261 // even attempting to create a new instance. Those errors are logged
262 // by the Pool, so the caller does not need to log anything in such
264 func (wp *Pool) Create(it arvados.InstanceType) bool {
265 logger := wp.logger.WithField("InstanceType", it.Name)
266 wp.setupOnce.Do(wp.setup)
268 defer wp.mtx.Unlock()
269 if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
273 secret := randomHex(instanceSecretLength)
274 wp.creating[secret] = createCall{time: now, instanceType: it}
277 tags := cloud.InstanceTags{
278 tagKeyInstanceType: it.Name,
279 tagKeyIdleBehavior: string(IdleBehaviorRun),
280 tagKeyInstanceSecret: secret,
282 initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename))
283 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
285 defer wp.mtx.Unlock()
286 // delete() is deferred so the updateWorker() call
287 // below knows to use StateBooting when adding a new
289 defer delete(wp.creating, secret)
291 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
293 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
294 time.AfterFunc(quotaErrorTTL, wp.notify)
296 logger.WithError(err).Error("create failed")
297 wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
300 wp.updateWorker(inst, it)
305 // AtQuota returns true if Create is not expected to work at the
307 func (wp *Pool) AtQuota() bool {
309 defer wp.mtx.Unlock()
310 return time.Now().Before(wp.atQuotaUntil)
313 // SetIdleBehavior determines how the indicated instance will behave
314 // when it has no containers running.
315 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
317 defer wp.mtx.Unlock()
318 wkr, ok := wp.workers[id]
320 return errors.New("requested instance does not exist")
322 wkr.idleBehavior = idleBehavior
328 // Add or update worker attached to the given instance.
330 // The second return value is true if a new worker is created.
332 // A newly added instance has state=StateBooting if its tags match an
333 // entry in wp.creating, otherwise StateUnknown.
335 // Caller must have lock.
336 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
337 inst = tagVerifier{inst}
339 if wkr := wp.workers[id]; wkr != nil {
340 wkr.executor.SetTarget(inst)
342 wkr.updated = time.Now()
347 state := StateUnknown
348 if _, ok := wp.creating[inst.Tags()[tagKeyInstanceSecret]]; ok {
352 // If an instance has a valid IdleBehavior tag when it first
353 // appears, initialize the new worker accordingly (this is how
354 // we restore IdleBehavior that was set by a prior dispatch
355 // process); otherwise, default to "run". After this,
356 // wkr.idleBehavior is the source of truth, and will only be
357 // changed via SetIdleBehavior().
358 idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior])
359 if !validIdleBehavior[idleBehavior] {
360 idleBehavior = IdleBehaviorRun
363 logger := wp.logger.WithFields(logrus.Fields{
364 "InstanceType": it.Name,
365 "Instance": inst.ID(),
366 "Address": inst.Address(),
368 logger.WithFields(logrus.Fields{
370 "IdleBehavior": idleBehavior,
371 }).Infof("instance appeared in cloud")
377 executor: wp.newExecutor(inst),
379 idleBehavior: idleBehavior,
386 running: make(map[string]struct{}),
387 starting: make(map[string]struct{}),
388 probing: make(chan struct{}, 1),
394 // caller must have lock.
395 func (wp *Pool) notifyExited(uuid string, t time.Time) {
399 // Shutdown shuts down a worker with the given type, or returns false
400 // if all workers with the given type are busy.
401 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
402 wp.setupOnce.Do(wp.setup)
404 defer wp.mtx.Unlock()
405 logger := wp.logger.WithField("InstanceType", it.Name)
406 logger.Info("shutdown requested")
407 for _, tryState := range []State{StateBooting, StateIdle} {
408 // TODO: shutdown the worker with the longest idle
409 // time (Idle) or the earliest create time (Booting)
410 for _, wkr := range wp.workers {
411 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
412 logger.WithField("Instance", wkr.instance).Info("shutting down")
421 // CountWorkers returns the current number of workers in each state.
423 // CountWorkers blocks, if necessary, until the initial instance list
424 // has been loaded from the cloud provider.
425 func (wp *Pool) CountWorkers() map[State]int {
426 wp.setupOnce.Do(wp.setup)
429 defer wp.mtx.Unlock()
431 for _, w := range wp.workers {
437 // Running returns the container UUIDs being prepared/run on workers.
439 // In the returned map, the time value indicates when the Pool
440 // observed that the container process had exited. A container that
441 // has not yet exited has a zero time value. The caller should use
442 // KillContainer() to garbage-collect the entries for exited
444 func (wp *Pool) Running() map[string]time.Time {
445 wp.setupOnce.Do(wp.setup)
447 defer wp.mtx.Unlock()
448 r := map[string]time.Time{}
449 for _, wkr := range wp.workers {
450 for uuid := range wkr.running {
451 r[uuid] = time.Time{}
453 for uuid := range wkr.starting {
454 r[uuid] = time.Time{}
457 for uuid, exited := range wp.exited {
463 // StartContainer starts a container on an idle worker immediately if
464 // possible, otherwise returns false.
465 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
466 wp.setupOnce.Do(wp.setup)
468 defer wp.mtx.Unlock()
470 for _, w := range wp.workers {
471 if w.instType == it && w.state == StateIdle {
472 if wkr == nil || w.busy.After(wkr.busy) {
480 wkr.startContainer(ctr)
484 // KillContainer kills the crunch-run process for the given container
485 // UUID, if it's running on any worker.
487 // KillContainer returns immediately; the act of killing the container
488 // takes some time, and runs in the background.
489 func (wp *Pool) KillContainer(uuid string) {
491 defer wp.mtx.Unlock()
492 if _, ok := wp.exited[uuid]; ok {
493 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
494 delete(wp.exited, uuid)
497 for _, wkr := range wp.workers {
498 if _, ok := wkr.running[uuid]; ok {
499 go wp.kill(wkr, uuid)
503 wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
506 func (wp *Pool) kill(wkr *worker, uuid string) {
507 logger := wp.logger.WithFields(logrus.Fields{
508 "ContainerUUID": uuid,
509 "Instance": wkr.instance.ID(),
511 logger.Debug("killing process")
512 cmd := "crunch-run --kill 15 " + uuid
513 if u := wkr.instance.RemoteUser(); u != "root" {
516 stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
518 logger.WithFields(logrus.Fields{
519 "stderr": string(stderr),
520 "stdout": string(stdout),
522 }).Warn("kill failed")
525 logger.Debug("killing process succeeded")
527 defer wp.mtx.Unlock()
528 if _, ok := wkr.running[uuid]; ok {
529 delete(wkr.running, uuid)
530 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
531 wkr.state = StateIdle
533 wkr.updated = time.Now()
538 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
540 reg = prometheus.NewRegistry()
542 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
543 Namespace: "arvados",
544 Subsystem: "dispatchcloud",
545 Name: "containers_running",
546 Help: "Number of containers reported running by cloud VMs.",
548 reg.MustRegister(wp.mContainersRunning)
549 wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
550 Namespace: "arvados",
551 Subsystem: "dispatchcloud",
552 Name: "instances_total",
553 Help: "Number of cloud VMs.",
554 }, []string{"category"})
555 reg.MustRegister(wp.mInstances)
556 wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
557 Namespace: "arvados",
558 Subsystem: "dispatchcloud",
559 Name: "instances_price",
560 Help: "Price of cloud VMs.",
561 }, []string{"category"})
562 reg.MustRegister(wp.mInstancesPrice)
563 wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
564 Namespace: "arvados",
565 Subsystem: "dispatchcloud",
567 Help: "Total VCPUs on all cloud VMs.",
568 }, []string{"category"})
569 reg.MustRegister(wp.mVCPUs)
570 wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
571 Namespace: "arvados",
572 Subsystem: "dispatchcloud",
573 Name: "memory_bytes_total",
574 Help: "Total memory on all cloud VMs.",
575 }, []string{"category"})
576 reg.MustRegister(wp.mMemory)
579 func (wp *Pool) runMetrics() {
581 defer wp.Unsubscribe(ch)
588 func (wp *Pool) updateMetrics() {
590 defer wp.mtx.RUnlock()
592 instances := map[string]int64{}
593 price := map[string]float64{}
594 cpu := map[string]int64{}
595 mem := map[string]int64{}
597 for _, wkr := range wp.workers {
600 case len(wkr.running)+len(wkr.starting) > 0:
602 case wkr.idleBehavior == IdleBehaviorHold:
604 case wkr.state == StateBooting:
606 case wkr.state == StateUnknown:
612 price[cat] += wkr.instType.Price
613 cpu[cat] += int64(wkr.instType.VCPUs)
614 mem[cat] += int64(wkr.instType.RAM)
615 running += int64(len(wkr.running) + len(wkr.starting))
617 for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
618 wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat]))
619 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
620 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
621 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
623 wp.mContainersRunning.Set(float64(running))
626 func (wp *Pool) runProbes() {
627 maxPPS := wp.maxProbesPerSecond
629 maxPPS = defaultMaxProbesPerSecond
631 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
632 defer limitticker.Stop()
634 probeticker := time.NewTicker(wp.probeInterval)
635 defer probeticker.Stop()
637 workers := []cloud.InstanceID{}
638 for range probeticker.C {
639 workers = workers[:0]
641 for id, wkr := range wp.workers {
642 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
645 workers = append(workers, id)
649 for _, id := range workers {
651 wkr, ok := wp.workers[id]
654 // Deleted while we were probing
658 go wkr.ProbeAndUpdate()
662 case <-limitticker.C:
668 func (wp *Pool) runSync() {
669 // sync once immediately, then wait syncInterval, sync again,
671 timer := time.NewTimer(1)
675 err := wp.getInstancesAndSync()
677 wp.logger.WithError(err).Warn("sync failed")
679 timer.Reset(wp.syncInterval)
681 wp.logger.Debug("worker.Pool stopped")
687 // Stop synchronizing with the InstanceSet.
688 func (wp *Pool) Stop() {
689 wp.setupOnce.Do(wp.setup)
693 // Instances returns an InstanceView for each worker in the pool,
694 // summarizing its current state and recent activity.
695 func (wp *Pool) Instances() []InstanceView {
697 wp.setupOnce.Do(wp.setup)
699 for _, w := range wp.workers {
700 r = append(r, InstanceView{
701 Instance: w.instance.ID(),
702 Address: w.instance.Address(),
703 Price: w.instType.Price,
704 ArvadosInstanceType: w.instType.Name,
705 ProviderInstanceType: w.instType.ProviderType,
706 LastContainerUUID: w.lastUUID,
708 WorkerState: w.state.String(),
709 IdleBehavior: w.idleBehavior,
713 sort.Slice(r, func(i, j int) bool {
714 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
719 func (wp *Pool) setup() {
720 wp.creating = map[string]createCall{}
721 wp.exited = map[string]time.Time{}
722 wp.workers = map[cloud.InstanceID]*worker{}
723 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
726 func (wp *Pool) notify() {
728 defer wp.mtx.RUnlock()
729 for _, send := range wp.subscribers {
731 case send <- struct{}{}:
737 func (wp *Pool) getInstancesAndSync() error {
738 wp.setupOnce.Do(wp.setup)
739 if err := wp.instanceSet.throttleInstances.Error(); err != nil {
742 wp.logger.Debug("getting instance list")
743 threshold := time.Now()
744 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
746 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
749 wp.sync(threshold, instances)
750 wp.logger.Debug("sync done")
754 // Add/remove/update workers based on instances, which was obtained
755 // from the instanceSet. However, don't clobber any other updates that
756 // already happened after threshold.
757 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
759 defer wp.mtx.Unlock()
760 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
763 for _, inst := range instances {
764 itTag := inst.Tags()[tagKeyInstanceType]
765 it, ok := wp.instanceTypes[itTag]
767 wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
770 if wkr, isNew := wp.updateWorker(inst, it); isNew {
772 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
773 wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
778 for id, wkr := range wp.workers {
779 if wkr.updated.After(threshold) {
782 logger := wp.logger.WithFields(logrus.Fields{
783 "Instance": wkr.instance.ID(),
784 "WorkerState": wkr.state,
786 logger.Info("instance disappeared in cloud")
787 delete(wp.workers, id)
788 go wkr.executor.Close()
795 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
803 func (wp *Pool) waitUntilLoaded() {
806 defer wp.mtx.RUnlock()
814 // Return a random string of n hexadecimal digits (n*4 random bits). n
816 func randomHex(n int) string {
817 buf := make([]byte, n/2)
818 _, err := rand.Read(buf)
822 return fmt.Sprintf("%x", buf)