tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
wp.creating[it]++
go func() {
+ defer wp.notify()
inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
wp.mtx.Lock()
defer wp.mtx.Unlock()
}
if err != nil {
logger.WithError(err).Error("create failed")
- go wp.notify()
return
}
wp.updateWorker(inst, it, StateBooting)
// Add or update worker attached to the given instance. Use
// initialState if a new worker is created. Caller must have lock.
-func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) {
+//
+// Returns true when a new worker is created.
+func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) bool {
id := inst.ID()
if wp.workers[id] != nil {
wp.workers[id].executor.SetTarget(inst)
if initialState == StateBooting && wp.workers[id].state == StateUnknown {
wp.workers[id].state = StateBooting
}
- return
+ return false
}
if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
initialState = StateHold
starting: make(map[string]struct{}),
probing: make(chan struct{}, 1),
}
- go wp.notify()
+ return true
}
// Shutdown shuts down a worker with the given type, or returns false
wp.mtx.Lock()
defer wp.mtx.Unlock()
wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
+ notify := false
for _, inst := range instances {
itTag := inst.Tags()[tagKeyInstanceType]
wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
continue
}
- wp.updateWorker(inst, it, StateUnknown)
+ if wp.updateWorker(inst, it, StateUnknown) {
+ notify = true
+ }
}
for id, wkr := range wp.workers {
logger.Info("instance disappeared in cloud")
delete(wp.workers, id)
go wkr.executor.Close()
- go wp.notify()
+ notify = true
}
if !wp.loaded {
wp.loaded = true
wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
}
+
+ if notify {
+ go wp.notify()
+ }
}
// should be called in a new goroutine