Merge branch 'master' into 15106-trgm-text-search
[arvados.git] / lib / dispatchcloud / worker / pool.go
index 4ddd3745ef443465666a1a339b9223de7bddd3f2..0ee36a96ff1d23d3c27e48679dba4b31007299f4 100644 (file)
@@ -5,7 +5,9 @@
 package worker
 
 import (
-       "bytes"
+       "crypto/rand"
+       "errors"
+       "fmt"
        "io"
        "sort"
        "strings"
@@ -14,30 +16,35 @@ import (
 
        "git.curoverse.com/arvados.git/lib/cloud"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "github.com/Sirupsen/logrus"
        "github.com/prometheus/client_golang/prometheus"
+       "github.com/sirupsen/logrus"
+       "golang.org/x/crypto/ssh"
 )
 
 const (
-       tagKeyInstanceType = "InstanceType"
-       tagKeyHold         = "Hold"
+       tagKeyInstanceType   = "InstanceType"
+       tagKeyIdleBehavior   = "IdleBehavior"
+       tagKeyInstanceSecret = "InstanceSecret"
+       tagKeyInstanceSetID  = "InstanceSetID"
 )
 
-// A View shows a worker's current state and recent activity.
-type View struct {
-       Instance             string
-       Price                float64
-       ArvadosInstanceType  string
-       ProviderInstanceType string
-       LastContainerUUID    string
-       Unallocated          time.Time
-       WorkerState          string
+// An InstanceView shows a worker's current state and recent activity.
+type InstanceView struct {
+       Instance             cloud.InstanceID `json:"instance"`
+       Address              string           `json:"address"`
+       Price                float64          `json:"price"`
+       ArvadosInstanceType  string           `json:"arvados_instance_type"`
+       ProviderInstanceType string           `json:"provider_instance_type"`
+       LastContainerUUID    string           `json:"last_container_uuid"`
+       LastBusy             time.Time        `json:"last_busy"`
+       WorkerState          string           `json:"worker_state"`
+       IdleBehavior         IdleBehavior     `json:"idle_behavior"`
 }
 
 // An Executor executes shell commands on a remote host.
 type Executor interface {
        // Run cmd on the current target.
-       Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
+       Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
 
        // Use the given target for subsequent operations. The new
        // target is the same host as the previous target, but it
@@ -61,6 +68,16 @@ const (
        defaultTimeoutIdle        = time.Minute
        defaultTimeoutBooting     = time.Minute * 10
        defaultTimeoutProbe       = time.Minute * 10
+       defaultTimeoutShutdown    = time.Second * 10
+       defaultTimeoutTERM        = time.Minute * 2
+       defaultTimeoutSignal      = time.Second * 5
+
+       // Time after a quota error to try again anyway, even if no
+       // instances have been shutdown.
+       quotaErrorTTL = time.Minute
+
+       // Time between "X failed because rate limiting" messages
+       logRateLimitErrorInterval = time.Second * 10
 )
 
 func duration(conf arvados.Duration, def time.Duration) time.Duration {
@@ -75,20 +92,28 @@ func duration(conf arvados.Duration, def time.Duration) time.Duration {
 //
 // New instances are configured and set up according to the given
 // cluster configuration.
-func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
+func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
        wp := &Pool{
                logger:             logger,
-               instanceSet:        instanceSet,
+               arvClient:          arvClient,
+               instanceSetID:      instanceSetID,
+               instanceSet:        &throttledInstanceSet{InstanceSet: instanceSet},
                newExecutor:        newExecutor,
-               bootProbeCommand:   cluster.CloudVMs.BootProbeCommand,
-               imageID:            cloud.ImageID(cluster.CloudVMs.ImageID),
+               bootProbeCommand:   cluster.Containers.CloudVMs.BootProbeCommand,
+               imageID:            cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
                instanceTypes:      cluster.InstanceTypes,
-               maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
-               probeInterval:      duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
-               syncInterval:       duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
-               timeoutIdle:        duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
-               timeoutBooting:     duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
-               timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
+               maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
+               probeInterval:      duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
+               syncInterval:       duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
+               timeoutIdle:        duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
+               timeoutBooting:     duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
+               timeoutProbe:       duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
+               timeoutShutdown:    duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+               timeoutTERM:        duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
+               timeoutSignal:      duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
+               installPublicKey:   installPublicKey,
+               tagKeyPrefix:       cluster.Containers.CloudVMs.TagKeyPrefix,
+               stop:               make(chan bool),
        }
        wp.registerMetrics(reg)
        go func() {
@@ -105,7 +130,9 @@ func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cl
 type Pool struct {
        // configuration
        logger             logrus.FieldLogger
-       instanceSet        cloud.InstanceSet
+       arvClient          *arvados.Client
+       instanceSetID      cloud.InstanceSetID
+       instanceSet        *throttledInstanceSet
        newExecutor        func(cloud.Instance) Executor
        bootProbeCommand   string
        imageID            cloud.ImageID
@@ -116,53 +143,54 @@ type Pool struct {
        timeoutIdle        time.Duration
        timeoutBooting     time.Duration
        timeoutProbe       time.Duration
+       timeoutShutdown    time.Duration
+       timeoutTERM        time.Duration
+       timeoutSignal      time.Duration
+       installPublicKey   ssh.PublicKey
+       tagKeyPrefix       string
 
        // private state
        subscribers  map[<-chan struct{}]chan<- struct{}
-       creating     map[arvados.InstanceType]int // goroutines waiting for (InstanceSet)Create to return
+       creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
        workers      map[cloud.InstanceID]*worker
        loaded       bool                 // loaded list of instances from InstanceSet at least once
        exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
        atQuotaUntil time.Time
+       atQuotaErr   cloud.QuotaError
        stop         chan bool
        mtx          sync.RWMutex
        setupOnce    sync.Once
 
-       mInstances         prometheus.Gauge
+       throttleCreate    throttle
+       throttleInstances throttle
+
        mContainersRunning prometheus.Gauge
-       mVCPUs             prometheus.Gauge
-       mVCPUsInuse        prometheus.Gauge
-       mMemory            prometheus.Gauge
-       mMemoryInuse       prometheus.Gauge
-}
-
-type worker struct {
-       state       State
-       instance    cloud.Instance
-       executor    Executor
-       instType    arvados.InstanceType
-       vcpus       int64
-       memory      int64
-       booted      bool
-       probed      time.Time
-       updated     time.Time
-       busy        time.Time
-       unallocated time.Time
-       lastUUID    string
-       running     map[string]struct{}
-       starting    map[string]struct{}
-       probing     chan struct{}
-}
-
-// Subscribe returns a channel that becomes ready whenever a worker's
-// state changes.
+       mInstances         *prometheus.GaugeVec
+       mInstancesPrice    *prometheus.GaugeVec
+       mVCPUs             *prometheus.GaugeVec
+       mMemory            *prometheus.GaugeVec
+}
+
+type createCall struct {
+       time         time.Time
+       instanceType arvados.InstanceType
+}
+
+// Subscribe returns a buffered channel that becomes ready after any
+// change to the pool's state that could have scheduling implications:
+// a worker's state changes, a new worker appears, the cloud
+// provider's API rate limiting period ends, etc.
+//
+// Additional events that occur while the channel is already ready
+// will be dropped, so it is OK if the caller services the channel
+// slowly.
 //
 // Example:
 //
 //     ch := wp.Subscribe()
 //     defer wp.Unsubscribe(ch)
 //     for range ch {
-//             // ...try scheduling some work...
+//             tryScheduling(wp)
 //             if done {
 //                     break
 //             }
@@ -185,95 +213,193 @@ func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
 }
 
 // Unallocated returns the number of unallocated (creating + booting +
-// idle + unknown) workers for each instance type.
+// idle + unknown) workers for each instance type.  Workers in
+// hold/drain mode are not included.
 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
        wp.setupOnce.Do(wp.setup)
        wp.mtx.RLock()
        defer wp.mtx.RUnlock()
-       u := map[arvados.InstanceType]int{}
-       for it, c := range wp.creating {
-               u[it] = c
+       unalloc := map[arvados.InstanceType]int{}
+       creating := map[arvados.InstanceType]int{}
+       oldestCreate := map[arvados.InstanceType]time.Time{}
+       for _, cc := range wp.creating {
+               it := cc.instanceType
+               creating[it]++
+               if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
+                       oldestCreate[it] = cc.time
+               }
        }
        for _, wkr := range wp.workers {
-               if len(wkr.running)+len(wkr.starting) == 0 && (wkr.state == StateRunning || wkr.state == StateBooting || wkr.state == StateUnknown) {
-                       u[wkr.instType]++
+               // Skip workers that are not expected to become
+               // available soon. Note len(wkr.running)>0 is not
+               // redundant here: it can be true even in
+               // StateUnknown.
+               if wkr.state == StateShutdown ||
+                       wkr.state == StateRunning ||
+                       wkr.idleBehavior != IdleBehaviorRun ||
+                       len(wkr.running) > 0 {
+                       continue
+               }
+               it := wkr.instType
+               unalloc[it]++
+               if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
+                       // If up to N new workers appear in
+                       // Instances() while we are waiting for N
+                       // Create() calls to complete, we assume we're
+                       // just seeing a race between Instances() and
+                       // Create() responses.
+                       //
+                       // The other common reason why nodes have
+                       // state==Unknown is that they appeared at
+                       // startup, before any Create calls. They
+                       // don't match the above timing condition, so
+                       // we never mistakenly attribute them to
+                       // pending Create calls.
+                       creating[it]--
                }
        }
-       return u
+       for it, c := range creating {
+               unalloc[it] += c
+       }
+       return unalloc
 }
 
 // Create a new instance with the given type, and add it to the worker
 // pool. The worker is added immediately; instance creation runs in
 // the background.
-func (wp *Pool) Create(it arvados.InstanceType) error {
+//
+// Create returns false if a pre-existing error state prevents it from
+// even attempting to create a new instance. Those errors are logged
+// by the Pool, so the caller does not need to log anything in such
+// cases.
+func (wp *Pool) Create(it arvados.InstanceType) bool {
        logger := wp.logger.WithField("InstanceType", it.Name)
        wp.setupOnce.Do(wp.setup)
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
-       tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
-       wp.creating[it]++
+       if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
+               return false
+       }
+       now := time.Now()
+       secret := randomHex(instanceSecretLength)
+       wp.creating[secret] = createCall{time: now, instanceType: it}
        go func() {
                defer wp.notify()
-               inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
+               tags := cloud.InstanceTags{
+                       wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
+                       wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
+                       wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
+                       wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
+               }
+               initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename))
+               inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
                wp.mtx.Lock()
                defer wp.mtx.Unlock()
-               wp.creating[it]--
-               if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
-                       wp.atQuotaUntil = time.Now().Add(time.Minute)
-               }
+               // delete() is deferred so the updateWorker() call
+               // below knows to use StateBooting when adding a new
+               // worker.
+               defer delete(wp.creating, secret)
                if err != nil {
+                       if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
+                               wp.atQuotaErr = err
+                               wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
+                               time.AfterFunc(quotaErrorTTL, wp.notify)
+                       }
                        logger.WithError(err).Error("create failed")
+                       wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
                        return
                }
-               wp.updateWorker(inst, it, StateBooting)
+               wp.updateWorker(inst, it)
        }()
-       return nil
+       return true
 }
 
 // AtQuota returns true if Create is not expected to work at the
 // moment.
 func (wp *Pool) AtQuota() bool {
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
        return time.Now().Before(wp.atQuotaUntil)
 }
 
