boot outcomes metric: fix labels and add some checks for the new metric
[arvados.git] / lib / dispatchcloud / worker / pool.go
index 201e8aad276eb5f8ca353e3ff8c41fc9dee00f2a..12bc1cdd71636263cebc0c8f21bd283d791aec04 100644 (file)
@@ -5,17 +5,19 @@
 package worker
 
 import (
 package worker
 
 import (
+       "crypto/md5"
        "crypto/rand"
        "errors"
        "fmt"
        "io"
        "crypto/rand"
        "errors"
        "fmt"
        "io"
+       "io/ioutil"
        "sort"
        "strings"
        "sync"
        "time"
 
        "sort"
        "strings"
        "sync"
        "time"
 
-       "git.curoverse.com/arvados.git/lib/cloud"
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/lib/cloud"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
        "golang.org/x/crypto/ssh"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
        "golang.org/x/crypto/ssh"
@@ -100,6 +102,7 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe
                instanceSet:        &throttledInstanceSet{InstanceSet: instanceSet},
                newExecutor:        newExecutor,
                bootProbeCommand:   cluster.Containers.CloudVMs.BootProbeCommand,
                instanceSet:        &throttledInstanceSet{InstanceSet: instanceSet},
                newExecutor:        newExecutor,
                bootProbeCommand:   cluster.Containers.CloudVMs.BootProbeCommand,
+               runnerSource:       cluster.Containers.CloudVMs.DeployRunnerBinary,
                imageID:            cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
                instanceTypes:      cluster.InstanceTypes,
                maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
                imageID:            cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
                instanceTypes:      cluster.InstanceTypes,
                maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
@@ -135,6 +138,7 @@ type Pool struct {
        instanceSet        *throttledInstanceSet
        newExecutor        func(cloud.Instance) Executor
        bootProbeCommand   string
        instanceSet        *throttledInstanceSet
        newExecutor        func(cloud.Instance) Executor
        bootProbeCommand   string
+       runnerSource       string
        imageID            cloud.ImageID
        instanceTypes      map[string]arvados.InstanceType
        syncInterval       time.Duration
        imageID            cloud.ImageID
        instanceTypes      map[string]arvados.InstanceType
        syncInterval       time.Duration
@@ -154,12 +158,15 @@ type Pool struct {
        creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
        workers      map[cloud.InstanceID]*worker
        loaded       bool                 // loaded list of instances from InstanceSet at least once
        creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
        workers      map[cloud.InstanceID]*worker
        loaded       bool                 // loaded list of instances from InstanceSet at least once
-       exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
+       exited       map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
        atQuotaUntil time.Time
        atQuotaErr   cloud.QuotaError
        stop         chan bool
        mtx          sync.RWMutex
        setupOnce    sync.Once
        atQuotaUntil time.Time
        atQuotaErr   cloud.QuotaError
        stop         chan bool
        mtx          sync.RWMutex
        setupOnce    sync.Once
+       runnerData   []byte
+       runnerMD5    [md5.Size]byte
+       runnerCmd    string
 
        throttleCreate    throttle
        throttleInstances throttle
 
        throttleCreate    throttle
        throttleInstances throttle
@@ -169,6 +176,7 @@ type Pool struct {
        mInstancesPrice    *prometheus.GaugeVec
        mVCPUs             *prometheus.GaugeVec
        mMemory            *prometheus.GaugeVec
        mInstancesPrice    *prometheus.GaugeVec
        mVCPUs             *prometheus.GaugeVec
        mMemory            *prometheus.GaugeVec
+       mBootOutcomes      *prometheus.CounterVec
        mDisappearances    *prometheus.CounterVec
 }
 
        mDisappearances    *prometheus.CounterVec
 }
 
@@ -177,6 +185,14 @@ type createCall struct {
        instanceType arvados.InstanceType
 }
 
        instanceType arvados.InstanceType
 }
 
+func (wp *Pool) CheckHealth() error {
+       wp.setupOnce.Do(wp.setup)
+       if err := wp.loadRunnerData(); err != nil {
+               return fmt.Errorf("error loading runner binary: %s", err)
+       }
+       return nil
+}
+
 // Subscribe returns a buffered channel that becomes ready after any
 // change to the pool's state that could have scheduling implications:
 // a worker's state changes, a new worker appears, the cloud
 // Subscribe returns a buffered channel that becomes ready after any
 // change to the pool's state that could have scheduling implications:
 // a worker's state changes, a new worker appears, the cloud
@@ -276,6 +292,10 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
 func (wp *Pool) Create(it arvados.InstanceType) bool {
        logger := wp.logger.WithField("InstanceType", it.Name)
        wp.setupOnce.Do(wp.setup)
 func (wp *Pool) Create(it arvados.InstanceType) bool {
        logger := wp.logger.WithField("InstanceType", it.Name)
        wp.setupOnce.Do(wp.setup)
+       if wp.loadRunnerData() != nil {
+               // Boot probe is certain to fail.
+               return false
+       }
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
        if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
        if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
@@ -292,7 +312,7 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
                        wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
                        wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
                }
                        wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
                        wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
                }
-               initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename))
+               initCmd := TagVerifier{nil, secret}.InitCommand()
                inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
                wp.mtx.Lock()
                defer wp.mtx.Unlock()
                inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
                wp.mtx.Lock()
                defer wp.mtx.Unlock()
