1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
17 "git.curoverse.com/arvados.git/lib/cloud"
18 "git.curoverse.com/arvados.git/sdk/go/arvados"
19 "github.com/prometheus/client_golang/prometheus"
20 "github.com/sirupsen/logrus"
21 "golang.org/x/crypto/ssh"
25 tagKeyInstanceType = "InstanceType"
26 tagKeyIdleBehavior = "IdleBehavior"
27 tagKeyInstanceSecret = "InstanceSecret"
30 // An InstanceView shows a worker's current state and recent activity.
31 type InstanceView struct {
32 Instance cloud.InstanceID `json:"instance"`
33 Address string `json:"address"`
34 Price float64 `json:"price"`
35 ArvadosInstanceType string `json:"arvados_instance_type"`
36 ProviderInstanceType string `json:"provider_instance_type"`
37 LastContainerUUID string `json:"last_container_uuid"`
38 LastBusy time.Time `json:"last_busy"`
39 WorkerState string `json:"worker_state"`
40 IdleBehavior IdleBehavior `json:"idle_behavior"`
43 // An Executor executes shell commands on a remote host.
44 type Executor interface {
45 // Run cmd on the current target.
46 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
48 // Use the given target for subsequent operations. The new
49 // target is the same host as the previous target, but it
50 // might return a different address and verify a different
53 // SetTarget is called frequently, and in most cases the new
54 // target will behave exactly the same as the old one. An
55 // implementation should optimize accordingly.
57 // SetTarget must not block on concurrent Execute calls.
58 SetTarget(cloud.ExecutorTarget)
64 defaultSyncInterval = time.Minute
65 defaultProbeInterval = time.Second * 10
66 defaultMaxProbesPerSecond = 10
67 defaultTimeoutIdle = time.Minute
68 defaultTimeoutBooting = time.Minute * 10
69 defaultTimeoutProbe = time.Minute * 10
70 defaultTimeoutShutdown = time.Second * 10
71 defaultTimeoutTERM = time.Minute * 2
72 defaultTimeoutSignal = time.Second * 5
74 // Time after a quota error to try again anyway, even if no
75 // instances have been shutdown.
76 quotaErrorTTL = time.Minute
78 // Time between "X failed because rate limiting" messages
79 logRateLimitErrorInterval = time.Second * 10
82 func duration(conf arvados.Duration, def time.Duration) time.Duration {
84 return time.Duration(conf)
90 // NewPool creates a Pool of workers backed by instanceSet.
92 // New instances are configured and set up according to the given
93 // cluster configuration.
94 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
98 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
99 newExecutor: newExecutor,
100 bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
101 imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
102 instanceTypes: cluster.InstanceTypes,
103 maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
104 probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
105 syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
106 timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
107 timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
108 timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
109 timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
110 timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
111 timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
112 installPublicKey: installPublicKey,
113 stop: make(chan bool),
115 wp.registerMetrics(reg)
117 wp.setupOnce.Do(wp.setup)
125 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
126 // zero Pool should not be used. Call NewPool to create a new Pool.
129 logger logrus.FieldLogger
130 arvClient *arvados.Client
131 instanceSet *throttledInstanceSet
132 newExecutor func(cloud.Instance) Executor
133 bootProbeCommand string
134 imageID cloud.ImageID
135 instanceTypes map[string]arvados.InstanceType
136 syncInterval time.Duration
137 probeInterval time.Duration
138 maxProbesPerSecond int
139 timeoutIdle time.Duration
140 timeoutBooting time.Duration
141 timeoutProbe time.Duration
142 timeoutShutdown time.Duration
143 timeoutTERM time.Duration
144 timeoutSignal time.Duration
145 installPublicKey ssh.PublicKey
148 subscribers map[<-chan struct{}]chan<- struct{}
149 creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
150 workers map[cloud.InstanceID]*worker
151 loaded bool // loaded list of instances from InstanceSet at least once
152 exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
153 atQuotaUntil time.Time
154 atQuotaErr cloud.QuotaError
159 throttleCreate throttle
160 throttleInstances throttle
162 mContainersRunning prometheus.Gauge
163 mInstances *prometheus.GaugeVec
164 mInstancesPrice *prometheus.GaugeVec
165 mVCPUs *prometheus.GaugeVec
166 mMemory *prometheus.GaugeVec
169 type createCall struct {
171 instanceType arvados.InstanceType
174 // Subscribe returns a buffered channel that becomes ready after any
175 // change to the pool's state that could have scheduling implications:
176 // a worker's state changes, a new worker appears, the cloud
177 // provider's API rate limiting period ends, etc.
179 // Additional events that occur while the channel is already ready
180 // will be dropped, so it is OK if the caller services the channel
185 // ch := wp.Subscribe()
186 // defer wp.Unsubscribe(ch)
193 func (wp *Pool) Subscribe() <-chan struct{} {
194 wp.setupOnce.Do(wp.setup)
196 defer wp.mtx.Unlock()
197 ch := make(chan struct{}, 1)
198 wp.subscribers[ch] = ch
202 // Unsubscribe stops sending updates to the given channel.
203 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
204 wp.setupOnce.Do(wp.setup)
206 defer wp.mtx.Unlock()
207 delete(wp.subscribers, ch)
210 // Unallocated returns the number of unallocated (creating + booting +
211 // idle + unknown) workers for each instance type. Workers in
212 // hold/drain mode are not included.
213 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
214 wp.setupOnce.Do(wp.setup)
216 defer wp.mtx.RUnlock()
217 unalloc := map[arvados.InstanceType]int{}
218 creating := map[arvados.InstanceType]int{}
219 oldestCreate := map[arvados.InstanceType]time.Time{}
220 for _, cc := range wp.creating {
221 it := cc.instanceType
223 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
224 oldestCreate[it] = cc.time
227 for _, wkr := range wp.workers {
228 // Skip workers that are not expected to become
229 // available soon. Note len(wkr.running)>0 is not
230 // redundant here: it can be true even in
232 if wkr.state == StateShutdown ||
233 wkr.state == StateRunning ||
234 wkr.idleBehavior != IdleBehaviorRun ||
235 len(wkr.running) > 0 {
240 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
241 // If up to N new workers appear in
242 // Instances() while we are waiting for N
243 // Create() calls to complete, we assume we're
244 // just seeing a race between Instances() and
245 // Create() responses.
247 // The other common reason why nodes have
248 // state==Unknown is that they appeared at
249 // startup, before any Create calls. They
250 // don't match the above timing condition, so
251 // we never mistakenly attribute them to
252 // pending Create calls.
256 for it, c := range creating {
262 // Create a new instance with the given type, and add it to the worker
263 // pool. The worker is added immediately; instance creation runs in
266 // Create returns false if a pre-existing error state prevents it from
267 // even attempting to create a new instance. Those errors are logged
268 // by the Pool, so the caller does not need to log anything in such
270 func (wp *Pool) Create(it arvados.InstanceType) bool {
271 logger := wp.logger.WithField("InstanceType", it.Name)
272 wp.setupOnce.Do(wp.setup)
274 defer wp.mtx.Unlock()
275 if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
279 secret := randomHex(instanceSecretLength)
280 wp.creating[secret] = createCall{time: now, instanceType: it}
283 tags := cloud.InstanceTags{
284 tagKeyInstanceType: it.Name,
285 tagKeyIdleBehavior: string(IdleBehaviorRun),
286 tagKeyInstanceSecret: secret,
288 initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename))
289 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
291 defer wp.mtx.Unlock()
292 // delete() is deferred so the updateWorker() call
293 // below knows to use StateBooting when adding a new
295 defer delete(wp.creating, secret)
297 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
299 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
300 time.AfterFunc(quotaErrorTTL, wp.notify)
302 logger.WithError(err).Error("create failed")
303 wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
306 wp.updateWorker(inst, it)
311 // AtQuota returns true if Create is not expected to work at the
313 func (wp *Pool) AtQuota() bool {
315 defer wp.mtx.Unlock()
316 return time.Now().Before(wp.atQuotaUntil)
319 // SetIdleBehavior determines how the indicated instance will behave
320 // when it has no containers running.
321 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
323 defer wp.mtx.Unlock()
324 wkr, ok := wp.workers[id]
326 return errors.New("requested instance does not exist")
328 wkr.setIdleBehavior(idleBehavior)
332 // Add or update worker attached to the given instance.
334 // The second return value is true if a new worker is created.
336 // A newly added instance has state=StateBooting if its tags match an
337 // entry in wp.creating, otherwise StateUnknown.
339 // Caller must have lock.
340 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
341 inst = tagVerifier{inst}
343 if wkr := wp.workers[id]; wkr != nil {
344 wkr.executor.SetTarget(inst)
346 wkr.updated = time.Now()
351 state := StateUnknown
352 if _, ok := wp.creating[inst.Tags()[tagKeyInstanceSecret]]; ok {
356 // If an instance has a valid IdleBehavior tag when it first
357 // appears, initialize the new worker accordingly (this is how
358 // we restore IdleBehavior that was set by a prior dispatch
359 // process); otherwise, default to "run". After this,
360 // wkr.idleBehavior is the source of truth, and will only be
361 // changed via SetIdleBehavior().
362 idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior])
363 if !validIdleBehavior[idleBehavior] {
364 idleBehavior = IdleBehaviorRun
367 logger := wp.logger.WithFields(logrus.Fields{
368 "InstanceType": it.Name,
369 "Instance": inst.ID(),
370 "Address": inst.Address(),
372 logger.WithFields(logrus.Fields{
374 "IdleBehavior": idleBehavior,
375 }).Infof("instance appeared in cloud")
381 executor: wp.newExecutor(inst),
383 idleBehavior: idleBehavior,
390 running: make(map[string]*remoteRunner),
391 starting: make(map[string]*remoteRunner),
392 probing: make(chan struct{}, 1),
398 // Shutdown shuts down a worker with the given type, or returns false
399 // if all workers with the given type are busy.
400 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
401 wp.setupOnce.Do(wp.setup)
403 defer wp.mtx.Unlock()
404 logger := wp.logger.WithField("InstanceType", it.Name)
405 logger.Info("shutdown requested")
406 for _, tryState := range []State{StateBooting, StateIdle} {
407 // TODO: shutdown the worker with the longest idle
408 // time (Idle) or the earliest create time (Booting)
409 for _, wkr := range wp.workers {
410 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
411 logger.WithField("Instance", wkr.instance).Info("shutting down")
420 // CountWorkers returns the current number of workers in each state.
422 // CountWorkers blocks, if necessary, until the initial instance list
423 // has been loaded from the cloud provider.
424 func (wp *Pool) CountWorkers() map[State]int {
425 wp.setupOnce.Do(wp.setup)
428 defer wp.mtx.Unlock()
430 for _, w := range wp.workers {
436 // Running returns the container UUIDs being prepared/run on workers.
438 // In the returned map, the time value indicates when the Pool
439 // observed that the container process had exited. A container that
440 // has not yet exited has a zero time value. The caller should use
441 // KillContainer() to garbage-collect the entries for exited
443 func (wp *Pool) Running() map[string]time.Time {
444 wp.setupOnce.Do(wp.setup)
446 defer wp.mtx.Unlock()
447 r := map[string]time.Time{}
448 for _, wkr := range wp.workers {
449 for uuid := range wkr.running {
450 r[uuid] = time.Time{}
452 for uuid := range wkr.starting {
453 r[uuid] = time.Time{}
456 for uuid, exited := range wp.exited {
462 // StartContainer starts a container on an idle worker immediately if
463 // possible, otherwise returns false.
464 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
465 wp.setupOnce.Do(wp.setup)
467 defer wp.mtx.Unlock()
469 for _, w := range wp.workers {
470 if w.instType == it && w.state == StateIdle {
471 if wkr == nil || w.busy.After(wkr.busy) {
479 wkr.startContainer(ctr)
483 // KillContainer kills the crunch-run process for the given container
484 // UUID, if it's running on any worker.
486 // KillContainer returns immediately; the act of killing the container
487 // takes some time, and runs in the background.
488 func (wp *Pool) KillContainer(uuid string, reason string) {
490 defer wp.mtx.Unlock()
491 logger := wp.logger.WithFields(logrus.Fields{
492 "ContainerUUID": uuid,
495 if _, ok := wp.exited[uuid]; ok {
496 logger.Debug("clearing placeholder for exited crunch-run process")
497 delete(wp.exited, uuid)
500 for _, wkr := range wp.workers {
501 rr := wkr.running[uuid]
503 rr = wkr.starting[uuid]
510 logger.Debug("cannot kill: already disappeared")
513 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
515 reg = prometheus.NewRegistry()
517 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
518 Namespace: "arvados",
519 Subsystem: "dispatchcloud",
520 Name: "containers_running",
521 Help: "Number of containers reported running by cloud VMs.",
523 reg.MustRegister(wp.mContainersRunning)
524 wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
525 Namespace: "arvados",
526 Subsystem: "dispatchcloud",
527 Name: "instances_total",
528 Help: "Number of cloud VMs.",
529 }, []string{"category"})
530 reg.MustRegister(wp.mInstances)
531 wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
532 Namespace: "arvados",
533 Subsystem: "dispatchcloud",
534 Name: "instances_price",
535 Help: "Price of cloud VMs.",
536 }, []string{"category"})
537 reg.MustRegister(wp.mInstancesPrice)
538 wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
539 Namespace: "arvados",
540 Subsystem: "dispatchcloud",
542 Help: "Total VCPUs on all cloud VMs.",
543 }, []string{"category"})
544 reg.MustRegister(wp.mVCPUs)
545 wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
546 Namespace: "arvados",
547 Subsystem: "dispatchcloud",
548 Name: "memory_bytes_total",
549 Help: "Total memory on all cloud VMs.",
550 }, []string{"category"})
551 reg.MustRegister(wp.mMemory)
554 func (wp *Pool) runMetrics() {
556 defer wp.Unsubscribe(ch)
563 func (wp *Pool) updateMetrics() {
565 defer wp.mtx.RUnlock()
567 instances := map[string]int64{}
568 price := map[string]float64{}
569 cpu := map[string]int64{}
570 mem := map[string]int64{}
572 for _, wkr := range wp.workers {
575 case len(wkr.running)+len(wkr.starting) > 0:
577 case wkr.idleBehavior == IdleBehaviorHold:
579 case wkr.state == StateBooting:
581 case wkr.state == StateUnknown:
587 price[cat] += wkr.instType.Price
588 cpu[cat] += int64(wkr.instType.VCPUs)
589 mem[cat] += int64(wkr.instType.RAM)
590 running += int64(len(wkr.running) + len(wkr.starting))
592 for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
593 wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat]))
594 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
595 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
596 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
598 wp.mContainersRunning.Set(float64(running))
601 func (wp *Pool) runProbes() {
602 maxPPS := wp.maxProbesPerSecond
604 maxPPS = defaultMaxProbesPerSecond
606 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
607 defer limitticker.Stop()
609 probeticker := time.NewTicker(wp.probeInterval)
610 defer probeticker.Stop()
612 workers := []cloud.InstanceID{}
613 for range probeticker.C {
614 workers = workers[:0]
616 for id, wkr := range wp.workers {
617 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
620 workers = append(workers, id)
624 for _, id := range workers {
626 wkr, ok := wp.workers[id]
629 // Deleted while we were probing
633 go wkr.ProbeAndUpdate()
637 case <-limitticker.C:
643 func (wp *Pool) runSync() {
644 // sync once immediately, then wait syncInterval, sync again,
646 timer := time.NewTimer(1)
650 err := wp.getInstancesAndSync()
652 wp.logger.WithError(err).Warn("sync failed")
654 timer.Reset(wp.syncInterval)
656 wp.logger.Debug("worker.Pool stopped")
662 // Stop synchronizing with the InstanceSet.
663 func (wp *Pool) Stop() {
664 wp.setupOnce.Do(wp.setup)
668 // Instances returns an InstanceView for each worker in the pool,
669 // summarizing its current state and recent activity.
670 func (wp *Pool) Instances() []InstanceView {
672 wp.setupOnce.Do(wp.setup)
674 for _, w := range wp.workers {
675 r = append(r, InstanceView{
676 Instance: w.instance.ID(),
677 Address: w.instance.Address(),
678 Price: w.instType.Price,
679 ArvadosInstanceType: w.instType.Name,
680 ProviderInstanceType: w.instType.ProviderType,
681 LastContainerUUID: w.lastUUID,
683 WorkerState: w.state.String(),
684 IdleBehavior: w.idleBehavior,
688 sort.Slice(r, func(i, j int) bool {
689 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
694 // KillInstance destroys a cloud VM instance. It returns an error if
695 // the given instance does not exist.
696 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
697 wkr, ok := wp.workers[id]
699 return errors.New("instance not found")
701 wkr.logger.WithField("Reason", reason).Info("shutting down")
706 func (wp *Pool) setup() {
707 wp.creating = map[string]createCall{}
708 wp.exited = map[string]time.Time{}
709 wp.workers = map[cloud.InstanceID]*worker{}
710 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
713 func (wp *Pool) notify() {
715 defer wp.mtx.RUnlock()
716 for _, send := range wp.subscribers {
718 case send <- struct{}{}:
724 func (wp *Pool) getInstancesAndSync() error {
725 wp.setupOnce.Do(wp.setup)
726 if err := wp.instanceSet.throttleInstances.Error(); err != nil {
729 wp.logger.Debug("getting instance list")
730 threshold := time.Now()
731 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
733 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
736 wp.sync(threshold, instances)
737 wp.logger.Debug("sync done")
741 // Add/remove/update workers based on instances, which was obtained
742 // from the instanceSet. However, don't clobber any other updates that
743 // already happened after threshold.
744 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
746 defer wp.mtx.Unlock()
747 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
750 for _, inst := range instances {
751 itTag := inst.Tags()[tagKeyInstanceType]
752 it, ok := wp.instanceTypes[itTag]
754 wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
757 if wkr, isNew := wp.updateWorker(inst, it); isNew {
759 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
760 wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
765 for id, wkr := range wp.workers {
766 if wkr.updated.After(threshold) {
769 logger := wp.logger.WithFields(logrus.Fields{
770 "Instance": wkr.instance.ID(),
771 "WorkerState": wkr.state,
773 logger.Info("instance disappeared in cloud")
774 delete(wp.workers, id)
782 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
790 func (wp *Pool) waitUntilLoaded() {
793 defer wp.mtx.RUnlock()
801 // Return a random string of n hexadecimal digits (n*4 random bits). n
803 func randomHex(n int) string {
804 buf := make([]byte, n/2)
805 _, err := rand.Read(buf)
809 return fmt.Sprintf("%x", buf)