// private state
subscribers map[<-chan struct{}]chan<- struct{}
- creating map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls
+ creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
workers map[cloud.InstanceID]*worker
loaded bool // loaded list of instances from InstanceSet at least once
exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
mMemory *prometheus.GaugeVec
}
+type createCall struct {
+ time time.Time
+ instanceType arvados.InstanceType
+}
+
// Subscribe returns a buffered channel that becomes ready after any
// change to the pool's state that could have scheduling implications:
// a worker's state changes, a new worker appears, the cloud
defer wp.mtx.RUnlock()
unalloc := map[arvados.InstanceType]int{}
creating := map[arvados.InstanceType]int{}
- for it, times := range wp.creating {
- creating[it] = len(times)
+ oldestCreate := map[arvados.InstanceType]time.Time{}
+ for _, cc := range wp.creating {
+ it := cc.instanceType
+ creating[it]++
+ if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
+ oldestCreate[it] = cc.time
+ }
}
for _, wkr := range wp.workers {
// Skip workers that are not expected to become
}
it := wkr.instType
unalloc[it]++
- if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) {
+ if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
// If up to N new workers appear in
// Instances() while we are waiting for N
// Create() calls to complete, we assume we're
return false
}
now := time.Now()
- wp.creating[it] = append(wp.creating[it], now)
+ secret := randomHex(instanceSecretLength)
+ wp.creating[secret] = createCall{time: now, instanceType: it}
go func() {
defer wp.notify()
- secret := randomHex(instanceSecretLength)
tags := cloud.InstanceTags{
tagKeyInstanceType: it.Name,
tagKeyIdleBehavior: string(IdleBehaviorRun),
inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
wp.mtx.Lock()
defer wp.mtx.Unlock()
- // Remove our timestamp marker from wp.creating
- for i, t := range wp.creating[it] {
- if t == now {
- copy(wp.creating[it][i:], wp.creating[it][i+1:])
- wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1]
- break
- }
- }
+ // delete() is deferred so the updateWorker() call
+ // below knows to use StateBooting when adding a new
+ // worker.
+ defer delete(wp.creating, secret)
if err != nil {
if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
wp.atQuotaErr = err
wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
return
}
- wp.updateWorker(inst, it, StateBooting)
+ wp.updateWorker(inst, it)
}()
return true
}
return nil
}
-// Add or update worker attached to the given instance. Use
-// initialState if a new worker is created.
+// Add or update worker attached to the given instance.
//
// The second return value is true if a new worker is created.
//
+// A newly added instance has state=StateBooting if its tags match an
+// entry in wp.creating, otherwise StateUnknown.
+//
// Caller must have lock.
-func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
+func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
inst = tagVerifier{inst}
id := inst.ID()
if wkr := wp.workers[id]; wkr != nil {
wkr.executor.SetTarget(inst)
wkr.instance = inst
wkr.updated = time.Now()
- if initialState == StateBooting && wkr.state == StateUnknown {
- wkr.state = StateBooting
- }
wkr.saveTags()
return wkr, false
}
+ state := StateUnknown
+ if _, ok := wp.creating[inst.Tags()[tagKeyInstanceSecret]]; ok {
+ state = StateBooting
+ }
+
// If an instance has a valid IdleBehavior tag when it first
// appears, initialize the new worker accordingly (this is how
// we restore IdleBehavior that was set by a prior dispatch
"Address": inst.Address(),
})
logger.WithFields(logrus.Fields{
- "State": initialState,
+ "State": state,
"IdleBehavior": idleBehavior,
}).Infof("instance appeared in cloud")
now := time.Now()
wp: wp,
logger: logger,
executor: wp.newExecutor(inst),
- state: initialState,
+ state: state,
idleBehavior: idleBehavior,
instance: inst,
instType: it,
}
// CountWorkers returns the current number of workers in each state.
+//
+// CountWorkers blocks, if necessary, until the initial instance list
+// has been loaded from the cloud provider.
func (wp *Pool) CountWorkers() map[State]int {
wp.setupOnce.Do(wp.setup)
+ wp.waitUntilLoaded()
wp.mtx.Lock()
defer wp.mtx.Unlock()
r := map[State]int{}
func (wp *Pool) runMetrics() {
ch := wp.Subscribe()
defer wp.Unsubscribe(ch)
+ wp.updateMetrics()
for range ch {
wp.updateMetrics()
}
}
func (wp *Pool) setup() {
- wp.creating = map[arvados.InstanceType][]time.Time{}
+ wp.creating = map[string]createCall{}
wp.exited = map[string]time.Time{}
wp.workers = map[cloud.InstanceID]*worker{}
wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
continue
}
- if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew {
+ if wkr, isNew := wp.updateWorker(inst, it); isNew {
notify = true
} else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
}
if !wp.loaded {
+ notify = true
wp.loaded = true
wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
}
}
}
+func (wp *Pool) waitUntilLoaded() {
+ ch := wp.Subscribe()
+ wp.mtx.RLock()
+ defer wp.mtx.RUnlock()
+ for !wp.loaded {
+ wp.mtx.RUnlock()
+ <-ch
+ wp.mtx.RLock()
+ }
+}
+
// Return a random string of n hexadecimal digits (n*4 random bits). n
// must be even.
func randomHex(n int) string {