15345: Add .../containers/kill management API to dispatcher.
[arvados.git] / lib / dispatchcloud / worker / pool.go
index 37add6d3d6fc83dd3edbda9e8b4de9ff750381d0..97ca7f60a2a916fd6feece03e016c7fe3673e22d 100644 (file)
@@ -5,6 +5,9 @@
 package worker
 
 import (
 package worker
 
 import (
+       "crypto/rand"
+       "errors"
+       "fmt"
        "io"
        "sort"
        "strings"
        "io"
        "sort"
        "strings"
@@ -13,30 +16,35 @@ import (
 
        "git.curoverse.com/arvados.git/lib/cloud"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
 
        "git.curoverse.com/arvados.git/lib/cloud"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "github.com/Sirupsen/logrus"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/prometheus/client_golang/prometheus"
+       "github.com/sirupsen/logrus"
+       "golang.org/x/crypto/ssh"
 )
 
 const (
 )
 
 const (
-       tagKeyInstanceType = "InstanceType"
-       tagKeyHold         = "Hold"
+       tagKeyInstanceType   = "InstanceType"
+       tagKeyIdleBehavior   = "IdleBehavior"
+       tagKeyInstanceSecret = "InstanceSecret"
+       tagKeyInstanceSetID  = "InstanceSetID"
 )
 
 // An InstanceView shows a worker's current state and recent activity.
 type InstanceView struct {
 )
 
 // An InstanceView shows a worker's current state and recent activity.
 type InstanceView struct {
-       Instance             string
-       Price                float64
-       ArvadosInstanceType  string
-       ProviderInstanceType string
-       LastContainerUUID    string
-       LastBusy             time.Time
-       WorkerState          string
+       Instance             cloud.InstanceID `json:"instance"`
+       Address              string           `json:"address"`
+       Price                float64          `json:"price"`
+       ArvadosInstanceType  string           `json:"arvados_instance_type"`
+       ProviderInstanceType string           `json:"provider_instance_type"`
+       LastContainerUUID    string           `json:"last_container_uuid"`
+       LastBusy             time.Time        `json:"last_busy"`
+       WorkerState          string           `json:"worker_state"`
+       IdleBehavior         IdleBehavior     `json:"idle_behavior"`
 }
 
 // An Executor executes shell commands on a remote host.
 type Executor interface {
        // Run cmd on the current target.
 }
 
 // An Executor executes shell commands on a remote host.
 type Executor interface {
        // Run cmd on the current target.
-       Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
+       Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
 
        // Use the given target for subsequent operations. The new
        // target is the same host as the previous target, but it
 
        // Use the given target for subsequent operations. The new
        // target is the same host as the previous target, but it
@@ -61,6 +69,15 @@ const (
        defaultTimeoutBooting     = time.Minute * 10
        defaultTimeoutProbe       = time.Minute * 10
        defaultTimeoutShutdown    = time.Second * 10
        defaultTimeoutBooting     = time.Minute * 10
        defaultTimeoutProbe       = time.Minute * 10
        defaultTimeoutShutdown    = time.Second * 10
+       defaultTimeoutTERM        = time.Minute * 2
+       defaultTimeoutSignal      = time.Second * 5
+
+       // Time after a quota error to try again anyway, even if no
+       // instances have been shutdown.
+       quotaErrorTTL = time.Minute
+
+       // Time between "X failed because rate limiting" messages
+       logRateLimitErrorInterval = time.Second * 10
 )
 
 func duration(conf arvados.Duration, def time.Duration) time.Duration {
 )
 
 func duration(conf arvados.Duration, def time.Duration) time.Duration {
@@ -75,21 +92,28 @@ func duration(conf arvados.Duration, def time.Duration) time.Duration {
 //
 // New instances are configured and set up according to the given
 // cluster configuration.
 //
 // New instances are configured and set up according to the given
 // cluster configuration.
-func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
+func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
        wp := &Pool{
                logger:             logger,
        wp := &Pool{
                logger:             logger,
-               instanceSet:        instanceSet,
+               arvClient:          arvClient,
+               instanceSetID:      instanceSetID,
+               instanceSet:        &throttledInstanceSet{InstanceSet: instanceSet},
                newExecutor:        newExecutor,
                newExecutor:        newExecutor,
-               bootProbeCommand:   cluster.CloudVMs.BootProbeCommand,
-               imageID:            cloud.ImageID(cluster.CloudVMs.ImageID),
+               bootProbeCommand:   cluster.Containers.CloudVMs.BootProbeCommand,
+               imageID:            cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
                instanceTypes:      cluster.InstanceTypes,
                instanceTypes:      cluster.InstanceTypes,
-               maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
-               probeInterval:      duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
-               syncInterval:       duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
-               timeoutIdle:        duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
-               timeoutBooting:     duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
-               timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
-               timeoutShutdown:    duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+               maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
+               probeInterval:      duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
+               syncInterval:       duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
+               timeoutIdle:        duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
+               timeoutBooting:     duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
+               timeoutProbe:       duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
+               timeoutShutdown:    duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+               timeoutTERM:        duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
+               timeoutSignal:      duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
+               installPublicKey:   installPublicKey,
+               tagKeyPrefix:       cluster.Containers.CloudVMs.TagKeyPrefix,
+               stop:               make(chan bool),
        }
        wp.registerMetrics(reg)
        go func() {
        }
        wp.registerMetrics(reg)
        go func() {
@@ -106,7 +130,9 @@ func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cl
 type Pool struct {
        // configuration
        logger             logrus.FieldLogger
 type Pool struct {
        // configuration
        logger             logrus.FieldLogger
-       instanceSet        cloud.InstanceSet
+       arvClient          *arvados.Client
+       instanceSetID      cloud.InstanceSetID
+       instanceSet        *throttledInstanceSet
        newExecutor        func(cloud.Instance) Executor
        bootProbeCommand   string
        imageID            cloud.ImageID
        newExecutor        func(cloud.Instance) Executor
        bootProbeCommand   string
        imageID            cloud.ImageID
@@ -118,36 +144,54 @@ type Pool struct {
        timeoutBooting     time.Duration
        timeoutProbe       time.Duration
        timeoutShutdown    time.Duration
        timeoutBooting     time.Duration
        timeoutProbe       time.Duration
        timeoutShutdown    time.Duration
+       timeoutTERM        time.Duration
+       timeoutSignal      time.Duration
+       installPublicKey   ssh.PublicKey
+       tagKeyPrefix       string
 
        // private state
        subscribers  map[<-chan struct{}]chan<- struct{}
 
        // private state
        subscribers  map[<-chan struct{}]chan<- struct{}
-       creating     map[arvados.InstanceType]int // goroutines waiting for (InstanceSet)Create to return
+       creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
        workers      map[cloud.InstanceID]*worker
        loaded       bool                 // loaded list of instances from InstanceSet at least once
        workers      map[cloud.InstanceID]*worker
        loaded       bool                 // loaded list of instances from InstanceSet at least once
-       exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
+       exited       map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
        atQuotaUntil time.Time
        atQuotaErr   cloud.QuotaError
        stop         chan bool
        mtx          sync.RWMutex
        setupOnce    sync.Once
 
        atQuotaUntil time.Time
        atQuotaErr   cloud.QuotaError
        stop         chan bool
        mtx          sync.RWMutex
        setupOnce    sync.Once
 
-       mInstances         prometheus.Gauge
+       throttleCreate    throttle
+       throttleInstances throttle
+
        mContainersRunning prometheus.Gauge
        mContainersRunning prometheus.Gauge
-       mVCPUs             prometheus.Gauge
-       mVCPUsInuse        prometheus.Gauge
-       mMemory            prometheus.Gauge
-       mMemoryInuse       prometheus.Gauge
+       mInstances         *prometheus.GaugeVec
+       mInstancesPrice    *prometheus.GaugeVec
+       mVCPUs             *prometheus.GaugeVec
+       mMemory            *prometheus.GaugeVec
+       mDisappearances    *prometheus.CounterVec
 }
 
 }
 
-// Subscribe returns a channel that becomes ready whenever a worker's
-// state changes.
+type createCall struct {
+       time         time.Time
+       instanceType arvados.InstanceType
+}
+
+// Subscribe returns a buffered channel that becomes ready after any
+// change to the pool's state that could have scheduling implications:
+// a worker's state changes, a new worker appears, the cloud
+// provider's API rate limiting period ends, etc.
+//
+// Additional events that occur while the channel is already ready
+// will be dropped, so it is OK if the caller services the channel
+// slowly.
 //
 // Example:
 //
 //     ch := wp.Subscribe()
 //     defer wp.Unsubscribe(ch)
 //     for range ch {
 //
 // Example:
 //
 //     ch := wp.Subscribe()
 //     defer wp.Unsubscribe(ch)
 //     for range ch {
-//             // ...try scheduling some work...
+//             tryScheduling(wp)
 //             if done {
 //                     break
 //             }
 //             if done {
 //                     break
 //             }
@@ -170,58 +214,105 @@ func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
 }
 
 // Unallocated returns the number of unallocated (creating + booting +
 }
 
 // Unallocated returns the number of unallocated (creating + booting +
-// idle + unknown) workers for each instance type.
-//
-// The returned counts should be interpreted as upper bounds, rather
-// than exact counts: they are sometimes artificially high when a
-// newly created instance appears in the driver's Instances() list
-// before the Create() call returns.
+// idle + unknown) workers for each instance type.  Workers in
+// hold/drain mode are not included.
 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
        wp.setupOnce.Do(wp.setup)
        wp.mtx.RLock()
        defer wp.mtx.RUnlock()
 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
        wp.setupOnce.Do(wp.setup)
        wp.mtx.RLock()
        defer wp.mtx.RUnlock()
-       u := map[arvados.InstanceType]int{}
-       for it, c := range wp.creating {
-               u[it] = c
+       unalloc := map[arvados.InstanceType]int{}
+       creating := map[arvados.InstanceType]int{}
+       oldestCreate := map[arvados.InstanceType]time.Time{}
+       for _, cc := range wp.creating {
+               it := cc.instanceType
+               creating[it]++
+               if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
+                       oldestCreate[it] = cc.time
+               }
        }
        for _, wkr := range wp.workers {
        }
        for _, wkr := range wp.workers {
-               if wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown {
-                       u[wkr.instType]++
+               // Skip workers that are not expected to become
+               // available soon. Note len(wkr.running)>0 is not
+               // redundant here: it can be true even in
+               // StateUnknown.
+               if wkr.state == StateShutdown ||
+                       wkr.state == StateRunning ||
+                       wkr.idleBehavior != IdleBehaviorRun ||
+                       len(wkr.running) > 0 {
+                       continue
+               }
+               it := wkr.instType
+               unalloc[it]++
+               if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
+                       // If up to N new workers appear in
+                       // Instances() while we are waiting for N
+                       // Create() calls to complete, we assume we're
+                       // just seeing a race between Instances() and
+                       // Create() responses.
+                       //
+                       // The other common reason why nodes have
+                       // state==Unknown is that they appeared at
+                       // startup, before any Create calls. They
+                       // don't match the above timing condition, so
+                       // we never mistakenly attribute them to
+                       // pending Create calls.
+                       creating[it]--
                }
        }
                }
        }
-       return u
+       for it, c := range creating {
+               unalloc[it] += c
+       }
+       return unalloc
 }
 
 // Create a new instance with the given type, and add it to the worker
 // pool. The worker is added immediately; instance creation runs in
 // the background.
 }
 
 // Create a new instance with the given type, and add it to the worker
 // pool. The worker is added immediately; instance creation runs in
 // the background.
-func (wp *Pool) Create(it arvados.InstanceType) error {
+//
+// Create returns false if a pre-existing error state prevents it from
+// even attempting to create a new instance. Those errors are logged
+// by the Pool, so the caller does not need to log anything in such
+// cases.
+func (wp *Pool) Create(it arvados.InstanceType) bool {
        logger := wp.logger.WithField("InstanceType", it.Name)
        wp.setupOnce.Do(wp.setup)
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
        logger := wp.logger.WithField("InstanceType", it.Name)
        wp.setupOnce.Do(wp.setup)
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
-       if time.Now().Before(wp.atQuotaUntil) {
-               return wp.atQuotaErr
+       if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
+               return false
        }
        }
-       tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
-       wp.creating[it]++
+       now := time.Now()
+       secret := randomHex(instanceSecretLength)
+       wp.creating[secret] = createCall{time: now, instanceType: it}
        go func() {
                defer wp.notify()
        go func() {
                defer wp.notify()
-               inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
+               tags := cloud.InstanceTags{
+                       wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
+                       wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
+                       wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
+                       wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
+               }
+               initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename))
+               inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
                wp.mtx.Lock()
                defer wp.mtx.Unlock()
                wp.mtx.Lock()
                defer wp.mtx.Unlock()
-               wp.creating[it]--
-               if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
-                       wp.atQuotaErr = err
-                       wp.atQuotaUntil = time.Now().Add(time.Minute)
-               }
+               // delete() is deferred so the updateWorker() call
+               // below knows to use StateBooting when adding a new
+               // worker.
+               defer delete(wp.creating, secret)
                if err != nil {
                if err != nil {
+                       if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
+                               wp.atQuotaErr = err
+                               wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
+                               time.AfterFunc(quotaErrorTTL, wp.notify)
+                       }
                        logger.WithError(err).Error("create failed")
                        logger.WithError(err).Error("create failed")
+                       wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
                        return
                }
                        return
                }
-               wp.updateWorker(inst, it, StateBooting)
+               wp.updateWorker(inst, it)
        }()
        }()
