Merge branch '15370-loopback-dispatchcloud'
[arvados.git] / lib / dispatchcloud / worker / pool.go
index 2c2d977d874227d8ebea67392b4665e03492a80e..66e0bfee910a236b46980f2db4b7c30850b3a759 100644 (file)
@@ -5,27 +5,38 @@
 package worker
 
 import (
+       "crypto/hmac"
+       "crypto/md5"
+       "crypto/rand"
+       "crypto/sha256"
        "errors"
+       "fmt"
        "io"
+       "io/ioutil"
+       mathrand "math/rand"
        "sort"
        "strings"
        "sync"
        "time"
 
-       "git.curoverse.com/arvados.git/lib/cloud"
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/lib/cloud"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
+       "golang.org/x/crypto/ssh"
 )
 
 const (
-       tagKeyInstanceType = "InstanceType"
-       tagKeyIdleBehavior = "IdleBehavior"
+       tagKeyInstanceType   = "InstanceType"
+       tagKeyIdleBehavior   = "IdleBehavior"
+       tagKeyInstanceSecret = "InstanceSecret"
+       tagKeyInstanceSetID  = "InstanceSetID"
 )
 
 // An InstanceView shows a worker's current state and recent activity.
 type InstanceView struct {
        Instance             cloud.InstanceID `json:"instance"`
+       Address              string           `json:"address"`
        Price                float64          `json:"price"`
        ArvadosInstanceType  string           `json:"arvados_instance_type"`
        ProviderInstanceType string           `json:"provider_instance_type"`
@@ -56,48 +67,65 @@ type Executor interface {
 }
 
 const (
-       defaultSyncInterval       = time.Minute
-       defaultProbeInterval      = time.Second * 10
-       defaultMaxProbesPerSecond = 10
-       defaultTimeoutIdle        = time.Minute
-       defaultTimeoutBooting     = time.Minute * 10
-       defaultTimeoutProbe       = time.Minute * 10
-       defaultTimeoutShutdown    = time.Second * 10
+       defaultSyncInterval        = time.Minute
+       defaultProbeInterval       = time.Second * 10
+       defaultMaxProbesPerSecond  = 10
+       defaultTimeoutIdle         = time.Minute
+       defaultTimeoutBooting      = time.Minute * 10
+       defaultTimeoutProbe        = time.Minute * 10
+       defaultTimeoutShutdown     = time.Second * 10
+       defaultTimeoutTERM         = time.Minute * 2
+       defaultTimeoutSignal       = time.Second * 5
+       defaultTimeoutStaleRunLock = time.Second * 5
 
        // Time after a quota error to try again anyway, even if no
        // instances have been shutdown.
        quotaErrorTTL = time.Minute
+
+       // Time between "X failed because rate limiting" messages
+       logRateLimitErrorInterval = time.Second * 10
 )
 
 func duration(conf arvados.Duration, def time.Duration) time.Duration {
        if conf > 0 {
                return time.Duration(conf)
-       } else {
-               return def
        }
+       return def
 }
 
 // NewPool creates a Pool of workers backed by instanceSet.
 //
 // New instances are configured and set up according to the given
 // cluster configuration.
-func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
+func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
        wp := &Pool{
-               logger:             logger,
-               arvClient:          arvClient,
-               instanceSet:        instanceSet,
-               newExecutor:        newExecutor,
-               bootProbeCommand:   cluster.CloudVMs.BootProbeCommand,
-               imageID:            cloud.ImageID(cluster.CloudVMs.ImageID),
-               instanceTypes:      cluster.InstanceTypes,
-               maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
-               probeInterval:      duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
-               syncInterval:       duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
-               timeoutIdle:        duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
-               timeoutBooting:     duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
-               timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
-               timeoutShutdown:    duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
-               stop:               make(chan bool),
+               logger:                         logger,
+               arvClient:                      arvClient,
+               instanceSetID:                  instanceSetID,
+               instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
+               newExecutor:                    newExecutor,
+               cluster:                        cluster,
+               bootProbeCommand:               cluster.Containers.CloudVMs.BootProbeCommand,
+               runnerSource:                   cluster.Containers.CloudVMs.DeployRunnerBinary,
+               imageID:                        cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
+               instanceTypes:                  cluster.InstanceTypes,
+               maxProbesPerSecond:             cluster.Containers.CloudVMs.MaxProbesPerSecond,
+               maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
+               probeInterval:                  duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
+               syncInterval:                   duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
+               timeoutIdle:                    duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
+               timeoutBooting:                 duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
+               timeoutProbe:                   duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
+               timeoutShutdown:                duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+               timeoutTERM:                    duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
+               timeoutSignal:                  duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
+               timeoutStaleRunLock:            duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
+               systemRootToken:                cluster.SystemRootToken,
+               installPublicKey:               installPublicKey,
+               tagKeyPrefix:                   cluster.Containers.CloudVMs.TagKeyPrefix,
+               runnerCmdDefault:               cluster.Containers.CrunchRunCommand,
+               runnerArgs:                     append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...),
+               stop:                           make(chan bool),
        }
        wp.registerMetrics(reg)
        go func() {
@@ -113,51 +141,90 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe
 // zero Pool should not be used. Call NewPool to create a new Pool.
 type Pool struct {
        // configuration
-       logger             logrus.FieldLogger
-       arvClient          *arvados.Client
-       instanceSet        cloud.InstanceSet
-       newExecutor        func(cloud.Instance) Executor
-       bootProbeCommand   string
-       imageID            cloud.ImageID
-       instanceTypes      map[string]arvados.InstanceType
-       syncInterval       time.Duration
-       probeInterval      time.Duration
-       maxProbesPerSecond int
-       timeoutIdle        time.Duration
-       timeoutBooting     time.Duration
-       timeoutProbe       time.Duration
-       timeoutShutdown    time.Duration
+       logger                         logrus.FieldLogger
+       arvClient                      *arvados.Client
+       instanceSetID                  cloud.InstanceSetID
+       instanceSet                    *throttledInstanceSet
+       newExecutor                    func(cloud.Instance) Executor
+       cluster                        *arvados.Cluster
+       bootProbeCommand               string
+       runnerSource                   string
+       imageID                        cloud.ImageID
+       instanceTypes                  map[string]arvados.InstanceType
+       syncInterval                   time.Duration
+       probeInterval                  time.Duration
+       maxProbesPerSecond             int
+       maxConcurrentInstanceCreateOps int
+       timeoutIdle                    time.Duration
+       timeoutBooting                 time.Duration
+       timeoutProbe                   time.Duration
+       timeoutShutdown                time.Duration
+       timeoutTERM                    time.Duration
+       timeoutSignal                  time.Duration
+       timeoutStaleRunLock            time.Duration
+       systemRootToken                string
+       installPublicKey               ssh.PublicKey
+       tagKeyPrefix                   string
+       runnerCmdDefault               string   // crunch-run command to use if not deploying a binary
+       runnerArgs                     []string // extra args passed to crunch-run
 
        // private state
        subscribers  map[<-chan struct{}]chan<- struct{}
-       creating     map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls
+       creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
        workers      map[cloud.InstanceID]*worker
        loaded       bool                 // loaded list of instances from InstanceSet at least once
-       exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
+       exited       map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
        atQuotaUntil time.Time
        atQuotaErr   cloud.QuotaError
        stop         chan bool
        mtx          sync.RWMutex
        setupOnce    sync.Once
+       runnerData   []byte
+       runnerMD5    [md5.Size]byte
+       runnerCmd    string
+
+       mContainersRunning        prometheus.Gauge
+       mInstances                *prometheus.GaugeVec
+       mInstancesPrice           *prometheus.GaugeVec
+       mVCPUs                    *prometheus.GaugeVec
+       mMemory                   *prometheus.GaugeVec
+       mBootOutcomes             *prometheus.CounterVec
+       mDisappearances           *prometheus.CounterVec
+       mTimeToSSH                prometheus.Summary
+       mTimeToReadyForContainer  prometheus.Summary
+       mTimeFromShutdownToGone   prometheus.Summary
+       mTimeFromQueueToCrunchRun prometheus.Summary
+       mRunProbeDuration         *prometheus.SummaryVec
+}
 
-       mInstances         prometheus.Gauge
-       mInstancesPrice    prometheus.Gauge
-       mContainersRunning prometheus.Gauge
-       mVCPUs             prometheus.Gauge
-       mVCPUsInuse        prometheus.Gauge
-       mMemory            prometheus.Gauge
-       mMemoryInuse       prometheus.Gauge
+type createCall struct {
+       time         time.Time
+       instanceType arvados.InstanceType
 }
 
-// Subscribe returns a channel that becomes ready whenever a worker's
-// state changes.
+func (wp *Pool) CheckHealth() error {
+       wp.setupOnce.Do(wp.setup)
+       if err := wp.loadRunnerData(); err != nil {
+               return fmt.Errorf("error loading runner binary: %s", err)
+       }
+       return nil
+}
+
+// Subscribe returns a buffered channel that becomes ready after any
+// change to the pool's state that could have scheduling implications:
+// a worker's state changes, a new worker appears, the cloud
+// provider's API rate limiting period ends, etc.
+//
+// Additional events that occur while the channel is already ready
+// will be dropped, so it is OK if the caller services the channel
+// slowly.
 //
 // Example:
 //
 //     ch := wp.Subscribe()
 //     defer wp.Unsubscribe(ch)
 //     for range ch {
-//             // ...try scheduling some work...
+//             tryScheduling(wp)
 //             if done {
 //                     break
 //             }
@@ -188,16 +255,28 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
        defer wp.mtx.RUnlock()
        unalloc := map[arvados.InstanceType]int{}
        creating := map[arvados.InstanceType]int{}
-       for it, times := range wp.creating {
-               creating[it] = len(times)
+       oldestCreate := map[arvados.InstanceType]time.Time{}
+       for _, cc := range wp.creating {
+               it := cc.instanceType
+               creating[it]++
+               if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
+                       oldestCreate[it] = cc.time
+               }
        }
        for _, wkr := range wp.workers {
-               if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) || wkr.idleBehavior != IdleBehaviorRun {
+               // Skip workers that are not expected to become
+               // available soon. Note len(wkr.running)>0 is not
+               // redundant here: it can be true even in
+               // StateUnknown.
+               if wkr.state == StateShutdown ||
+                       wkr.state == StateRunning ||
+                       wkr.idleBehavior != IdleBehaviorRun ||
+                       len(wkr.running) > 0 {
                        continue
                }
                it := wkr.instType
                unalloc[it]++
-               if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) {
+               if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
                        // If up to N new workers appear in
                        // Instances() while we are waiting for N
                        // Create() calls to complete, we assume we're
@@ -222,44 +301,67 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
 // Create a new instance with the given type, and add it to the worker
 // pool. The worker is added immediately; instance creation runs in
 // the background.
-func (wp *Pool) Create(it arvados.InstanceType) error {
+//
+// Create returns false if a pre-existing error state prevents it from
+// even attempting to create a new instance. Those errors are logged
+// by the Pool, so the caller does not need to log anything in such
+// cases.
+func (wp *Pool) Create(it arvados.InstanceType) bool {
        logger := wp.logger.WithField("InstanceType", it.Name)
        wp.setupOnce.Do(wp.setup)
+       if wp.loadRunnerData() != nil {
+               // Boot probe is certain to fail.
+               return false
+       }
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
-       if time.Now().Before(wp.atQuotaUntil) {
-               return wp.atQuotaErr
+       if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil {
+               return false
        }
-       tags := cloud.InstanceTags{
-               tagKeyInstanceType: it.Name,
-               tagKeyIdleBehavior: string(IdleBehaviorRun),
+       // The maxConcurrentInstanceCreateOps knob throttles the number of node create
+       // requests in flight. It was added to work around a limitation in Azure's
+       // managed disks, which support no more than 20 concurrent node creation
+       // requests from a single disk image (cf.
+       // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
+       // The code assumes that node creation, from Azure's perspective, means the
+       // period until the instance appears in the "get all instances" list.
+       if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
+               logger.Info("reached MaxConcurrentInstanceCreateOps")
+               wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
+               return false
        }
        now := time.Now()
-       wp.creating[it] = append(wp.creating[it], now)
+       secret := randomHex(instanceSecretLength)
+       wp.creating[secret] = createCall{time: now, instanceType: it}
        go func() {
                defer wp.notify()
-               inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
+               tags := cloud.InstanceTags{
+                       wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
+                       wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
+                       wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
+                       wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
+               }
+               initCmd := TagVerifier{nil, secret, nil}.InitCommand()
+               inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
                wp.mtx.Lock()
                defer wp.mtx.Unlock()
-               // Remove our timestamp marker from wp.creating
-               for i, t := range wp.creating[it] {
-                       if t == now {
-                               copy(wp.creating[it][i:], wp.creating[it][i+1:])
-                               wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1]
-                               break
-                       }
-               }
-               if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
-                       wp.atQuotaErr = err
-                       wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
-               }
+               // delete() is deferred so the updateWorker() call
+               // below knows to use StateBooting when adding a new
+               // worker.
+               defer delete(wp.creating, secret)
                if err != nil {
+                       if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
+                               wp.atQuotaErr = err
+                               wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
+                               time.AfterFunc(quotaErrorTTL, wp.notify)
+                       }
                        logger.WithError(err).Error("create failed")
+                       wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
                        return
                }
-               wp.updateWorker(inst, it, StateBooting)
+               wp.updateWorker(inst, it)
        }()
-       return nil
+       return true
 }
 
 // AtQuota returns true if Create is not expected to work at the
@@ -279,48 +381,70 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior)
        if !ok {
                return errors.New("requested instance does not exist")
        }
