1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
19 "git.arvados.org/arvados.git/lib/cloud"
20 "git.arvados.org/arvados.git/sdk/go/arvados"
21 "github.com/prometheus/client_golang/prometheus"
22 "github.com/sirupsen/logrus"
23 "golang.org/x/crypto/ssh"
27 tagKeyInstanceType = "InstanceType"
28 tagKeyIdleBehavior = "IdleBehavior"
29 tagKeyInstanceSecret = "InstanceSecret"
30 tagKeyInstanceSetID = "InstanceSetID"
33 // An InstanceView shows a worker's current state and recent activity.
34 type InstanceView struct {
35 Instance cloud.InstanceID `json:"instance"`
36 Address string `json:"address"`
37 Price float64 `json:"price"`
38 ArvadosInstanceType string `json:"arvados_instance_type"`
39 ProviderInstanceType string `json:"provider_instance_type"`
40 LastContainerUUID string `json:"last_container_uuid"`
41 LastBusy time.Time `json:"last_busy"`
42 WorkerState string `json:"worker_state"`
43 IdleBehavior IdleBehavior `json:"idle_behavior"`
46 // An Executor executes shell commands on a remote host.
47 type Executor interface {
48 // Run cmd on the current target.
49 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
51 // Use the given target for subsequent operations. The new
52 // target is the same host as the previous target, but it
53 // might return a different address and verify a different
56 // SetTarget is called frequently, and in most cases the new
57 // target will behave exactly the same as the old one. An
58 // implementation should optimize accordingly.
60 // SetTarget must not block on concurrent Execute calls.
61 SetTarget(cloud.ExecutorTarget)
67 defaultSyncInterval = time.Minute
68 defaultProbeInterval = time.Second * 10
69 defaultMaxProbesPerSecond = 10
70 defaultTimeoutIdle = time.Minute
71 defaultTimeoutBooting = time.Minute * 10
72 defaultTimeoutProbe = time.Minute * 10
73 defaultTimeoutShutdown = time.Second * 10
74 defaultTimeoutTERM = time.Minute * 2
75 defaultTimeoutSignal = time.Second * 5
77 // Time after a quota error to try again anyway, even if no
78 // instances have been shutdown.
79 quotaErrorTTL = time.Minute
81 // Time between "X failed because rate limiting" messages
82 logRateLimitErrorInterval = time.Second * 10
85 func duration(conf arvados.Duration, def time.Duration) time.Duration {
87 return time.Duration(conf)
93 // NewPool creates a Pool of workers backed by instanceSet.
95 // New instances are configured and set up according to the given
96 // cluster configuration.
97 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
100 arvClient: arvClient,
101 instanceSetID: instanceSetID,
102 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
103 newExecutor: newExecutor,
104 bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
105 runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
106 imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
107 instanceTypes: cluster.InstanceTypes,
108 maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
109 maxConcurrentNodeCreateOps: cluster.Containers.CloudVMs.MaxConcurrentNodeCreateOps,
110 probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
111 syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
112 timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
113 timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
114 timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
115 timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
116 timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
117 timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
118 installPublicKey: installPublicKey,
119 tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
120 stop: make(chan bool),
122 wp.registerMetrics(reg)
124 wp.setupOnce.Do(wp.setup)
132 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
133 // zero Pool should not be used. Call NewPool to create a new Pool.
136 logger logrus.FieldLogger
137 arvClient *arvados.Client
138 instanceSetID cloud.InstanceSetID
139 instanceSet *throttledInstanceSet
140 newExecutor func(cloud.Instance) Executor
141 bootProbeCommand string
143 imageID cloud.ImageID
144 instanceTypes map[string]arvados.InstanceType
145 syncInterval time.Duration
146 probeInterval time.Duration
147 maxProbesPerSecond int
148 maxConcurrentNodeCreateOps int
149 timeoutIdle time.Duration
150 timeoutBooting time.Duration
151 timeoutProbe time.Duration
152 timeoutShutdown time.Duration
153 timeoutTERM time.Duration
154 timeoutSignal time.Duration
155 installPublicKey ssh.PublicKey
159 subscribers map[<-chan struct{}]chan<- struct{}
160 creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
161 workers map[cloud.InstanceID]*worker
162 loaded bool // loaded list of instances from InstanceSet at least once
163 exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
164 atQuotaUntil time.Time
165 atQuotaErr cloud.QuotaError
170 runnerMD5 [md5.Size]byte
173 mContainersRunning prometheus.Gauge
174 mInstances *prometheus.GaugeVec
175 mInstancesPrice *prometheus.GaugeVec
176 mVCPUs *prometheus.GaugeVec
177 mMemory *prometheus.GaugeVec
178 mBootOutcomes *prometheus.CounterVec
179 mDisappearances *prometheus.CounterVec
182 type createCall struct {
184 instanceType arvados.InstanceType
187 func (wp *Pool) CheckHealth() error {
188 wp.setupOnce.Do(wp.setup)
189 if err := wp.loadRunnerData(); err != nil {
190 return fmt.Errorf("error loading runner binary: %s", err)
195 // Subscribe returns a buffered channel that becomes ready after any
196 // change to the pool's state that could have scheduling implications:
197 // a worker's state changes, a new worker appears, the cloud
198 // provider's API rate limiting period ends, etc.
200 // Additional events that occur while the channel is already ready
201 // will be dropped, so it is OK if the caller services the channel
206 // ch := wp.Subscribe()
207 // defer wp.Unsubscribe(ch)
214 func (wp *Pool) Subscribe() <-chan struct{} {
215 wp.setupOnce.Do(wp.setup)
217 defer wp.mtx.Unlock()
218 ch := make(chan struct{}, 1)
219 wp.subscribers[ch] = ch
223 // Unsubscribe stops sending updates to the given channel.
224 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
225 wp.setupOnce.Do(wp.setup)
227 defer wp.mtx.Unlock()
228 delete(wp.subscribers, ch)
231 // Unallocated returns the number of unallocated (creating + booting +
232 // idle + unknown) workers for each instance type. Workers in
233 // hold/drain mode are not included.
234 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
235 wp.setupOnce.Do(wp.setup)
237 defer wp.mtx.RUnlock()
238 unalloc := map[arvados.InstanceType]int{}
239 creating := map[arvados.InstanceType]int{}
240 oldestCreate := map[arvados.InstanceType]time.Time{}
241 for _, cc := range wp.creating {
242 it := cc.instanceType
244 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
245 oldestCreate[it] = cc.time
248 for _, wkr := range wp.workers {
249 // Skip workers that are not expected to become
250 // available soon. Note len(wkr.running)>0 is not
251 // redundant here: it can be true even in
253 if wkr.state == StateShutdown ||
254 wkr.state == StateRunning ||
255 wkr.idleBehavior != IdleBehaviorRun ||
256 len(wkr.running) > 0 {
261 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
262 // If up to N new workers appear in
263 // Instances() while we are waiting for N
264 // Create() calls to complete, we assume we're
265 // just seeing a race between Instances() and
266 // Create() responses.
268 // The other common reason why nodes have
269 // state==Unknown is that they appeared at
270 // startup, before any Create calls. They
271 // don't match the above timing condition, so
272 // we never mistakenly attribute them to
273 // pending Create calls.
277 for it, c := range creating {
283 type RateLimitError struct{ Retry time.Time }
285 func (e RateLimitError) Error() string {
286 return fmt.Sprintf("node creation request failed, hit maxConcurrentNodeCreateOps, wait until %s", e.Retry)
288 func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
290 // Create a new instance with the given type, and add it to the worker
291 // pool. The worker is added immediately; instance creation runs in
294 // Create returns false if a pre-existing error state prevents it from
295 // even attempting to create a new instance. Those errors are logged
296 // by the Pool, so the caller does not need to log anything in such
298 func (wp *Pool) Create(it arvados.InstanceType) bool {
299 logger := wp.logger.WithField("InstanceType", it.Name)
300 wp.setupOnce.Do(wp.setup)
301 if wp.loadRunnerData() != nil {
302 // Boot probe is certain to fail.
306 defer wp.mtx.Unlock()
307 if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil {
310 // The maxConcurrentNodeCreateOps knob throttles the number of node create
311 // requests in flight. It was added to work around a limitation in Azure's
312 // managed disks, which support no more than 20 concurrent node creation
313 // requests from a single disk image (cf.
314 // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
315 // The code assumes that node creation, from Azure's perspective, means the
316 // period until the instance appears in the "get all instances" list.
317 if wp.maxConcurrentNodeCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentNodeCreateOps {
318 wp.instanceSet.throttleCreate.CheckRateLimitError(RateLimitError{Retry: time.Now().Add(5 * time.Second)}, wp.logger, "create instance", wp.notify)
322 secret := randomHex(instanceSecretLength)
323 wp.creating[secret] = createCall{time: now, instanceType: it}
326 tags := cloud.InstanceTags{
327 wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID),
328 wp.tagKeyPrefix + tagKeyInstanceType: it.Name,
329 wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
330 wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
332 initCmd := TagVerifier{nil, secret}.InitCommand()
333 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
335 defer wp.mtx.Unlock()
336 // delete() is deferred so the updateWorker() call
337 // below knows to use StateBooting when adding a new
339 defer delete(wp.creating, secret)
341 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
343 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
344 time.AfterFunc(quotaErrorTTL, wp.notify)
346 logger.WithError(err).Error("create failed")
347 wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
350 wp.updateWorker(inst, it)
355 // AtQuota returns true if Create is not expected to work at the
357 func (wp *Pool) AtQuota() bool {
359 defer wp.mtx.Unlock()
360 return time.Now().Before(wp.atQuotaUntil)
363 // SetIdleBehavior determines how the indicated instance will behave
364 // when it has no containers running.
365 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
367 defer wp.mtx.Unlock()
368 wkr, ok := wp.workers[id]
370 return errors.New("requested instance does not exist")
372 wkr.setIdleBehavior(idleBehavior)
376 // Add or update worker attached to the given instance.
378 // The second return value is true if a new worker is created.
380 // A newly added instance has state=StateBooting if its tags match an
381 // entry in wp.creating, otherwise StateUnknown.
383 // Caller must have lock.
384 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
385 secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
386 inst = TagVerifier{inst, secret}
388 if wkr := wp.workers[id]; wkr != nil {
389 wkr.executor.SetTarget(inst)
391 wkr.updated = time.Now()
396 state := StateUnknown
397 if _, ok := wp.creating[secret]; ok {
401 // If an instance has a valid IdleBehavior tag when it first
402 // appears, initialize the new worker accordingly (this is how
403 // we restore IdleBehavior that was set by a prior dispatch
404 // process); otherwise, default to "run". After this,
405 // wkr.idleBehavior is the source of truth, and will only be
406 // changed via SetIdleBehavior().
407 idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
408 if !validIdleBehavior[idleBehavior] {
409 idleBehavior = IdleBehaviorRun
412 logger := wp.logger.WithFields(logrus.Fields{
413 "InstanceType": it.Name,
414 "Instance": inst.ID(),
415 "Address": inst.Address(),
417 logger.WithFields(logrus.Fields{
419 "IdleBehavior": idleBehavior,
420 }).Infof("instance appeared in cloud")
426 executor: wp.newExecutor(inst),
428 idleBehavior: idleBehavior,
435 running: make(map[string]*remoteRunner),
436 starting: make(map[string]*remoteRunner),
437 probing: make(chan struct{}, 1),
443 // Shutdown shuts down a worker with the given type, or returns false
444 // if all workers with the given type are busy.
445 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
446 wp.setupOnce.Do(wp.setup)
448 defer wp.mtx.Unlock()
449 logger := wp.logger.WithField("InstanceType", it.Name)
450 logger.Info("shutdown requested")
451 for _, tryState := range []State{StateBooting, StateIdle} {
452 // TODO: shutdown the worker with the longest idle
453 // time (Idle) or the earliest create time (Booting)
454 for _, wkr := range wp.workers {
455 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
456 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
457 wkr.reportBootOutcome(BootOutcomeAborted)
466 // CountWorkers returns the current number of workers in each state.
468 // CountWorkers blocks, if necessary, until the initial instance list
469 // has been loaded from the cloud provider.
470 func (wp *Pool) CountWorkers() map[State]int {
471 wp.setupOnce.Do(wp.setup)
474 defer wp.mtx.Unlock()
476 for _, w := range wp.workers {
482 // Running returns the container UUIDs being prepared/run on workers.
484 // In the returned map, the time value indicates when the Pool
485 // observed that the container process had exited. A container that
486 // has not yet exited has a zero time value. The caller should use
487 // ForgetContainer() to garbage-collect the entries for exited
489 func (wp *Pool) Running() map[string]time.Time {
490 wp.setupOnce.Do(wp.setup)
492 defer wp.mtx.Unlock()
493 r := map[string]time.Time{}
494 for _, wkr := range wp.workers {
495 for uuid := range wkr.running {
496 r[uuid] = time.Time{}
498 for uuid := range wkr.starting {
499 r[uuid] = time.Time{}
502 for uuid, exited := range wp.exited {
508 // StartContainer starts a container on an idle worker immediately if
509 // possible, otherwise returns false.
510 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
511 wp.setupOnce.Do(wp.setup)
513 defer wp.mtx.Unlock()
515 for _, w := range wp.workers {
516 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
517 if wkr == nil || w.busy.After(wkr.busy) {
525 wkr.startContainer(ctr)
529 // KillContainer kills the crunch-run process for the given container
530 // UUID, if it's running on any worker.
532 // KillContainer returns immediately; the act of killing the container
533 // takes some time, and runs in the background.
535 // KillContainer returns false if the container has already ended.
536 func (wp *Pool) KillContainer(uuid string, reason string) bool {
538 defer wp.mtx.Unlock()
539 logger := wp.logger.WithFields(logrus.Fields{
540 "ContainerUUID": uuid,
543 for _, wkr := range wp.workers {
544 rr := wkr.running[uuid]
546 rr = wkr.starting[uuid]
553 logger.Debug("cannot kill: already disappeared")
557 // ForgetContainer clears the placeholder for the given exited
558 // container, so it isn't returned by subsequent calls to Running().
560 // ForgetContainer has no effect if the container has not yet exited.
562 // The "container exited at time T" placeholder (which necessitates
563 // ForgetContainer) exists to make it easier for the caller
564 // (scheduler) to distinguish a container that exited without
565 // finalizing its state from a container that exited too recently for
566 // its final state to have appeared in the scheduler's queue cache.
567 func (wp *Pool) ForgetContainer(uuid string) {
569 defer wp.mtx.Unlock()
570 if _, ok := wp.exited[uuid]; ok {
571 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
572 delete(wp.exited, uuid)
576 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
578 reg = prometheus.NewRegistry()
580 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
581 Namespace: "arvados",
582 Subsystem: "dispatchcloud",
583 Name: "containers_running",
584 Help: "Number of containers reported running by cloud VMs.",
586 reg.MustRegister(wp.mContainersRunning)
587 wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
588 Namespace: "arvados",
589 Subsystem: "dispatchcloud",
590 Name: "instances_total",
591 Help: "Number of cloud VMs.",
592 }, []string{"category", "instance_type"})
593 reg.MustRegister(wp.mInstances)
594 wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
595 Namespace: "arvados",
596 Subsystem: "dispatchcloud",
597 Name: "instances_price",
598 Help: "Price of cloud VMs.",
599 }, []string{"category"})
600 reg.MustRegister(wp.mInstancesPrice)
601 wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
602 Namespace: "arvados",
603 Subsystem: "dispatchcloud",
605 Help: "Total VCPUs on all cloud VMs.",
606 }, []string{"category"})
607 reg.MustRegister(wp.mVCPUs)
608 wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
609 Namespace: "arvados",
610 Subsystem: "dispatchcloud",
611 Name: "memory_bytes_total",
612 Help: "Total memory on all cloud VMs.",
613 }, []string{"category"})
614 reg.MustRegister(wp.mMemory)
615 wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
616 Namespace: "arvados",
617 Subsystem: "dispatchcloud",
618 Name: "boot_outcomes",
619 Help: "Boot outcomes by type.",
620 }, []string{"outcome"})
621 for k := range validBootOutcomes {
622 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
624 reg.MustRegister(wp.mBootOutcomes)
625 wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
626 Namespace: "arvados",
627 Subsystem: "dispatchcloud",
628 Name: "instances_disappeared",
629 Help: "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
630 }, []string{"state"})
631 for _, v := range stateString {
632 wp.mDisappearances.WithLabelValues(v).Add(0)
634 reg.MustRegister(wp.mDisappearances)
637 func (wp *Pool) runMetrics() {
639 defer wp.Unsubscribe(ch)
646 func (wp *Pool) updateMetrics() {
648 defer wp.mtx.RUnlock()
654 instances := map[entKey]int64{}
655 price := map[string]float64{}
656 cpu := map[string]int64{}
657 mem := map[string]int64{}
659 for _, wkr := range wp.workers {
662 case len(wkr.running)+len(wkr.starting) > 0:
664 case wkr.idleBehavior == IdleBehaviorHold:
666 case wkr.state == StateBooting:
668 case wkr.state == StateUnknown:
673 instances[entKey{cat, wkr.instType.Name}]++
674 price[cat] += wkr.instType.Price
675 cpu[cat] += int64(wkr.instType.VCPUs)
676 mem[cat] += int64(wkr.instType.RAM)
677 running += int64(len(wkr.running) + len(wkr.starting))
679 for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
680 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
681 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
682 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
683 // make sure to reset gauges for non-existing category/nodetype combinations
684 for _, it := range wp.instanceTypes {
685 if _, ok := instances[entKey{cat, it.Name}]; !ok {
686 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
690 for k, v := range instances {
691 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
693 wp.mContainersRunning.Set(float64(running))
696 func (wp *Pool) runProbes() {
697 maxPPS := wp.maxProbesPerSecond
699 maxPPS = defaultMaxProbesPerSecond
701 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
702 defer limitticker.Stop()
704 probeticker := time.NewTicker(wp.probeInterval)
705 defer probeticker.Stop()
707 workers := []cloud.InstanceID{}
708 for range probeticker.C {
709 workers = workers[:0]
711 for id, wkr := range wp.workers {
712 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
715 workers = append(workers, id)
719 for _, id := range workers {
721 wkr, ok := wp.workers[id]
724 // Deleted while we were probing
728 go wkr.ProbeAndUpdate()
732 case <-limitticker.C:
738 func (wp *Pool) runSync() {
739 // sync once immediately, then wait syncInterval, sync again,
741 timer := time.NewTimer(1)
745 err := wp.getInstancesAndSync()
747 wp.logger.WithError(err).Warn("sync failed")
749 timer.Reset(wp.syncInterval)
751 wp.logger.Debug("worker.Pool stopped")
757 // Stop synchronizing with the InstanceSet.
758 func (wp *Pool) Stop() {
759 wp.setupOnce.Do(wp.setup)
763 // Instances returns an InstanceView for each worker in the pool,
764 // summarizing its current state and recent activity.
765 func (wp *Pool) Instances() []InstanceView {
767 wp.setupOnce.Do(wp.setup)
769 for _, w := range wp.workers {
770 r = append(r, InstanceView{
771 Instance: w.instance.ID(),
772 Address: w.instance.Address(),
773 Price: w.instType.Price,
774 ArvadosInstanceType: w.instType.Name,
775 ProviderInstanceType: w.instType.ProviderType,
776 LastContainerUUID: w.lastUUID,
778 WorkerState: w.state.String(),
779 IdleBehavior: w.idleBehavior,
783 sort.Slice(r, func(i, j int) bool {
784 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
789 // KillInstance destroys a cloud VM instance. It returns an error if
790 // the given instance does not exist.
791 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
792 wkr, ok := wp.workers[id]
794 return errors.New("instance not found")
796 wkr.logger.WithField("Reason", reason).Info("shutting down")
797 wkr.reportBootOutcome(BootOutcomeAborted)
802 func (wp *Pool) setup() {
803 wp.creating = map[string]createCall{}
804 wp.exited = map[string]time.Time{}
805 wp.workers = map[cloud.InstanceID]*worker{}
806 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
810 // Load the runner program to be deployed on worker nodes into
811 // wp.runnerData, if necessary. Errors are logged.
813 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
815 // Caller must not have lock.
816 func (wp *Pool) loadRunnerData() error {
818 defer wp.mtx.Unlock()
819 if wp.runnerData != nil {
821 } else if wp.runnerSource == "" {
822 wp.runnerCmd = "crunch-run"
823 wp.runnerData = []byte{}
826 logger := wp.logger.WithField("source", wp.runnerSource)
827 logger.Debug("loading runner")
828 buf, err := ioutil.ReadFile(wp.runnerSource)
830 logger.WithError(err).Error("failed to load runner program")
834 wp.runnerMD5 = md5.Sum(buf)
835 wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
839 func (wp *Pool) notify() {
841 defer wp.mtx.RUnlock()
842 for _, send := range wp.subscribers {
844 case send <- struct{}{}:
850 func (wp *Pool) getInstancesAndSync() error {
851 wp.setupOnce.Do(wp.setup)
852 if err := wp.instanceSet.throttleInstances.Error(); err != nil {
855 wp.logger.Debug("getting instance list")
856 threshold := time.Now()
857 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
859 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
862 wp.sync(threshold, instances)
863 wp.logger.Debug("sync done")
867 // Add/remove/update workers based on instances, which was obtained
868 // from the instanceSet. However, don't clobber any other updates that
869 // already happened after threshold.
870 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
872 defer wp.mtx.Unlock()
873 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
876 for _, inst := range instances {
877 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
878 it, ok := wp.instanceTypes[itTag]
880 wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
883 if wkr, isNew := wp.updateWorker(inst, it); isNew {
885 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
886 wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
891 for id, wkr := range wp.workers {
892 if wkr.updated.After(threshold) {
895 logger := wp.logger.WithFields(logrus.Fields{
896 "Instance": wkr.instance.ID(),
897 "WorkerState": wkr.state,
899 logger.Info("instance disappeared in cloud")
900 wkr.reportBootOutcome(BootOutcomeDisappeared)
901 if wp.mDisappearances != nil {
902 wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
904 delete(wp.workers, id)
912 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
920 func (wp *Pool) waitUntilLoaded() {
923 defer wp.mtx.RUnlock()
931 // Return a random string of n hexadecimal digits (n*4 random bits). n
933 func randomHex(n int) string {
934 buf := make([]byte, n/2)
935 _, err := rand.Read(buf)
939 return fmt.Sprintf("%x", buf)