@@ -346,7 +366,7 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior)
 // Caller must have lock.
 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
        secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
 // Caller must have lock.
 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
        secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
-       inst = tagVerifier{inst, secret}
+       inst = TagVerifier{inst, secret}
        id := inst.ID()
        if wkr := wp.workers[id]; wkr != nil {
                wkr.executor.SetTarget(inst)
        id := inst.ID()
        if wkr := wp.workers[id]; wkr != nil {
                wkr.executor.SetTarget(inst)
@@ -416,7 +436,8 @@ func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
                // time (Idle) or the earliest create time (Booting)
                for _, wkr := range wp.workers {
                        if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
                // time (Idle) or the earliest create time (Booting)
                for _, wkr := range wp.workers {
                        if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
-                               logger.WithField("Instance", wkr.instance).Info("shutting down")
+                               logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
+                               wkr.reportBootOutcome(BootOutcomeAborted)
                                wkr.shutdown()
                                return true
                        }
                                wkr.shutdown()
                                return true
                        }
@@ -446,7 +467,7 @@ func (wp *Pool) CountWorkers() map[State]int {
 // In the returned map, the time value indicates when the Pool
 // observed that the container process had exited. A container that
 // has not yet exited has a zero time value. The caller should use
 // In the returned map, the time value indicates when the Pool
 // observed that the container process had exited. A container that
 // has not yet exited has a zero time value. The caller should use
-// KillContainer() to garbage-collect the entries for exited
+// ForgetContainer() to garbage-collect the entries for exited
 // containers.
 func (wp *Pool) Running() map[string]time.Time {
        wp.setupOnce.Do(wp.setup)
 // containers.
 func (wp *Pool) Running() map[string]time.Time {
        wp.setupOnce.Do(wp.setup)
@@ -475,7 +496,7 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b
        defer wp.mtx.Unlock()
        var wkr *worker
        for _, w := range wp.workers {
        defer wp.mtx.Unlock()
        var wkr *worker
        for _, w := range wp.workers {
-               if w.instType == it && w.state == StateIdle {
+               if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
                        if wkr == nil || w.busy.After(wkr.busy) {
                                wkr = w
                        }
                        if wkr == nil || w.busy.After(wkr.busy) {
                                wkr = w
                        }
@@ -493,18 +514,15 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b
 //
 // KillContainer returns immediately; the act of killing the container
 // takes some time, and runs in the background.
 //
 // KillContainer returns immediately; the act of killing the container
 // takes some time, and runs in the background.
-func (wp *Pool) KillContainer(uuid string, reason string) {
+//
+// KillContainer returns false if the container has already ended.
+func (wp *Pool) KillContainer(uuid string, reason string) bool {
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
        logger := wp.logger.WithFields(logrus.Fields{
                "ContainerUUID": uuid,
                "Reason":        reason,
        })
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
        logger := wp.logger.WithFields(logrus.Fields{
                "ContainerUUID": uuid,
                "Reason":        reason,
        })
-       if _, ok := wp.exited[uuid]; ok {
-               logger.Debug("clearing placeholder for exited crunch-run process")
-               delete(wp.exited, uuid)
-               return
-       }
        for _, wkr := range wp.workers {
                rr := wkr.running[uuid]
                if rr == nil {
        for _, wkr := range wp.workers {
                rr := wkr.running[uuid]
                if rr == nil {
@@ -512,10 +530,30 @@ func (wp *Pool) KillContainer(uuid string, reason string) {
                }
                if rr != nil {
                        rr.Kill(reason)
                }
                if rr != nil {
                        rr.Kill(reason)
-                       return
+                       return true
                }
        }
        logger.Debug("cannot kill: already disappeared")
                }
        }
        logger.Debug("cannot kill: already disappeared")
+       return false
+}
+
+// ForgetContainer clears the placeholder for the given exited
+// container, so it isn't returned by subsequent calls to Running().
+//
+// ForgetContainer has no effect if the container has not yet exited.
+//
+// The "container exited at time T" placeholder (which necessitates
+// ForgetContainer) exists to make it easier for the caller
+// (scheduler) to distinguish a container that exited without
+// finalizing its state from a container that exited too recently for
+// its final state to have appeared in the scheduler's queue cache.
+func (wp *Pool) ForgetContainer(uuid string) {
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       if _, ok := wp.exited[uuid]; ok {
+               wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
+               delete(wp.exited, uuid)
+       }
 }
 
 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
 }
 
 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
@@ -534,7 +572,7 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
                Subsystem: "dispatchcloud",
                Name:      "instances_total",
                Help:      "Number of cloud VMs.",
                Subsystem: "dispatchcloud",
                Name:      "instances_total",
                Help:      "Number of cloud VMs.",
-       }, []string{"category"})
+       }, []string{"category", "instance_type"})
        reg.MustRegister(wp.mInstances)
        wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
                Namespace: "arvados",
        reg.MustRegister(wp.mInstances)
        wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
                Namespace: "arvados",
@@ -557,6 +595,16 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
                Help:      "Total memory on all cloud VMs.",
        }, []string{"category"})
        reg.MustRegister(wp.mMemory)
                Help:      "Total memory on all cloud VMs.",
        }, []string{"category"})
        reg.MustRegister(wp.mMemory)
+       wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "boot_outcomes",
+               Help:      "Boot outcomes by type.",
+       }, []string{"outcome"})
+       for k := range validBootOutcomes {
+               wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
+       }
+       reg.MustRegister(wp.mBootOutcomes)
        wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
                Namespace: "arvados",
                Subsystem: "dispatchcloud",
        wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
                Namespace: "arvados",
                Subsystem: "dispatchcloud",