-       wkr.idleBehavior = idleBehavior
-       wkr.saveTags()
-       wkr.shutdownIfIdle()
+       wkr.setIdleBehavior(idleBehavior)
        return nil
 }
 
-// Add or update worker attached to the given instance. Use
-// initialState if a new worker is created.
+// Successful connection to the SSH daemon, update the mTimeToSSH metric
+func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       wkr := wp.workers[inst.ID()]
+       if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
+               // the node is not in booting state (can happen if a-d-c is restarted) OR
+               // this is not the first SSH connection
+               return
+       }
+
+       wkr.firstSSHConnection = time.Now()
+       if wp.mTimeToSSH != nil {
+               wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
+       }
+}
+
+// Add or update worker attached to the given instance.
 //
 // The second return value is true if a new worker is created.
 //
+// A newly added instance has state=StateBooting if its tags match an
+// entry in wp.creating, otherwise StateUnknown.
+//
 // Caller must have lock.
-func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
+func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
+       secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
+       inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
        id := inst.ID()
        if wkr := wp.workers[id]; wkr != nil {
                wkr.executor.SetTarget(inst)
                wkr.instance = inst
                wkr.updated = time.Now()
-               if initialState == StateBooting && wkr.state == StateUnknown {
-                       wkr.state = StateBooting
-               }
                wkr.saveTags()
                return wkr, false
        }
 