-// Add or update worker attached to the given instance. Use
-// initialState if a new worker is created. Caller must have lock.
+// SetIdleBehavior determines how the indicated instance will behave
+// when it has no containers running.
+func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       wkr, ok := wp.workers[id]
+       if !ok {
+               return errors.New("requested instance does not exist")
+       }
+       wkr.setIdleBehavior(idleBehavior)
+       return nil
+}
+
+// Add or update worker attached to the given instance.
+//
+// The second return value is true if a new worker is created.
+//
+// A newly added instance has state=StateBooting if its tags match an
+// entry in wp.creating, otherwise StateUnknown.
 //
-// Returns true when a new worker is created.
-func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) bool {
+// Caller must have lock.
+func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
+       secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
+       inst = tagVerifier{inst, secret}
        id := inst.ID()
-       if wp.workers[id] != nil {
-               wp.workers[id].executor.SetTarget(inst)
-               wp.workers[id].instance = inst
-               wp.workers[id].updated = time.Now()
-               if initialState == StateBooting && wp.workers[id].state == StateUnknown {
-                       wp.workers[id].state = StateBooting
-               }
-               return false
+       if wkr := wp.workers[id]; wkr != nil {
+               wkr.executor.SetTarget(inst)
+               wkr.instance = inst
+               wkr.updated = time.Now()
+               wkr.saveTags()
+               return wkr, false
+       }
+
+       state := StateUnknown
+       if _, ok := wp.creating[secret]; ok {
+               state = StateBooting
        }
-       if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
-               initialState = StateHold
+
+       // If an instance has a valid IdleBehavior tag when it first
+       // appears, initialize the new worker accordingly (this is how
+       // we restore IdleBehavior that was set by a prior dispatch
+       // process); otherwise, default to "run". After this,
+       // wkr.idleBehavior is the source of truth, and will only be
+       // changed via SetIdleBehavior().
+       idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
+       if !validIdleBehavior[idleBehavior] {
+               idleBehavior = IdleBehaviorRun
        }
-       wp.logger.WithFields(logrus.Fields{
+
+       logger := wp.logger.WithFields(logrus.Fields{
                "InstanceType": it.Name,
-               "Instance":     inst,
-               "State":        initialState,
+               "Instance":     inst.ID(),
+               "Address":      inst.Address(),
+       })
+       logger.WithFields(logrus.Fields{
+               "State":        state,
+               "IdleBehavior": idleBehavior,
        }).Infof("instance appeared in cloud")
        now := time.Now()
-       wp.workers[id] = &worker{
-               executor:    wp.newExecutor(inst),
-               state:       initialState,
-               instance:    inst,
-               instType:    it,
-               probed:      now,
-               busy:        now,
-               updated:     now,
-               unallocated: now,
-               running:     make(map[string]struct{}),
-               starting:    make(map[string]struct{}),
-               probing:     make(chan struct{}, 1),
-       }
-       return true
+       wkr := &worker{
+               mtx:          &wp.mtx,
+               wp:           wp,
+               logger:       logger,
+               executor:     wp.newExecutor(inst),
+               state:        state,
+               idleBehavior: idleBehavior,
+               instance:     inst,
+               instType:     it,
+               appeared:     now,
+               probed:       now,
+               busy:         now,
+               updated:      now,
+               running:      make(map[string]*remoteRunner),
+               starting:     make(map[string]*remoteRunner),
+               probing:      make(chan struct{}, 1),
+       }
+       wp.workers[id] = wkr
+       return wkr, true
 }
 
 // Shutdown shuts down a worker with the given type, or returns false
@@ -284,46 +410,27 @@ func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
        defer wp.mtx.Unlock()
        logger := wp.logger.WithField("InstanceType", it.Name)
        logger.Info("shutdown requested")
-       for _, tryState := range []State{StateBooting, StateRunning} {
+       for _, tryState := range []State{StateBooting, StateIdle} {
                // TODO: shutdown the worker with the longest idle
-               // time (Running) or the earliest create time
-               // (Booting)
+               // time (Idle) or the earliest create time (Booting)
                for _, wkr := range wp.workers {
-                       if wkr.state != tryState || len(wkr.running)+len(wkr.starting) > 0 {
-                               continue
+                       if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
+                               logger.WithField("Instance", wkr.instance).Info("shutting down")
+                               wkr.shutdown()
+                               return true
                        }
-                       if wkr.instType != it {
-                               continue
-                       }
-                       logger = logger.WithField("Instance", wkr.instance)
-                       logger.Info("shutting down")
-                       wp.shutdown(wkr, logger)
-                       return true
                }
        }
        return false
 }
 
-// caller must have lock
-func (wp *Pool) shutdown(wkr *worker, logger logrus.FieldLogger) {
-       wkr.updated = time.Now()
-       wkr.state = StateShutdown
-       go func() {
-               err := wkr.instance.Destroy()
-               if err != nil {
-                       logger.WithError(err).WithField("Instance", wkr.instance).Warn("shutdown failed")
-                       return
-               }
-               wp.mtx.Lock()
-               wp.atQuotaUntil = time.Now()
-               wp.mtx.Unlock()
-               wp.notify()
-       }()
-}
-
-// Workers returns the current number of workers in each state.
-func (wp *Pool) Workers() map[State]int {
+// CountWorkers returns the current number of workers in each state.
+//
+// CountWorkers blocks, if necessary, until the initial instance list
+// has been loaded from the cloud provider.
+func (wp *Pool) CountWorkers() map[State]int {
        wp.setupOnce.Do(wp.setup)
+       wp.waitUntilLoaded()
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
        r := map[State]int{}
@@ -334,6 +441,12 @@ func (wp *Pool) Workers() map[State]int {
 }
 
 // Running returns the container UUIDs being prepared/run on workers.
+//
+// In the returned map, the time value indicates when the Pool
+// observed that the container process had exited. A container that
+// has not yet exited has a zero time value. The caller should use
+// KillContainer() to garbage-collect the entries for exited
+// containers.
 func (wp *Pool) Running() map[string]time.Time {
        wp.setupOnce.Do(wp.setup)
        wp.mtx.Lock()
@@ -356,17 +469,12 @@ func (wp *Pool) Running() map[string]time.Time {
 // StartContainer starts a container on an idle worker immediately if
 // possible, otherwise returns false.
 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
-       logger := wp.logger.WithFields(logrus.Fields{
-               "InstanceType":  it.Name,
-               "ContainerUUID": ctr.UUID,
-               "Priority":      ctr.Priority,
-       })
        wp.setupOnce.Do(wp.setup)
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
        var wkr *worker
        for _, w := range wp.workers {
-               if w.instType == it && w.state == StateRunning && len(w.running)+len(w.starting) == 0 {
+               if w.instType == it && w.state == StateIdle {
                        if wkr == nil || w.busy.After(wkr.busy) {
                                wkr = w
                        }
@@ -375,88 +483,44 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b
        if wkr == nil {
                return false
        }
-       logger = logger.WithField("Instance", wkr.instance)
-       logger.Debug("starting container")
-       wkr.starting[ctr.UUID] = struct{}{}
-       go func() {
-               stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
-               wp.mtx.Lock()
-               defer wp.mtx.Unlock()
-               wkr.updated = time.Now()
-               delete(wkr.starting, ctr.UUID)
-               wkr.running[ctr.UUID] = struct{}{}
-               if err != nil {
-                       logger.WithField("stdout", string(stdout)).
-                               WithField("stderr", string(stderr)).
-                               WithError(err).
-                               Error("error starting crunch-run process")
-                       // Leave uuid in wkr.running, though: it's
-                       // possible the error was just a communication
-                       // failure and the process was in fact
-                       // started.  Wait for next probe to find out.
-                       return
-               }
-               logger.Info("crunch-run process started")
-               wkr.lastUUID = ctr.UUID
-       }()
+       wkr.startContainer(ctr)
        return true
 }
 
 // KillContainer kills the crunch-run process for the given container
 // UUID, if it's running on any worker.
-func (wp *Pool) KillContainer(uuid string) {
+//
+// KillContainer returns immediately; the act of killing the container
+// takes some time, and runs in the background.
+func (wp *Pool) KillContainer(uuid string, reason string) {
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
+       logger := wp.logger.WithFields(logrus.Fields{
+               "ContainerUUID": uuid,
+               "Reason":        reason,
+       })
        if _, ok := wp.exited[uuid]; ok {
-               wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
+               logger.Debug("clearing placeholder for exited crunch-run process")
                delete(wp.exited, uuid)
                return
        }
        for _, wkr := range wp.workers {
-               if _, ok := wkr.running[uuid]; ok {
-                       go wp.kill(wkr, uuid)
+               rr := wkr.running[uuid]
+               if rr == nil {
+                       rr = wkr.starting[uuid]
+               }
+               if rr != nil {
+                       rr.Kill(reason)
                        return
                }
        }
-       wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
-}
-
-func (wp *Pool) kill(wkr *worker, uuid string) {
-       logger := wp.logger.WithFields(logrus.Fields{
-               "ContainerUUID": uuid,
-               "Instance":      wkr.instance,
-       })
-       logger.Debug("killing process")
-       stdout, stderr, err := wkr.executor.Execute("crunch-run --kill "+uuid, nil)
-       if err != nil {
-               logger.WithFields(logrus.Fields{
-                       "stderr": string(stderr),
-                       "stdout": string(stdout),
-                       "error":  err,
-               }).Warn("kill failed")
-               return
-       }
-       logger.Debug("killing process succeeded")
-       wp.mtx.Lock()
-       defer wp.mtx.Unlock()
-       if _, ok := wkr.running[uuid]; ok {
-               delete(wkr.running, uuid)
-               wkr.updated = time.Now()
-               go wp.notify()
-       }
+       logger.Debug("cannot kill: already disappeared")
 }
 
 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
        if reg == nil {
                reg = prometheus.NewRegistry()
        }
-       wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
-               Namespace: "arvados",
-               Subsystem: "dispatchcloud",
-               Name:      "instances_total",
-               Help:      "Number of cloud VMs including pending, booting, running, held, and shutting down.",
-       })
-       reg.MustRegister(wp.mInstances)
        wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
                Namespace: "arvados",
                Subsystem: "dispatchcloud",
@@ -464,40 +528,40 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
                Help:      "Number of containers reported running by cloud VMs.",
        })
        reg.MustRegister(wp.mContainersRunning)
-
-       wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
+       wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "instances_total",
+               Help:      "Number of cloud VMs.",
+       }, []string{"category"})
+       reg.MustRegister(wp.mInstances)
+       wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "instances_price",
+               Help:      "Price of cloud VMs.",
+       }, []string{"category"})
+       reg.MustRegister(wp.mInstancesPrice)
+       wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
                Namespace: "arvados",
                Subsystem: "dispatchcloud",
                Name:      "vcpus_total",
                Help:      "Total VCPUs on all cloud VMs.",
-       })
+       }, []string{"category"})
        reg.MustRegister(wp.mVCPUs)
