14977: Don't try fixStaleLocks until worker pool state is loaded.
[arvados.git] / lib / dispatchcloud / worker / pool.go
index df37326a080de92e4d03eab3bd3dad17cf8bf200..e90935e2aa9e5747d08e136475cd186c0b4bc766 100644 (file)
@@ -5,7 +5,9 @@
 package worker
 
 import (
+       "crypto/rand"
        "errors"
+       "fmt"
        "io"
        "sort"
        "strings"
@@ -16,16 +18,19 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
+       "golang.org/x/crypto/ssh"
 )
 
 const (
-       tagKeyInstanceType = "InstanceType"
-       tagKeyIdleBehavior = "IdleBehavior"
+       tagKeyInstanceType   = "InstanceType"
+       tagKeyIdleBehavior   = "IdleBehavior"
+       tagKeyInstanceSecret = "InstanceSecret"
 )
 
 // An InstanceView shows a worker's current state and recent activity.
 type InstanceView struct {
        Instance             cloud.InstanceID `json:"instance"`
+       Address              string           `json:"address"`
        Price                float64          `json:"price"`
        ArvadosInstanceType  string           `json:"arvados_instance_type"`
        ProviderInstanceType string           `json:"provider_instance_type"`
@@ -84,7 +89,7 @@ func duration(conf arvados.Duration, def time.Duration) time.Duration {
 //
 // New instances are configured and set up according to the given
 // cluster configuration.
-func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
+func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
        wp := &Pool{
                logger:             logger,
                arvClient:          arvClient,
@@ -100,6 +105,7 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe
                timeoutBooting:     duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
                timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
                timeoutShutdown:    duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+               installPublicKey:   installPublicKey,
                stop:               make(chan bool),
        }
        wp.registerMetrics(reg)
@@ -130,10 +136,11 @@ type Pool struct {
        timeoutBooting     time.Duration
        timeoutProbe       time.Duration
        timeoutShutdown    time.Duration
+       installPublicKey   ssh.PublicKey
 
        // private state
        subscribers  map[<-chan struct{}]chan<- struct{}
-       creating     map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls
+       creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
        workers      map[cloud.InstanceID]*worker
        loaded       bool                 // loaded list of instances from InstanceSet at least once
        exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
@@ -146,13 +153,16 @@ type Pool struct {
        throttleCreate    throttle
        throttleInstances throttle
 
-       mInstances         prometheus.Gauge
-       mInstancesPrice    prometheus.Gauge
        mContainersRunning prometheus.Gauge
-       mVCPUs             prometheus.Gauge
-       mVCPUsInuse        prometheus.Gauge
-       mMemory            prometheus.Gauge
-       mMemoryInuse       prometheus.Gauge
+       mInstances         *prometheus.GaugeVec
+       mInstancesPrice    *prometheus.GaugeVec
+       mVCPUs             *prometheus.GaugeVec
+       mMemory            *prometheus.GaugeVec
+}
+
+type createCall struct {
+       time         time.Time
+       instanceType arvados.InstanceType
 }
 
 // Subscribe returns a buffered channel that becomes ready after any
@@ -200,8 +210,13 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
        defer wp.mtx.RUnlock()
        unalloc := map[arvados.InstanceType]int{}
        creating := map[arvados.InstanceType]int{}
-       for it, times := range wp.creating {
-               creating[it] = len(times)
+       oldestCreate := map[arvados.InstanceType]time.Time{}
+       for _, cc := range wp.creating {
+               it := cc.instanceType
+               creating[it]++
+               if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
+                       oldestCreate[it] = cc.time
+               }
        }
        for _, wkr := range wp.workers {
                // Skip workers that are not expected to become
@@ -216,7 +231,7 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
                }
                it := wkr.instType
                unalloc[it]++
-               if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) {
+               if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
                        // If up to N new workers appear in
                        // Instances() while we are waiting for N
                        // Create() calls to complete, we assume we're
@@ -254,25 +269,24 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
        if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
                return false
        }
-       tags := cloud.InstanceTags{
-               tagKeyInstanceType: it.Name,
-               tagKeyIdleBehavior: string(IdleBehaviorRun),
-       }
        now := time.Now()
-       wp.creating[it] = append(wp.creating[it], now)
+       secret := randomHex(instanceSecretLength)
+       wp.creating[secret] = createCall{time: now, instanceType: it}
        go func() {
                defer wp.notify()
-               inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
+               tags := cloud.InstanceTags{
+                       tagKeyInstanceType:   it.Name,
+                       tagKeyIdleBehavior:   string(IdleBehaviorRun),
+                       tagKeyInstanceSecret: secret,
+               }
+               initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename))
+               inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
                wp.mtx.Lock()
                defer wp.mtx.Unlock()
-               // Remove our timestamp marker from wp.creating
-               for i, t := range wp.creating[it] {
-                       if t == now {
-                               copy(wp.creating[it][i:], wp.creating[it][i+1:])
-                               wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1]
-                               break
-                       }
-               }
+               // delete() is deferred so the updateWorker() call
+               // below knows to use StateBooting when adding a new
+               // worker.
+               defer delete(wp.creating, secret)
                if err != nil {
                        if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
                                wp.atQuotaErr = err
@@ -283,7 +297,7 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
                        wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
                        return
                }