+       state := StateUnknown
+       if _, ok := wp.creating[secret]; ok {
+               state = StateBooting
+       }
+
        // If an instance has a valid IdleBehavior tag when it first
        // appears, initialize the new worker accordingly (this is how
        // we restore IdleBehavior that was set by a prior dispatch
        // process); otherwise, default to "run". After this,
        // wkr.idleBehavior is the source of truth, and will only be
        // changed via SetIdleBehavior().
-       idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior])
+       idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
        if !validIdleBehavior[idleBehavior] {
                idleBehavior = IdleBehaviorRun
        }
 
        logger := wp.logger.WithFields(logrus.Fields{
                "InstanceType": it.Name,
-               "Instance":     inst,
+               "Instance":     inst.ID(),
+               "Address":      inst.Address(),
        })
        logger.WithFields(logrus.Fields{
-               "State":        initialState,
+               "State":        state,
                "IdleBehavior": idleBehavior,
        }).Infof("instance appeared in cloud")
        now := time.Now()
@@ -329,7 +453,7 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi
                wp:           wp,
                logger:       logger,
                executor:     wp.newExecutor(inst),
-               state:        initialState,
+               state:        state,
                idleBehavior: idleBehavior,
                instance:     inst,
                instType:     it,
@@ -337,19 +461,14 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi
                probed:       now,
                busy:         now,
                updated:      now,
