"sync"
"time"
- "git.curoverse.com/arvados.git/lib/cloud"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/lib/cloud"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
tagKeyInstanceType = "InstanceType"
tagKeyIdleBehavior = "IdleBehavior"
tagKeyInstanceSecret = "InstanceSecret"
+ tagKeyInstanceSetID = "InstanceSetID"
)
// An InstanceView shows a worker's current state and recent activity.
defaultTimeoutBooting = time.Minute * 10
defaultTimeoutProbe = time.Minute * 10
defaultTimeoutShutdown = time.Second * 10
+ defaultTimeoutTERM = time.Minute * 2
+ defaultTimeoutSignal = time.Second * 5
// Time after a quota error to try again anyway, even if no
// instances have been shutdown.
//
// New instances are configured and set up according to the given
// cluster configuration.
-func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
+func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
wp := &Pool{
logger: logger,
arvClient: arvClient,
+ instanceSetID: instanceSetID,
instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
newExecutor: newExecutor,
- bootProbeCommand: cluster.CloudVMs.BootProbeCommand,
- imageID: cloud.ImageID(cluster.CloudVMs.ImageID),
+ bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
+ imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
instanceTypes: cluster.InstanceTypes,
- maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
- probeInterval: duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
- syncInterval: duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
- timeoutIdle: duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
- timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
- timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
- timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+ maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
+ probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
+ syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
+ timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
+ timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
+ timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
+ timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+ timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
+ timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
installPublicKey: installPublicKey,
+ tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
stop: make(chan bool),
}
wp.registerMetrics(reg)
// configuration
logger logrus.FieldLogger
arvClient *arvados.Client
+ instanceSetID cloud.InstanceSetID
instanceSet *throttledInstanceSet
newExecutor func(cloud.Instance) Executor
bootProbeCommand string
timeoutBooting time.Duration
timeoutProbe time.Duration
timeoutShutdown time.Duration
+ timeoutTERM time.Duration
+ timeoutSignal time.Duration
installPublicKey ssh.PublicKey
+ tagKeyPrefix string
// private state
subscribers map[<-chan struct{}]chan<- struct{}
creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
workers map[cloud.InstanceID]*worker
loaded bool // loaded list of instances from InstanceSet at least once
- exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
+ exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
atQuotaUntil time.Time
atQuotaErr cloud.QuotaError
stop chan bool
mInstancesPrice *prometheus.GaugeVec
mVCPUs *prometheus.GaugeVec
mMemory *prometheus.GaugeVec
+ mDisappearances *prometheus.CounterVec
}
type createCall struct {
go func() {
defer wp.notify()
tags := cloud.InstanceTags{
- tagKeyInstanceType: it.Name,
- tagKeyIdleBehavior: string(IdleBehaviorRun),
- tagKeyInstanceSecret: secret,
+ wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID),
+ wp.tagKeyPrefix + tagKeyInstanceType: it.Name,
+ wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
+ wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
}
- initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename))
+ initCmd := TagVerifier{nil, secret}.InitCommand()
inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
wp.mtx.Lock()
defer wp.mtx.Unlock()
if !ok {
return errors.New("requested instance does not exist")
}
- wkr.idleBehavior = idleBehavior
- wkr.saveTags()
- wkr.shutdownIfIdle()
+ wkr.setIdleBehavior(idleBehavior)
return nil
}
//
// Caller must have lock.
func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
- inst = tagVerifier{inst}
+ secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
+ inst = TagVerifier{inst, secret}
id := inst.ID()
if wkr := wp.workers[id]; wkr != nil {
wkr.executor.SetTarget(inst)
}
state := StateUnknown
- if _, ok := wp.creating[inst.Tags()[tagKeyInstanceSecret]]; ok {
+ if _, ok := wp.creating[secret]; ok {
state = StateBooting
}
// process); otherwise, default to "run". After this,
// wkr.idleBehavior is the source of truth, and will only be
// changed via SetIdleBehavior().
- idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior])
+ idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
if !validIdleBehavior[idleBehavior] {
idleBehavior = IdleBehaviorRun
}
probed: now,
busy: now,
updated: now,
- running: make(map[string]struct{}),
- starting: make(map[string]struct{}),
+ running: make(map[string]*remoteRunner),
+ starting: make(map[string]*remoteRunner),
probing: make(chan struct{}, 1),
}
wp.workers[id] = wkr
return wkr, true
}
-// caller must have lock.
-func (wp *Pool) notifyExited(uuid string, t time.Time) {
- wp.exited[uuid] = t
-}
-
// Shutdown shuts down a worker with the given type, or returns false
// if all workers with the given type are busy.
func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
// In the returned map, the time value indicates when the Pool
// observed that the container process had exited. A container that
// has not yet exited has a zero time value. The caller should use
-// KillContainer() to garbage-collect the entries for exited
+// ForgetContainer() to garbage-collect the entries for exited
// containers.
func (wp *Pool) Running() map[string]time.Time {
wp.setupOnce.Do(wp.setup)
//
// KillContainer returns immediately; the act of killing the container
// takes some time, and runs in the background.
-func (wp *Pool) KillContainer(uuid string) {
+//
+// KillContainer returns false if the container has already ended.
+func (wp *Pool) KillContainer(uuid string, reason string) bool {
wp.mtx.Lock()
defer wp.mtx.Unlock()
- if _, ok := wp.exited[uuid]; ok {
- wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
- delete(wp.exited, uuid)
- return
- }
+ logger := wp.logger.WithFields(logrus.Fields{
+ "ContainerUUID": uuid,
+ "Reason": reason,
+ })
for _, wkr := range wp.workers {
- if _, ok := wkr.running[uuid]; ok {
- go wp.kill(wkr, uuid)
- return
+ rr := wkr.running[uuid]
+ if rr == nil {
+ rr = wkr.starting[uuid]
+ }
+ if rr != nil {
+ rr.Kill(reason)
+ return true
}
}
- wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
+ logger.Debug("cannot kill: already disappeared")
+ return false
}
-func (wp *Pool) kill(wkr *worker, uuid string) {
- logger := wp.logger.WithFields(logrus.Fields{
- "ContainerUUID": uuid,
- "Instance": wkr.instance.ID(),
- })
- logger.Debug("killing process")
- cmd := "crunch-run --kill 15 " + uuid
- if u := wkr.instance.RemoteUser(); u != "root" {
- cmd = "sudo " + cmd
- }
- stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
- if err != nil {
- logger.WithFields(logrus.Fields{
- "stderr": string(stderr),
- "stdout": string(stdout),
- "error": err,
- }).Warn("kill failed")
- return
- }
- logger.Debug("killing process succeeded")
+// ForgetContainer clears the placeholder for the given exited
+// container, so it isn't returned by subsequent calls to Running().
+//
+// ForgetContainer has no effect if the container has not yet exited.
+//
+// The "container exited at time T" placeholder (which necessitates
+// ForgetContainer) exists to make it easier for the caller
+// (scheduler) to distinguish a container that exited without
+// finalizing its state from a container that exited too recently for
+// its final state to have appeared in the scheduler's queue cache.
+func (wp *Pool) ForgetContainer(uuid string) {
wp.mtx.Lock()
defer wp.mtx.Unlock()
- if _, ok := wkr.running[uuid]; ok {
- delete(wkr.running, uuid)
- if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
- wkr.state = StateIdle
- }
- wkr.updated = time.Now()
- go wp.notify()
+ if _, ok := wp.exited[uuid]; ok {
+ wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
+ delete(wp.exited, uuid)
}
}
Help: "Total memory on all cloud VMs.",
}, []string{"category"})
reg.MustRegister(wp.mMemory)
+ wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "instances_disappeared",
+ Help: "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
+ }, []string{"state"})
+ for _, v := range stateString {
+ wp.mDisappearances.WithLabelValues(v).Add(0)
+ }
+ reg.MustRegister(wp.mDisappearances)
}
func (wp *Pool) runMetrics() {
return r
}
+// KillInstance destroys a cloud VM instance. It returns an error if
+// the given instance does not exist.
+func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
+ wkr, ok := wp.workers[id]
+ if !ok {
+ return errors.New("instance not found")
+ }
+ wkr.logger.WithField("Reason", reason).Info("shutting down")
+ wkr.shutdown()
+ return nil
+}
+
func (wp *Pool) setup() {
wp.creating = map[string]createCall{}
wp.exited = map[string]time.Time{}
}
wp.logger.Debug("getting instance list")
threshold := time.Now()
- instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
+ instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
if err != nil {
wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
return err
notify := false
for _, inst := range instances {
- itTag := inst.Tags()[tagKeyInstanceType]
+ itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
it, ok := wp.instanceTypes[itTag]
if !ok {
wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
"WorkerState": wkr.state,
})
logger.Info("instance disappeared in cloud")
+ if wp.mDisappearances != nil {
+ wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
+ }
delete(wp.workers, id)
- go wkr.executor.Close()
+ go wkr.Close()
notify = true
}