-       return nil
+       return true
 }
 
 // AtQuota returns true if Create is not expected to work at the
 }
 
 // AtQuota returns true if Create is not expected to work at the
@@ -232,56 +323,86 @@ func (wp *Pool) AtQuota() bool {
        return time.Now().Before(wp.atQuotaUntil)
 }
 
        return time.Now().Before(wp.atQuotaUntil)
 }
 
-// Add or update worker attached to the given instance. Use
-// initialState if a new worker is created.
+// SetIdleBehavior determines how the indicated instance will behave
+// when it has no containers running.
+func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       wkr, ok := wp.workers[id]
+       if !ok {
+               return errors.New("requested instance does not exist")
+       }
+       wkr.setIdleBehavior(idleBehavior)
+       return nil
+}
+
+// Add or update worker attached to the given instance.
 //
 // The second return value is true if a new worker is created.
 //
 //
 // The second return value is true if a new worker is created.
 //
+// A newly added instance has state=StateBooting if its tags match an
+// entry in wp.creating, otherwise StateUnknown.
+//
 // Caller must have lock.
 // Caller must have lock.
-func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
+func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
+       secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
+       inst = tagVerifier{inst, secret}
        id := inst.ID()
        if wkr := wp.workers[id]; wkr != nil {
                wkr.executor.SetTarget(inst)
                wkr.instance = inst
                wkr.updated = time.Now()
        id := inst.ID()
        if wkr := wp.workers[id]; wkr != nil {
                wkr.executor.SetTarget(inst)
                wkr.instance = inst
                wkr.updated = time.Now()
-               if initialState == StateBooting && wkr.state == StateUnknown {
-                       wkr.state = StateBooting
-               }
+               wkr.saveTags()
                return wkr, false
        }
                return wkr, false
        }