-               running:      make(map[string]struct{}),
-               starting:     make(map[string]struct{}),
+               running:      make(map[string]*remoteRunner),
+               starting:     make(map[string]*remoteRunner),
                probing:      make(chan struct{}, 1),
        }
        wp.workers[id] = wkr
        return wkr, true
 }
 
-// caller must have lock.
-func (wp *Pool) notifyExited(uuid string, t time.Time) {
-       wp.exited[uuid] = t
-}
-
 // Shutdown shuts down a worker with the given type, or returns false
 // if all workers with the given type are busy.
 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
@@ -363,7 +482,8 @@ func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
                // time (Idle) or the earliest create time (Booting)
                for _, wkr := range wp.workers {
                        if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
-                               logger.WithField("Instance", wkr.instance).Info("shutting down")
+                               logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
+                               wkr.reportBootOutcome(BootOutcomeAborted)
                                wkr.shutdown()
                                return true
                        }
@@ -373,8 +493,12 @@ func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
 }
 
 // CountWorkers returns the current number of workers in each state.
+//
+// CountWorkers blocks, if necessary, until the initial instance list
+// has been loaded from the cloud provider.
 func (wp *Pool) CountWorkers() map[State]int {
        wp.setupOnce.Do(wp.setup)
+       wp.waitUntilLoaded()
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
        r := map[State]int{}
@@ -385,6 +509,12 @@ func (wp *Pool) CountWorkers() map[State]int {
 }
 
 // Running returns the container UUIDs being prepared/run on workers.
+//
+// In the returned map, the time value indicates when the Pool
+// observed that the container process had exited. A container that
+// has not yet exited has a zero time value. The caller should use
+// ForgetContainer() to garbage-collect the entries for exited
+// containers.
 func (wp *Pool) Running() map[string]time.Time {
        wp.setupOnce.Do(wp.setup)
        wp.mtx.Lock()
@@ -412,7 +542,7 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b
        defer wp.mtx.Unlock()
        var wkr *worker
        for _, w := range wp.workers {
-               if w.instType == it && w.state == StateIdle {
+               if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
                        if wkr == nil || w.busy.After(wkr.busy) {
                                wkr = w
                        }
@@ -430,48 +560,45 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b
 //
 // KillContainer returns immediately; the act of killing the container
 // takes some time, and runs in the background.
-func (wp *Pool) KillContainer(uuid string) {
+//
+// KillContainer returns false if the container has already ended.
+func (wp *Pool) KillContainer(uuid string, reason string) bool {
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
-       if _, ok := wp.exited[uuid]; ok {
-               wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
-               delete(wp.exited, uuid)
-               return
-       }
+       logger := wp.logger.WithFields(logrus.Fields{
+               "ContainerUUID": uuid,
+               "Reason":        reason,
+       })
        for _, wkr := range wp.workers {
-               if _, ok := wkr.running[uuid]; ok {
-                       go wp.kill(wkr, uuid)
-                       return
+               rr := wkr.running[uuid]
+               if rr == nil {
+                       rr = wkr.starting[uuid]
+               }
+               if rr != nil {
+                       rr.Kill(reason)
+                       return true
                }
        }
-       wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
+       logger.Debug("cannot kill: already disappeared")
+       return false
 }
 
-func (wp *Pool) kill(wkr *worker, uuid string) {
-       logger := wp.logger.WithFields(logrus.Fields{
-               "ContainerUUID": uuid,
-               "Instance":      wkr.instance,
-       })
-       logger.Debug("killing process")
-       stdout, stderr, err := wkr.executor.Execute(nil, "crunch-run --kill 15 "+uuid, nil)
-       if err != nil {
-               logger.WithFields(logrus.Fields{
-                       "stderr": string(stderr),
-                       "stdout": string(stdout),
-                       "error":  err,
-               }).Warn("kill failed")
-               return
-       }
-       logger.Debug("killing process succeeded")
+// ForgetContainer clears the placeholder for the given exited
+// container, so it isn't returned by subsequent calls to Running().
+//
+// ForgetContainer has no effect if the container has not yet exited.
+//
+// The "container exited at time T" placeholder (which necessitates
+// ForgetContainer) exists to make it easier for the caller
+// (scheduler) to distinguish a container that exited without
+// finalizing its state from a container that exited too recently for
+// its final state to have appeared in the scheduler's queue cache.
+func (wp *Pool) ForgetContainer(uuid string) {
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
-       if _, ok := wkr.running[uuid]; ok {
-               delete(wkr.running, uuid)
-               if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
-                       wkr.state = StateIdle
-               }
-               wkr.updated = time.Now()
-               go wp.notify()
+       if _, ok := wp.exited[uuid]; ok {
+               wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
+               delete(wp.exited, uuid)
        }
 }
 
@@ -479,20 +606,6 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
        if reg == nil {
                reg = prometheus.NewRegistry()
        }
-       wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
-               Namespace: "arvados",
-               Subsystem: "dispatchcloud",
-               Name:      "instances_total",
-               Help:      "Number of cloud VMs including pending, booting, running, held, and shutting down.",
-       })
-       reg.MustRegister(wp.mInstances)
-       wp.mInstancesPrice = prometheus.NewGauge(prometheus.GaugeOpts{
-               Namespace: "arvados",
-               Subsystem: "dispatchcloud",
-               Name:      "instances_price_total",
-               Help:      "Sum of prices of all cloud VMs including pending, booting, running, held, and shutting down.",
-       })
-       reg.MustRegister(wp.mInstancesPrice)
        wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
                Namespace: "arvados",
                Subsystem: "dispatchcloud",
@@ -500,40 +613,100 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
                Help:      "Number of containers reported running by cloud VMs.",
        })
        reg.MustRegister(wp.mContainersRunning)
-
-       wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
+       wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "instances_total",
+               Help:      "Number of cloud VMs.",
+       }, []string{"category", "instance_type"})
+       reg.MustRegister(wp.mInstances)
+       wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "instances_price",
+               Help:      "Price of cloud VMs.",
+       }, []string{"category"})
+       reg.MustRegister(wp.mInstancesPrice)
+       wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
                Namespace: "arvados",
                Subsystem: "dispatchcloud",
                Name:      "vcpus_total",
                Help:      "Total VCPUs on all cloud VMs.",
