1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
17 "git.curoverse.com/arvados.git/lib/cloud"
18 "git.curoverse.com/arvados.git/sdk/go/arvados"
19 "github.com/prometheus/client_golang/prometheus"
20 "github.com/sirupsen/logrus"
21 "golang.org/x/crypto/ssh"
25 tagKeyInstanceType = "InstanceType"
26 tagKeyIdleBehavior = "IdleBehavior"
27 tagKeyInstanceSecret = "InstanceSecret"
28 tagKeyInstanceSetID = "InstanceSetID"
31 // An InstanceView shows a worker's current state and recent activity.
32 type InstanceView struct {
33 Instance cloud.InstanceID `json:"instance"`
34 Address string `json:"address"`
35 Price float64 `json:"price"`
36 ArvadosInstanceType string `json:"arvados_instance_type"`
37 ProviderInstanceType string `json:"provider_instance_type"`
38 LastContainerUUID string `json:"last_container_uuid"`
39 LastBusy time.Time `json:"last_busy"`
40 WorkerState string `json:"worker_state"`
41 IdleBehavior IdleBehavior `json:"idle_behavior"`
44 // An Executor executes shell commands on a remote host.
45 type Executor interface {
46 // Run cmd on the current target.
47 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
49 // Use the given target for subsequent operations. The new
50 // target is the same host as the previous target, but it
51 // might return a different address and verify a different
54 // SetTarget is called frequently, and in most cases the new
55 // target will behave exactly the same as the old one. An
56 // implementation should optimize accordingly.
58 // SetTarget must not block on concurrent Execute calls.
59 SetTarget(cloud.ExecutorTarget)
65 defaultSyncInterval = time.Minute
66 defaultProbeInterval = time.Second * 10
67 defaultMaxProbesPerSecond = 10
68 defaultTimeoutIdle = time.Minute
69 defaultTimeoutBooting = time.Minute * 10
70 defaultTimeoutProbe = time.Minute * 10
71 defaultTimeoutShutdown = time.Second * 10
72 defaultTimeoutTERM = time.Minute * 2
73 defaultTimeoutSignal = time.Second * 5
75 // Time after a quota error to try again anyway, even if no
76 // instances have been shutdown.
77 quotaErrorTTL = time.Minute
79 // Time between "X failed because rate limiting" messages
80 logRateLimitErrorInterval = time.Second * 10
83 func duration(conf arvados.Duration, def time.Duration) time.Duration {
85 return time.Duration(conf)
91 // NewPool creates a Pool of workers backed by instanceSet.
93 // New instances are configured and set up according to the given
94 // cluster configuration.
95 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
99 instanceSetID: instanceSetID,
100 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
101 newExecutor: newExecutor,
102 bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
103 imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
104 instanceTypes: cluster.InstanceTypes,
105 maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
106 probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
107 syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
108 timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
109 timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
110 timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
111 timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
112 timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
113 timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
114 installPublicKey: installPublicKey,
115 tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
116 stop: make(chan bool),
118 wp.registerMetrics(reg)
120 wp.setupOnce.Do(wp.setup)
128 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
129 // zero Pool should not be used. Call NewPool to create a new Pool.
132 logger logrus.FieldLogger
133 arvClient *arvados.Client
134 instanceSetID cloud.InstanceSetID
135 instanceSet *throttledInstanceSet
136 newExecutor func(cloud.Instance) Executor
137 bootProbeCommand string
138 imageID cloud.ImageID
139 instanceTypes map[string]arvados.InstanceType
140 syncInterval time.Duration
141 probeInterval time.Duration
142 maxProbesPerSecond int
143 timeoutIdle time.Duration
144 timeoutBooting time.Duration
145 timeoutProbe time.Duration
146 timeoutShutdown time.Duration
147 timeoutTERM time.Duration
148 timeoutSignal time.Duration
149 installPublicKey ssh.PublicKey
153 subscribers map[<-chan struct{}]chan<- struct{}
154 creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
155 workers map[cloud.InstanceID]*worker
156 loaded bool // loaded list of instances from InstanceSet at least once
157 exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
158 atQuotaUntil time.Time
159 atQuotaErr cloud.QuotaError
164 throttleCreate throttle
165 throttleInstances throttle
167 mContainersRunning prometheus.Gauge
168 mInstances *prometheus.GaugeVec
169 mInstancesPrice *prometheus.GaugeVec
170 mVCPUs *prometheus.GaugeVec
171 mMemory *prometheus.GaugeVec
174 type createCall struct {
176 instanceType arvados.InstanceType
179 // Subscribe returns a buffered channel that becomes ready after any
180 // change to the pool's state that could have scheduling implications:
181 // a worker's state changes, a new worker appears, the cloud
182 // provider's API rate limiting period ends, etc.
184 // Additional events that occur while the channel is already ready
185 // will be dropped, so it is OK if the caller services the channel
190 // ch := wp.Subscribe()
191 // defer wp.Unsubscribe(ch)
198 func (wp *Pool) Subscribe() <-chan struct{} {
199 wp.setupOnce.Do(wp.setup)
201 defer wp.mtx.Unlock()
202 ch := make(chan struct{}, 1)
203 wp.subscribers[ch] = ch
207 // Unsubscribe stops sending updates to the given channel.
208 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
209 wp.setupOnce.Do(wp.setup)
211 defer wp.mtx.Unlock()
212 delete(wp.subscribers, ch)
215 // Unallocated returns the number of unallocated (creating + booting +
216 // idle + unknown) workers for each instance type. Workers in
217 // hold/drain mode are not included.
218 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
219 wp.setupOnce.Do(wp.setup)
221 defer wp.mtx.RUnlock()
222 unalloc := map[arvados.InstanceType]int{}
223 creating := map[arvados.InstanceType]int{}
224 oldestCreate := map[arvados.InstanceType]time.Time{}
225 for _, cc := range wp.creating {
226 it := cc.instanceType
228 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
229 oldestCreate[it] = cc.time
232 for _, wkr := range wp.workers {
233 // Skip workers that are not expected to become
234 // available soon. Note len(wkr.running)>0 is not
235 // redundant here: it can be true even in
237 if wkr.state == StateShutdown ||
238 wkr.state == StateRunning ||
239 wkr.idleBehavior != IdleBehaviorRun ||
240 len(wkr.running) > 0 {
245 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
246 // If up to N new workers appear in
247 // Instances() while we are waiting for N
248 // Create() calls to complete, we assume we're
249 // just seeing a race between Instances() and
250 // Create() responses.
252 // The other common reason why nodes have
253 // state==Unknown is that they appeared at
254 // startup, before any Create calls. They
255 // don't match the above timing condition, so
256 // we never mistakenly attribute them to
257 // pending Create calls.
261 for it, c := range creating {
267 // Create a new instance with the given type, and add it to the worker
268 // pool. The worker is added immediately; instance creation runs in
271 // Create returns false if a pre-existing error state prevents it from
272 // even attempting to create a new instance. Those errors are logged
273 // by the Pool, so the caller does not need to log anything in such
275 func (wp *Pool) Create(it arvados.InstanceType) bool {
276 logger := wp.logger.WithField("InstanceType", it.Name)
277 wp.setupOnce.Do(wp.setup)
279 defer wp.mtx.Unlock()
280 if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
284 secret := randomHex(instanceSecretLength)
285 wp.creating[secret] = createCall{time: now, instanceType: it}
288 tags := cloud.InstanceTags{
289 wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID),
290 wp.tagKeyPrefix + tagKeyInstanceType: it.Name,
291 wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
292 wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
294 initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename))
295 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
297 defer wp.mtx.Unlock()
298 // delete() is deferred so the updateWorker() call
299 // below knows to use StateBooting when adding a new
301 defer delete(wp.creating, secret)
303 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
305 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
306 time.AfterFunc(quotaErrorTTL, wp.notify)
308 logger.WithError(err).Error("create failed")
309 wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
312 wp.updateWorker(inst, it)
317 // AtQuota returns true if Create is not expected to work at the
319 func (wp *Pool) AtQuota() bool {
321 defer wp.mtx.Unlock()
322 return time.Now().Before(wp.atQuotaUntil)
325 // SetIdleBehavior determines how the indicated instance will behave
326 // when it has no containers running.
327 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
329 defer wp.mtx.Unlock()
330 wkr, ok := wp.workers[id]
332 return errors.New("requested instance does not exist")
334 wkr.setIdleBehavior(idleBehavior)
338 // Add or update worker attached to the given instance.
340 // The second return value is true if a new worker is created.
342 // A newly added instance has state=StateBooting if its tags match an
343 // entry in wp.creating, otherwise StateUnknown.
345 // Caller must have lock.
346 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
347 secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
348 inst = tagVerifier{inst, secret}
350 if wkr := wp.workers[id]; wkr != nil {
351 wkr.executor.SetTarget(inst)
353 wkr.updated = time.Now()
358 state := StateUnknown
359 if _, ok := wp.creating[secret]; ok {
363 // If an instance has a valid IdleBehavior tag when it first
364 // appears, initialize the new worker accordingly (this is how
365 // we restore IdleBehavior that was set by a prior dispatch
366 // process); otherwise, default to "run". After this,
367 // wkr.idleBehavior is the source of truth, and will only be
368 // changed via SetIdleBehavior().
369 idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
370 if !validIdleBehavior[idleBehavior] {
371 idleBehavior = IdleBehaviorRun
374 logger := wp.logger.WithFields(logrus.Fields{
375 "InstanceType": it.Name,
376 "Instance": inst.ID(),
377 "Address": inst.Address(),
379 logger.WithFields(logrus.Fields{
381 "IdleBehavior": idleBehavior,
382 }).Infof("instance appeared in cloud")
388 executor: wp.newExecutor(inst),
390 idleBehavior: idleBehavior,
397 running: make(map[string]*remoteRunner),
398 starting: make(map[string]*remoteRunner),
399 probing: make(chan struct{}, 1),
405 // Shutdown shuts down a worker with the given type, or returns false
406 // if all workers with the given type are busy.
407 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
408 wp.setupOnce.Do(wp.setup)
410 defer wp.mtx.Unlock()
411 logger := wp.logger.WithField("InstanceType", it.Name)
412 logger.Info("shutdown requested")
413 for _, tryState := range []State{StateBooting, StateIdle} {
414 // TODO: shutdown the worker with the longest idle
415 // time (Idle) or the earliest create time (Booting)
416 for _, wkr := range wp.workers {
417 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
418 logger.WithField("Instance", wkr.instance).Info("shutting down")
427 // CountWorkers returns the current number of workers in each state.
429 // CountWorkers blocks, if necessary, until the initial instance list
430 // has been loaded from the cloud provider.
431 func (wp *Pool) CountWorkers() map[State]int {
432 wp.setupOnce.Do(wp.setup)
435 defer wp.mtx.Unlock()
437 for _, w := range wp.workers {
443 // Running returns the container UUIDs being prepared/run on workers.
445 // In the returned map, the time value indicates when the Pool
446 // observed that the container process had exited. A container that
447 // has not yet exited has a zero time value. The caller should use
448 // KillContainer() to garbage-collect the entries for exited
450 func (wp *Pool) Running() map[string]time.Time {
451 wp.setupOnce.Do(wp.setup)
453 defer wp.mtx.Unlock()
454 r := map[string]time.Time{}
455 for _, wkr := range wp.workers {
456 for uuid := range wkr.running {
457 r[uuid] = time.Time{}
459 for uuid := range wkr.starting {
460 r[uuid] = time.Time{}
463 for uuid, exited := range wp.exited {
469 // StartContainer starts a container on an idle worker immediately if
470 // possible, otherwise returns false.
471 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
472 wp.setupOnce.Do(wp.setup)
474 defer wp.mtx.Unlock()
476 for _, w := range wp.workers {
477 if w.instType == it && w.state == StateIdle {
478 if wkr == nil || w.busy.After(wkr.busy) {
486 wkr.startContainer(ctr)
490 // KillContainer kills the crunch-run process for the given container
491 // UUID, if it's running on any worker.
493 // KillContainer returns immediately; the act of killing the container
494 // takes some time, and runs in the background.
495 func (wp *Pool) KillContainer(uuid string, reason string) {
497 defer wp.mtx.Unlock()
498 logger := wp.logger.WithFields(logrus.Fields{
499 "ContainerUUID": uuid,
502 if _, ok := wp.exited[uuid]; ok {
503 logger.Debug("clearing placeholder for exited crunch-run process")
504 delete(wp.exited, uuid)
507 for _, wkr := range wp.workers {
508 rr := wkr.running[uuid]
510 rr = wkr.starting[uuid]
517 logger.Debug("cannot kill: already disappeared")
520 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
522 reg = prometheus.NewRegistry()
524 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
525 Namespace: "arvados",
526 Subsystem: "dispatchcloud",
527 Name: "containers_running",
528 Help: "Number of containers reported running by cloud VMs.",
530 reg.MustRegister(wp.mContainersRunning)
531 wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
532 Namespace: "arvados",
533 Subsystem: "dispatchcloud",
534 Name: "instances_total",
535 Help: "Number of cloud VMs.",
536 }, []string{"category"})
537 reg.MustRegister(wp.mInstances)
538 wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
539 Namespace: "arvados",
540 Subsystem: "dispatchcloud",
541 Name: "instances_price",
542 Help: "Price of cloud VMs.",
543 }, []string{"category"})
544 reg.MustRegister(wp.mInstancesPrice)
545 wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
546 Namespace: "arvados",
547 Subsystem: "dispatchcloud",
549 Help: "Total VCPUs on all cloud VMs.",
550 }, []string{"category"})
551 reg.MustRegister(wp.mVCPUs)
552 wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
553 Namespace: "arvados",
554 Subsystem: "dispatchcloud",
555 Name: "memory_bytes_total",
556 Help: "Total memory on all cloud VMs.",
557 }, []string{"category"})
558 reg.MustRegister(wp.mMemory)
561 func (wp *Pool) runMetrics() {
563 defer wp.Unsubscribe(ch)
570 func (wp *Pool) updateMetrics() {
572 defer wp.mtx.RUnlock()
574 instances := map[string]int64{}
575 price := map[string]float64{}
576 cpu := map[string]int64{}
577 mem := map[string]int64{}
579 for _, wkr := range wp.workers {
582 case len(wkr.running)+len(wkr.starting) > 0:
584 case wkr.idleBehavior == IdleBehaviorHold:
586 case wkr.state == StateBooting:
588 case wkr.state == StateUnknown:
594 price[cat] += wkr.instType.Price
595 cpu[cat] += int64(wkr.instType.VCPUs)
596 mem[cat] += int64(wkr.instType.RAM)
597 running += int64(len(wkr.running) + len(wkr.starting))
599 for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
600 wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat]))
601 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
602 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
603 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
605 wp.mContainersRunning.Set(float64(running))
608 func (wp *Pool) runProbes() {
609 maxPPS := wp.maxProbesPerSecond
611 maxPPS = defaultMaxProbesPerSecond
613 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
614 defer limitticker.Stop()
616 probeticker := time.NewTicker(wp.probeInterval)
617 defer probeticker.Stop()
619 workers := []cloud.InstanceID{}
620 for range probeticker.C {
621 workers = workers[:0]
623 for id, wkr := range wp.workers {
624 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
627 workers = append(workers, id)
631 for _, id := range workers {
633 wkr, ok := wp.workers[id]
636 // Deleted while we were probing
640 go wkr.ProbeAndUpdate()
644 case <-limitticker.C:
650 func (wp *Pool) runSync() {
651 // sync once immediately, then wait syncInterval, sync again,
653 timer := time.NewTimer(1)
657 err := wp.getInstancesAndSync()
659 wp.logger.WithError(err).Warn("sync failed")
661 timer.Reset(wp.syncInterval)
663 wp.logger.Debug("worker.Pool stopped")
669 // Stop synchronizing with the InstanceSet.
670 func (wp *Pool) Stop() {
671 wp.setupOnce.Do(wp.setup)
675 // Instances returns an InstanceView for each worker in the pool,
676 // summarizing its current state and recent activity.
677 func (wp *Pool) Instances() []InstanceView {
679 wp.setupOnce.Do(wp.setup)
681 for _, w := range wp.workers {
682 r = append(r, InstanceView{
683 Instance: w.instance.ID(),
684 Address: w.instance.Address(),
685 Price: w.instType.Price,
686 ArvadosInstanceType: w.instType.Name,
687 ProviderInstanceType: w.instType.ProviderType,
688 LastContainerUUID: w.lastUUID,
690 WorkerState: w.state.String(),
691 IdleBehavior: w.idleBehavior,
695 sort.Slice(r, func(i, j int) bool {
696 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
701 // KillInstance destroys a cloud VM instance. It returns an error if
702 // the given instance does not exist.
703 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
704 wkr, ok := wp.workers[id]
706 return errors.New("instance not found")
708 wkr.logger.WithField("Reason", reason).Info("shutting down")
713 func (wp *Pool) setup() {
714 wp.creating = map[string]createCall{}
715 wp.exited = map[string]time.Time{}
716 wp.workers = map[cloud.InstanceID]*worker{}
717 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
720 func (wp *Pool) notify() {
722 defer wp.mtx.RUnlock()
723 for _, send := range wp.subscribers {
725 case send <- struct{}{}:
731 func (wp *Pool) getInstancesAndSync() error {
732 wp.setupOnce.Do(wp.setup)
733 if err := wp.instanceSet.throttleInstances.Error(); err != nil {
736 wp.logger.Debug("getting instance list")
737 threshold := time.Now()
738 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
740 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
743 wp.sync(threshold, instances)
744 wp.logger.Debug("sync done")
748 // Add/remove/update workers based on instances, which was obtained
749 // from the instanceSet. However, don't clobber any other updates that
750 // already happened after threshold.
751 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
753 defer wp.mtx.Unlock()
754 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
757 for _, inst := range instances {
758 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
759 it, ok := wp.instanceTypes[itTag]
761 wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
764 if wkr, isNew := wp.updateWorker(inst, it); isNew {
766 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
767 wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
772 for id, wkr := range wp.workers {
773 if wkr.updated.After(threshold) {
776 logger := wp.logger.WithFields(logrus.Fields{
777 "Instance": wkr.instance.ID(),
778 "WorkerState": wkr.state,
780 logger.Info("instance disappeared in cloud")
781 delete(wp.workers, id)
789 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
797 func (wp *Pool) waitUntilLoaded() {
800 defer wp.mtx.RUnlock()
808 // Return a random string of n hexadecimal digits (n*4 random bits). n
810 func randomHex(n int) string {
811 buf := make([]byte, n/2)
812 _, err := rand.Read(buf)
816 return fmt.Sprintf("%x", buf)