-       if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
-               initialState = StateHold
+
+       state := StateUnknown
+       if _, ok := wp.creating[secret]; ok {
+               state = StateBooting
+       }
+
+       // If an instance has a valid IdleBehavior tag when it first
+       // appears, initialize the new worker accordingly (this is how
+       // we restore IdleBehavior that was set by a prior dispatch
+       // process); otherwise, default to "run". After this,
+       // wkr.idleBehavior is the source of truth, and will only be
+       // changed via SetIdleBehavior().
+       idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
+       if !validIdleBehavior[idleBehavior] {
+               idleBehavior = IdleBehaviorRun
        }
        }
+
        logger := wp.logger.WithFields(logrus.Fields{
                "InstanceType": it.Name,
        logger := wp.logger.WithFields(logrus.Fields{
                "InstanceType": it.Name,
-               "Instance":     inst,
+               "Instance":     inst.ID(),
+               "Address":      inst.Address(),
        })
        })
-       logger.WithField("State", initialState).Infof("instance appeared in cloud")
+       logger.WithFields(logrus.Fields{
+               "State":        state,
+               "IdleBehavior": idleBehavior,
+       }).Infof("instance appeared in cloud")
        now := time.Now()
        wkr := &worker{
        now := time.Now()
        wkr := &worker{
-               mtx:      &wp.mtx,
-               wp:       wp,
-               logger:   logger,
-               executor: wp.newExecutor(inst),
-               state:    initialState,
-               instance: inst,
-               instType: it,
-               probed:   now,
-               busy:     now,
-               updated:  now,
-               running:  make(map[string]struct{}),
-               starting: make(map[string]struct{}),
-               probing:  make(chan struct{}, 1),
+               mtx:          &wp.mtx,
+               wp:           wp,
+               logger:       logger,
+               executor:     wp.newExecutor(inst),
+               state:        state,
+               idleBehavior: idleBehavior,
+               instance:     inst,
+               instType:     it,
+               appeared:     now,
+               probed:       now,
+               busy:         now,
+               updated:      now,
+               running:      make(map[string]*remoteRunner),
+               starting:     make(map[string]*remoteRunner),
+               probing:      make(chan struct{}, 1),
        }
        wp.workers[id] = wkr
        return wkr, true
 }
 
        }
        wp.workers[id] = wkr
        return wkr, true
 }
 