@@ -582,7 +630,11 @@ func (wp *Pool) updateMetrics() {
        wp.mtx.RLock()
        defer wp.mtx.RUnlock()
 
        wp.mtx.RLock()
        defer wp.mtx.RUnlock()
 
-       instances := map[string]int64{}
+       type entKey struct {
+               cat      string
+               instType string
+       }
+       instances := map[entKey]int64{}
        price := map[string]float64{}
        cpu := map[string]int64{}
        mem := map[string]int64{}
        price := map[string]float64{}
        cpu := map[string]int64{}
        mem := map[string]int64{}
@@ -601,17 +653,25 @@ func (wp *Pool) updateMetrics() {
                default:
                        cat = "idle"
                }
                default:
                        cat = "idle"
                }
-               instances[cat]++
+               instances[entKey{cat, wkr.instType.Name}]++
                price[cat] += wkr.instType.Price
                cpu[cat] += int64(wkr.instType.VCPUs)
                mem[cat] += int64(wkr.instType.RAM)
                running += int64(len(wkr.running) + len(wkr.starting))
        }
        for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
                price[cat] += wkr.instType.Price
                cpu[cat] += int64(wkr.instType.VCPUs)
                mem[cat] += int64(wkr.instType.RAM)
                running += int64(len(wkr.running) + len(wkr.starting))
        }
        for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
