defaultTimeoutBooting = time.Minute * 10
defaultTimeoutProbe = time.Minute * 10
defaultTimeoutShutdown = time.Second * 10
+ defaultTimeoutTERM = time.Minute * 2
+ defaultTimeoutSignal = time.Second * 5
// Time after a quota error to try again anyway, even if no
// instances have been shutdown.
timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+ timeoutTERM: duration(cluster.Dispatch.TimeoutTERM, defaultTimeoutTERM),
+ timeoutSignal: duration(cluster.Dispatch.TimeoutSignal, defaultTimeoutSignal),
installPublicKey: installPublicKey,
stop: make(chan bool),
}
timeoutBooting time.Duration
timeoutProbe time.Duration
timeoutShutdown time.Duration
+ timeoutTERM time.Duration
+ timeoutSignal time.Duration
installPublicKey ssh.PublicKey
// private state
if !ok {
return errors.New("requested instance does not exist")
}
- wkr.idleBehavior = idleBehavior
- wkr.saveTags()
- wkr.shutdownIfIdle()
+ wkr.setIdleBehavior(idleBehavior)
return nil
}
probed: now,
busy: now,
updated: now,
- running: make(map[string]struct{}),
- starting: make(map[string]struct{}),
+ running: make(map[string]*remoteRunner),
+ starting: make(map[string]*remoteRunner),
probing: make(chan struct{}, 1),
}
wp.workers[id] = wkr
return wkr, true
}
-// caller must have lock.
-func (wp *Pool) notifyExited(uuid string, t time.Time) {
- wp.exited[uuid] = t
-}
-
// Shutdown shuts down a worker with the given type, or returns false
// if all workers with the given type are busy.
func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
}
// CountWorkers returns the current number of workers in each state.
+//
+// CountWorkers blocks, if necessary, until the initial instance list
+// has been loaded from the cloud provider.
func (wp *Pool) CountWorkers() map[State]int {
wp.setupOnce.Do(wp.setup)
+ wp.waitUntilLoaded()
wp.mtx.Lock()
defer wp.mtx.Unlock()
r := map[State]int{}
//
// KillContainer returns immediately; the act of killing the container
// takes some time, and runs in the background.
-func (wp *Pool) KillContainer(uuid string) {
+func (wp *Pool) KillContainer(uuid string, reason string) {
wp.mtx.Lock()
defer wp.mtx.Unlock()
+ logger := wp.logger.WithFields(logrus.Fields{
+ "ContainerUUID": uuid,
+ "Reason": reason,
+ })
if _, ok := wp.exited[uuid]; ok {
- wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
+ logger.Debug("clearing placeholder for exited crunch-run process")
delete(wp.exited, uuid)
return
}
for _, wkr := range wp.workers {
- if _, ok := wkr.running[uuid]; ok {
- go wp.kill(wkr, uuid)
- return
+ rr := wkr.running[uuid]
+ if rr == nil {
+ rr = wkr.starting[uuid]
}
- }
- wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
-}
-
-func (wp *Pool) kill(wkr *worker, uuid string) {
- logger := wp.logger.WithFields(logrus.Fields{
- "ContainerUUID": uuid,
- "Instance": wkr.instance.ID(),
- })
- logger.Debug("killing process")
- cmd := "crunch-run --kill 15 " + uuid
- if u := wkr.instance.RemoteUser(); u != "root" {
- cmd = "sudo " + cmd
- }
- stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
- if err != nil {
- logger.WithFields(logrus.Fields{
- "stderr": string(stderr),
- "stdout": string(stdout),
- "error": err,
- }).Warn("kill failed")
- return
- }
- logger.Debug("killing process succeeded")
- wp.mtx.Lock()
- defer wp.mtx.Unlock()
- if _, ok := wkr.running[uuid]; ok {
- delete(wkr.running, uuid)
- if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
- wkr.state = StateIdle
+ if rr != nil {
+ rr.Kill(reason)
+ return
}
- wkr.updated = time.Now()
- go wp.notify()
}
+ logger.Debug("cannot kill: already disappeared")
}
func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
return r
}
+// KillInstance destroys a cloud VM instance. It returns an error if
+// the given instance does not exist.
+func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
+ wkr, ok := wp.workers[id]
+ if !ok {
+ return errors.New("instance not found")
+ }
+ wkr.logger.WithField("Reason", reason).Info("shutting down")
+ wkr.shutdown()
+ return nil
+}
+
func (wp *Pool) setup() {
wp.creating = map[string]createCall{}
wp.exited = map[string]time.Time{}
})
logger.Info("instance disappeared in cloud")
delete(wp.workers, id)
- go wkr.executor.Close()
+ go wkr.Close()
notify = true
}
if !wp.loaded {
+ notify = true
wp.loaded = true
wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
}
}
}
+func (wp *Pool) waitUntilLoaded() {
+ ch := wp.Subscribe()
+ wp.mtx.RLock()
+ defer wp.mtx.RUnlock()
+ for !wp.loaded {
+ wp.mtx.RUnlock()
+ <-ch
+ wp.mtx.RLock()
+ }
+}
+
// Return a random string of n hexadecimal digits (n*4 random bits). n
// must be even.
func randomHex(n int) string {