-// caller must have lock.
-func (wp *Pool) notifyExited(uuid string, t time.Time) {
-       wp.exited[uuid] = t
-}
-
 // Shutdown shuts down a worker with the given type, or returns false
 // if all workers with the given type are busy.
 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
 // Shutdown shuts down a worker with the given type, or returns false
 // if all workers with the given type are busy.
 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
@@ -294,7 +415,7 @@ func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
                // TODO: shutdown the worker with the longest idle
                // time (Idle) or the earliest create time (Booting)
                for _, wkr := range wp.workers {
                // TODO: shutdown the worker with the longest idle
                // time (Idle) or the earliest create time (Booting)
                for _, wkr := range wp.workers {
-                       if wkr.state == tryState && wkr.instType == it {
+                       if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
                                logger.WithField("Instance", wkr.instance).Info("shutting down")
                                wkr.shutdown()
                                return true
                                logger.WithField("Instance", wkr.instance).Info("shutting down")
                                wkr.shutdown()
                                return true
@@ -304,9 +425,13 @@ func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
        return false
 }
 
        return false
 }
 
-// Workers returns the current number of workers in each state.
-func (wp *Pool) Workers() map[State]int {
+// CountWorkers returns the current number of workers in each state.
+//
+// CountWorkers blocks, if necessary, until the initial instance list
+// has been loaded from the cloud provider.
+func (wp *Pool) CountWorkers() map[State]int {
        wp.setupOnce.Do(wp.setup)
        wp.setupOnce.Do(wp.setup)
+       wp.waitUntilLoaded()
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
        r := map[State]int{}
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
        r := map[State]int{}
@@ -317,6 +442,12 @@ func (wp *Pool) Workers() map[State]int {
 }
 
 // Running returns the container UUIDs being prepared/run on workers.
 }
 
 // Running returns the container UUIDs being prepared/run on workers.
+//
+// In the returned map, the time value indicates when the Pool
+// observed that the container process had exited. A container that
+// has not yet exited has a zero time value. The caller should use
+// ForgetContainer() to garbage-collect the entries for exited
+// containers.
 func (wp *Pool) Running() map[string]time.Time {
        wp.setupOnce.Do(wp.setup)
        wp.mtx.Lock()
 func (wp *Pool) Running() map[string]time.Time {
        wp.setupOnce.Do(wp.setup)
        wp.mtx.Lock()
@@ -362,48 +493,45 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b
 //
 // KillContainer returns immediately; the act of killing the container
 // takes some time, and runs in the background.
 //
 // KillContainer returns immediately; the act of killing the container
 // takes some time, and runs in the background.
-func (wp *Pool) KillContainer(uuid string) {
+//
+// KillContainer returns false if the container has already ended.
+func (wp *Pool) KillContainer(uuid string, reason string) bool {
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
-       if _, ok := wp.exited[uuid]; ok {
-               wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
-               delete(wp.exited, uuid)
-               return
-       }
+       logger := wp.logger.WithFields(logrus.Fields{
+               "ContainerUUID": uuid,
+               "Reason":        reason,
+       })
        for _, wkr := range wp.workers {
        for _, wkr := range wp.workers {
-               if _, ok := wkr.running[uuid]; ok {
-                       go wp.kill(wkr, uuid)
-                       return
+               rr := wkr.running[uuid]
+               if rr == nil {
+                       rr = wkr.starting[uuid]
+               }
+               if rr != nil {
+                       rr.Kill(reason)
+                       return true
                }
        }
                }
        }
-       wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
+       logger.Debug("cannot kill: already disappeared")
+       return false
 }
 
 }
 
-func (wp *Pool) kill(wkr *worker, uuid string) {
-       logger := wp.logger.WithFields(logrus.Fields{
-               "ContainerUUID": uuid,
-               "Instance":      wkr.instance,
-       })
-       logger.Debug("killing process")
-       stdout, stderr, err := wkr.executor.Execute("crunch-run --kill "+uuid, nil)
-       if err != nil {
-               logger.WithFields(logrus.Fields{
-                       "stderr": string(stderr),
-                       "stdout": string(stdout),
-                       "error":  err,
-               }).Warn("kill failed")
-               return
-       }
-       logger.Debug("killing process succeeded")
+// ForgetContainer clears the placeholder for the given exited
+// container, so it isn't returned by subsequent calls to Running().
+//
+// ForgetContainer has no effect if the container has not yet exited.
+//
+// The "container exited at time T" placeholder (which necessitates
+// ForgetContainer) exists to make it easier for the caller
+// (scheduler) to distinguish a container that exited without
+// finalizing its state from a container that exited too recently for
+// its final state to have appeared in the scheduler's queue cache.
+func (wp *Pool) ForgetContainer(uuid string) {
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
-       if _, ok := wkr.running[uuid]; ok {
-               delete(wkr.running, uuid)
-               if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
-                       wkr.state = StateIdle
-               }
-               wkr.updated = time.Now()
-               go wp.notify()
+       if _, ok := wp.exited[uuid]; ok {
+               wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
+               delete(wp.exited, uuid)
        }
 }
 
        }
 }
 
@@ -411,13 +539,6 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
        if reg == nil {
                reg = prometheus.NewRegistry()
        }
        if reg == nil {
                reg = prometheus.NewRegistry()
        }
-       wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
-               Namespace: "arvados",
-               Subsystem: "dispatchcloud",
-               Name:      "instances_total",
-               Help:      "Number of cloud VMs including pending, booting, running, held, and shutting down.",
-       })
-       reg.MustRegister(wp.mInstances)
        wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
                Namespace: "arvados",
                Subsystem: "dispatchcloud",
        wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
                Namespace: "arvados",
                Subsystem: "dispatchcloud",
@@ -425,40 +546,50 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
                Help:      "Number of containers reported running by cloud VMs.",
        })
        reg.MustRegister(wp.mContainersRunning)
                Help:      "Number of containers reported running by cloud VMs.",
        })
        reg.MustRegister(wp.mContainersRunning)
