14931: Add configurable prefix for built-in tags.
[arvados.git] / lib / dispatchcloud / worker / pool.go
index e90935e2aa9e5747d08e136475cd186c0b4bc766..0ee36a96ff1d23d3c27e48679dba4b31007299f4 100644 (file)
@@ -25,6 +25,7 @@ const (
        tagKeyInstanceType   = "InstanceType"
        tagKeyIdleBehavior   = "IdleBehavior"
        tagKeyInstanceSecret = "InstanceSecret"
+       tagKeyInstanceSetID  = "InstanceSetID"
 )
 
 // An InstanceView shows a worker's current state and recent activity.
@@ -68,6 +69,8 @@ const (
        defaultTimeoutBooting     = time.Minute * 10
        defaultTimeoutProbe       = time.Minute * 10
        defaultTimeoutShutdown    = time.Second * 10
+       defaultTimeoutTERM        = time.Minute * 2
+       defaultTimeoutSignal      = time.Second * 5
 
        // Time after a quota error to try again anyway, even if no
        // instances have been shutdown.
@@ -89,23 +92,27 @@ func duration(conf arvados.Duration, def time.Duration) time.Duration {
 //
 // New instances are configured and set up according to the given
 // cluster configuration.
-func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
+func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
        wp := &Pool{
                logger:             logger,
                arvClient:          arvClient,
+               instanceSetID:      instanceSetID,
                instanceSet:        &throttledInstanceSet{InstanceSet: instanceSet},
                newExecutor:        newExecutor,
-               bootProbeCommand:   cluster.CloudVMs.BootProbeCommand,
-               imageID:            cloud.ImageID(cluster.CloudVMs.ImageID),
+               bootProbeCommand:   cluster.Containers.CloudVMs.BootProbeCommand,
+               imageID:            cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
                instanceTypes:      cluster.InstanceTypes,
-               maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
-               probeInterval:      duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
-               syncInterval:       duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
-               timeoutIdle:        duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
-               timeoutBooting:     duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
-               timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
-               timeoutShutdown:    duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+               maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
+               probeInterval:      duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
+               syncInterval:       duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
+               timeoutIdle:        duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
+               timeoutBooting:     duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
+               timeoutProbe:       duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
+               timeoutShutdown:    duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+               timeoutTERM:        duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
+               timeoutSignal:      duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
                installPublicKey:   installPublicKey,
+               tagKeyPrefix:       cluster.Containers.CloudVMs.TagKeyPrefix,
                stop:               make(chan bool),
        }
        wp.registerMetrics(reg)
@@ -124,6 +131,7 @@ type Pool struct {
        // configuration
        logger             logrus.FieldLogger
        arvClient          *arvados.Client
+       instanceSetID      cloud.InstanceSetID
        instanceSet        *throttledInstanceSet
        newExecutor        func(cloud.Instance) Executor
        bootProbeCommand   string
@@ -136,7 +144,10 @@ type Pool struct {
        timeoutBooting     time.Duration
        timeoutProbe       time.Duration
        timeoutShutdown    time.Duration
+       timeoutTERM        time.Duration
+       timeoutSignal      time.Duration
        installPublicKey   ssh.PublicKey
+       tagKeyPrefix       string
 
        // private state
        subscribers  map[<-chan struct{}]chan<- struct{}
@@ -275,9 +286,10 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
        go func() {
                defer wp.notify()
                tags := cloud.InstanceTags{
-                       tagKeyInstanceType:   it.Name,
-                       tagKeyIdleBehavior:   string(IdleBehaviorRun),
-                       tagKeyInstanceSecret: secret,
+                       wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
+                       wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
+                       wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
+                       wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
                }
                initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename))
                inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
@@ -319,9 +331,7 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior)
        if !ok {
                return errors.New("requested instance does not exist")
        }
-       wkr.idleBehavior = idleBehavior
-       wkr.saveTags()
-       wkr.shutdownIfIdle()
+       wkr.setIdleBehavior(idleBehavior)
        return nil
 }
 
@@ -334,7 +344,8 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior)
 //
 // Caller must have lock.
 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
-       inst = tagVerifier{inst}
+       secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
+       inst = tagVerifier{inst, secret}
        id := inst.ID()
        if wkr := wp.workers[id]; wkr != nil {
                wkr.executor.SetTarget(inst)
@@ -345,7 +356,7 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*wor
        }
 
        state := StateUnknown