-       wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
-               Namespace: "arvados",
-               Subsystem: "dispatchcloud",
-               Name:      "vcpus_inuse",
-               Help:      "VCPUs on cloud VMs that are running containers.",
-       })
-       reg.MustRegister(wp.mVCPUsInuse)
-       wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
+       wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
                Namespace: "arvados",
                Subsystem: "dispatchcloud",
                Name:      "memory_bytes_total",
                Help:      "Total memory on all cloud VMs.",
-       })
+       }, []string{"category"})
        reg.MustRegister(wp.mMemory)
-       wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
-               Namespace: "arvados",
-               Subsystem: "dispatchcloud",
-               Name:      "memory_bytes_inuse",
-               Help:      "Memory on cloud VMs that are running containers.",
-       })
-       reg.MustRegister(wp.mMemoryInuse)
 }
 
 func (wp *Pool) runMetrics() {
        ch := wp.Subscribe()
        defer wp.Unsubscribe(ch)
+       wp.updateMetrics()
        for range ch {
                wp.updateMetrics()
        }
@@ -507,23 +571,38 @@ func (wp *Pool) updateMetrics() {
        wp.mtx.RLock()
        defer wp.mtx.RUnlock()
 
-       var alloc, cpu, cpuInuse, mem, memInuse int64
+       instances := map[string]int64{}
+       price := map[string]float64{}
+       cpu := map[string]int64{}
+       mem := map[string]int64{}
+       var running int64
        for _, wkr := range wp.workers {
-               cpu += int64(wkr.instType.VCPUs)
-               mem += int64(wkr.instType.RAM)
-               if len(wkr.running)+len(wkr.starting) == 0 {
-                       continue
+               var cat string
+               switch {
+               case len(wkr.running)+len(wkr.starting) > 0:
+                       cat = "inuse"
+               case wkr.idleBehavior == IdleBehaviorHold:
+                       cat = "hold"
+               case wkr.state == StateBooting:
+                       cat = "booting"
+               case wkr.state == StateUnknown:
+                       cat = "unknown"
+               default:
+                       cat = "idle"
                }
-               alloc += int64(len(wkr.running) + len(wkr.starting))
-               cpuInuse += int64(wkr.instType.VCPUs)
-               memInuse += int64(wkr.instType.RAM)
+               instances[cat]++
+               price[cat] += wkr.instType.Price
+               cpu[cat] += int64(wkr.instType.VCPUs)
+               mem[cat] += int64(wkr.instType.RAM)
+               running += int64(len(wkr.running) + len(wkr.starting))
+       }
+       for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
+               wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat]))
+               wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
+               wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
+               wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
        }
