1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
19 "git.arvados.org/arvados.git/lib/cloud"
20 "git.arvados.org/arvados.git/sdk/go/arvados"
21 "github.com/prometheus/client_golang/prometheus"
22 "github.com/sirupsen/logrus"
23 "golang.org/x/crypto/ssh"
27 tagKeyInstanceType = "InstanceType"
28 tagKeyIdleBehavior = "IdleBehavior"
29 tagKeyInstanceSecret = "InstanceSecret"
30 tagKeyInstanceSetID = "InstanceSetID"
33 // An InstanceView shows a worker's current state and recent activity.
34 type InstanceView struct {
35 Instance cloud.InstanceID `json:"instance"`
36 Address string `json:"address"`
37 Price float64 `json:"price"`
38 ArvadosInstanceType string `json:"arvados_instance_type"`
39 ProviderInstanceType string `json:"provider_instance_type"`
40 LastContainerUUID string `json:"last_container_uuid"`
41 LastBusy time.Time `json:"last_busy"`
42 WorkerState string `json:"worker_state"`
43 IdleBehavior IdleBehavior `json:"idle_behavior"`
46 // An Executor executes shell commands on a remote host.
47 type Executor interface {
48 // Run cmd on the current target.
49 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
51 // Use the given target for subsequent operations. The new
52 // target is the same host as the previous target, but it
53 // might return a different address and verify a different
56 // SetTarget is called frequently, and in most cases the new
57 // target will behave exactly the same as the old one. An
58 // implementation should optimize accordingly.
60 // SetTarget must not block on concurrent Execute calls.
61 SetTarget(cloud.ExecutorTarget)
67 defaultSyncInterval = time.Minute
68 defaultProbeInterval = time.Second * 10
69 defaultMaxProbesPerSecond = 10
70 defaultTimeoutIdle = time.Minute
71 defaultTimeoutBooting = time.Minute * 10
72 defaultTimeoutProbe = time.Minute * 10
73 defaultTimeoutShutdown = time.Second * 10
74 defaultTimeoutTERM = time.Minute * 2
75 defaultTimeoutSignal = time.Second * 5
77 // Time after a quota error to try again anyway, even if no
78 // instances have been shutdown.
79 quotaErrorTTL = time.Minute
81 // Time between "X failed because rate limiting" messages
82 logRateLimitErrorInterval = time.Second * 10
85 func duration(conf arvados.Duration, def time.Duration) time.Duration {
87 return time.Duration(conf)
93 // NewPool creates a Pool of workers backed by instanceSet.
95 // New instances are configured and set up according to the given
96 // cluster configuration.
97 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
100 arvClient: arvClient,
101 instanceSetID: instanceSetID,
102 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
103 newExecutor: newExecutor,
104 bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
105 runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
106 imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
107 instanceTypes: cluster.InstanceTypes,
108 maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
109 maxConcurrentNodeCreateOps: cluster.Containers.CloudVMs.MaxConcurrentNodeCreateOps,
110 probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
111 syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
112 timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
113 timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
114 timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
115 timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
116 timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
117 timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
118 installPublicKey: installPublicKey,
119 tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
120 stop: make(chan bool),
122 wp.registerMetrics(reg)
124 wp.setupOnce.Do(wp.setup)
132 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
133 // zero Pool should not be used. Call NewPool to create a new Pool.
136 logger logrus.FieldLogger
137 arvClient *arvados.Client
138 instanceSetID cloud.InstanceSetID
139 instanceSet *throttledInstanceSet
140 newExecutor func(cloud.Instance) Executor
141 bootProbeCommand string
143 imageID cloud.ImageID
144 instanceTypes map[string]arvados.InstanceType
145 syncInterval time.Duration
146 probeInterval time.Duration
147 maxProbesPerSecond int
148 maxConcurrentNodeCreateOps int
149 timeoutIdle time.Duration
150 timeoutBooting time.Duration
151 timeoutProbe time.Duration
152 timeoutShutdown time.Duration
153 timeoutTERM time.Duration
154 timeoutSignal time.Duration
155 installPublicKey ssh.PublicKey
159 subscribers map[<-chan struct{}]chan<- struct{}
160 creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
161 workers map[cloud.InstanceID]*worker
162 loaded bool // loaded list of instances from InstanceSet at least once
163 exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
164 atQuotaUntil time.Time
165 atQuotaErr cloud.QuotaError
170 runnerMD5 [md5.Size]byte
173 throttleCreate throttle
174 throttleInstances throttle
176 mContainersRunning prometheus.Gauge
177 mInstances *prometheus.GaugeVec
178 mInstancesPrice *prometheus.GaugeVec
179 mVCPUs *prometheus.GaugeVec
180 mMemory *prometheus.GaugeVec
181 mBootOutcomes *prometheus.CounterVec
182 mDisappearances *prometheus.CounterVec
185 type createCall struct {
187 instanceType arvados.InstanceType
190 func (wp *Pool) CheckHealth() error {
191 wp.setupOnce.Do(wp.setup)
192 if err := wp.loadRunnerData(); err != nil {
193 return fmt.Errorf("error loading runner binary: %s", err)
198 // Subscribe returns a buffered channel that becomes ready after any
199 // change to the pool's state that could have scheduling implications:
200 // a worker's state changes, a new worker appears, the cloud
201 // provider's API rate limiting period ends, etc.
203 // Additional events that occur while the channel is already ready
204 // will be dropped, so it is OK if the caller services the channel
209 // ch := wp.Subscribe()
210 // defer wp.Unsubscribe(ch)
217 func (wp *Pool) Subscribe() <-chan struct{} {
218 wp.setupOnce.Do(wp.setup)
220 defer wp.mtx.Unlock()
221 ch := make(chan struct{}, 1)
222 wp.subscribers[ch] = ch
226 // Unsubscribe stops sending updates to the given channel.
227 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
228 wp.setupOnce.Do(wp.setup)
230 defer wp.mtx.Unlock()
231 delete(wp.subscribers, ch)
234 // Unallocated returns the number of unallocated (creating + booting +
235 // idle + unknown) workers for each instance type. Workers in
236 // hold/drain mode are not included.
237 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
238 wp.setupOnce.Do(wp.setup)
240 defer wp.mtx.RUnlock()
241 unalloc := map[arvados.InstanceType]int{}
242 creating := map[arvados.InstanceType]int{}
243 oldestCreate := map[arvados.InstanceType]time.Time{}
244 for _, cc := range wp.creating {
245 it := cc.instanceType
247 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
248 oldestCreate[it] = cc.time
251 for _, wkr := range wp.workers {
252 // Skip workers that are not expected to become
253 // available soon. Note len(wkr.running)>0 is not
254 // redundant here: it can be true even in
256 if wkr.state == StateShutdown ||
257 wkr.state == StateRunning ||
258 wkr.idleBehavior != IdleBehaviorRun ||
259 len(wkr.running) > 0 {
264 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
265 // If up to N new workers appear in
266 // Instances() while we are waiting for N
267 // Create() calls to complete, we assume we're
268 // just seeing a race between Instances() and
269 // Create() responses.
271 // The other common reason why nodes have
272 // state==Unknown is that they appeared at
273 // startup, before any Create calls. They
274 // don't match the above timing condition, so
275 // we never mistakenly attribute them to
276 // pending Create calls.
280 for it, c := range creating {
286 type RateLimitError struct{ Retry time.Time }
288 func (e RateLimitError) Error() string {
289 return fmt.Sprintf("node creation request failed, hit maxConcurrentNodeCreateOps, wait until %s", e.Retry)
291 func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
293 // Create a new instance with the given type, and add it to the worker
294 // pool. The worker is added immediately; instance creation runs in
297 // Create returns false if a pre-existing error state prevents it from
298 // even attempting to create a new instance. Those errors are logged
299 // by the Pool, so the caller does not need to log anything in such
301 func (wp *Pool) Create(it arvados.InstanceType) bool {
302 logger := wp.logger.WithField("InstanceType", it.Name)
303 wp.setupOnce.Do(wp.setup)
304 if wp.loadRunnerData() != nil {
305 // Boot probe is certain to fail.
309 defer wp.mtx.Unlock()
310 if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
313 // The maxConcurrentNodeCreateOps knob throttles the number of node create
314 // requests in flight. It was added to work around a limitation in Azure's
315 // managed disks, which support no more than 20 concurrent node creation
316 // requests from a single disk image (cf.
317 // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
318 // The code assumes that node creation, from Azure's perspective, means the
319 // period until the instance appears in the "get all instances" list.
320 if wp.maxConcurrentNodeCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentNodeCreateOps {
321 wp.instanceSet.throttleCreate.CheckRateLimitError(RateLimitError{Retry: time.Now().Add(5 * time.Second)}, wp.logger, "create instance", wp.notify)
325 secret := randomHex(instanceSecretLength)
326 wp.creating[secret] = createCall{time: now, instanceType: it}
329 tags := cloud.InstanceTags{
330 wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID),
331 wp.tagKeyPrefix + tagKeyInstanceType: it.Name,
332 wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
333 wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
335 initCmd := TagVerifier{nil, secret}.InitCommand()
336 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
338 defer wp.mtx.Unlock()
339 // delete() is deferred so the updateWorker() call
340 // below knows to use StateBooting when adding a new
342 defer delete(wp.creating, secret)
344 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
346 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
347 time.AfterFunc(quotaErrorTTL, wp.notify)
349 logger.WithError(err).Error("create failed")
350 wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
353 wp.updateWorker(inst, it)
358 // AtQuota returns true if Create is not expected to work at the
360 func (wp *Pool) AtQuota() bool {
362 defer wp.mtx.Unlock()
363 return time.Now().Before(wp.atQuotaUntil)
366 // SetIdleBehavior determines how the indicated instance will behave
367 // when it has no containers running.
368 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
370 defer wp.mtx.Unlock()
371 wkr, ok := wp.workers[id]
373 return errors.New("requested instance does not exist")
375 wkr.setIdleBehavior(idleBehavior)
379 // Add or update worker attached to the given instance.
381 // The second return value is true if a new worker is created.
383 // A newly added instance has state=StateBooting if its tags match an
384 // entry in wp.creating, otherwise StateUnknown.
386 // Caller must have lock.
387 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
388 secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
389 inst = TagVerifier{inst, secret}
391 if wkr := wp.workers[id]; wkr != nil {
392 wkr.executor.SetTarget(inst)
394 wkr.updated = time.Now()
399 state := StateUnknown
400 if _, ok := wp.creating[secret]; ok {
404 // If an instance has a valid IdleBehavior tag when it first
405 // appears, initialize the new worker accordingly (this is how
406 // we restore IdleBehavior that was set by a prior dispatch
407 // process); otherwise, default to "run". After this,
408 // wkr.idleBehavior is the source of truth, and will only be
409 // changed via SetIdleBehavior().
410 idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
411 if !validIdleBehavior[idleBehavior] {
412 idleBehavior = IdleBehaviorRun
415 logger := wp.logger.WithFields(logrus.Fields{
416 "InstanceType": it.Name,
417 "Instance": inst.ID(),
418 "Address": inst.Address(),
420 logger.WithFields(logrus.Fields{
422 "IdleBehavior": idleBehavior,
423 }).Infof("instance appeared in cloud")
429 executor: wp.newExecutor(inst),
431 idleBehavior: idleBehavior,
438 running: make(map[string]*remoteRunner),
439 starting: make(map[string]*remoteRunner),
440 probing: make(chan struct{}, 1),
446 // Shutdown shuts down a worker with the given type, or returns false
447 // if all workers with the given type are busy.
448 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
449 wp.setupOnce.Do(wp.setup)
451 defer wp.mtx.Unlock()
452 logger := wp.logger.WithField("InstanceType", it.Name)
453 logger.Info("shutdown requested")
454 for _, tryState := range []State{StateBooting, StateIdle} {
455 // TODO: shutdown the worker with the longest idle
456 // time (Idle) or the earliest create time (Booting)
457 for _, wkr := range wp.workers {
458 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
459 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
460 wkr.reportBootOutcome(BootOutcomeAborted)
469 // CountWorkers returns the current number of workers in each state.
471 // CountWorkers blocks, if necessary, until the initial instance list
472 // has been loaded from the cloud provider.
473 func (wp *Pool) CountWorkers() map[State]int {
474 wp.setupOnce.Do(wp.setup)
477 defer wp.mtx.Unlock()
479 for _, w := range wp.workers {
485 // Running returns the container UUIDs being prepared/run on workers.
487 // In the returned map, the time value indicates when the Pool
488 // observed that the container process had exited. A container that
489 // has not yet exited has a zero time value. The caller should use
490 // ForgetContainer() to garbage-collect the entries for exited
492 func (wp *Pool) Running() map[string]time.Time {
493 wp.setupOnce.Do(wp.setup)
495 defer wp.mtx.Unlock()
496 r := map[string]time.Time{}
497 for _, wkr := range wp.workers {
498 for uuid := range wkr.running {
499 r[uuid] = time.Time{}
501 for uuid := range wkr.starting {
502 r[uuid] = time.Time{}
505 for uuid, exited := range wp.exited {
511 // StartContainer starts a container on an idle worker immediately if
512 // possible, otherwise returns false.
513 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
514 wp.setupOnce.Do(wp.setup)
516 defer wp.mtx.Unlock()
518 for _, w := range wp.workers {
519 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
520 if wkr == nil || w.busy.After(wkr.busy) {
528 wkr.startContainer(ctr)
532 // KillContainer kills the crunch-run process for the given container
533 // UUID, if it's running on any worker.
535 // KillContainer returns immediately; the act of killing the container
536 // takes some time, and runs in the background.
538 // KillContainer returns false if the container has already ended.
539 func (wp *Pool) KillContainer(uuid string, reason string) bool {
541 defer wp.mtx.Unlock()
542 logger := wp.logger.WithFields(logrus.Fields{
543 "ContainerUUID": uuid,
546 for _, wkr := range wp.workers {
547 rr := wkr.running[uuid]
549 rr = wkr.starting[uuid]
556 logger.Debug("cannot kill: already disappeared")
560 // ForgetContainer clears the placeholder for the given exited
561 // container, so it isn't returned by subsequent calls to Running().
563 // ForgetContainer has no effect if the container has not yet exited.
565 // The "container exited at time T" placeholder (which necessitates
566 // ForgetContainer) exists to make it easier for the caller
567 // (scheduler) to distinguish a container that exited without
568 // finalizing its state from a container that exited too recently for
569 // its final state to have appeared in the scheduler's queue cache.
570 func (wp *Pool) ForgetContainer(uuid string) {
572 defer wp.mtx.Unlock()
573 if _, ok := wp.exited[uuid]; ok {
574 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
575 delete(wp.exited, uuid)
579 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
581 reg = prometheus.NewRegistry()
583 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
584 Namespace: "arvados",
585 Subsystem: "dispatchcloud",
586 Name: "containers_running",
587 Help: "Number of containers reported running by cloud VMs.",
589 reg.MustRegister(wp.mContainersRunning)
590 wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
591 Namespace: "arvados",
592 Subsystem: "dispatchcloud",
593 Name: "instances_total",
594 Help: "Number of cloud VMs.",
595 }, []string{"category", "instance_type"})
596 reg.MustRegister(wp.mInstances)
597 wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
598 Namespace: "arvados",
599 Subsystem: "dispatchcloud",
600 Name: "instances_price",
601 Help: "Price of cloud VMs.",
602 }, []string{"category"})
603 reg.MustRegister(wp.mInstancesPrice)
604 wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
605 Namespace: "arvados",
606 Subsystem: "dispatchcloud",
608 Help: "Total VCPUs on all cloud VMs.",
609 }, []string{"category"})
610 reg.MustRegister(wp.mVCPUs)
611 wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
612 Namespace: "arvados",
613 Subsystem: "dispatchcloud",
614 Name: "memory_bytes_total",
615 Help: "Total memory on all cloud VMs.",
616 }, []string{"category"})
617 reg.MustRegister(wp.mMemory)
618 wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
619 Namespace: "arvados",
620 Subsystem: "dispatchcloud",
621 Name: "boot_outcomes",
622 Help: "Boot outcomes by type.",
623 }, []string{"outcome"})
624 for k := range validBootOutcomes {
625 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
627 reg.MustRegister(wp.mBootOutcomes)
628 wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
629 Namespace: "arvados",
630 Subsystem: "dispatchcloud",
631 Name: "instances_disappeared",
632 Help: "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
633 }, []string{"state"})
634 for _, v := range stateString {
635 wp.mDisappearances.WithLabelValues(v).Add(0)
637 reg.MustRegister(wp.mDisappearances)
640 func (wp *Pool) runMetrics() {
642 defer wp.Unsubscribe(ch)
649 func (wp *Pool) updateMetrics() {
651 defer wp.mtx.RUnlock()
657 instances := map[entKey]int64{}
658 price := map[string]float64{}
659 cpu := map[string]int64{}
660 mem := map[string]int64{}
662 for _, wkr := range wp.workers {
665 case len(wkr.running)+len(wkr.starting) > 0:
667 case wkr.idleBehavior == IdleBehaviorHold:
669 case wkr.state == StateBooting:
671 case wkr.state == StateUnknown:
676 instances[entKey{cat, wkr.instType.Name}]++
677 price[cat] += wkr.instType.Price
678 cpu[cat] += int64(wkr.instType.VCPUs)
679 mem[cat] += int64(wkr.instType.RAM)
680 running += int64(len(wkr.running) + len(wkr.starting))
682 for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
683 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
684 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
685 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
686 // make sure to reset gauges for non-existing category/nodetype combinations
687 for _, it := range wp.instanceTypes {
688 if _, ok := instances[entKey{cat, it.Name}]; !ok {
689 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
693 for k, v := range instances {
694 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
696 wp.mContainersRunning.Set(float64(running))
699 func (wp *Pool) runProbes() {
700 maxPPS := wp.maxProbesPerSecond
702 maxPPS = defaultMaxProbesPerSecond
704 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
705 defer limitticker.Stop()
707 probeticker := time.NewTicker(wp.probeInterval)
708 defer probeticker.Stop()
710 workers := []cloud.InstanceID{}
711 for range probeticker.C {
712 workers = workers[:0]
714 for id, wkr := range wp.workers {
715 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
718 workers = append(workers, id)
722 for _, id := range workers {
724 wkr, ok := wp.workers[id]
727 // Deleted while we were probing
731 go wkr.ProbeAndUpdate()
735 case <-limitticker.C:
741 func (wp *Pool) runSync() {
742 // sync once immediately, then wait syncInterval, sync again,
744 timer := time.NewTimer(1)
748 err := wp.getInstancesAndSync()
750 wp.logger.WithError(err).Warn("sync failed")
752 timer.Reset(wp.syncInterval)
754 wp.logger.Debug("worker.Pool stopped")
760 // Stop synchronizing with the InstanceSet.
761 func (wp *Pool) Stop() {
762 wp.setupOnce.Do(wp.setup)
766 // Instances returns an InstanceView for each worker in the pool,
767 // summarizing its current state and recent activity.
768 func (wp *Pool) Instances() []InstanceView {
770 wp.setupOnce.Do(wp.setup)
772 for _, w := range wp.workers {
773 r = append(r, InstanceView{
774 Instance: w.instance.ID(),
775 Address: w.instance.Address(),
776 Price: w.instType.Price,
777 ArvadosInstanceType: w.instType.Name,
778 ProviderInstanceType: w.instType.ProviderType,
779 LastContainerUUID: w.lastUUID,
781 WorkerState: w.state.String(),
782 IdleBehavior: w.idleBehavior,
786 sort.Slice(r, func(i, j int) bool {
787 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
792 // KillInstance destroys a cloud VM instance. It returns an error if
793 // the given instance does not exist.
794 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
795 wkr, ok := wp.workers[id]
797 return errors.New("instance not found")
799 wkr.logger.WithField("Reason", reason).Info("shutting down")
800 wkr.reportBootOutcome(BootOutcomeAborted)
805 func (wp *Pool) setup() {
806 wp.creating = map[string]createCall{}
807 wp.exited = map[string]time.Time{}
808 wp.workers = map[cloud.InstanceID]*worker{}
809 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
813 // Load the runner program to be deployed on worker nodes into
814 // wp.runnerData, if necessary. Errors are logged.
816 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
818 // Caller must not have lock.
819 func (wp *Pool) loadRunnerData() error {
821 defer wp.mtx.Unlock()
822 if wp.runnerData != nil {
824 } else if wp.runnerSource == "" {
825 wp.runnerCmd = "crunch-run"
826 wp.runnerData = []byte{}
829 logger := wp.logger.WithField("source", wp.runnerSource)
830 logger.Debug("loading runner")
831 buf, err := ioutil.ReadFile(wp.runnerSource)
833 logger.WithError(err).Error("failed to load runner program")
837 wp.runnerMD5 = md5.Sum(buf)
838 wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
842 func (wp *Pool) notify() {
844 defer wp.mtx.RUnlock()
845 for _, send := range wp.subscribers {
847 case send <- struct{}{}:
853 func (wp *Pool) getInstancesAndSync() error {
854 wp.setupOnce.Do(wp.setup)
855 if err := wp.instanceSet.throttleInstances.Error(); err != nil {
858 wp.logger.Debug("getting instance list")
859 threshold := time.Now()
860 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
862 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
865 wp.sync(threshold, instances)
866 wp.logger.Debug("sync done")
870 // Add/remove/update workers based on instances, which was obtained
871 // from the instanceSet. However, don't clobber any other updates that
872 // already happened after threshold.
873 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
875 defer wp.mtx.Unlock()
876 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
879 for _, inst := range instances {
880 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
881 it, ok := wp.instanceTypes[itTag]
883 wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
886 if wkr, isNew := wp.updateWorker(inst, it); isNew {
888 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
889 wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
894 for id, wkr := range wp.workers {
895 if wkr.updated.After(threshold) {
898 logger := wp.logger.WithFields(logrus.Fields{
899 "Instance": wkr.instance.ID(),
900 "WorkerState": wkr.state,
902 logger.Info("instance disappeared in cloud")
903 wkr.reportBootOutcome(BootOutcomeDisappeared)
904 if wp.mDisappearances != nil {
905 wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
907 delete(wp.workers, id)
915 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
923 func (wp *Pool) waitUntilLoaded() {
926 defer wp.mtx.RUnlock()
934 // Return a random string of n hexadecimal digits (n*4 random bits). n
936 func randomHex(n int) string {
937 buf := make([]byte, n/2)
938 _, err := rand.Read(buf)
942 return fmt.Sprintf("%x", buf)