package worker
import (
+ "crypto/rand"
"errors"
+ "fmt"
"io"
"sort"
"strings"
"sync"
"time"
- "git.curoverse.com/arvados.git/lib/cloud"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/lib/cloud"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
+ "golang.org/x/crypto/ssh"
)
const (
- tagKeyInstanceType = "InstanceType"
- tagKeyIdleBehavior = "IdleBehavior"
+ tagKeyInstanceType = "InstanceType"
+ tagKeyIdleBehavior = "IdleBehavior"
+ tagKeyInstanceSecret = "InstanceSecret"
+ tagKeyInstanceSetID = "InstanceSetID"
)
// An InstanceView shows a worker's current state and recent activity.
type InstanceView struct {
Instance cloud.InstanceID `json:"instance"`
+ Address string `json:"address"`
Price float64 `json:"price"`
ArvadosInstanceType string `json:"arvados_instance_type"`
ProviderInstanceType string `json:"provider_instance_type"`
defaultTimeoutBooting = time.Minute * 10
defaultTimeoutProbe = time.Minute * 10
defaultTimeoutShutdown = time.Second * 10
+ defaultTimeoutTERM = time.Minute * 2
+ defaultTimeoutSignal = time.Second * 5
// Time after a quota error to try again anyway, even if no
// instances have been shutdown.
//
// New instances are configured and set up according to the given
// cluster configuration.
-func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
+func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
wp := &Pool{
logger: logger,
arvClient: arvClient,
+ instanceSetID: instanceSetID,
instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
newExecutor: newExecutor,
- bootProbeCommand: cluster.CloudVMs.BootProbeCommand,
- imageID: cloud.ImageID(cluster.CloudVMs.ImageID),
+ bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
+ imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
instanceTypes: cluster.InstanceTypes,
- maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
- probeInterval: duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
- syncInterval: duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
- timeoutIdle: duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
- timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
- timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
- timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+ maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
+ probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
+ syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
+ timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
+ timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
+ timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
+ timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+ timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
+ timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
+ installPublicKey: installPublicKey,
+ tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
stop: make(chan bool),
}
wp.registerMetrics(reg)
// configuration
logger logrus.FieldLogger
arvClient *arvados.Client
+ instanceSetID cloud.InstanceSetID
instanceSet *throttledInstanceSet
newExecutor func(cloud.Instance) Executor
bootProbeCommand string
timeoutBooting time.Duration
timeoutProbe time.Duration
timeoutShutdown time.Duration
+ timeoutTERM time.Duration
+ timeoutSignal time.Duration
+ installPublicKey ssh.PublicKey
+ tagKeyPrefix string
// private state
subscribers map[<-chan struct{}]chan<- struct{}
- creating map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls
+ creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
workers map[cloud.InstanceID]*worker
loaded bool // loaded list of instances from InstanceSet at least once
- exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
+ exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
atQuotaUntil time.Time
atQuotaErr cloud.QuotaError
stop chan bool
throttleCreate throttle
throttleInstances throttle
- mInstances prometheus.Gauge
- mInstancesPrice prometheus.Gauge
mContainersRunning prometheus.Gauge
- mVCPUs prometheus.Gauge
- mVCPUsInuse prometheus.Gauge
- mMemory prometheus.Gauge
- mMemoryInuse prometheus.Gauge
+ mInstances *prometheus.GaugeVec
+ mInstancesPrice *prometheus.GaugeVec
+ mVCPUs *prometheus.GaugeVec
+ mMemory *prometheus.GaugeVec
+ mDisappearances *prometheus.CounterVec
+}
+
+type createCall struct {
+ time time.Time
+ instanceType arvados.InstanceType
}
// Subscribe returns a buffered channel that becomes ready after any
defer wp.mtx.RUnlock()
unalloc := map[arvados.InstanceType]int{}
creating := map[arvados.InstanceType]int{}
- for it, times := range wp.creating {
- creating[it] = len(times)
+ oldestCreate := map[arvados.InstanceType]time.Time{}
+ for _, cc := range wp.creating {
+ it := cc.instanceType
+ creating[it]++
+ if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
+ oldestCreate[it] = cc.time
+ }
}
for _, wkr := range wp.workers {
- if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) || wkr.idleBehavior != IdleBehaviorRun {
+ // Skip workers that are not expected to become
+ // available soon. Note len(wkr.running)>0 is not
+ // redundant here: it can be true even in
+ // StateUnknown.
+ if wkr.state == StateShutdown ||
+ wkr.state == StateRunning ||
+ wkr.idleBehavior != IdleBehaviorRun ||
+ len(wkr.running) > 0 {
continue
}
it := wkr.instType
unalloc[it]++
- if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) {
+ if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
// If up to N new workers appear in
// Instances() while we are waiting for N
// Create() calls to complete, we assume we're
if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
return false
}
- tags := cloud.InstanceTags{
- tagKeyInstanceType: it.Name,
- tagKeyIdleBehavior: string(IdleBehaviorRun),
- }
now := time.Now()
- wp.creating[it] = append(wp.creating[it], now)
+ secret := randomHex(instanceSecretLength)
+ wp.creating[secret] = createCall{time: now, instanceType: it}
go func() {
defer wp.notify()
- inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
+ tags := cloud.InstanceTags{
+ wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID),
+ wp.tagKeyPrefix + tagKeyInstanceType: it.Name,
+ wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
+ wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
+ }
+ initCmd := TagVerifier{nil, secret}.InitCommand()
+ inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
wp.mtx.Lock()
defer wp.mtx.Unlock()
- // Remove our timestamp marker from wp.creating
- for i, t := range wp.creating[it] {
- if t == now {
- copy(wp.creating[it][i:], wp.creating[it][i+1:])
- wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1]
- break
- }
- }
+ // delete() is deferred so the updateWorker() call
+ // below knows to use StateBooting when adding a new
+ // worker.
+ defer delete(wp.creating, secret)
if err != nil {
if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
wp.atQuotaErr = err
wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
return
}
- wp.updateWorker(inst, it, StateBooting)
+ wp.updateWorker(inst, it)
}()
return true
}
if !ok {
return errors.New("requested instance does not exist")
}
- wkr.idleBehavior = idleBehavior
- wkr.saveTags()
- wkr.shutdownIfIdle()
+ wkr.setIdleBehavior(idleBehavior)
return nil
}
-// Add or update worker attached to the given instance. Use
-// initialState if a new worker is created.
+// Add or update worker attached to the given instance.
//
// The second return value is true if a new worker is created.
//
+// A newly added instance has state=StateBooting if its tags match an
+// entry in wp.creating, otherwise StateUnknown.
+//
// Caller must have lock.
-func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
+func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
+ secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
+ inst = TagVerifier{inst, secret}
id := inst.ID()
if wkr := wp.workers[id]; wkr != nil {
wkr.executor.SetTarget(inst)
wkr.instance = inst
wkr.updated = time.Now()
- if initialState == StateBooting && wkr.state == StateUnknown {
- wkr.state = StateBooting
- }
wkr.saveTags()
return wkr, false
}
+ state := StateUnknown
+ if _, ok := wp.creating[secret]; ok {
+ state = StateBooting
+ }
+
// If an instance has a valid IdleBehavior tag when it first
// appears, initialize the new worker accordingly (this is how
// we restore IdleBehavior that was set by a prior dispatch
// process); otherwise, default to "run". After this,
// wkr.idleBehavior is the source of truth, and will only be
// changed via SetIdleBehavior().
- idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior])
+ idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
if !validIdleBehavior[idleBehavior] {
idleBehavior = IdleBehaviorRun
}
logger := wp.logger.WithFields(logrus.Fields{
"InstanceType": it.Name,
- "Instance": inst,
+ "Instance": inst.ID(),
+ "Address": inst.Address(),
})
logger.WithFields(logrus.Fields{
- "State": initialState,
+ "State": state,
"IdleBehavior": idleBehavior,
}).Infof("instance appeared in cloud")
now := time.Now()
wp: wp,
logger: logger,
executor: wp.newExecutor(inst),
- state: initialState,
+ state: state,
idleBehavior: idleBehavior,
instance: inst,
instType: it,
probed: now,
busy: now,
updated: now,
- running: make(map[string]struct{}),
- starting: make(map[string]struct{}),
+ running: make(map[string]*remoteRunner),
+ starting: make(map[string]*remoteRunner),
probing: make(chan struct{}, 1),
}
wp.workers[id] = wkr
return wkr, true
}
-// caller must have lock.
-func (wp *Pool) notifyExited(uuid string, t time.Time) {
- wp.exited[uuid] = t
-}
-
// Shutdown shuts down a worker with the given type, or returns false
// if all workers with the given type are busy.
func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
}
// CountWorkers returns the current number of workers in each state.
+//
+// CountWorkers blocks, if necessary, until the initial instance list
+// has been loaded from the cloud provider.
func (wp *Pool) CountWorkers() map[State]int {
wp.setupOnce.Do(wp.setup)
+ wp.waitUntilLoaded()
wp.mtx.Lock()
defer wp.mtx.Unlock()
r := map[State]int{}
// In the returned map, the time value indicates when the Pool
// observed that the container process had exited. A container that
// has not yet exited has a zero time value. The caller should use
-// KillContainer() to garbage-collect the entries for exited
+// ForgetContainer() to garbage-collect the entries for exited
// containers.
func (wp *Pool) Running() map[string]time.Time {
wp.setupOnce.Do(wp.setup)
//
// KillContainer returns immediately; the act of killing the container
// takes some time, and runs in the background.
-func (wp *Pool) KillContainer(uuid string) {
+//
+// KillContainer returns false if the container has already ended.
+func (wp *Pool) KillContainer(uuid string, reason string) bool {
wp.mtx.Lock()
defer wp.mtx.Unlock()
- if _, ok := wp.exited[uuid]; ok {
- wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
- delete(wp.exited, uuid)
- return
- }
+ logger := wp.logger.WithFields(logrus.Fields{
+ "ContainerUUID": uuid,
+ "Reason": reason,
+ })
for _, wkr := range wp.workers {
- if _, ok := wkr.running[uuid]; ok {
- go wp.kill(wkr, uuid)
- return
+ rr := wkr.running[uuid]
+ if rr == nil {
+ rr = wkr.starting[uuid]
+ }
+ if rr != nil {
+ rr.Kill(reason)
+ return true
}
}
- wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
+ logger.Debug("cannot kill: already disappeared")
+ return false
}
-func (wp *Pool) kill(wkr *worker, uuid string) {
- logger := wp.logger.WithFields(logrus.Fields{
- "ContainerUUID": uuid,
- "Instance": wkr.instance,
- })
- logger.Debug("killing process")
- stdout, stderr, err := wkr.executor.Execute(nil, "crunch-run --kill 15 "+uuid, nil)
- if err != nil {
- logger.WithFields(logrus.Fields{
- "stderr": string(stderr),
- "stdout": string(stdout),
- "error": err,
- }).Warn("kill failed")
- return
- }
- logger.Debug("killing process succeeded")
+// ForgetContainer clears the placeholder for the given exited
+// container, so it isn't returned by subsequent calls to Running().
+//
+// ForgetContainer has no effect if the container has not yet exited.
+//
+// The "container exited at time T" placeholder (which necessitates
+// ForgetContainer) exists to make it easier for the caller
+// (scheduler) to distinguish a container that exited without
+// finalizing its state from a container that exited too recently for
+// its final state to have appeared in the scheduler's queue cache.
+func (wp *Pool) ForgetContainer(uuid string) {
wp.mtx.Lock()
defer wp.mtx.Unlock()
- if _, ok := wkr.running[uuid]; ok {
- delete(wkr.running, uuid)
- if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
- wkr.state = StateIdle
- }
- wkr.updated = time.Now()
- go wp.notify()
+ if _, ok := wp.exited[uuid]; ok {
+ wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
+ delete(wp.exited, uuid)
}
}
if reg == nil {
reg = prometheus.NewRegistry()
}
- wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "dispatchcloud",
- Name: "instances_total",
- Help: "Number of cloud VMs including pending, booting, running, held, and shutting down.",
- })
- reg.MustRegister(wp.mInstances)
- wp.mInstancesPrice = prometheus.NewGauge(prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "dispatchcloud",
- Name: "instances_price_total",
- Help: "Sum of prices of all cloud VMs including pending, booting, running, held, and shutting down.",
- })
- reg.MustRegister(wp.mInstancesPrice)
wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "dispatchcloud",
Help: "Number of containers reported running by cloud VMs.",
})
reg.MustRegister(wp.mContainersRunning)
-
- wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
+ wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "instances_total",
+ Help: "Number of cloud VMs.",
+ }, []string{"category"})
+ reg.MustRegister(wp.mInstances)
+ wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "instances_price",
+ Help: "Price of cloud VMs.",
+ }, []string{"category"})
+ reg.MustRegister(wp.mInstancesPrice)
+ wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "dispatchcloud",
Name: "vcpus_total",
Help: "Total VCPUs on all cloud VMs.",
- })
+ }, []string{"category"})
reg.MustRegister(wp.mVCPUs)
- wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "dispatchcloud",
- Name: "vcpus_inuse",
- Help: "VCPUs on cloud VMs that are running containers.",
- })
- reg.MustRegister(wp.mVCPUsInuse)
- wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
+ wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "dispatchcloud",
Name: "memory_bytes_total",
Help: "Total memory on all cloud VMs.",
- })
+ }, []string{"category"})
reg.MustRegister(wp.mMemory)
- wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
+ wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "arvados",
Subsystem: "dispatchcloud",
- Name: "memory_bytes_inuse",
- Help: "Memory on cloud VMs that are running containers.",
- })
- reg.MustRegister(wp.mMemoryInuse)
+ Name: "instances_disappeared",
+ Help: "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
+ }, []string{"state"})
+ for _, v := range stateString {
+ wp.mDisappearances.WithLabelValues(v).Add(0)
+ }
+ reg.MustRegister(wp.mDisappearances)
}
func (wp *Pool) runMetrics() {
ch := wp.Subscribe()
defer wp.Unsubscribe(ch)
+ wp.updateMetrics()
for range ch {
wp.updateMetrics()
}
wp.mtx.RLock()
defer wp.mtx.RUnlock()
- var price float64
- var alloc, cpu, cpuInuse, mem, memInuse int64
+ instances := map[string]int64{}
+ price := map[string]float64{}
+ cpu := map[string]int64{}
+ mem := map[string]int64{}
+ var running int64
for _, wkr := range wp.workers {
- price += wkr.instType.Price
- cpu += int64(wkr.instType.VCPUs)
- mem += int64(wkr.instType.RAM)
- if len(wkr.running)+len(wkr.starting) == 0 {
- continue
+ var cat string
+ switch {
+ case len(wkr.running)+len(wkr.starting) > 0:
+ cat = "inuse"
+ case wkr.idleBehavior == IdleBehaviorHold:
+ cat = "hold"
+ case wkr.state == StateBooting:
+ cat = "booting"
+ case wkr.state == StateUnknown:
+ cat = "unknown"
+ default:
+ cat = "idle"
}
- alloc += int64(len(wkr.running) + len(wkr.starting))
- cpuInuse += int64(wkr.instType.VCPUs)
- memInuse += int64(wkr.instType.RAM)
- }
- wp.mInstances.Set(float64(len(wp.workers)))
- wp.mInstancesPrice.Set(price)
- wp.mContainersRunning.Set(float64(alloc))
- wp.mVCPUs.Set(float64(cpu))
- wp.mMemory.Set(float64(mem))
- wp.mVCPUsInuse.Set(float64(cpuInuse))
- wp.mMemoryInuse.Set(float64(memInuse))
+ instances[cat]++
+ price[cat] += wkr.instType.Price
+ cpu[cat] += int64(wkr.instType.VCPUs)
+ mem[cat] += int64(wkr.instType.RAM)
+ running += int64(len(wkr.running) + len(wkr.starting))
+ }
+ for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
+ wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat]))
+ wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
+ wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
+ wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
+ }
+ wp.mContainersRunning.Set(float64(running))
}
func (wp *Pool) runProbes() {
for _, w := range wp.workers {
r = append(r, InstanceView{
Instance: w.instance.ID(),
+ Address: w.instance.Address(),
Price: w.instType.Price,
ArvadosInstanceType: w.instType.Name,
ProviderInstanceType: w.instType.ProviderType,
return r
}
+// KillInstance destroys a cloud VM instance. It returns an error if
+// the given instance does not exist.
+func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
+ wkr, ok := wp.workers[id]
+ if !ok {
+ return errors.New("instance not found")
+ }
+ wkr.logger.WithField("Reason", reason).Info("shutting down")
+ wkr.shutdown()
+ return nil
+}
+
func (wp *Pool) setup() {
- wp.creating = map[arvados.InstanceType][]time.Time{}
+ wp.creating = map[string]createCall{}
wp.exited = map[string]time.Time{}
wp.workers = map[cloud.InstanceID]*worker{}
wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
}
wp.logger.Debug("getting instance list")
threshold := time.Now()
- instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
+ instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
if err != nil {
wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
return err
notify := false
for _, inst := range instances {
- itTag := inst.Tags()[tagKeyInstanceType]
+ itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
it, ok := wp.instanceTypes[itTag]
if !ok {
wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
continue
}
- if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew {
+ if wkr, isNew := wp.updateWorker(inst, it); isNew {
notify = true
} else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
continue
}
logger := wp.logger.WithFields(logrus.Fields{
- "Instance": wkr.instance,
+ "Instance": wkr.instance.ID(),
"WorkerState": wkr.state,
})
logger.Info("instance disappeared in cloud")
+ if wp.mDisappearances != nil {
+ wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
+ }
delete(wp.workers, id)
- go wkr.executor.Close()
+ go wkr.Close()
notify = true
}
if !wp.loaded {
+ notify = true
wp.loaded = true
wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
}
go wp.notify()
}
}
+
+func (wp *Pool) waitUntilLoaded() {
+ ch := wp.Subscribe()
+ wp.mtx.RLock()
+ defer wp.mtx.RUnlock()
+ for !wp.loaded {
+ wp.mtx.RUnlock()
+ <-ch
+ wp.mtx.RLock()
+ }
+}
+
+// Return a random string of n hexadecimal digits (n*4 random bits). n
+// must be even.
+func randomHex(n int) string {
+ buf := make([]byte, n/2)
+ _, err := rand.Read(buf)
+ if err != nil {
+ panic(err)
+ }
+ return fmt.Sprintf("%x", buf)
+}