-       })
+       }, []string{"category"})
        reg.MustRegister(wp.mVCPUs)
-       wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
-               Namespace: "arvados",
-               Subsystem: "dispatchcloud",
-               Name:      "vcpus_inuse",
-               Help:      "VCPUs on cloud VMs that are running containers.",
-       })
-       reg.MustRegister(wp.mVCPUsInuse)
-       wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
+       wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
                Namespace: "arvados",
                Subsystem: "dispatchcloud",
                Name:      "memory_bytes_total",
                Help:      "Total memory on all cloud VMs.",
-       })
+       }, []string{"category"})
        reg.MustRegister(wp.mMemory)
-       wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
+       wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
                Namespace: "arvados",
                Subsystem: "dispatchcloud",
-               Name:      "memory_bytes_inuse",
-               Help:      "Memory on cloud VMs that are running containers.",
+               Name:      "boot_outcomes",
+               Help:      "Boot outcomes by type.",
+       }, []string{"outcome"})
+       for k := range validBootOutcomes {
+               wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
+       }
+       reg.MustRegister(wp.mBootOutcomes)
+       wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "instances_disappeared",
+               Help:      "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
+       }, []string{"state"})
+       for _, v := range stateString {
+               wp.mDisappearances.WithLabelValues(v).Add(0)
+       }
+       reg.MustRegister(wp.mDisappearances)
+       wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
+               Namespace:  "arvados",
+               Subsystem:  "dispatchcloud",
+               Name:       "instances_time_to_ssh_seconds",
+               Help:       "Number of seconds between instance creation and the first successful SSH connection.",
+               Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
        })