-               wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat]))
                wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
                wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
                wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
                wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
                wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
                wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
+               // make sure to reset gauges for non-existing category/nodetype combinations
+               for _, it := range wp.instanceTypes {
+                       if _, ok := instances[entKey{cat, it.Name}]; !ok {
+                               wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
+                       }
+               }
+       }
+       for k, v := range instances {
+               wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
        }
        wp.mContainersRunning.Set(float64(running))
 }
        }
        wp.mContainersRunning.Set(float64(running))
 }
@@ -717,6 +777,7 @@ func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
                return errors.New("instance not found")
        }
        wkr.logger.WithField("Reason", reason).Info("shutting down")
                return errors.New("instance not found")
        }
        wkr.logger.WithField("Reason", reason).Info("shutting down")
+       wkr.reportBootOutcome(BootOutcomeAborted)
        wkr.shutdown()
        return nil
 }
        wkr.shutdown()
        return nil
 }
@@ -726,6 +787,36 @@ func (wp *Pool) setup() {
        wp.exited = map[string]time.Time{}
        wp.workers = map[cloud.InstanceID]*worker{}
        wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
        wp.exited = map[string]time.Time{}
        wp.workers = map[cloud.InstanceID]*worker{}
        wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
+       wp.loadRunnerData()
+}
+
+// Load the runner program to be deployed on worker nodes into
+// wp.runnerData, if necessary. Errors are logged.
+//
+// If auto-deploy is disabled, len(wp.runnerData) will be 0.
+//
+// Caller must not have lock.
+func (wp *Pool) loadRunnerData() error {
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       if wp.runnerData != nil {
+               return nil
+       } else if wp.runnerSource == "" {
+               wp.runnerCmd = "crunch-run"
+               wp.runnerData = []byte{}
+               return nil
+       }
+       logger := wp.logger.WithField("source", wp.runnerSource)
+       logger.Debug("loading runner")
+       buf, err := ioutil.ReadFile(wp.runnerSource)
+       if err != nil {
+               logger.WithError(err).Error("failed to load runner program")
+               return err
+       }
+       wp.runnerData = buf
+       wp.runnerMD5 = md5.Sum(buf)
+       wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
+       return nil
 }
 
 func (wp *Pool) notify() {
 }
 
 func (wp *Pool) notify() {
@@ -769,13 +860,13 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
                itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
                it, ok := wp.instanceTypes[itTag]
                if !ok {
                itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
                it, ok := wp.instanceTypes[itTag]
                if !ok {
-                       wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
+                       wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
                        continue
                }
                if wkr, isNew := wp.updateWorker(inst, it); isNew {
                        notify = true
                } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
                        continue
                }
                if wkr, isNew := wp.updateWorker(inst, it); isNew {
                        notify = true
                } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
-                       wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
+                       wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
                        wkr.shutdown()
                }
        }
                        wkr.shutdown()
                }
        }
@@ -789,6 +880,7 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
                        "WorkerState": wkr.state,
                })
                logger.Info("instance disappeared in cloud")
                        "WorkerState": wkr.state,
                })
                logger.Info("instance disappeared in cloud")
+               wkr.reportBootOutcome(BootOutcomeDisappeared)
                if wp.mDisappearances != nil {
                        wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
                }
                if wp.mDisappearances != nil {
                        wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
                }