Merge branch '15781-multi-value-property-search'
[arvados.git] / lib / dispatchcloud / worker / pool.go
index 84b61fc006c239e3cc67e9a7828e6ebe29b6546e..0636fcee897fee906db1c03ae2c7b6a4e4e7c811 100644 (file)
@@ -5,17 +5,19 @@
 package worker
 
 import (
+       "crypto/md5"
        "crypto/rand"
        "errors"
        "fmt"
        "io"
+       "io/ioutil"
        "sort"
        "strings"
        "sync"
        "time"
 
-       "git.curoverse.com/arvados.git/lib/cloud"
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/lib/cloud"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
        "golang.org/x/crypto/ssh"
@@ -25,6 +27,7 @@ const (
        tagKeyInstanceType   = "InstanceType"
        tagKeyIdleBehavior   = "IdleBehavior"
        tagKeyInstanceSecret = "InstanceSecret"
+       tagKeyInstanceSetID  = "InstanceSetID"
 )
 
 // An InstanceView shows a worker's current state and recent activity.
@@ -91,13 +94,15 @@ func duration(conf arvados.Duration, def time.Duration) time.Duration {
 //
 // New instances are configured and set up according to the given
 // cluster configuration.
-func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
+func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
        wp := &Pool{
                logger:             logger,
                arvClient:          arvClient,
+               instanceSetID:      instanceSetID,
                instanceSet:        &throttledInstanceSet{InstanceSet: instanceSet},
                newExecutor:        newExecutor,
                bootProbeCommand:   cluster.Containers.CloudVMs.BootProbeCommand,
+               runnerSource:       cluster.Containers.CloudVMs.DeployRunnerBinary,
                imageID:            cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
                instanceTypes:      cluster.InstanceTypes,
                maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
@@ -110,6 +115,7 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe
                timeoutTERM:        duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
                timeoutSignal:      duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
                installPublicKey:   installPublicKey,
+               tagKeyPrefix:       cluster.Containers.CloudVMs.TagKeyPrefix,
                stop:               make(chan bool),
        }
        wp.registerMetrics(reg)
@@ -128,9 +134,11 @@ type Pool struct {
        // configuration
        logger             logrus.FieldLogger
        arvClient          *arvados.Client
+       instanceSetID      cloud.InstanceSetID
        instanceSet        *throttledInstanceSet
        newExecutor        func(cloud.Instance) Executor
        bootProbeCommand   string
+       runnerSource       string
        imageID            cloud.ImageID
        instanceTypes      map[string]arvados.InstanceType
        syncInterval       time.Duration
@@ -143,18 +151,22 @@ type Pool struct {
        timeoutTERM        time.Duration
        timeoutSignal      time.Duration
        installPublicKey   ssh.PublicKey
+       tagKeyPrefix       string
 
        // private state
        subscribers  map[<-chan struct{}]chan<- struct{}
        creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
        workers      map[cloud.InstanceID]*worker
        loaded       bool                 // loaded list of instances from InstanceSet at least once
-       exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
+       exited       map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
        atQuotaUntil time.Time
        atQuotaErr   cloud.QuotaError
        stop         chan bool
        mtx          sync.RWMutex
        setupOnce    sync.Once
+       runnerData   []byte
+       runnerMD5    [md5.Size]byte
+       runnerCmd    string
 
        throttleCreate    throttle
        throttleInstances throttle
@@ -164,6 +176,7 @@ type Pool struct {
        mInstancesPrice    *prometheus.GaugeVec
        mVCPUs             *prometheus.GaugeVec
        mMemory            *prometheus.GaugeVec
+       mDisappearances    *prometheus.CounterVec
 }
 
 type createCall struct {
@@ -171,6 +184,14 @@ type createCall struct {
        instanceType arvados.InstanceType
 }
 
+func (wp *Pool) CheckHealth() error {
+       wp.setupOnce.Do(wp.setup)
+       if err := wp.loadRunnerData(); err != nil {
+               return fmt.Errorf("error loading runner binary: %s", err)
+       }
+       return nil
+}
+
 // Subscribe returns a buffered channel that becomes ready after any
 // change to the pool's state that could have scheduling implications:
 // a worker's state changes, a new worker appears, the cloud
@@ -270,6 +291,10 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
 func (wp *Pool) Create(it arvados.InstanceType) bool {
        logger := wp.logger.WithField("InstanceType", it.Name)
        wp.setupOnce.Do(wp.setup)
+       if wp.loadRunnerData() != nil {
+               // Boot probe is certain to fail.
+               return false
+       }
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
        if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
@@ -281,11 +306,12 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
        go func() {
                defer wp.notify()
                tags := cloud.InstanceTags{
-                       tagKeyInstanceType:   it.Name,
-                       tagKeyIdleBehavior:   string(IdleBehaviorRun),
-                       tagKeyInstanceSecret: secret,
+                       wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
+                       wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
+                       wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
+                       wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
                }
-               initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename))
+               initCmd := TagVerifier{nil, secret}.InitCommand()
                inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
                wp.mtx.Lock()
                defer wp.mtx.Unlock()
@@ -338,7 +364,8 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior)
 //
 // Caller must have lock.
 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
-       inst = tagVerifier{inst}
+       secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
+       inst = TagVerifier{inst, secret}
        id := inst.ID()
        if wkr := wp.workers[id]; wkr != nil {
                wkr.executor.SetTarget(inst)
@@ -349,7 +376,7 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*wor
        }
 
        state := StateUnknown
-       if _, ok := wp.creating[inst.Tags()[tagKeyInstanceSecret]]; ok {
+       if _, ok := wp.creating[secret]; ok {
                state = StateBooting
        }
 
@@ -359,7 +386,7 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*wor
        // process); otherwise, default to "run". After this,
        // wkr.idleBehavior is the source of truth, and will only be
        // changed via SetIdleBehavior().
-       idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior])
+       idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
        if !validIdleBehavior[idleBehavior] {
                idleBehavior = IdleBehaviorRun
        }
@@ -438,7 +465,7 @@ func (wp *Pool) CountWorkers() map[State]int {
 // In the returned map, the time value indicates when the Pool
 // observed that the container process had exited. A container that
 // has not yet exited has a zero time value. The caller should use
-// KillContainer() to garbage-collect the entries for exited
+// ForgetContainer() to garbage-collect the entries for exited
 // containers.
 func (wp *Pool) Running() map[string]time.Time {
        wp.setupOnce.Do(wp.setup)
@@ -485,18 +512,15 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b
 //
 // KillContainer returns immediately; the act of killing the container
 // takes some time, and runs in the background.
-func (wp *Pool) KillContainer(uuid string, reason string) {
+//
+// KillContainer returns false if the container has already ended.
+func (wp *Pool) KillContainer(uuid string, reason string) bool {
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
        logger := wp.logger.WithFields(logrus.Fields{
                "ContainerUUID": uuid,
                "Reason":        reason,
        })
-       if _, ok := wp.exited[uuid]; ok {
-               logger.Debug("clearing placeholder for exited crunch-run process")
-               delete(wp.exited, uuid)
-               return
-       }
        for _, wkr := range wp.workers {
                rr := wkr.running[uuid]
                if rr == nil {
@@ -504,10 +528,30 @@ func (wp *Pool) KillContainer(uuid string, reason string) {
                }
                if rr != nil {
                        rr.Kill(reason)
-                       return
+                       return true
                }
        }
        logger.Debug("cannot kill: already disappeared")
+       return false
+}
+
+// ForgetContainer clears the placeholder for the given exited
+// container, so it isn't returned by subsequent calls to Running().
+//
+// ForgetContainer has no effect if the container has not yet exited.
+//
+// The "container exited at time T" placeholder (which necessitates
+// ForgetContainer) exists to make it easier for the caller
+// (scheduler) to distinguish a container that exited without
+// finalizing its state from a container that exited too recently for
+// its final state to have appeared in the scheduler's queue cache.
+func (wp *Pool) ForgetContainer(uuid string) {
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       if _, ok := wp.exited[uuid]; ok {
+               wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
+               delete(wp.exited, uuid)
+       }
 }
 
 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
@@ -526,7 +570,7 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
                Subsystem: "dispatchcloud",
                Name:      "instances_total",
                Help:      "Number of cloud VMs.",
-       }, []string{"category"})
+       }, []string{"category", "instance_type"})
        reg.MustRegister(wp.mInstances)
        wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
                Namespace: "arvados",
@@ -549,6 +593,16 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
                Help:      "Total memory on all cloud VMs.",
        }, []string{"category"})
        reg.MustRegister(wp.mMemory)
+       wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "instances_disappeared",
+               Help:      "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
+       }, []string{"state"})
+       for _, v := range stateString {
+               wp.mDisappearances.WithLabelValues(v).Add(0)
+       }
+       reg.MustRegister(wp.mDisappearances)
 }
 
 func (wp *Pool) runMetrics() {
@@ -564,7 +618,11 @@ func (wp *Pool) updateMetrics() {
        wp.mtx.RLock()
        defer wp.mtx.RUnlock()
 
-       instances := map[string]int64{}
+       type entKey struct {
+               cat      string
+               instType string
+       }
+       instances := map[entKey]int64{}
        price := map[string]float64{}
        cpu := map[string]int64{}
        mem := map[string]int64{}
@@ -583,17 +641,25 @@ func (wp *Pool) updateMetrics() {
                default:
                        cat = "idle"
                }
-               instances[cat]++
+               instances[entKey{cat, wkr.instType.Name}]++
                price[cat] += wkr.instType.Price
                cpu[cat] += int64(wkr.instType.VCPUs)
                mem[cat] += int64(wkr.instType.RAM)
                running += int64(len(wkr.running) + len(wkr.starting))
        }
        for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
-               wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat]))
                wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
                wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
                wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
