Merge branch '15781-multi-value-property-search'
[arvados.git] / lib / dispatchcloud / worker / pool.go
index 14f6a3efced3815f11b19b6e08612ead4326e4f6..0636fcee897fee906db1c03ae2c7b6a4e4e7c811 100644 (file)
@@ -5,17 +5,19 @@
 package worker
 
 import (
+       "crypto/md5"
        "crypto/rand"
        "errors"
        "fmt"
        "io"
+       "io/ioutil"
        "sort"
        "strings"
        "sync"
        "time"
 
-       "git.curoverse.com/arvados.git/lib/cloud"
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/lib/cloud"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
        "golang.org/x/crypto/ssh"
@@ -25,6 +27,7 @@ const (
        tagKeyInstanceType   = "InstanceType"
        tagKeyIdleBehavior   = "IdleBehavior"
        tagKeyInstanceSecret = "InstanceSecret"
+       tagKeyInstanceSetID  = "InstanceSetID"
 )
 
 // An InstanceView shows a worker's current state and recent activity.
@@ -68,6 +71,8 @@ const (
        defaultTimeoutBooting     = time.Minute * 10
        defaultTimeoutProbe       = time.Minute * 10
        defaultTimeoutShutdown    = time.Second * 10
+       defaultTimeoutTERM        = time.Minute * 2
+       defaultTimeoutSignal      = time.Second * 5
 
        // Time after a quota error to try again anyway, even if no
        // instances have been shutdown.
@@ -89,23 +94,28 @@ func duration(conf arvados.Duration, def time.Duration) time.Duration {
 //
 // New instances are configured and set up according to the given
 // cluster configuration.
-func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
+func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
        wp := &Pool{
                logger:             logger,
                arvClient:          arvClient,
+               instanceSetID:      instanceSetID,
                instanceSet:        &throttledInstanceSet{InstanceSet: instanceSet},
                newExecutor:        newExecutor,
-               bootProbeCommand:   cluster.CloudVMs.BootProbeCommand,
-               imageID:            cloud.ImageID(cluster.CloudVMs.ImageID),
+               bootProbeCommand:   cluster.Containers.CloudVMs.BootProbeCommand,
+               runnerSource:       cluster.Containers.CloudVMs.DeployRunnerBinary,
+               imageID:            cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
                instanceTypes:      cluster.InstanceTypes,
-               maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
-               probeInterval:      duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
-               syncInterval:       duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
-               timeoutIdle:        duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
-               timeoutBooting:     duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
-               timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
-               timeoutShutdown:    duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+               maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
+               probeInterval:      duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
+               syncInterval:       duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
+               timeoutIdle:        duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
+               timeoutBooting:     duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
+               timeoutProbe:       duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
+               timeoutShutdown:    duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+               timeoutTERM:        duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
+               timeoutSignal:      duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
                installPublicKey:   installPublicKey,
+               tagKeyPrefix:       cluster.Containers.CloudVMs.TagKeyPrefix,
                stop:               make(chan bool),
        }
        wp.registerMetrics(reg)
@@ -124,9 +134,11 @@ type Pool struct {
        // configuration
        logger             logrus.FieldLogger
        arvClient          *arvados.Client
+       instanceSetID      cloud.InstanceSetID
        instanceSet        *throttledInstanceSet
        newExecutor        func(cloud.Instance) Executor
        bootProbeCommand   string
+       runnerSource       string
        imageID            cloud.ImageID
        instanceTypes      map[string]arvados.InstanceType
        syncInterval       time.Duration
@@ -136,19 +148,25 @@ type Pool struct {
        timeoutBooting     time.Duration
        timeoutProbe       time.Duration
        timeoutShutdown    time.Duration
+       timeoutTERM        time.Duration
+       timeoutSignal      time.Duration
        installPublicKey   ssh.PublicKey
+       tagKeyPrefix       string
 
        // private state
        subscribers  map[<-chan struct{}]chan<- struct{}
-       creating     map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls
+       creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
        workers      map[cloud.InstanceID]*worker
        loaded       bool                 // loaded list of instances from InstanceSet at least once
-       exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
+       exited       map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
        atQuotaUntil time.Time
        atQuotaErr   cloud.QuotaError
        stop         chan bool
        mtx          sync.RWMutex
        setupOnce    sync.Once
+       runnerData   []byte
+       runnerMD5    [md5.Size]byte
+       runnerCmd    string
 
        throttleCreate    throttle
        throttleInstances throttle
@@ -158,6 +176,20 @@ type Pool struct {
        mInstancesPrice    *prometheus.GaugeVec
        mVCPUs             *prometheus.GaugeVec
        mMemory            *prometheus.GaugeVec
+       mDisappearances    *prometheus.CounterVec
+}
+
+type createCall struct {
+       time         time.Time
+       instanceType arvados.InstanceType
+}
+
+func (wp *Pool) CheckHealth() error {
+       wp.setupOnce.Do(wp.setup)
+       if err := wp.loadRunnerData(); err != nil {
+               return fmt.Errorf("error loading runner binary: %s", err)
+       }
+       return nil
 }
 
 // Subscribe returns a buffered channel that becomes ready after any
@@ -205,8 +237,13 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
        defer wp.mtx.RUnlock()
        unalloc := map[arvados.InstanceType]int{}
        creating := map[arvados.InstanceType]int{}
-       for it, times := range wp.creating {
-               creating[it] = len(times)
+       oldestCreate := map[arvados.InstanceType]time.Time{}
+       for _, cc := range wp.creating {
+               it := cc.instanceType
+               creating[it]++
+               if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
+                       oldestCreate[it] = cc.time
+               }
        }
        for _, wkr := range wp.workers {
                // Skip workers that are not expected to become
@@ -221,7 +258,7 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
                }
                it := wkr.instType
                unalloc[it]++
-               if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) {
+               if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
                        // If up to N new workers appear in
                        // Instances() while we are waiting for N
                        // Create() calls to complete, we assume we're
@@ -254,33 +291,34 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
 func (wp *Pool) Create(it arvados.InstanceType) bool {
        logger := wp.logger.WithField("InstanceType", it.Name)
        wp.setupOnce.Do(wp.setup)
+       if wp.loadRunnerData() != nil {
+               // Boot probe is certain to fail.
+               return false
+       }
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
        if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
                return false
        }
        now := time.Now()
-       wp.creating[it] = append(wp.creating[it], now)
+       secret := randomHex(instanceSecretLength)
+       wp.creating[secret] = createCall{time: now, instanceType: it}
        go func() {
                defer wp.notify()
-               secret := randomHex(instanceSecretLength)
                tags := cloud.InstanceTags{
-                       tagKeyInstanceType:   it.Name,
-                       tagKeyIdleBehavior:   string(IdleBehaviorRun),
-                       tagKeyInstanceSecret: secret,
+                       wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
+                       wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
+                       wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
+                       wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
                }
-               initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename))
+               initCmd := TagVerifier{nil, secret}.InitCommand()
                inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
                wp.mtx.Lock()
                defer wp.mtx.Unlock()
-               // Remove our timestamp marker from wp.creating
-               for i, t := range wp.creating[it] {
-                       if t == now {
-                               copy(wp.creating[it][i:], wp.creating[it][i+1:])
-                               wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1]
-                               break
-                       }
-               }
+               // delete() is deferred so the updateWorker() call
+               // below knows to use StateBooting when adding a new
+               // worker.
+               defer delete(wp.creating, secret)
                if err != nil {
                        if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
                                wp.atQuotaErr = err
@@ -291,7 +329,7 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
                        wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
                        return
                }
-               wp.updateWorker(inst, it, StateBooting)
+               wp.updateWorker(inst, it)
        }()
        return true
 }
@@ -313,39 +351,42 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior)
        if !ok {
                return errors.New("requested instance does not exist")
        }