-       if _, ok := wp.creating[inst.Tags()[tagKeyInstanceSecret]]; ok {
+       if _, ok := wp.creating[secret]; ok {
                state = StateBooting
        }
 
@@ -355,7 +366,7 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*wor
        // process); otherwise, default to "run". After this,
        // wkr.idleBehavior is the source of truth, and will only be
        // changed via SetIdleBehavior().
-       idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior])
+       idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
        if !validIdleBehavior[idleBehavior] {
                idleBehavior = IdleBehaviorRun
        }
@@ -383,19 +394,14 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*wor
                probed:       now,
                busy:         now,
                updated:      now,
-               running:      make(map[string]struct{}),
-               starting:     make(map[string]struct{}),
+               running:      make(map[string]*remoteRunner),
+               starting:     make(map[string]*remoteRunner),
                probing:      make(chan struct{}, 1),
        }
        wp.workers[id] = wkr
        return wkr, true
 }
 
-// caller must have lock.
-func (wp *Pool) notifyExited(uuid string, t time.Time) {
-       wp.exited[uuid] = t
-}
-
 // Shutdown shuts down a worker with the given type, or returns false
 // if all workers with the given type are busy.
 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
@@ -486,53 +492,29 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b
 //
 // KillContainer returns immediately; the act of killing the container
 // takes some time, and runs in the background.
-func (wp *Pool) KillContainer(uuid string) {
+func (wp *Pool) KillContainer(uuid string, reason string) {
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
+       logger := wp.logger.WithFields(logrus.Fields{
+               "ContainerUUID": uuid,
+               "Reason":        reason,
+       })
        if _, ok := wp.exited[uuid]; ok {
-               wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
+               logger.Debug("clearing placeholder for exited crunch-run process")
                delete(wp.exited, uuid)
                return
        }
        for _, wkr := range wp.workers {
-               if _, ok := wkr.running[uuid]; ok {
-                       go wp.kill(wkr, uuid)
-                       return
+               rr := wkr.running[uuid]
+               if rr == nil {
+                       rr = wkr.starting[uuid]
                }
-       }
-       wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
-}
-
-func (wp *Pool) kill(wkr *worker, uuid string) {
-       logger := wp.logger.WithFields(logrus.Fields{
-               "ContainerUUID": uuid,
-               "Instance":      wkr.instance.ID(),
-       })
-       logger.Debug("killing process")
-       cmd := "crunch-run --kill 15 " + uuid
-       if u := wkr.instance.RemoteUser(); u != "root" {
-               cmd = "sudo " + cmd
-       }
-       stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
-       if err != nil {
-               logger.WithFields(logrus.Fields{
-                       "stderr": string(stderr),
-                       "stdout": string(stdout),
-                       "error":  err,
-               }).Warn("kill failed")
-               return
-       }
-       logger.Debug("killing process succeeded")
-       wp.mtx.Lock()
-       defer wp.mtx.Unlock()
-       if _, ok := wkr.running[uuid]; ok {
-               delete(wkr.running, uuid)
-               if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
-                       wkr.state = StateIdle
+               if rr != nil {
+                       rr.Kill(reason)
+                       return
                }
-               wkr.updated = time.Now()
-               go wp.notify()
        }
+       logger.Debug("cannot kill: already disappeared")
 }
 
 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
@@ -716,6 +698,18 @@ func (wp *Pool) Instances() []InstanceView {
        return r
 }
 
+// KillInstance destroys a cloud VM instance. It returns an error if
+// the given instance does not exist.
+func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
+       wkr, ok := wp.workers[id]
+       if !ok {
+               return errors.New("instance not found")
+       }
+       wkr.logger.WithField("Reason", reason).Info("shutting down")
+       wkr.shutdown()
+       return nil
+}
+
 func (wp *Pool) setup() {
        wp.creating = map[string]createCall{}
        wp.exited = map[string]time.Time{}
@@ -741,7 +735,7 @@ func (wp *Pool) getInstancesAndSync() error {
        }
        wp.logger.Debug("getting instance list")
        threshold := time.Now()
-       instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
+       instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
        if err != nil {
                wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
                return err
@@ -761,7 +755,7 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
        notify := false
 
        for _, inst := range instances {
-               itTag := inst.Tags()[tagKeyInstanceType]
+               itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
                it, ok := wp.instanceTypes[itTag]
                if !ok {
                        wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
@@ -785,7 +779,7 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
                })
                logger.Info("instance disappeared in cloud")
                delete(wp.workers, id)
-               go wkr.executor.Close()
+               go wkr.Close()
                notify = true
        }