-       wp.mInstances.Set(float64(len(wp.workers)))
-       wp.mContainersRunning.Set(float64(alloc))
-       wp.mVCPUs.Set(float64(cpu))
-       wp.mMemory.Set(float64(mem))
-       wp.mVCPUsInuse.Set(float64(cpuInuse))
-       wp.mMemoryInuse.Set(float64(memInuse))
+       wp.mContainersRunning.Set(float64(running))
 }
 
 func (wp *Pool) runProbes() {
@@ -542,7 +621,7 @@ func (wp *Pool) runProbes() {
                workers = workers[:0]
                wp.mtx.Lock()
                for id, wkr := range wp.workers {
-                       if wkr.state == StateShutdown || wp.shutdownIfIdle(wkr) {
+                       if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
                                continue
                        }
                        workers = append(workers, id)
@@ -553,20 +632,12 @@ func (wp *Pool) runProbes() {
                        wp.mtx.Lock()
                        wkr, ok := wp.workers[id]
                        wp.mtx.Unlock()
-                       if !ok || wkr.state == StateShutdown {
-                               // Deleted/shutdown while we
-                               // were probing others
+                       if !ok {
+                               // Deleted while we were probing
+                               // others
                                continue
                        }
-                       select {
-                       case wkr.probing <- struct{}{}:
-                               go func() {
-                                       wp.probeAndUpdate(wkr)
-                                       <-wkr.probing
-                               }()
-                       default:
-                               wp.logger.WithField("Instance", wkr.instance).Debug("still waiting for last probe to finish")
-                       }
+                       go wkr.ProbeAndUpdate()
                        select {
                        case <-wp.stop:
                                return
@@ -595,76 +666,52 @@ func (wp *Pool) runSync() {
        }
 }
 
-// caller must have lock.
-func (wp *Pool) shutdownIfBroken(wkr *worker, dur time.Duration) {
-       if wkr.state == StateHold {
-               return
-       }
-       label, threshold := "", wp.timeoutProbe
-       if wkr.state == StateBooting {
-               label, threshold = "new ", wp.timeoutBooting
-       }
-       if dur < threshold {
-               return
-       }
-       wp.logger.WithFields(logrus.Fields{
-               "Instance": wkr.instance,
-               "Duration": dur,
-               "Since":    wkr.probed,
-               "State":    wkr.state,
-       }).Warnf("%sinstance unresponsive, shutting down", label)
-       wp.shutdown(wkr, wp.logger)
-}
-
-// caller must have lock.
-func (wp *Pool) shutdownIfIdle(wkr *worker) bool {
-       if len(wkr.running)+len(wkr.starting) > 0 || wkr.state != StateRunning {
-               return false
-       }
-       age := time.Since(wkr.unallocated)
-       if age < wp.timeoutIdle {
-               return false
-       }
-       logger := wp.logger.WithFields(logrus.Fields{
-               "Age":      age,
-               "Instance": wkr.instance,
-       })
-       logger.Info("shutdown idle worker")
-       wp.shutdown(wkr, logger)
-       return true
-}
-
 // Stop synchronizing with the InstanceSet.
 func (wp *Pool) Stop() {
        wp.setupOnce.Do(wp.setup)
        close(wp.stop)
 }
 
-// View reports status information for every worker in the pool.
-func (wp *Pool) View() []View {
-       var r []View
+// Instances returns an InstanceView for each worker in the pool,
+// summarizing its current state and recent activity.
+func (wp *Pool) Instances() []InstanceView {
+       var r []InstanceView
        wp.setupOnce.Do(wp.setup)
        wp.mtx.Lock()
        for _, w := range wp.workers {
-               r = append(r, View{
-                       Instance:             w.instance.String(),
+               r = append(r, InstanceView{
+                       Instance:             w.instance.ID(),
+                       Address:              w.instance.Address(),
                        Price:                w.instType.Price,
                        ArvadosInstanceType:  w.instType.Name,
                        ProviderInstanceType: w.instType.ProviderType,
                        LastContainerUUID:    w.lastUUID,
-                       Unallocated:          w.unallocated,
+                       LastBusy:             w.busy,
                        WorkerState:          w.state.String(),
+                       IdleBehavior:         w.idleBehavior,
                })
        }
        wp.mtx.Unlock()
        sort.Slice(r, func(i, j int) bool {
-               return strings.Compare(r[i].Instance, r[j].Instance) < 0
+               return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
        })
        return r
 }
 
+// KillInstance destroys a cloud VM instance. It returns an error if
+// the given instance does not exist.
+func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
+       wkr, ok := wp.workers[id]
+       if !ok {
+               return errors.New("instance not found")
+       }
+       wkr.logger.WithField("Reason", reason).Info("shutting down")
+       wkr.shutdown()
+       return nil
+}
+
 func (wp *Pool) setup() {
-       wp.creating = map[arvados.InstanceType]int{}
+       wp.creating = map[string]createCall{}
        wp.exited = map[string]time.Time{}
        wp.workers = map[cloud.InstanceID]*worker{}
        wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
@@ -683,10 +730,14 @@ func (wp *Pool) notify() {
 
 func (wp *Pool) getInstancesAndSync() error {
        wp.setupOnce.Do(wp.setup)
+       if err := wp.instanceSet.throttleInstances.Error(); err != nil {
+               return err
+       }
        wp.logger.Debug("getting instance list")
        threshold := time.Now()
-       instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
+       instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
        if err != nil {
+               wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
                return err
        }
        wp.sync(threshold, instances)
@@ -704,14 +755,17 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
        notify := false
 
        for _, inst := range instances {
-               itTag := inst.Tags()[tagKeyInstanceType]
+               itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
                it, ok := wp.instanceTypes[itTag]
                if !ok {
                        wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
                        continue
                }
-               if wp.updateWorker(inst, it, StateUnknown) {
+               if wkr, isNew := wp.updateWorker(inst, it); isNew {
                        notify = true
+               } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
+                       wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
+                       wkr.shutdown()
                }
        }
 
@@ -720,16 +774,17 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
                        continue
                }
                logger := wp.logger.WithFields(logrus.Fields{
-                       "Instance":    wkr.instance,
+                       "Instance":    wkr.instance.ID(),
                        "WorkerState": wkr.state,
                })
                logger.Info("instance disappeared in cloud")
                delete(wp.workers, id)
-               go wkr.executor.Close()
+               go wkr.Close()
                notify = true
        }
 
        if !wp.loaded {
+               notify = true
                wp.loaded = true
                wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
        }
@@ -739,137 +794,24 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
        }
 }
 
-// should be called in a new goroutine
-func (wp *Pool) probeAndUpdate(wkr *worker) {
-       logger := wp.logger.WithField("Instance", wkr.instance)
-       wp.mtx.Lock()
-       updated := wkr.updated
-       booted := wkr.booted
-       wp.mtx.Unlock()
-
-       var (
-               ctrUUIDs []string
-               ok       bool
-               stderr   []byte
-       )
-       if !booted {
-               booted, stderr = wp.probeBooted(wkr)
-               wp.mtx.Lock()
-               if booted && !wkr.booted {
-                       wkr.booted = booted
-                       logger.Info("instance booted")
-               } else {
-                       booted = wkr.booted
-               }
-               wp.mtx.Unlock()
-       }
-       if booted {
-               ctrUUIDs, ok, stderr = wp.probeRunning(wkr)
-       }
-       logger = logger.WithField("stderr", string(stderr))
-       wp.mtx.Lock()
-       defer wp.mtx.Unlock()
-       if !ok {
-               if wkr.state == StateShutdown {
-                       return
-               }
-               dur := time.Since(wkr.probed)
-               logger := logger.WithFields(logrus.Fields{
-                       "Duration": dur,
-                       "State":    wkr.state,
-               })
-               if wkr.state == StateBooting {
-                       logger.Debug("new instance not responding")
-               } else {
-                       logger.Info("instance not responding")
-               }
-               wp.shutdownIfBroken(wkr, dur)
-               return
-       }
-
-       updateTime := time.Now()
-       wkr.probed = updateTime
-       if len(ctrUUIDs) > 0 {
-               wkr.busy = updateTime
-               wkr.lastUUID = ctrUUIDs[0]
-       }
-       if wkr.state == StateShutdown || wkr.state == StateHold {
-       } else if booted {
-               if wkr.state != StateRunning {
-                       wkr.state = StateRunning
-                       go wp.notify()
-               }
-       } else {
-               wkr.state = StateBooting
-       }
-
-       if updated != wkr.updated {
-               // Worker was updated (e.g., by starting a new
-               // container) after the probe began. Avoid clobbering
-               // those changes with the probe results.
-               return
-       }
-
-       if len(ctrUUIDs) == 0 && len(wkr.running) > 0 {
-               wkr.unallocated = updateTime
-       }
-       running := map[string]struct{}{}
-       changed := false
-       for _, uuid := range ctrUUIDs {
-               running[uuid] = struct{}{}
-               if _, ok := wkr.running[uuid]; !ok {
-                       changed = true
-               }
-       }
-       for uuid := range wkr.running {
-               if _, ok := running[uuid]; !ok {
-                       logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
-                       wp.exited[uuid] = updateTime
-                       changed = true
-               }
-       }
-       if changed {
-               wkr.running = running
-               wkr.updated = updateTime
-               go wp.notify()
-       }
-}
-
-func (wp *Pool) probeRunning(wkr *worker) (running []string, ok bool, stderr []byte) {
-       cmd := "crunch-run --list"
-       stdout, stderr, err := wkr.executor.Execute(cmd, nil)
-       if err != nil {
-               wp.logger.WithFields(logrus.Fields{
-                       "Instance": wkr.instance,
-                       "Command":  cmd,
-                       "stdout":   string(stdout),
-                       "stderr":   string(stderr),
-               }).WithError(err).Warn("probe failed")
-               return nil, false, stderr
-       }
-       stdout = bytes.TrimRight(stdout, "\n")
-       if len(stdout) == 0 {
-               return nil, true, stderr
+func (wp *Pool) waitUntilLoaded() {
+       ch := wp.Subscribe()
+       wp.mtx.RLock()
+       defer wp.mtx.RUnlock()
+       for !wp.loaded {
+               wp.mtx.RUnlock()
+               <-ch
+               wp.mtx.RLock()
        }
-       return strings.Split(string(stdout), "\n"), true, stderr
 }
 
-func (wp *Pool) probeBooted(wkr *worker) (ok bool, stderr []byte) {
-       cmd := wp.bootProbeCommand
-       if cmd == "" {
-               cmd = "true"
-       }
-       stdout, stderr, err := wkr.executor.Execute(cmd, nil)
-       logger := wp.logger.WithFields(logrus.Fields{
-               "Instance": wkr.instance,
-               "Command":  cmd,
-               "stdout":   string(stdout),
-               "stderr":   string(stderr),
-       })
+// Return a random string of n hexadecimal digits (n*4 random bits). n
+// must be even.
+func randomHex(n int) string {
+       buf := make([]byte, n/2)
+       _, err := rand.Read(buf)
        if err != nil {
-               logger.WithError(err).Debug("boot probe failed")
-               return false, stderr
+               panic(err)
        }
-       logger.Info("boot probe succeeded")
-       return true, stderr
+       return fmt.Sprintf("%x", buf)
 }