-       wkr.idleBehavior = idleBehavior
-       wkr.saveTags()
-       wkr.shutdownIfIdle()
+       wkr.setIdleBehavior(idleBehavior)
        return nil
 }
 
-// Add or update worker attached to the given instance. Use
-// initialState if a new worker is created.
+// Add or update worker attached to the given instance.
 //
 // The second return value is true if a new worker is created.
 //
+// A newly added instance has state=StateBooting if its tags match an
+// entry in wp.creating, otherwise StateUnknown.
+//
 // Caller must have lock.
-func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
-       inst = tagVerifier{inst}
+func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
+       secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
+       inst = TagVerifier{inst, secret}
        id := inst.ID()
        if wkr := wp.workers[id]; wkr != nil {
                wkr.executor.SetTarget(inst)
                wkr.instance = inst
                wkr.updated = time.Now()
-               if initialState == StateBooting && wkr.state == StateUnknown {
-                       wkr.state = StateBooting
-               }
                wkr.saveTags()
                return wkr, false
        }
 
+       state := StateUnknown
+       if _, ok := wp.creating[secret]; ok {
+               state = StateBooting
+       }
+
        // If an instance has a valid IdleBehavior tag when it first
        // appears, initialize the new worker accordingly (this is how
        // we restore IdleBehavior that was set by a prior dispatch
        // process); otherwise, default to "run". After this,
        // wkr.idleBehavior is the source of truth, and will only be
        // changed via SetIdleBehavior().