-
-       wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
+       wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "instances_total",
+               Help:      "Number of cloud VMs.",
+       }, []string{"category"})
+       reg.MustRegister(wp.mInstances)
+       wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "instances_price",
+               Help:      "Price of cloud VMs.",
+       }, []string{"category"})
+       reg.MustRegister(wp.mInstancesPrice)
+       wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
                Namespace: "arvados",
                Subsystem: "dispatchcloud",
                Name:      "vcpus_total",
                Help:      "Total VCPUs on all cloud VMs.",
                Namespace: "arvados",
                Subsystem: "dispatchcloud",
                Name:      "vcpus_total",
                Help:      "Total VCPUs on all cloud VMs.",
-       })
+       }, []string{"category"})
        reg.MustRegister(wp.mVCPUs)
        reg.MustRegister(wp.mVCPUs)
-       wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
-               Namespace: "arvados",
-               Subsystem: "dispatchcloud",
-               Name:      "vcpus_inuse",
-               Help:      "VCPUs on cloud VMs that are running containers.",
-       })
-       reg.MustRegister(wp.mVCPUsInuse)
-       wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
+       wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
                Namespace: "arvados",
                Subsystem: "dispatchcloud",
                Name:      "memory_bytes_total",
                Help:      "Total memory on all cloud VMs.",
                Namespace: "arvados",
                Subsystem: "dispatchcloud",
                Name:      "memory_bytes_total",
                Help:      "Total memory on all cloud VMs.",
