X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/90bb5de474c45e0b786f387ecab2277b593e335d..49717fb59156c2b276ccc2fde0b9f2de71e812a6:/lib/dispatchcloud/worker/pool.go diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go index e90935e2aa..97ca7f60a2 100644 --- a/lib/dispatchcloud/worker/pool.go +++ b/lib/dispatchcloud/worker/pool.go @@ -25,6 +25,7 @@ const ( tagKeyInstanceType = "InstanceType" tagKeyIdleBehavior = "IdleBehavior" tagKeyInstanceSecret = "InstanceSecret" + tagKeyInstanceSetID = "InstanceSetID" ) // An InstanceView shows a worker's current state and recent activity. @@ -68,6 +69,8 @@ const ( defaultTimeoutBooting = time.Minute * 10 defaultTimeoutProbe = time.Minute * 10 defaultTimeoutShutdown = time.Second * 10 + defaultTimeoutTERM = time.Minute * 2 + defaultTimeoutSignal = time.Second * 5 // Time after a quota error to try again anyway, even if no // instances have been shutdown. @@ -89,23 +92,27 @@ func duration(conf arvados.Duration, def time.Duration) time.Duration { // // New instances are configured and set up according to the given // cluster configuration. -func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool { +func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool { wp := &Pool{ logger: logger, arvClient: arvClient, + instanceSetID: instanceSetID, instanceSet: &throttledInstanceSet{InstanceSet: instanceSet}, newExecutor: newExecutor, - bootProbeCommand: cluster.CloudVMs.BootProbeCommand, - imageID: cloud.ImageID(cluster.CloudVMs.ImageID), + bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand, + imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID), instanceTypes: cluster.InstanceTypes, - maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond, - probeInterval: duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval), - syncInterval: duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval), - timeoutIdle: duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle), - timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting), - timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe), - timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown), + maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond, + probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval), + syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval), + timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle), + timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting), + timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe), + timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown), + timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM), + timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal), installPublicKey: installPublicKey, + tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix, stop: make(chan bool), } wp.registerMetrics(reg) @@ -124,6 +131,7 @@ type Pool struct { // configuration logger logrus.FieldLogger arvClient *arvados.Client + instanceSetID cloud.InstanceSetID instanceSet *throttledInstanceSet newExecutor func(cloud.Instance) Executor bootProbeCommand string @@ -136,14 +144,17 @@ type Pool struct { timeoutBooting time.Duration timeoutProbe time.Duration timeoutShutdown time.Duration + timeoutTERM time.Duration + timeoutSignal time.Duration installPublicKey ssh.PublicKey + tagKeyPrefix string // private state subscribers map[<-chan struct{}]chan<- struct{} creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret) workers map[cloud.InstanceID]*worker loaded bool // loaded list of instances from InstanceSet at least once - exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called + exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called atQuotaUntil time.Time atQuotaErr cloud.QuotaError stop chan bool @@ -158,6 +169,7 @@ type Pool struct { mInstancesPrice *prometheus.GaugeVec mVCPUs *prometheus.GaugeVec mMemory *prometheus.GaugeVec + mDisappearances *prometheus.CounterVec } type createCall struct { @@ -275,9 +287,10 @@ func (wp *Pool) Create(it arvados.InstanceType) bool { go func() { defer wp.notify() tags := cloud.InstanceTags{ - tagKeyInstanceType: it.Name, - tagKeyIdleBehavior: string(IdleBehaviorRun), - tagKeyInstanceSecret: secret, + wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID), + wp.tagKeyPrefix + tagKeyInstanceType: it.Name, + wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun), + wp.tagKeyPrefix + tagKeyInstanceSecret: secret, } initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename)) inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey) @@ -319,9 +332,7 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) if !ok { return errors.New("requested instance does not exist") } - wkr.idleBehavior = idleBehavior - wkr.saveTags() - wkr.shutdownIfIdle() + wkr.setIdleBehavior(idleBehavior) return nil } @@ -334,7 +345,8 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) // // Caller must have lock. func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) { - inst = tagVerifier{inst} + secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret] + inst = tagVerifier{inst, secret} id := inst.ID() if wkr := wp.workers[id]; wkr != nil { wkr.executor.SetTarget(inst) @@ -345,7 +357,7 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*wor } state := StateUnknown - if _, ok := wp.creating[inst.Tags()[tagKeyInstanceSecret]]; ok { + if _, ok := wp.creating[secret]; ok { state = StateBooting } @@ -355,7 +367,7 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*wor // process); otherwise, default to "run". After this, // wkr.idleBehavior is the source of truth, and will only be // changed via SetIdleBehavior(). - idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior]) + idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior]) if !validIdleBehavior[idleBehavior] { idleBehavior = IdleBehaviorRun } @@ -383,19 +395,14 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*wor probed: now, busy: now, updated: now, - running: make(map[string]struct{}), - starting: make(map[string]struct{}), + running: make(map[string]*remoteRunner), + starting: make(map[string]*remoteRunner), probing: make(chan struct{}, 1), } wp.workers[id] = wkr return wkr, true } -// caller must have lock. -func (wp *Pool) notifyExited(uuid string, t time.Time) { - wp.exited[uuid] = t -} - // Shutdown shuts down a worker with the given type, or returns false // if all workers with the given type are busy. func (wp *Pool) Shutdown(it arvados.InstanceType) bool { @@ -439,7 +446,7 @@ func (wp *Pool) CountWorkers() map[State]int { // In the returned map, the time value indicates when the Pool // observed that the container process had exited. A container that // has not yet exited has a zero time value. The caller should use -// KillContainer() to garbage-collect the entries for exited +// ForgetContainer() to garbage-collect the entries for exited // containers. func (wp *Pool) Running() map[string]time.Time { wp.setupOnce.Do(wp.setup) @@ -486,52 +493,45 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b // // KillContainer returns immediately; the act of killing the container // takes some time, and runs in the background. -func (wp *Pool) KillContainer(uuid string) { +// +// KillContainer returns false if the container has already ended. +func (wp *Pool) KillContainer(uuid string, reason string) bool { wp.mtx.Lock() defer wp.mtx.Unlock() - if _, ok := wp.exited[uuid]; ok { - wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process") - delete(wp.exited, uuid) - return - } + logger := wp.logger.WithFields(logrus.Fields{ + "ContainerUUID": uuid, + "Reason": reason, + }) for _, wkr := range wp.workers { - if _, ok := wkr.running[uuid]; ok { - go wp.kill(wkr, uuid) - return + rr := wkr.running[uuid] + if rr == nil { + rr = wkr.starting[uuid] + } + if rr != nil { + rr.Kill(reason) + return true } } - wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared") + logger.Debug("cannot kill: already disappeared") + return false } -func (wp *Pool) kill(wkr *worker, uuid string) { - logger := wp.logger.WithFields(logrus.Fields{ - "ContainerUUID": uuid, - "Instance": wkr.instance.ID(), - }) - logger.Debug("killing process") - cmd := "crunch-run --kill 15 " + uuid - if u := wkr.instance.RemoteUser(); u != "root" { - cmd = "sudo " + cmd - } - stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil) - if err != nil { - logger.WithFields(logrus.Fields{ - "stderr": string(stderr), - "stdout": string(stdout), - "error": err, - }).Warn("kill failed") - return - } - logger.Debug("killing process succeeded") +// ForgetContainer clears the placeholder for the given exited +// container, so it isn't returned by subsequent calls to Running(). +// +// ForgetContainer has no effect if the container has not yet exited. +// +// The "container exited at time T" placeholder (which necessitates +// ForgetContainer) exists to make it easier for the caller +// (scheduler) to distinguish a container that exited without +// finalizing its state from a container that exited too recently for +// its final state to have appeared in the scheduler's queue cache. +func (wp *Pool) ForgetContainer(uuid string) { wp.mtx.Lock() defer wp.mtx.Unlock() - if _, ok := wkr.running[uuid]; ok { - delete(wkr.running, uuid) - if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 { - wkr.state = StateIdle - } - wkr.updated = time.Now() - go wp.notify() + if _, ok := wp.exited[uuid]; ok { + wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process") + delete(wp.exited, uuid) } } @@ -574,6 +574,16 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) { Help: "Total memory on all cloud VMs.", }, []string{"category"}) reg.MustRegister(wp.mMemory) + wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "instances_disappeared", + Help: "Number of occurrences of an instance disappearing from the cloud provider's list of instances.", + }, []string{"state"}) + for _, v := range stateString { + wp.mDisappearances.WithLabelValues(v).Add(0) + } + reg.MustRegister(wp.mDisappearances) } func (wp *Pool) runMetrics() { @@ -716,6 +726,18 @@ func (wp *Pool) Instances() []InstanceView { return r } +// KillInstance destroys a cloud VM instance. It returns an error if +// the given instance does not exist. +func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error { + wkr, ok := wp.workers[id] + if !ok { + return errors.New("instance not found") + } + wkr.logger.WithField("Reason", reason).Info("shutting down") + wkr.shutdown() + return nil +} + func (wp *Pool) setup() { wp.creating = map[string]createCall{} wp.exited = map[string]time.Time{} @@ -741,7 +763,7 @@ func (wp *Pool) getInstancesAndSync() error { } wp.logger.Debug("getting instance list") threshold := time.Now() - instances, err := wp.instanceSet.Instances(cloud.InstanceTags{}) + instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)}) if err != nil { wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify) return err @@ -761,7 +783,7 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { notify := false for _, inst := range instances { - itTag := inst.Tags()[tagKeyInstanceType] + itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType] it, ok := wp.instanceTypes[itTag] if !ok { wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag) @@ -784,8 +806,11 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { "WorkerState": wkr.state, }) logger.Info("instance disappeared in cloud") + if wp.mDisappearances != nil { + wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc() + } delete(wp.workers, id) - go wkr.executor.Close() + go wkr.Close() notify = true }