-       idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior])
+       idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
        if !validIdleBehavior[idleBehavior] {
                idleBehavior = IdleBehaviorRun
        }
@@ -356,7 +397,7 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi
                "Address":      inst.Address(),
        })
        logger.WithFields(logrus.Fields{
-               "State":        initialState,
+               "State":        state,
                "IdleBehavior": idleBehavior,
        }).Infof("instance appeared in cloud")
        now := time.Now()
@@ -365,7 +406,7 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi
                wp:           wp,
                logger:       logger,
                executor:     wp.newExecutor(inst),
-               state:        initialState,
+               state:        state,
                idleBehavior: idleBehavior,
                instance:     inst,
                instType:     it,
@@ -373,19 +414,14 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi
                probed:       now,
                busy:         now,
                updated:      now,
-               running:      make(map[string]struct{}),
-               starting:     make(map[string]struct{}),
+               running:      make(map[string]*remoteRunner),
+               starting:     make(map[string]*remoteRunner),
                probing:      make(chan struct{}, 1),
        }
        wp.workers[id] = wkr
        return wkr, true
 }
 
-// caller must have lock.
-func (wp *Pool) notifyExited(uuid string, t time.Time) {
-       wp.exited[uuid] = t
-}
-
 // Shutdown shuts down a worker with the given type, or returns false
 // if all workers with the given type are busy.
 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
@@ -409,8 +445,12 @@ func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
 }
 
 // CountWorkers returns the current number of workers in each state.