+               // make sure to reset gauges for non-existing category/nodetype combinations
+               for _, it := range wp.instanceTypes {
+                       if _, ok := instances[entKey{cat, it.Name}]; !ok {
+                               wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
+                       }
+               }
+       }
+       for k, v := range instances {
+               wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
        }
        wp.mContainersRunning.Set(float64(running))
 }
@@ -708,6 +774,36 @@ func (wp *Pool) setup() {
        wp.exited = map[string]time.Time{}
        wp.workers = map[cloud.InstanceID]*worker{}
        wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
+       wp.loadRunnerData()
+}
+
+// Load the runner program to be deployed on worker nodes into
+// wp.runnerData, if necessary. Errors are logged.
+//
+// If auto-deploy is disabled, len(wp.runnerData) will be 0.
+//
+// Caller must not have lock.
+func (wp *Pool) loadRunnerData() error {
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       if wp.runnerData != nil {
+               return nil
+       } else if wp.runnerSource == "" {
+               wp.runnerCmd = "crunch-run"
+               wp.runnerData = []byte{}
+               return nil
+       }
+       logger := wp.logger.WithField("source", wp.runnerSource)
+       logger.Debug("loading runner")
+       buf, err := ioutil.ReadFile(wp.runnerSource)
+       if err != nil {
+               logger.WithError(err).Error("failed to load runner program")
+               return err
+       }
+       wp.runnerData = buf
+       wp.runnerMD5 = md5.Sum(buf)
+       wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
+       return nil
 }
 
 func (wp *Pool) notify() {
@@ -728,7 +824,7 @@ func (wp *Pool) getInstancesAndSync() error {
        }
        wp.logger.Debug("getting instance list")
        threshold := time.Now()
-       instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
+       instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
        if err != nil {
                wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
                return err
@@ -748,7 +844,7 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
        notify := false
 
        for _, inst := range instances {
-               itTag := inst.Tags()[tagKeyInstanceType]
+               itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
                it, ok := wp.instanceTypes[itTag]
                if !ok {
                        wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
@@ -771,6 +867,9 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
                        "WorkerState": wkr.state,
                })
                logger.Info("instance disappeared in cloud")
+               if wp.mDisappearances != nil {
+                       wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
+               }
                delete(wp.workers, id)
                go wkr.Close()
                notify = true