-       })
+       }, []string{"category"})
        reg.MustRegister(wp.mMemory)
        reg.MustRegister(wp.mMemory)
-       wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
+       wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
                Namespace: "arvados",
                Subsystem: "dispatchcloud",
                Namespace: "arvados",
                Subsystem: "dispatchcloud",
-               Name:      "memory_bytes_inuse",
-               Help:      "Memory on cloud VMs that are running containers.",
-       })
-       reg.MustRegister(wp.mMemoryInuse)
+               Name:      "instances_disappeared",
+               Help:      "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
+       }, []string{"state"})
+       for _, v := range stateString {
+               wp.mDisappearances.WithLabelValues(v).Add(0)
+       }
+       reg.MustRegister(wp.mDisappearances)
 }
 
 func (wp *Pool) runMetrics() {
        ch := wp.Subscribe()
        defer wp.Unsubscribe(ch)
 }
 
 func (wp *Pool) runMetrics() {
        ch := wp.Subscribe()
        defer wp.Unsubscribe(ch)
+       wp.updateMetrics()
        for range ch {
                wp.updateMetrics()
        }
        for range ch {
                wp.updateMetrics()
        }
@@ -468,23 +599,38 @@ func (wp *Pool) updateMetrics() {
        wp.mtx.RLock()
        defer wp.mtx.RUnlock()
 
        wp.mtx.RLock()
        defer wp.mtx.RUnlock()
 
-       var alloc, cpu, cpuInuse, mem, memInuse int64
+       instances := map[string]int64{}
+       price := map[string]float64{}
+       cpu := map[string]int64{}
+       mem := map[string]int64{}
+       var running int64
        for _, wkr := range wp.workers {
        for _, wkr := range wp.workers {
-               cpu += int64(wkr.instType.VCPUs)
-               mem += int64(wkr.instType.RAM)
-               if len(wkr.running)+len(wkr.starting) == 0 {
-                       continue
+               var cat string
+               switch {
+               case len(wkr.running)+len(wkr.starting) > 0:
+                       cat = "inuse"
+               case wkr.idleBehavior == IdleBehaviorHold:
+                       cat = "hold"
+               case wkr.state == StateBooting:
+                       cat = "booting"
+               case wkr.state == StateUnknown:
+                       cat = "unknown"
+               default:
+                       cat = "idle"
                }
                }
-               alloc += int64(len(wkr.running) + len(wkr.starting))
-               cpuInuse += int64(wkr.instType.VCPUs)
-               memInuse += int64(wkr.instType.RAM)
+               instances[cat]++
+               price[cat] += wkr.instType.Price
+               cpu[cat] += int64(wkr.instType.VCPUs)
+               mem[cat] += int64(wkr.instType.RAM)
+               running += int64(len(wkr.running) + len(wkr.starting))
+       }
+       for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
+               wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat]))
+               wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
+               wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
+               wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
        }
        }