+//
+// CountWorkers blocks, if necessary, until the initial instance list
+// has been loaded from the cloud provider.
 func (wp *Pool) CountWorkers() map[State]int {
        wp.setupOnce.Do(wp.setup)
+       wp.waitUntilLoaded()
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
        r := map[State]int{}
@@ -425,7 +465,7 @@ func (wp *Pool) CountWorkers() map[State]int {
 // In the returned map, the time value indicates when the Pool
 // observed that the container process had exited. A container that
 // has not yet exited has a zero time value. The caller should use
-// KillContainer() to garbage-collect the entries for exited
+// ForgetContainer() to garbage-collect the entries for exited
 // containers.
 func (wp *Pool) Running() map[string]time.Time {
        wp.setupOnce.Do(wp.setup)
@@ -472,52 +512,45 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b
 //
 // KillContainer returns immediately; the act of killing the container
 // takes some time, and runs in the background.
-func (wp *Pool) KillContainer(uuid string) {
+//
+// KillContainer returns false if the container has already ended.
+func (wp *Pool) KillContainer(uuid string, reason string) bool {
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
-       if _, ok := wp.exited[uuid]; ok {
-               wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
-               delete(wp.exited, uuid)
-               return
-       }
+       logger := wp.logger.WithFields(logrus.Fields{
+               "ContainerUUID": uuid,
+               "Reason":        reason,
+       })
        for _, wkr := range wp.workers {
-               if _, ok := wkr.running[uuid]; ok {
-                       go wp.kill(wkr, uuid)
-                       return
+               rr := wkr.running[uuid]
+               if rr == nil {
+                       rr = wkr.starting[uuid]
+               }
+               if rr != nil {
+                       rr.Kill(reason)
+                       return true
                }
        }
-       wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
+       logger.Debug("cannot kill: already disappeared")
+       return false
 }
 
-func (wp *Pool) kill(wkr *worker, uuid string) {
-       logger := wp.logger.WithFields(logrus.Fields{
-               "ContainerUUID": uuid,
-               "Instance":      wkr.instance.ID(),
-       })
-       logger.Debug("killing process")
-       cmd := "crunch-run --kill 15 " + uuid
-       if u := wkr.instance.RemoteUser(); u != "root" {
-               cmd = "sudo " + cmd
-       }
-       stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
-       if err != nil {
-               logger.WithFields(logrus.Fields{
-                       "stderr": string(stderr),
-                       "stdout": string(stdout),
-                       "error":  err,
-               }).Warn("kill failed")
-               return
-       }
-       logger.Debug("killing process succeeded")
+// ForgetContainer clears the placeholder for the given exited
+// container, so it isn't returned by subsequent calls to Running().
+//
+// ForgetContainer has no effect if the container has not yet exited.
+//
+// The "container exited at time T" placeholder (which necessitates
+// ForgetContainer) exists to make it easier for the caller
+// (scheduler) to distinguish a container that exited without
+// finalizing its state from a container that exited too recently for
+// its final state to have appeared in the scheduler's queue cache.
+func (wp *Pool) ForgetContainer(uuid string) {
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
-       if _, ok := wkr.running[uuid]; ok {
-               delete(wkr.running, uuid)
-               if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
-                       wkr.state = StateIdle
-               }
-               wkr.updated = time.Now()
-               go wp.notify()
+       if _, ok := wp.exited[uuid]; ok {
+               wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
+               delete(wp.exited, uuid)
        }
 }
 
@@ -537,7 +570,7 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
                Subsystem: "dispatchcloud",
                Name:      "instances_total",
                Help:      "Number of cloud VMs.",
-       }, []string{"category"})
+       }, []string{"category", "instance_type"})
        reg.MustRegister(wp.mInstances)
        wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
                Namespace: "arvados",
@@ -560,11 +593,22 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
                Help:      "Total memory on all cloud VMs.",
        }, []string{"category"})
        reg.MustRegister(wp.mMemory)
+       wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "instances_disappeared",
+               Help:      "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
+       }, []string{"state"})
+       for _, v := range stateString {
+               wp.mDisappearances.WithLabelValues(v).Add(0)
+       }
+       reg.MustRegister(wp.mDisappearances)
 }
 
 func (wp *Pool) runMetrics() {
        ch := wp.Subscribe()
        defer wp.Unsubscribe(ch)
+       wp.updateMetrics()
        for range ch {
                wp.updateMetrics()
        }
@@ -574,7 +618,11 @@ func (wp *Pool) updateMetrics() {
        wp.mtx.RLock()
        defer wp.mtx.RUnlock()
 
-       instances := map[string]int64{}
+       type entKey struct {
+               cat      string
+               instType string
+       }
+       instances := map[entKey]int64{}
        price := map[string]float64{}
        cpu := map[string]int64{}
        mem := map[string]int64{}
@@ -593,17 +641,25 @@ func (wp *Pool) updateMetrics() {
                default:
                        cat = "idle"
                }
-               instances[cat]++
+               instances[entKey{cat, wkr.instType.Name}]++
                price[cat] += wkr.instType.Price
                cpu[cat] += int64(wkr.instType.VCPUs)
                mem[cat] += int64(wkr.instType.RAM)
                running += int64(len(wkr.running) + len(wkr.starting))
        }
        for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
-               wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat]))
                wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
                wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
                wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