-       reg.MustRegister(wp.mMemoryInuse)
+       reg.MustRegister(wp.mTimeToSSH)
+       wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
+               Namespace:  "arvados",
+               Subsystem:  "dispatchcloud",
+               Name:       "instances_time_to_ready_for_container_seconds",
+               Help:       "Number of seconds between the first successful SSH connection and ready to run a container.",
+               Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
+       })
+       reg.MustRegister(wp.mTimeToReadyForContainer)
+       wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
+               Namespace:  "arvados",
+               Subsystem:  "dispatchcloud",
+               Name:       "instances_time_from_shutdown_request_to_disappearance_seconds",
+               Help:       "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
+               Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
+       })
+       reg.MustRegister(wp.mTimeFromShutdownToGone)
+       wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
+               Namespace:  "arvados",
+               Subsystem:  "dispatchcloud",
+               Name:       "containers_time_from_queue_to_crunch_run_seconds",
+               Help:       "Number of seconds between the queuing of a container and the start of crunch-run.",
+               Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
+       })
+       reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
+       wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
+               Namespace:  "arvados",
+               Subsystem:  "dispatchcloud",
+               Name:       "instances_run_probe_duration_seconds",
+               Help:       "Number of seconds per runProbe call.",
+               Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
+       }, []string{"outcome"})
+       reg.MustRegister(wp.mRunProbeDuration)
 }
 
 func (wp *Pool) runMetrics() {
        ch := wp.Subscribe()
        defer wp.Unsubscribe(ch)
+       wp.updateMetrics()
        for range ch {
                wp.updateMetrics()
        }
@@ -543,26 +716,50 @@ func (wp *Pool) updateMetrics() {
        wp.mtx.RLock()
        defer wp.mtx.RUnlock()
 
-       var price float64
-       var alloc, cpu, cpuInuse, mem, memInuse int64
+       type entKey struct {
+               cat      string
+               instType string
+       }
+       instances := map[entKey]int64{}
+       price := map[string]float64{}
+       cpu := map[string]int64{}
+       mem := map[string]int64{}
+       var running int64
        for _, wkr := range wp.workers {
-               price += wkr.instType.Price
-               cpu += int64(wkr.instType.VCPUs)
-               mem += int64(wkr.instType.RAM)
-               if len(wkr.running)+len(wkr.starting) == 0 {
-                       continue
+               var cat string
+               switch {
+               case len(wkr.running)+len(wkr.starting) > 0:
+                       cat = "inuse"
+               case wkr.idleBehavior == IdleBehaviorHold:
+                       cat = "hold"
+               case wkr.state == StateBooting:
+                       cat = "booting"
+               case wkr.state == StateUnknown:
+                       cat = "unknown"
+               default:
+                       cat = "idle"
+               }
+               instances[entKey{cat, wkr.instType.Name}]++
+               price[cat] += wkr.instType.Price
+               cpu[cat] += int64(wkr.instType.VCPUs)
+               mem[cat] += int64(wkr.instType.RAM)
+               running += int64(len(wkr.running) + len(wkr.starting))
+       }
+       for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
+               wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
+               wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
+               wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
+               // make sure to reset gauges for non-existing category/nodetype combinations
+               for _, it := range wp.instanceTypes {
+                       if _, ok := instances[entKey{cat, it.Name}]; !ok {
+                               wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
+                       }
                }
-               alloc += int64(len(wkr.running) + len(wkr.starting))
-               cpuInuse += int64(wkr.instType.VCPUs)
-               memInuse += int64(wkr.instType.RAM)
        }
-       wp.mInstances.Set(float64(len(wp.workers)))
-       wp.mInstancesPrice.Set(price)
-       wp.mContainersRunning.Set(float64(alloc))
-       wp.mVCPUs.Set(float64(cpu))
-       wp.mMemory.Set(float64(mem))
-       wp.mVCPUsInuse.Set(float64(cpuInuse))
-       wp.mMemoryInuse.Set(float64(memInuse))
+       for k, v := range instances {
+               wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
+       }
+       wp.mContainersRunning.Set(float64(running))
 }
 
 func (wp *Pool) runProbes() {
@@ -578,6 +775,13 @@ func (wp *Pool) runProbes() {
 
        workers := []cloud.InstanceID{}
        for range probeticker.C {
+               // Add some jitter. Without this, if probeInterval is
+               // a multiple of syncInterval and sync is
+               // instantaneous (as with the loopback driver), the
+               // first few probes race with sync operations and
+               // don't update the workers.
+               time.Sleep(time.Duration(mathrand.Int63n(int64(wp.probeInterval) / 23)))
+
                workers = workers[:0]
                wp.mtx.Lock()
                for id, wkr := range wp.workers {
@@ -641,6 +845,7 @@ func (wp *Pool) Instances() []InstanceView {
        for _, w := range wp.workers {
                r = append(r, InstanceView{
                        Instance:             w.instance.ID(),
+                       Address:              w.instance.Address(),
                        Price:                w.instType.Price,
                        ArvadosInstanceType:  w.instType.Name,
                        ProviderInstanceType: w.instType.ProviderType,
@@ -657,11 +862,54 @@ func (wp *Pool) Instances() []InstanceView {
        return r
 }
 
+// KillInstance destroys a cloud VM instance. It returns an error if
+// the given instance does not exist.
+func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
+       wkr, ok := wp.workers[id]
+       if !ok {
+               return errors.New("instance not found")
+       }
+       wkr.logger.WithField("Reason", reason).Info("shutting down")
+       wkr.reportBootOutcome(BootOutcomeAborted)
+       wkr.shutdown()
+       return nil
+}
+
 func (wp *Pool) setup() {
-       wp.creating = map[arvados.InstanceType][]time.Time{}
+       wp.creating = map[string]createCall{}
        wp.exited = map[string]time.Time{}
        wp.workers = map[cloud.InstanceID]*worker{}
        wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
+       wp.loadRunnerData()
+}
+
+// Load the runner program to be deployed on worker nodes into
+// wp.runnerData, if necessary. Errors are logged.
+//
+// If auto-deploy is disabled, len(wp.runnerData) will be 0.
+//
+// Caller must not have lock.
+func (wp *Pool) loadRunnerData() error {
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       if wp.runnerData != nil {
+               return nil
+       } else if wp.runnerSource == "" {
+               wp.runnerCmd = wp.runnerCmdDefault
+               wp.runnerData = []byte{}
+               return nil
+       }
+       logger := wp.logger.WithField("source", wp.runnerSource)
+       logger.Debug("loading runner")
+       buf, err := ioutil.ReadFile(wp.runnerSource)
+       if err != nil {
+               logger.WithError(err).Error("failed to load runner program")
+               return err
+       }
+       wp.runnerData = buf
+       wp.runnerMD5 = md5.Sum(buf)
+       wp.runnerCmd = fmt.Sprintf("/tmp/arvados-crunch-run/crunch-run~%x", wp.runnerMD5)
+       return nil
 }
 
 func (wp *Pool) notify() {
@@ -677,10 +925,14 @@ func (wp *Pool) notify() {
 
 func (wp *Pool) getInstancesAndSync() error {
        wp.setupOnce.Do(wp.setup)
+       if err := wp.instanceSet.throttleInstances.Error(); err != nil {
+               return err
+       }
        wp.logger.Debug("getting instance list")
        threshold := time.Now()
-       instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
+       instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
        if err != nil {
+               wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
                return err
        }
        wp.sync(threshold, instances)
@@ -698,16 +950,16 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
        notify := false
 
        for _, inst := range instances {
-               itTag := inst.Tags()[tagKeyInstanceType]
+               itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
                it, ok := wp.instanceTypes[itTag]
                if !ok {
-                       wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
+                       wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
                        continue
                }
-               if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew {
+               if wkr, isNew := wp.updateWorker(inst, it); isNew {
                        notify = true
                } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
-                       wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
+                       wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
                        wkr.shutdown()
                }
        }
@@ -717,16 +969,25 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
                        continue
                }
                logger := wp.logger.WithFields(logrus.Fields{
-                       "Instance":    wkr.instance,
+                       "Instance":    wkr.instance.ID(),
                        "WorkerState": wkr.state,
                })
                logger.Info("instance disappeared in cloud")
+               wkr.reportBootOutcome(BootOutcomeDisappeared)
+               if wp.mDisappearances != nil {
+                       wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
+               }
+               // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
+               if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
+                       wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
+               }
                delete(wp.workers, id)
-               go wkr.executor.Close()
+               go wkr.Close()
                notify = true
        }
 
        if !wp.loaded {
+               notify = true
                wp.loaded = true
                wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
        }
@@ -735,3 +996,31 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
                go wp.notify()
        }
 }
+
+func (wp *Pool) waitUntilLoaded() {
+       ch := wp.Subscribe()
+       wp.mtx.RLock()
+       defer wp.mtx.RUnlock()
+       for !wp.loaded {
+               wp.mtx.RUnlock()
+               <-ch
+               wp.mtx.RLock()
+       }
+}
+
+func (wp *Pool) gatewayAuthSecret(uuid string) string {
+       h := hmac.New(sha256.New, []byte(wp.systemRootToken))
+       fmt.Fprint(h, uuid)
+       return fmt.Sprintf("%x", h.Sum(nil))
+}
+
+// Return a random string of n hexadecimal digits (n*4 random bits). n
+// must be even.
+func randomHex(n int) string {
+       buf := make([]byte, n/2)
+       _, err := rand.Read(buf)
+       if err != nil {
+               panic(err)
+       }
+       return fmt.Sprintf("%x", buf)
+}