-               wp.updateWorker(inst, it, StateBooting)
+               wp.updateWorker(inst, it)
        }()
        return true
 }
@@ -311,25 +325,30 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior)
        return nil
 }
 
-// Add or update worker attached to the given instance. Use
-// initialState if a new worker is created.
+// Add or update worker attached to the given instance.
 //
 // The second return value is true if a new worker is created.
 //
+// A newly added instance has state=StateBooting if its tags match an
+// entry in wp.creating, otherwise StateUnknown.
+//
 // Caller must have lock.
-func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
+func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
+       inst = tagVerifier{inst}
        id := inst.ID()
        if wkr := wp.workers[id]; wkr != nil {
                wkr.executor.SetTarget(inst)
                wkr.instance = inst
                wkr.updated = time.Now()
-               if initialState == StateBooting && wkr.state == StateUnknown {
-                       wkr.state = StateBooting
-               }
                wkr.saveTags()
                return wkr, false
        }
 
+       state := StateUnknown
+       if _, ok := wp.creating[inst.Tags()[tagKeyInstanceSecret]]; ok {
+               state = StateBooting
+       }
+
        // If an instance has a valid IdleBehavior tag when it first
        // appears, initialize the new worker accordingly (this is how
        // we restore IdleBehavior that was set by a prior dispatch
@@ -343,10 +362,11 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi
 
        logger := wp.logger.WithFields(logrus.Fields{
                "InstanceType": it.Name,
-               "Instance":     inst,
+               "Instance":     inst.ID(),
+               "Address":      inst.Address(),
        })
        logger.WithFields(logrus.Fields{
-               "State":        initialState,
+               "State":        state,
                "IdleBehavior": idleBehavior,
        }).Infof("instance appeared in cloud")
        now := time.Now()
@@ -355,7 +375,7 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi
                wp:           wp,
                logger:       logger,
                executor:     wp.newExecutor(inst),
-               state:        initialState,
+               state:        state,
                idleBehavior: idleBehavior,
                instance:     inst,
                instType:     it,
@@ -399,8 +419,12 @@ func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
 }
 
 // CountWorkers returns the current number of workers in each state.
+//
+// CountWorkers blocks, if necessary, until the initial instance list
+// has been loaded from the cloud provider.
 func (wp *Pool) CountWorkers() map[State]int {
        wp.setupOnce.Do(wp.setup)
+       wp.waitUntilLoaded()
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
        r := map[State]int{}
@@ -482,7 +506,7 @@ func (wp *Pool) KillContainer(uuid string) {
 func (wp *Pool) kill(wkr *worker, uuid string) {
        logger := wp.logger.WithFields(logrus.Fields{
                "ContainerUUID": uuid,
-               "Instance":      wkr.instance,
+               "Instance":      wkr.instance.ID(),
        })
        logger.Debug("killing process")
        cmd := "crunch-run --kill 15 " + uuid
@@ -515,20 +539,6 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
        if reg == nil {
                reg = prometheus.NewRegistry()
        }
-       wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
-               Namespace: "arvados",
-               Subsystem: "dispatchcloud",
-               Name:      "instances_total",
-               Help:      "Number of cloud VMs including pending, booting, running, held, and shutting down.",
-       })
-       reg.MustRegister(wp.mInstances)
-       wp.mInstancesPrice = prometheus.NewGauge(prometheus.GaugeOpts{
-               Namespace: "arvados",
-               Subsystem: "dispatchcloud",
-               Name:      "instances_price_total",
-               Help:      "Sum of prices of all cloud VMs including pending, booting, running, held, and shutting down.",
-       })
-       reg.MustRegister(wp.mInstancesPrice)
        wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
                Namespace: "arvados",
                Subsystem: "dispatchcloud",
@@ -536,40 +546,40 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
                Help:      "Number of containers reported running by cloud VMs.",
        })
        reg.MustRegister(wp.mContainersRunning)
-
-       wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
+       wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "instances_total",
+               Help:      "Number of cloud VMs.",
+       }, []string{"category"})
+       reg.MustRegister(wp.mInstances)
+       wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "instances_price",
+               Help:      "Price of cloud VMs.",
+       }, []string{"category"})
+       reg.MustRegister(wp.mInstancesPrice)
+       wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
                Namespace: "arvados",
                Subsystem: "dispatchcloud",
                Name:      "vcpus_total",
                Help:      "Total VCPUs on all cloud VMs.",
-       })
+       }, []string{"category"})
        reg.MustRegister(wp.mVCPUs)
-       wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
-               Namespace: "arvados",
-               Subsystem: "dispatchcloud",
-               Name:      "vcpus_inuse",
-               Help:      "VCPUs on cloud VMs that are running containers.",
-       })
-       reg.MustRegister(wp.mVCPUsInuse)
-       wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
+       wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
                Namespace: "arvados",
                Subsystem: "dispatchcloud",
                Name:      "memory_bytes_total",
                Help:      "Total memory on all cloud VMs.",
