1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
19 "git.arvados.org/arvados.git/lib/cloud"
20 "git.arvados.org/arvados.git/sdk/go/arvados"
21 "github.com/prometheus/client_golang/prometheus"
22 "github.com/sirupsen/logrus"
23 "golang.org/x/crypto/ssh"
27 tagKeyInstanceType = "InstanceType"
28 tagKeyIdleBehavior = "IdleBehavior"
29 tagKeyInstanceSecret = "InstanceSecret"
30 tagKeyInstanceSetID = "InstanceSetID"
33 // An InstanceView shows a worker's current state and recent activity.
34 type InstanceView struct {
35 Instance cloud.InstanceID `json:"instance"`
36 Address string `json:"address"`
37 Price float64 `json:"price"`
38 ArvadosInstanceType string `json:"arvados_instance_type"`
39 ProviderInstanceType string `json:"provider_instance_type"`
40 LastContainerUUID string `json:"last_container_uuid"`
41 LastBusy time.Time `json:"last_busy"`
42 WorkerState string `json:"worker_state"`
43 IdleBehavior IdleBehavior `json:"idle_behavior"`
46 // An Executor executes shell commands on a remote host.
47 type Executor interface {
48 // Run cmd on the current target.
49 Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
51 // Use the given target for subsequent operations. The new
52 // target is the same host as the previous target, but it
53 // might return a different address and verify a different
56 // SetTarget is called frequently, and in most cases the new
57 // target will behave exactly the same as the old one. An
58 // implementation should optimize accordingly.
60 // SetTarget must not block on concurrent Execute calls.
61 SetTarget(cloud.ExecutorTarget)
67 defaultSyncInterval = time.Minute
68 defaultProbeInterval = time.Second * 10
69 defaultMaxProbesPerSecond = 10
70 defaultTimeoutIdle = time.Minute
71 defaultTimeoutBooting = time.Minute * 10
72 defaultTimeoutProbe = time.Minute * 10
73 defaultTimeoutShutdown = time.Second * 10
74 defaultTimeoutTERM = time.Minute * 2
75 defaultTimeoutSignal = time.Second * 5
77 // Time after a quota error to try again anyway, even if no
78 // instances have been shutdown.
79 quotaErrorTTL = time.Minute
81 // Time between "X failed because rate limiting" messages
82 logRateLimitErrorInterval = time.Second * 10
85 func duration(conf arvados.Duration, def time.Duration) time.Duration {
87 return time.Duration(conf)
93 // NewPool creates a Pool of workers backed by instanceSet.
95 // New instances are configured and set up according to the given
96 // cluster configuration.
97 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
100 arvClient: arvClient,
101 instanceSetID: instanceSetID,
102 instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
103 newExecutor: newExecutor,
104 bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
105 runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
106 imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
107 instanceTypes: cluster.InstanceTypes,
108 maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
109 probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
110 syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
111 timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
112 timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
113 timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
114 timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
115 timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
116 timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
117 installPublicKey: installPublicKey,
118 tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
119 stop: make(chan bool),
121 wp.registerMetrics(reg)
123 wp.setupOnce.Do(wp.setup)
131 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
132 // zero Pool should not be used. Call NewPool to create a new Pool.
135 logger logrus.FieldLogger
136 arvClient *arvados.Client
137 instanceSetID cloud.InstanceSetID
138 instanceSet *throttledInstanceSet
139 newExecutor func(cloud.Instance) Executor
140 bootProbeCommand string
142 imageID cloud.ImageID
143 instanceTypes map[string]arvados.InstanceType
144 syncInterval time.Duration
145 probeInterval time.Duration
146 maxProbesPerSecond int
147 timeoutIdle time.Duration
148 timeoutBooting time.Duration
149 timeoutProbe time.Duration
150 timeoutShutdown time.Duration
151 timeoutTERM time.Duration
152 timeoutSignal time.Duration
153 installPublicKey ssh.PublicKey
157 subscribers map[<-chan struct{}]chan<- struct{}
158 creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
159 workers map[cloud.InstanceID]*worker
160 loaded bool // loaded list of instances from InstanceSet at least once
161 exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
162 atQuotaUntil time.Time
163 atQuotaErr cloud.QuotaError
168 runnerMD5 [md5.Size]byte
171 throttleCreate throttle
172 throttleInstances throttle
174 mContainersRunning prometheus.Gauge
175 mInstances *prometheus.GaugeVec
176 mInstancesPrice *prometheus.GaugeVec
177 mVCPUs *prometheus.GaugeVec
178 mMemory *prometheus.GaugeVec
179 mBootOutcomes *prometheus.CounterVec
180 mDisappearances *prometheus.CounterVec
183 type createCall struct {
185 instanceType arvados.InstanceType
188 func (wp *Pool) CheckHealth() error {
189 wp.setupOnce.Do(wp.setup)
190 if err := wp.loadRunnerData(); err != nil {
191 return fmt.Errorf("error loading runner binary: %s", err)
196 // Subscribe returns a buffered channel that becomes ready after any
197 // change to the pool's state that could have scheduling implications:
198 // a worker's state changes, a new worker appears, the cloud
199 // provider's API rate limiting period ends, etc.
201 // Additional events that occur while the channel is already ready
202 // will be dropped, so it is OK if the caller services the channel
207 // ch := wp.Subscribe()
208 // defer wp.Unsubscribe(ch)
215 func (wp *Pool) Subscribe() <-chan struct{} {
216 wp.setupOnce.Do(wp.setup)
218 defer wp.mtx.Unlock()
219 ch := make(chan struct{}, 1)
220 wp.subscribers[ch] = ch
224 // Unsubscribe stops sending updates to the given channel.
225 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
226 wp.setupOnce.Do(wp.setup)
228 defer wp.mtx.Unlock()
229 delete(wp.subscribers, ch)
232 // Unallocated returns the number of unallocated (creating + booting +
233 // idle + unknown) workers for each instance type. Workers in
234 // hold/drain mode are not included.
235 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
236 wp.setupOnce.Do(wp.setup)
238 defer wp.mtx.RUnlock()
239 unalloc := map[arvados.InstanceType]int{}
240 creating := map[arvados.InstanceType]int{}
241 oldestCreate := map[arvados.InstanceType]time.Time{}
242 for _, cc := range wp.creating {
243 it := cc.instanceType
245 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
246 oldestCreate[it] = cc.time
249 for _, wkr := range wp.workers {
250 // Skip workers that are not expected to become
251 // available soon. Note len(wkr.running)>0 is not
252 // redundant here: it can be true even in
254 if wkr.state == StateShutdown ||
255 wkr.state == StateRunning ||
256 wkr.idleBehavior != IdleBehaviorRun ||
257 len(wkr.running) > 0 {
262 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
263 // If up to N new workers appear in
264 // Instances() while we are waiting for N
265 // Create() calls to complete, we assume we're
266 // just seeing a race between Instances() and
267 // Create() responses.
269 // The other common reason why nodes have
270 // state==Unknown is that they appeared at
271 // startup, before any Create calls. They
272 // don't match the above timing condition, so
273 // we never mistakenly attribute them to
274 // pending Create calls.
278 for it, c := range creating {
284 // Create a new instance with the given type, and add it to the worker
285 // pool. The worker is added immediately; instance creation runs in
288 // Create returns false if a pre-existing error state prevents it from
289 // even attempting to create a new instance. Those errors are logged
290 // by the Pool, so the caller does not need to log anything in such
292 func (wp *Pool) Create(it arvados.InstanceType) bool {
293 logger := wp.logger.WithField("InstanceType", it.Name)
294 wp.setupOnce.Do(wp.setup)
295 if wp.loadRunnerData() != nil {
296 // Boot probe is certain to fail.
300 defer wp.mtx.Unlock()
301 if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
305 secret := randomHex(instanceSecretLength)
306 wp.creating[secret] = createCall{time: now, instanceType: it}
309 tags := cloud.InstanceTags{
310 wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID),
311 wp.tagKeyPrefix + tagKeyInstanceType: it.Name,
312 wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
313 wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
315 initCmd := TagVerifier{nil, secret}.InitCommand()
316 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
318 defer wp.mtx.Unlock()
319 // delete() is deferred so the updateWorker() call
320 // below knows to use StateBooting when adding a new
322 defer delete(wp.creating, secret)
324 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
326 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
327 time.AfterFunc(quotaErrorTTL, wp.notify)
329 logger.WithError(err).Error("create failed")
330 wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
333 wp.updateWorker(inst, it)
338 // AtQuota returns true if Create is not expected to work at the
340 func (wp *Pool) AtQuota() bool {
342 defer wp.mtx.Unlock()
343 return time.Now().Before(wp.atQuotaUntil)
346 // SetIdleBehavior determines how the indicated instance will behave
347 // when it has no containers running.
348 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
350 defer wp.mtx.Unlock()
351 wkr, ok := wp.workers[id]
353 return errors.New("requested instance does not exist")
355 wkr.setIdleBehavior(idleBehavior)
359 // Add or update worker attached to the given instance.
361 // The second return value is true if a new worker is created.
363 // A newly added instance has state=StateBooting if its tags match an
364 // entry in wp.creating, otherwise StateUnknown.
366 // Caller must have lock.
367 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
368 secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
369 inst = TagVerifier{inst, secret}
371 if wkr := wp.workers[id]; wkr != nil {
372 wkr.executor.SetTarget(inst)
374 wkr.updated = time.Now()
379 state := StateUnknown
380 if _, ok := wp.creating[secret]; ok {
384 // If an instance has a valid IdleBehavior tag when it first
385 // appears, initialize the new worker accordingly (this is how
386 // we restore IdleBehavior that was set by a prior dispatch
387 // process); otherwise, default to "run". After this,
388 // wkr.idleBehavior is the source of truth, and will only be
389 // changed via SetIdleBehavior().
390 idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
391 if !validIdleBehavior[idleBehavior] {
392 idleBehavior = IdleBehaviorRun
395 logger := wp.logger.WithFields(logrus.Fields{
396 "InstanceType": it.Name,
397 "Instance": inst.ID(),
398 "Address": inst.Address(),
400 logger.WithFields(logrus.Fields{
402 "IdleBehavior": idleBehavior,
403 }).Infof("instance appeared in cloud")
409 executor: wp.newExecutor(inst),
411 idleBehavior: idleBehavior,
418 running: make(map[string]*remoteRunner),
419 starting: make(map[string]*remoteRunner),
420 probing: make(chan struct{}, 1),
426 // Shutdown shuts down a worker with the given type, or returns false
427 // if all workers with the given type are busy.
428 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
429 wp.setupOnce.Do(wp.setup)
431 defer wp.mtx.Unlock()
432 logger := wp.logger.WithField("InstanceType", it.Name)
433 logger.Info("shutdown requested")
434 for _, tryState := range []State{StateBooting, StateIdle} {
435 // TODO: shutdown the worker with the longest idle
436 // time (Idle) or the earliest create time (Booting)
437 for _, wkr := range wp.workers {
438 if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
439 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
448 // CountWorkers returns the current number of workers in each state.
450 // CountWorkers blocks, if necessary, until the initial instance list
451 // has been loaded from the cloud provider.
452 func (wp *Pool) CountWorkers() map[State]int {
453 wp.setupOnce.Do(wp.setup)
456 defer wp.mtx.Unlock()
458 for _, w := range wp.workers {
464 // Running returns the container UUIDs being prepared/run on workers.
466 // In the returned map, the time value indicates when the Pool
467 // observed that the container process had exited. A container that
468 // has not yet exited has a zero time value. The caller should use
469 // ForgetContainer() to garbage-collect the entries for exited
471 func (wp *Pool) Running() map[string]time.Time {
472 wp.setupOnce.Do(wp.setup)
474 defer wp.mtx.Unlock()
475 r := map[string]time.Time{}
476 for _, wkr := range wp.workers {
477 for uuid := range wkr.running {
478 r[uuid] = time.Time{}
480 for uuid := range wkr.starting {
481 r[uuid] = time.Time{}
484 for uuid, exited := range wp.exited {
490 // StartContainer starts a container on an idle worker immediately if
491 // possible, otherwise returns false.
492 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
493 wp.setupOnce.Do(wp.setup)
495 defer wp.mtx.Unlock()
497 for _, w := range wp.workers {
498 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
499 if wkr == nil || w.busy.After(wkr.busy) {
507 wkr.startContainer(ctr)
511 // KillContainer kills the crunch-run process for the given container
512 // UUID, if it's running on any worker.
514 // KillContainer returns immediately; the act of killing the container
515 // takes some time, and runs in the background.
517 // KillContainer returns false if the container has already ended.
518 func (wp *Pool) KillContainer(uuid string, reason string) bool {
520 defer wp.mtx.Unlock()
521 logger := wp.logger.WithFields(logrus.Fields{
522 "ContainerUUID": uuid,
525 for _, wkr := range wp.workers {
526 rr := wkr.running[uuid]
528 rr = wkr.starting[uuid]
535 logger.Debug("cannot kill: already disappeared")
539 // ForgetContainer clears the placeholder for the given exited
540 // container, so it isn't returned by subsequent calls to Running().
542 // ForgetContainer has no effect if the container has not yet exited.
544 // The "container exited at time T" placeholder (which necessitates
545 // ForgetContainer) exists to make it easier for the caller
546 // (scheduler) to distinguish a container that exited without
547 // finalizing its state from a container that exited too recently for
548 // its final state to have appeared in the scheduler's queue cache.
549 func (wp *Pool) ForgetContainer(uuid string) {
551 defer wp.mtx.Unlock()
552 if _, ok := wp.exited[uuid]; ok {
553 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
554 delete(wp.exited, uuid)
558 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
560 reg = prometheus.NewRegistry()
562 wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
563 Namespace: "arvados",
564 Subsystem: "dispatchcloud",
565 Name: "containers_running",
566 Help: "Number of containers reported running by cloud VMs.",
568 reg.MustRegister(wp.mContainersRunning)
569 wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
570 Namespace: "arvados",
571 Subsystem: "dispatchcloud",
572 Name: "instances_total",
573 Help: "Number of cloud VMs.",
574 }, []string{"category", "instance_type"})
575 reg.MustRegister(wp.mInstances)
576 wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
577 Namespace: "arvados",
578 Subsystem: "dispatchcloud",
579 Name: "instances_price",
580 Help: "Price of cloud VMs.",
581 }, []string{"category"})
582 reg.MustRegister(wp.mInstancesPrice)
583 wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
584 Namespace: "arvados",
585 Subsystem: "dispatchcloud",
587 Help: "Total VCPUs on all cloud VMs.",
588 }, []string{"category"})
589 reg.MustRegister(wp.mVCPUs)
590 wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
591 Namespace: "arvados",
592 Subsystem: "dispatchcloud",
593 Name: "memory_bytes_total",
594 Help: "Total memory on all cloud VMs.",
595 }, []string{"category"})
596 reg.MustRegister(wp.mMemory)
597 wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
598 Namespace: "arvados",
599 Subsystem: "dispatchcloud",
600 Name: "boot_outcomes",
601 Help: "Boot outcomes by type.",
602 }, []string{"state"})
603 for k := range validBootOutcomes {
604 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
606 reg.MustRegister(wp.mBootOutcomes)
607 wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
608 Namespace: "arvados",
609 Subsystem: "dispatchcloud",
610 Name: "instances_disappeared",
611 Help: "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
612 }, []string{"state"})
613 for _, v := range stateString {
614 wp.mDisappearances.WithLabelValues(v).Add(0)
616 reg.MustRegister(wp.mDisappearances)
619 func (wp *Pool) runMetrics() {
621 defer wp.Unsubscribe(ch)
628 func (wp *Pool) updateMetrics() {
630 defer wp.mtx.RUnlock()
636 instances := map[entKey]int64{}
637 price := map[string]float64{}
638 cpu := map[string]int64{}
639 mem := map[string]int64{}
641 for _, wkr := range wp.workers {
644 case len(wkr.running)+len(wkr.starting) > 0:
646 case wkr.idleBehavior == IdleBehaviorHold:
648 case wkr.state == StateBooting:
650 case wkr.state == StateUnknown:
655 instances[entKey{cat, wkr.instType.Name}]++
656 price[cat] += wkr.instType.Price
657 cpu[cat] += int64(wkr.instType.VCPUs)
658 mem[cat] += int64(wkr.instType.RAM)
659 running += int64(len(wkr.running) + len(wkr.starting))
661 for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
662 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
663 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
664 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
665 // make sure to reset gauges for non-existing category/nodetype combinations
666 for _, it := range wp.instanceTypes {
667 if _, ok := instances[entKey{cat, it.Name}]; !ok {
668 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
672 for k, v := range instances {
673 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
675 wp.mContainersRunning.Set(float64(running))
678 func (wp *Pool) runProbes() {
679 maxPPS := wp.maxProbesPerSecond
681 maxPPS = defaultMaxProbesPerSecond
683 limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
684 defer limitticker.Stop()
686 probeticker := time.NewTicker(wp.probeInterval)
687 defer probeticker.Stop()
689 workers := []cloud.InstanceID{}
690 for range probeticker.C {
691 workers = workers[:0]
693 for id, wkr := range wp.workers {
694 if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
697 workers = append(workers, id)
701 for _, id := range workers {
703 wkr, ok := wp.workers[id]
706 // Deleted while we were probing
710 go wkr.ProbeAndUpdate()
714 case <-limitticker.C:
720 func (wp *Pool) runSync() {
721 // sync once immediately, then wait syncInterval, sync again,
723 timer := time.NewTimer(1)
727 err := wp.getInstancesAndSync()
729 wp.logger.WithError(err).Warn("sync failed")
731 timer.Reset(wp.syncInterval)
733 wp.logger.Debug("worker.Pool stopped")
739 // Stop synchronizing with the InstanceSet.
740 func (wp *Pool) Stop() {
741 wp.setupOnce.Do(wp.setup)
745 // Instances returns an InstanceView for each worker in the pool,
746 // summarizing its current state and recent activity.
747 func (wp *Pool) Instances() []InstanceView {
749 wp.setupOnce.Do(wp.setup)
751 for _, w := range wp.workers {
752 r = append(r, InstanceView{
753 Instance: w.instance.ID(),
754 Address: w.instance.Address(),
755 Price: w.instType.Price,
756 ArvadosInstanceType: w.instType.Name,
757 ProviderInstanceType: w.instType.ProviderType,
758 LastContainerUUID: w.lastUUID,
760 WorkerState: w.state.String(),
761 IdleBehavior: w.idleBehavior,
765 sort.Slice(r, func(i, j int) bool {
766 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
771 // KillInstance destroys a cloud VM instance. It returns an error if
772 // the given instance does not exist.
773 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
774 wkr, ok := wp.workers[id]
776 return errors.New("instance not found")
778 wkr.logger.WithField("Reason", reason).Info("shutting down")
783 func (wp *Pool) setup() {
784 wp.creating = map[string]createCall{}
785 wp.exited = map[string]time.Time{}
786 wp.workers = map[cloud.InstanceID]*worker{}
787 wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
791 // Load the runner program to be deployed on worker nodes into
792 // wp.runnerData, if necessary. Errors are logged.
794 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
796 // Caller must not have lock.
797 func (wp *Pool) loadRunnerData() error {
799 defer wp.mtx.Unlock()
800 if wp.runnerData != nil {
802 } else if wp.runnerSource == "" {
803 wp.runnerCmd = "crunch-run"
804 wp.runnerData = []byte{}
807 logger := wp.logger.WithField("source", wp.runnerSource)
808 logger.Debug("loading runner")
809 buf, err := ioutil.ReadFile(wp.runnerSource)
811 logger.WithError(err).Error("failed to load runner program")
815 wp.runnerMD5 = md5.Sum(buf)
816 wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
820 func (wp *Pool) notify() {
822 defer wp.mtx.RUnlock()
823 for _, send := range wp.subscribers {
825 case send <- struct{}{}:
831 func (wp *Pool) getInstancesAndSync() error {
832 wp.setupOnce.Do(wp.setup)
833 if err := wp.instanceSet.throttleInstances.Error(); err != nil {
836 wp.logger.Debug("getting instance list")
837 threshold := time.Now()
838 instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
840 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
843 wp.sync(threshold, instances)
844 wp.logger.Debug("sync done")
848 // Add/remove/update workers based on instances, which was obtained
849 // from the instanceSet. However, don't clobber any other updates that
850 // already happened after threshold.
851 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
853 defer wp.mtx.Unlock()
854 wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
857 for _, inst := range instances {
858 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
859 it, ok := wp.instanceTypes[itTag]
861 wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
864 if wkr, isNew := wp.updateWorker(inst, it); isNew {
866 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
867 wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
872 for id, wkr := range wp.workers {
873 if wkr.updated.After(threshold) {
876 logger := wp.logger.WithFields(logrus.Fields{
877 "Instance": wkr.instance.ID(),
878 "WorkerState": wkr.state,
880 logger.Info("instance disappeared in cloud")
881 wkr.reportBootOutcome(BootOutcomeDisappeared)
882 if wp.mDisappearances != nil {
883 wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
885 delete(wp.workers, id)
893 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
901 func (wp *Pool) waitUntilLoaded() {
904 defer wp.mtx.RUnlock()
912 // Return a random string of n hexadecimal digits (n*4 random bits). n
914 func randomHex(n int) string {
915 buf := make([]byte, n/2)
916 _, err := rand.Read(buf)
920 return fmt.Sprintf("%x", buf)