-       wp.mInstances.Set(float64(len(wp.workers)))
-       wp.mContainersRunning.Set(float64(alloc))
-       wp.mVCPUs.Set(float64(cpu))
-       wp.mMemory.Set(float64(mem))
-       wp.mVCPUsInuse.Set(float64(cpuInuse))
-       wp.mMemoryInuse.Set(float64(memInuse))
+       wp.mContainersRunning.Set(float64(running))
 }
 
 func (wp *Pool) runProbes() {
 }
 
 func (wp *Pool) runProbes() {
@@ -562,24 +708,38 @@ func (wp *Pool) Instances() []InstanceView {
        wp.mtx.Lock()
        for _, w := range wp.workers {
                r = append(r, InstanceView{
        wp.mtx.Lock()
        for _, w := range wp.workers {
                r = append(r, InstanceView{
-                       Instance:             w.instance.String(),
+                       Instance:             w.instance.ID(),
+                       Address:              w.instance.Address(),
                        Price:                w.instType.Price,
                        ArvadosInstanceType:  w.instType.Name,
                        ProviderInstanceType: w.instType.ProviderType,
                        LastContainerUUID:    w.lastUUID,
                        LastBusy:             w.busy,
                        WorkerState:          w.state.String(),
                        Price:                w.instType.Price,
                        ArvadosInstanceType:  w.instType.Name,
                        ProviderInstanceType: w.instType.ProviderType,
                        LastContainerUUID:    w.lastUUID,
                        LastBusy:             w.busy,
                        WorkerState:          w.state.String(),
+                       IdleBehavior:         w.idleBehavior,
                })
        }
        wp.mtx.Unlock()
        sort.Slice(r, func(i, j int) bool {
                })
        }
        wp.mtx.Unlock()
        sort.Slice(r, func(i, j int) bool {
-               return strings.Compare(r[i].Instance, r[j].Instance) < 0
+               return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
        })
        return r
 }
 
        })
        return r
 }
 