-       })
+       }, []string{"category"})
        reg.MustRegister(wp.mMemory)
-       wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
-               Namespace: "arvados",
-               Subsystem: "dispatchcloud",
-               Name:      "memory_bytes_inuse",
-               Help:      "Memory on cloud VMs that are running containers.",
-       })
-       reg.MustRegister(wp.mMemoryInuse)
 }
 
 func (wp *Pool) runMetrics() {
        ch := wp.Subscribe()
        defer wp.Unsubscribe(ch)
+       wp.updateMetrics()
        for range ch {
                wp.updateMetrics()
        }
@@ -579,26 +589,38 @@ func (wp *Pool) updateMetrics() {
        wp.mtx.RLock()
        defer wp.mtx.RUnlock()
 
-       var price float64
-       var alloc, cpu, cpuInuse, mem, memInuse int64
+       instances := map[string]int64{}
+       price := map[string]float64{}
+       cpu := map[string]int64{}
+       mem := map[string]int64{}
+       var running int64
        for _, wkr := range wp.workers {
-               price += wkr.instType.Price
-               cpu += int64(wkr.instType.VCPUs)
-               mem += int64(wkr.instType.RAM)
-               if len(wkr.running)+len(wkr.starting) == 0 {
-                       continue
+               var cat string
+               switch {
+               case len(wkr.running)+len(wkr.starting) > 0:
+                       cat = "inuse"
+               case wkr.idleBehavior == IdleBehaviorHold:
+                       cat = "hold"
+               case wkr.state == StateBooting:
+                       cat = "booting"
+               case wkr.state == StateUnknown:
+                       cat = "unknown"
+               default:
+                       cat = "idle"
                }
-               alloc += int64(len(wkr.running) + len(wkr.starting))
-               cpuInuse += int64(wkr.instType.VCPUs)
-               memInuse += int64(wkr.instType.RAM)
-       }
-       wp.mInstances.Set(float64(len(wp.workers)))
-       wp.mInstancesPrice.Set(price)
-       wp.mContainersRunning.Set(float64(alloc))
-       wp.mVCPUs.Set(float64(cpu))
-       wp.mMemory.Set(float64(mem))
-       wp.mVCPUsInuse.Set(float64(cpuInuse))
-       wp.mMemoryInuse.Set(float64(memInuse))
+               instances[cat]++
+               price[cat] += wkr.instType.Price
+               cpu[cat] += int64(wkr.instType.VCPUs)
+               mem[cat] += int64(wkr.instType.RAM)
+               running += int64(len(wkr.running) + len(wkr.starting))
+       }
+       for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
+               wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat]))
+               wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
+               wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
+               wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
+       }
+       wp.mContainersRunning.Set(float64(running))
 }
 
 func (wp *Pool) runProbes() {
@@ -677,6 +699,7 @@ func (wp *Pool) Instances() []InstanceView {
        for _, w := range wp.workers {
                r = append(r, InstanceView{
                        Instance:             w.instance.ID(),
+                       Address:              w.instance.Address(),
                        Price:                w.instType.Price,
                        ArvadosInstanceType:  w.instType.Name,
                        ProviderInstanceType: w.instType.ProviderType,
@@ -694,7 +717,7 @@ func (wp *Pool) Instances() []InstanceView {
 }
 
 func (wp *Pool) setup() {
-       wp.creating = map[arvados.InstanceType][]time.Time{}
+       wp.creating = map[string]createCall{}
        wp.exited = map[string]time.Time{}
        wp.workers = map[cloud.InstanceID]*worker{}
        wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
@@ -744,7 +767,7 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
                        wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
                        continue
                }
-               if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew {
+               if wkr, isNew := wp.updateWorker(inst, it); isNew {
                        notify = true
                } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
                        wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
@@ -757,7 +780,7 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
                        continue
                }
                logger := wp.logger.WithFields(logrus.Fields{
-                       "Instance":    wkr.instance,
+                       "Instance":    wkr.instance.ID(),
                        "WorkerState": wkr.state,
                })
                logger.Info("instance disappeared in cloud")
@@ -767,6 +790,7 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
        }
 
        if !wp.loaded {
+               notify = true
                wp.loaded = true
                wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
        }
@@ -775,3 +799,25 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
                go wp.notify()
        }
 }
+
+func (wp *Pool) waitUntilLoaded() {
+       ch := wp.Subscribe()
+       wp.mtx.RLock()
+       defer wp.mtx.RUnlock()
+       for !wp.loaded {
+               wp.mtx.RUnlock()
+               <-ch
+               wp.mtx.RLock()
+       }
+}
+
+// Return a random string of n hexadecimal digits (n*4 random bits). n
+// must be even.
+func randomHex(n int) string {
+       buf := make([]byte, n/2)
+       _, err := rand.Read(buf)
+       if err != nil {
+               panic(err)
+       }
+       return fmt.Sprintf("%x", buf)
+}