+               // make sure to reset gauges for non-existing category/nodetype combinations
+               for _, it := range wp.instanceTypes {
+                       if _, ok := instances[entKey{cat, it.Name}]; !ok {
+                               wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
+                       }
+               }
+       }
+       for k, v := range instances {
+               wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
        }
        wp.mContainersRunning.Set(float64(running))
 }
@@ -701,11 +757,53 @@ func (wp *Pool) Instances() []InstanceView {
        return r
 }
 
+// KillInstance destroys a cloud VM instance. It returns an error if
+// the given instance does not exist.
+func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
+       wkr, ok := wp.workers[id]
+       if !ok {
+               return errors.New("instance not found")
+       }
+       wkr.logger.WithField("Reason", reason).Info("shutting down")
+       wkr.shutdown()
+       return nil
+}
+
 func (wp *Pool) setup() {
-       wp.creating = map[arvados.InstanceType][]time.Time{}
+       wp.creating = map[string]createCall{}
        wp.exited = map[string]time.Time{}
        wp.workers = map[cloud.InstanceID]*worker{}
        wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
+       wp.loadRunnerData()
+}
+
+// Load the runner program to be deployed on worker nodes into
+// wp.runnerData, if necessary. Errors are logged.
+//
+// If auto-deploy is disabled, len(wp.runnerData) will be 0.
+//
+// Caller must not have lock.
+func (wp *Pool) loadRunnerData() error {
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       if wp.runnerData != nil {
+               return nil
+       } else if wp.runnerSource == "" {
+               wp.runnerCmd = "crunch-run"
+               wp.runnerData = []byte{}
+               return nil
+       }
+       logger := wp.logger.WithField("source", wp.runnerSource)
+       logger.Debug("loading runner")
+       buf, err := ioutil.ReadFile(wp.runnerSource)
+       if err != nil {
+               logger.WithError(err).Error("failed to load runner program")
+               return err
+       }
+       wp.runnerData = buf
+       wp.runnerMD5 = md5.Sum(buf)
+       wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
+       return nil
 }
 
 func (wp *Pool) notify() {
@@ -726,7 +824,7 @@ func (wp *Pool) getInstancesAndSync() error {
        }
        wp.logger.Debug("getting instance list")
        threshold := time.Now()
-       instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
+       instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
        if err != nil {
                wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
                return err
@@ -746,13 +844,13 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
        notify := false
 
        for _, inst := range instances {
-               itTag := inst.Tags()[tagKeyInstanceType]
+               itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
                it, ok := wp.instanceTypes[itTag]
                if !ok {
                        wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
                        continue
                }
-               if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew {
+               if wkr, isNew := wp.updateWorker(inst, it); isNew {
                        notify = true
                } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
                        wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
@@ -769,12 +867,16 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
                        "WorkerState": wkr.state,
                })
                logger.Info("instance disappeared in cloud")
+               if wp.mDisappearances != nil {
+                       wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
+               }
                delete(wp.workers, id)
-               go wkr.executor.Close()
+               go wkr.Close()
                notify = true
        }
 
        if !wp.loaded {
+               notify = true
                wp.loaded = true
                wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
        }
@@ -784,6 +886,17 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
        }
 }
 
+func (wp *Pool) waitUntilLoaded() {
+       ch := wp.Subscribe()
+       wp.mtx.RLock()
+       defer wp.mtx.RUnlock()
+       for !wp.loaded {
+               wp.mtx.RUnlock()
+               <-ch
+               wp.mtx.RLock()
+       }
+}
+
 // Return a random string of n hexadecimal digits (n*4 random bits). n
 // must be even.
 func randomHex(n int) string {