1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
19 "git.arvados.org/arvados.git/lib/cloud"
20 "git.arvados.org/arvados.git/sdk/go/arvados"
21 "github.com/prometheus/client_golang/prometheus"
22 "github.com/sirupsen/logrus"
23 "golang.org/x/crypto/ssh"
27 tagKeyInstanceType = "InstanceType"
28 tagKeyIdleBehavior = "IdleBehavior"
29 tagKeyInstanceSecret = "InstanceSecret"
30 tagKeyInstanceSetID = "InstanceSetID"
33 // An InstanceView shows a worker's current state and recent activity.
34 type InstanceView struct {
35 Instance cloud.InstanceID `json:"instance"`
36 Address string `json:"address"`
37 Price float64 `json:"price"`
38 ArvadosInstanceType string `json:"arvados_instance_type"`
39 ProviderInstanceType string `json:"provider_instance_type"`
40 LastContainerUUID string `json:"last_container_uuid"`
41 LastBusy time.Time `json:"last_busy"`
42 WorkerState string `json:"worker_state"`
43 IdleBehavior IdleBehavior `json:"idle_behavior"`
46 // An Executor executes shell commands on a remote host.
47 type Executor interface {
48 // Run cmd on the current target.
49 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
51 // Use the given target for subsequent operations. The new
52 // target is the same host as the previous target, but it
53 // might return a different address and verify a different
56 // SetTarget is called frequently, and in most cases the new
57 // target will behave exactly the same as the old one. An
58 // implementation should optimize accordingly.
60 // SetTarget must not block on concurrent Execute calls.
61 SetTarget(cloud.ExecutorTarget)
67 defaultSyncInterval = time.Minute
68 defaultProbeInterval = time.Second * 10
69 defaultMaxProbesPerSecond = 10
70 defaultTimeoutIdle = time.Minute
71 defaultTimeoutBooting = time.Minute * 10
72 defaultTimeoutProbe = time.Minute * 10
73 defaultTimeoutShutdown = time.Second * 10
74 defaultTimeoutTERM = time.Minute * 2
75 defaultTimeoutSignal = time.Second * 5
77 // Time after a quota error to try again anyway, even if no
78 // instances have been shutdown.
79 quotaErrorTTL = time.Minute
81 // Time between "X failed because rate limiting" messages
82 logRateLimitErrorInterval = time.Second * 10
85 func duration(conf arvados.Duration, def time.Duration) time.Duration {
87 return time.Duration(conf)
93 // NewPool creates a Pool of workers backed by instanceSet.
95 // New instances are configured and set up according to the given
96 // cluster configuration.
97 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
100 arvClient: arvClient,
101 instanceSetID: instanceSetID,
102 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
103 newExecutor: newExecutor,
104 bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
105 runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
106 imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
107 instanceTypes: cluster.InstanceTypes,
108 maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
109 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
110 probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
111 syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
112 timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
113 timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
114 timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
115 timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
116 timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
117 timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
118 installPublicKey: installPublicKey,
119 tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
120 stop: make(chan bool),
122 wp.registerMetrics(reg)
124 wp.setupOnce.Do(wp.setup)
132 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
133 // zero Pool should not be used. Call NewPool to create a new Pool.
136 logger logrus.FieldLogger
137 arvClient *arvados.Client
138 instanceSetID cloud.InstanceSetID
139 instanceSet *throttledInstanceSet
140 newExecutor func(cloud.Instance) Executor
141 bootProbeCommand string
143 imageID cloud.ImageID
144 instanceTypes map[string]arvados.InstanceType
145 syncInterval time.Duration
146 probeInterval time.Duration
147 maxProbesPerSecond int
148 maxConcurrentInstanceCreateOps int
149 timeoutIdle time.Duration
150 timeoutBooting time.Duration
151 timeoutProbe time.Duration
152 timeoutShutdown time.Duration
153 timeoutTERM time.Duration
154 timeoutSignal time.Duration
155 installPublicKey ssh.PublicKey
159 subscribers map[<-chan struct{}]chan<- struct{}
160 creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
161 workers map[cloud.InstanceID]*worker
162 loaded bool // loaded list of instances from InstanceSet at least once
163 exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
164 atQuotaUntil time.Time
165 atQuotaErr cloud.QuotaError
170 runnerMD5 [md5.Size]byte
173 mContainersRunning prometheus.Gauge
174 mInstances *prometheus.GaugeVec
175 mInstancesPrice *prometheus.GaugeVec
176 mVCPUs *prometheus.GaugeVec
177 mMemory *prometheus.GaugeVec
178 mBootOutcomes *prometheus.CounterVec
179 mDisappearances *prometheus.CounterVec
182 type createCall struct {
184 instanceType arvados.InstanceType
187 func (wp *Pool) CheckHealth() error {
188 wp.setupOnce.Do(wp.setup)
189 if err := wp.loadRunnerData(); err != nil {
190 return fmt.Errorf("error loading runner binary: %s", err)
195 // Subscribe returns a buffered channel that becomes ready after any
196 // change to the pool's state that could have scheduling implications:
197 // a worker's state changes, a new worker appears, the cloud
198 // provider's API rate limiting period ends, etc.
200 // Additional events that occur while the channel is already ready
201 // will be dropped, so it is OK if the caller services the channel
206 // ch := wp.Subscribe()
207 // defer wp.Unsubscribe(ch)
214 func (wp *Pool) Subscribe() <-chan struct{} {
215 wp.setupOnce.Do(wp.setup)
217 defer wp.mtx.Unlock()
218 ch := make(chan struct{}, 1)
219 wp.subscribers[ch] = ch
223 // Unsubscribe stops sending updates to the given channel.
224 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
225 wp.setupOnce.Do(wp.setup)
227 defer wp.mtx.Unlock()
228 delete(wp.subscribers, ch)
231 // Unallocated returns the number of unallocated (creating + booting +
232 // idle + unknown) workers for each instance type. Workers in
233 // hold/drain mode are not included.
234 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
235 wp.setupOnce.Do(wp.setup)
237 defer wp.mtx.RUnlock()
238 unalloc := map[arvados.InstanceType]int{}
239 creating := map[arvados.InstanceType]int{}
240 oldestCreate := map[arvados.InstanceType]time.Time{}
241 for _, cc := range wp.creating {
242 it := cc.instanceType
244 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
245 oldestCreate[it] = cc.time
248 for _, wkr := range wp.workers {
249 // Skip workers that are not expected to become
250 // available soon. Note len(wkr.running)>0 is not
251 // redundant here: it can be true even in
253 if wkr.state == StateShutdown ||
254 wkr.state == StateRunning ||
255 wkr.idleBehavior != IdleBehaviorRun ||
256 len(wkr.running) > 0 {
261 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
262 // If up to N new workers appear in
263 // Instances() while we are waiting for N
264 // Create() calls to complete, we assume we're
265 // just seeing a race between Instances() and
266 // Create() responses.
268 // The other common reason why nodes have
269 // state==Unknown is that they appeared at
270 // startup, before any Create calls. They
271 // don't match the above timing condition, so
272 // we never mistakenly attribute them to
273 // pending Create calls.
277 for it, c := range creating {
283 // Create a new instance with the given type, and add it to the worker
284 // pool. The worker is added immediately; instance creation runs in
287 // Create returns false if a pre-existing error state prevents it from
288 // even attempting to create a new instance. Those errors are logged
289 // by the Pool, so the caller does not need to log anything in such
291 func (wp *Pool) Create(it arvados.InstanceType) bool {
292 logger := wp.logger.WithField("InstanceType", it.Name)
293 wp.setupOnce.Do(wp.setup)
294 if wp.loadRunnerData() != nil {
295 // Boot probe is certain to fail.
299 defer wp.mtx.Unlock()
300 if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil {
303 // The maxConcurrentInstanceCreateOps knob throttles the number of node create
304 // requests in flight. It was added to work around a limitation in Azure's
305 // managed disks, which support no more than 20 concurrent node creation
306 // requests from a single disk image (cf.
307 // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
308 // The code assumes that node creation, from Azure's perspective, means the
309 // period until the instance appears in the "get all instances" list.
310 if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
311 logger.Info("reached MaxConcurrentInstanceCreateOps")
312 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
316 secret := randomHex(instanceSecretLength)
317 wp.creating[secret] = createCall{time: now, instanceType: it}
320 tags := cloud.InstanceTags{
321 wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID),
322 wp.tagKeyPrefix + tagKeyInstanceType: it.Name,
323 wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
324 wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
326 initCmd := TagVerifier{nil, secret}.InitCommand()
327 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
329 defer wp.mtx.Unlock()
330 // delete() is deferred so the updateWorker() call
331 // below knows to use StateBooting when adding a new
333 defer delete(wp.creating, secret)
335 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
337 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
338 time.AfterFunc(quotaErrorTTL, wp.notify)
340 logger.WithError(err).Error("create failed")
341 wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
344 wp.updateWorker(inst, it)
349 // AtQuota returns true if Create is not expected to work at the
351 func (wp *Pool) AtQuota() bool {
353 defer wp.mtx.Unlock()
354 return time.Now().Before(wp.atQuotaUntil)
357 // SetIdleBehavior determines how the indicated instance will behave
358 // when it has no containers running.
359 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
361 defer wp.mtx.Unlock()
362 wkr, ok := wp.workers[id]
364 return errors.New("requested instance does not exist")
366 wkr.setIdleBehavior(idleBehavior)
370 // Add or update worker attached to the given instance.
372 // The second return value is true if a new worker is created.
374 // A newly added instance has state=StateBooting if its tags match an
375 // entry in wp.creating, otherwise StateUnknown.
377 // Caller must have lock.
378 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
379 secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
380 inst = TagVerifier{inst, secret}
382 if wkr := wp.workers[id]; wkr != nil {
383 wkr.executor.SetTarget(inst)
385 wkr.updated = time.Now()
390 state := StateUnknown
391 if _, ok := wp.creating[secret]; ok {
395 // If an instance has a valid IdleBehavior tag when it first
396 // appears, initialize the new worker accordingly (this is how
397 // we restore IdleBehavior that was set by a prior dispatch
398 // process); otherwise, default to "run". After this,
399 // wkr.idleBehavior is the source of truth, and will only be
400 // changed via SetIdleBehavior().
401 idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
402 if !validIdleBehavior[idleBehavior] {
403 idleBehavior = IdleBehaviorRun
406 logger := wp.logger.WithFields(logrus.Fields{
407 "InstanceType": it.Name,
408 "Instance": inst.ID(),
409 "Address": inst.Address(),
411 logger.WithFields(logrus.Fields{
413 "IdleBehavior": idleBehavior,
414 }).Infof("instance appeared in cloud")
420 executor: wp.newExecutor(inst),
422 idleBehavior: idleBehavior,
429 running: make(map[string]*remoteRunner),
430 starting: make(map[string]*remoteRunner),
431 probing: make(chan struct{}, 1),
437 // Shutdown shuts down a worker with the given type, or returns false
438 // if all workers with the given type are busy.
439 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
440 wp.setupOnce.Do(wp.setup)
442 defer wp.mtx.Unlock()
443 logger := wp.logger.WithField("InstanceType", it.Name)
444 logger.Info("shutdown requested")
445 for _, tryState := range []State{StateBooting, StateIdle} {
446 // TODO: shutdown the worker with the longest idle
447 // time (Idle) or the earliest create time (Booting)
448 for _, wkr := range wp.workers {
449 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
450 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
451 wkr.reportBootOutcome(BootOutcomeAborted)
460 // CountWorkers returns the current number of workers in each state.
462 // CountWorkers blocks, if necessary, until the initial instance list
463 // has been loaded from the cloud provider.
464 func (wp *Pool) CountWorkers() map[State]int {
465 wp.setupOnce.Do(wp.setup)
468 defer wp.mtx.Unlock()
470 for _, w := range wp.workers {
476 // Running returns the container UUIDs being prepared/run on workers.
478 // In the returned map, the time value indicates when the Pool
479 // observed that the container process had exited. A container that
480 // has not yet exited has a zero time value. The caller should use
481 // ForgetContainer() to garbage-collect the entries for exited
483 func (wp *Pool) Running() map[string]time.Time {
484 wp.setupOnce.Do(wp.setup)
486 defer wp.mtx.Unlock()
487 r := map[string]time.Time{}
488 for _, wkr := range wp.workers {
489 for uuid := range wkr.running {
490 r[uuid] = time.Time{}
492 for uuid := range wkr.starting {
493 r[uuid] = time.Time{}
496 for uuid, exited := range wp.exited {
502 // StartContainer starts a container on an idle worker immediately if
503 // possible, otherwise returns false.
504 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
505 wp.setupOnce.Do(wp.setup)
507 defer wp.mtx.Unlock()
509 for _, w := range wp.workers {
510 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
511 if wkr == nil || w.busy.After(wkr.busy) {
519 wkr.startContainer(ctr)
523 // KillContainer kills the crunch-run process for the given container
524 // UUID, if it's running on any worker.
526 // KillContainer returns immediately; the act of killing the container
527 // takes some time, and runs in the background.
529 // KillContainer returns false if the container has already ended.
530 func (wp *Pool) KillContainer(uuid string, reason string) bool {
532 defer wp.mtx.Unlock()
533 logger := wp.logger.WithFields(logrus.Fields{
534 "ContainerUUID": uuid,
537 for _, wkr := range wp.workers {
538 rr := wkr.running[uuid]
540 rr = wkr.starting[uuid]
547 logger.Debug("cannot kill: already disappeared")
551 // ForgetContainer clears the placeholder for the given exited
552 // container, so it isn't returned by subsequent calls to Running().
554 // ForgetContainer has no effect if the container has not yet exited.
556 // The "container exited at time T" placeholder (which necessitates
557 // ForgetContainer) exists to make it easier for the caller
558 // (scheduler) to distinguish a container that exited without
559 // finalizing its state from a container that exited too recently for
560 // its final state to have appeared in the scheduler's queue cache.
561 func (wp *Pool) ForgetContainer(uuid string) {
563 defer wp.mtx.Unlock()
564 if _, ok := wp.exited[uuid]; ok {
565 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
566 delete(wp.exited, uuid)
570 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
572 reg = prometheus.NewRegistry()
574 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
575 Namespace: "arvados",
576 Subsystem: "dispatchcloud",
577 Name: "containers_running",
578 Help: "Number of containers reported running by cloud VMs.",
580 reg.MustRegister(wp.mContainersRunning)
581 wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
582 Namespace: "arvados",
583 Subsystem: "dispatchcloud",
584 Name: "instances_total",
585 Help: "Number of cloud VMs.",
586 }, []string{"category", "instance_type"})
587 reg.MustRegister(wp.mInstances)
588 wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
589 Namespace: "arvados",
590 Subsystem: "dispatchcloud",
591 Name: "instances_price",
592 Help: "Price of cloud VMs.",
593 }, []string{"category"})
594 reg.MustRegister(wp.mInstancesPrice)
595 wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
596 Namespace: "arvados",
597 Subsystem: "dispatchcloud",
599 Help: "Total VCPUs on all cloud VMs.",
600 }, []string{"category"})
601 reg.MustRegister(wp.mVCPUs)
602 wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
603 Namespace: "arvados",
604 Subsystem: "dispatchcloud",
605 Name: "memory_bytes_total",
606 Help: "Total memory on all cloud VMs.",
607 }, []string{"category"})
608 reg.MustRegister(wp.mMemory)
609 wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
610 Namespace: "arvados",
611 Subsystem: "dispatchcloud",
612 Name: "boot_outcomes",
613 Help: "Boot outcomes by type.",
614 }, []string{"outcome"})
615 for k := range validBootOutcomes {
616 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
618 reg.MustRegister(wp.mBootOutcomes)
619 wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
620 Namespace: "arvados",
621 Subsystem: "dispatchcloud",
622 Name: "instances_disappeared",
623 Help: "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
624 }, []string{"state"})
625 for _, v := range stateString {
626 wp.mDisappearances.WithLabelValues(v).Add(0)
628 reg.MustRegister(wp.mDisappearances)
631 func (wp *Pool) runMetrics() {
633 defer wp.Unsubscribe(ch)
640 func (wp *Pool) updateMetrics() {
642 defer wp.mtx.RUnlock()
648 instances := map[entKey]int64{}
649 price := map[string]float64{}
650 cpu := map[string]int64{}
651 mem := map[string]int64{}
653 for _, wkr := range wp.workers {
656 case len(wkr.running)+len(wkr.starting) > 0:
658 case wkr.idleBehavior == IdleBehaviorHold:
660 case wkr.state == StateBooting:
662 case wkr.state == StateUnknown:
667 instances[entKey{cat, wkr.instType.Name}]++
668 price[cat] += wkr.instType.Price
669 cpu[cat] += int64(wkr.instType.VCPUs)
670 mem[cat] += int64(wkr.instType.RAM)
671 running += int64(len(wkr.running) + len(wkr.starting))
673 for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
674 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
675 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
676 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
677 // make sure to reset gauges for non-existing category/nodetype combinations
678 for _, it := range wp.instanceTypes {
679 if _, ok := instances[entKey{cat, it.Name}]; !ok {
680 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
684 for k, v := range instances {
685 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
687 wp.mContainersRunning.Set(float64(running))
690 func (wp *Pool) runProbes() {
691 maxPPS := wp.maxProbesPerSecond
693 maxPPS = defaultMaxProbesPerSecond
695 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
696 defer limitticker.Stop()
698 probeticker := time.NewTicker(wp.probeInterval)
699 defer probeticker.Stop()
701 workers := []cloud.InstanceID{}
702 for range probeticker.C {
703 workers = workers[:0]
705 for id, wkr := range wp.workers {
706 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
709 workers = append(workers, id)
713 for _, id := range workers {
715 wkr, ok := wp.workers[id]
718 // Deleted while we were probing
722 go wkr.ProbeAndUpdate()
726 case <-limitticker.C:
732 func (wp *Pool) runSync() {
733 // sync once immediately, then wait syncInterval, sync again,
735 timer := time.NewTimer(1)
739 err := wp.getInstancesAndSync()
741 wp.logger.WithError(err).Warn("sync failed")
743 timer.Reset(wp.syncInterval)
745 wp.logger.Debug("worker.Pool stopped")
751 // Stop synchronizing with the InstanceSet.
752 func (wp *Pool) Stop() {
753 wp.setupOnce.Do(wp.setup)
757 // Instances returns an InstanceView for each worker in the pool,
758 // summarizing its current state and recent activity.
759 func (wp *Pool) Instances() []InstanceView {
761 wp.setupOnce.Do(wp.setup)
763 for _, w := range wp.workers {
764 r = append(r, InstanceView{
765 Instance: w.instance.ID(),
766 Address: w.instance.Address(),
767 Price: w.instType.Price,
768 ArvadosInstanceType: w.instType.Name,
769 ProviderInstanceType: w.instType.ProviderType,
770 LastContainerUUID: w.lastUUID,
772 WorkerState: w.state.String(),
773 IdleBehavior: w.idleBehavior,
777 sort.Slice(r, func(i, j int) bool {
778 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
783 // KillInstance destroys a cloud VM instance. It returns an error if
784 // the given instance does not exist.
785 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
786 wkr, ok := wp.workers[id]
788 return errors.New("instance not found")
790 wkr.logger.WithField("Reason", reason).Info("shutting down")
791 wkr.reportBootOutcome(BootOutcomeAborted)
796 func (wp *Pool) setup() {
797 wp.creating = map[string]createCall{}
798 wp.exited = map[string]time.Time{}
799 wp.workers = map[cloud.InstanceID]*worker{}
800 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
804 // Load the runner program to be deployed on worker nodes into
805 // wp.runnerData, if necessary. Errors are logged.
807 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
809 // Caller must not have lock.
810 func (wp *Pool) loadRunnerData() error {
812 defer wp.mtx.Unlock()
813 if wp.runnerData != nil {
815 } else if wp.runnerSource == "" {
816 wp.runnerCmd = "crunch-run"
817 wp.runnerData = []byte{}
820 logger := wp.logger.WithField("source", wp.runnerSource)
821 logger.Debug("loading runner")
822 buf, err := ioutil.ReadFile(wp.runnerSource)
824 logger.WithError(err).Error("failed to load runner program")
828 wp.runnerMD5 = md5.Sum(buf)
829 wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
833 func (wp *Pool) notify() {
835 defer wp.mtx.RUnlock()
836 for _, send := range wp.subscribers {
838 case send <- struct{}{}:
844 func (wp *Pool) getInstancesAndSync() error {
845 wp.setupOnce.Do(wp.setup)
846 if err := wp.instanceSet.throttleInstances.Error(); err != nil {
849 wp.logger.Debug("getting instance list")
850 threshold := time.Now()
851 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
853 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
856 wp.sync(threshold, instances)
857 wp.logger.Debug("sync done")
861 // Add/remove/update workers based on instances, which was obtained
862 // from the instanceSet. However, don't clobber any other updates that
863 // already happened after threshold.
864 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
866 defer wp.mtx.Unlock()
867 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
870 for _, inst := range instances {
871 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
872 it, ok := wp.instanceTypes[itTag]
874 wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
877 if wkr, isNew := wp.updateWorker(inst, it); isNew {
879 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
880 wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
885 for id, wkr := range wp.workers {
886 if wkr.updated.After(threshold) {
889 logger := wp.logger.WithFields(logrus.Fields{
890 "Instance": wkr.instance.ID(),
891 "WorkerState": wkr.state,
893 logger.Info("instance disappeared in cloud")
894 wkr.reportBootOutcome(BootOutcomeDisappeared)
895 if wp.mDisappearances != nil {
896 wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
898 delete(wp.workers, id)
906 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
914 func (wp *Pool) waitUntilLoaded() {
917 defer wp.mtx.RUnlock()
925 // Return a random string of n hexadecimal digits (n*4 random bits). n
927 func randomHex(n int) string {
928 buf := make([]byte, n/2)
929 _, err := rand.Read(buf)
933 return fmt.Sprintf("%x", buf)