+// KillInstance destroys a cloud VM instance. It returns an error if
+// the given instance does not exist.
+func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
+       wkr, ok := wp.workers[id]
+       if !ok {
+               return errors.New("instance not found")
+       }
+       wkr.logger.WithField("Reason", reason).Info("shutting down")
+       wkr.shutdown()
+       return nil
+}
+
 func (wp *Pool) setup() {
 func (wp *Pool) setup() {
-       wp.creating = map[arvados.InstanceType]int{}
+       wp.creating = map[string]createCall{}
        wp.exited = map[string]time.Time{}
        wp.workers = map[cloud.InstanceID]*worker{}
        wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
        wp.exited = map[string]time.Time{}
        wp.workers = map[cloud.InstanceID]*worker{}
        wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
@@ -598,10 +758,14 @@ func (wp *Pool) notify() {
 
 func (wp *Pool) getInstancesAndSync() error {
        wp.setupOnce.Do(wp.setup)
 
 func (wp *Pool) getInstancesAndSync() error {
        wp.setupOnce.Do(wp.setup)
+       if err := wp.instanceSet.throttleInstances.Error(); err != nil {
+               return err
+       }
        wp.logger.Debug("getting instance list")
        threshold := time.Now()
        wp.logger.Debug("getting instance list")
        threshold := time.Now()
-       instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
+       instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
        if err != nil {
        if err != nil {
+               wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
                return err
        }
        wp.sync(threshold, instances)
                return err
        }
        wp.sync(threshold, instances)
@@ -619,13 +783,13 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
        notify := false
 
        for _, inst := range instances {
        notify := false
 
        for _, inst := range instances {
-               itTag := inst.Tags()[tagKeyInstanceType]
+               itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
                it, ok := wp.instanceTypes[itTag]
                if !ok {
                        wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
                        continue
                }
                it, ok := wp.instanceTypes[itTag]
                if !ok {
                        wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
                        continue
                }
-               if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew {
+               if wkr, isNew := wp.updateWorker(inst, it); isNew {
                        notify = true
                } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
                        wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
                        notify = true
                } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
                        wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
@@ -638,16 +802,20 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
                        continue
                }
                logger := wp.logger.WithFields(logrus.Fields{
                        continue
                }
                logger := wp.logger.WithFields(logrus.Fields{
-                       "Instance":    wkr.instance,
+                       "Instance":    wkr.instance.ID(),
                        "WorkerState": wkr.state,
                })
                logger.Info("instance disappeared in cloud")
                        "WorkerState": wkr.state,
                })
                logger.Info("instance disappeared in cloud")
+               if wp.mDisappearances != nil {
+                       wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
+               }
                delete(wp.workers, id)
                delete(wp.workers, id)
-               go wkr.executor.Close()
+               go wkr.Close()
                notify = true
        }
 
        if !wp.loaded {
                notify = true
        }
 
        if !wp.loaded {
+               notify = true
                wp.loaded = true
                wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
        }
                wp.loaded = true
                wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
        }
@@ -656,3 +824,25 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
                go wp.notify()
        }
 }
                go wp.notify()
        }
 }
+
+func (wp *Pool) waitUntilLoaded() {
+       ch := wp.Subscribe()
+       wp.mtx.RLock()
+       defer wp.mtx.RUnlock()
+       for !wp.loaded {
+               wp.mtx.RUnlock()
+               <-ch
+               wp.mtx.RLock()
+       }
+}
+
+// Return a random string of n hexadecimal digits (n*4 random bits). n
+// must be even.
+func randomHex(n int) string {
+       buf := make([]byte, n/2)
+       _, err := rand.Read(buf)
+       if err != nil {
+               panic(err)
+       }
+       return fmt.Sprintf("%x", buf)
+}