package worker
import (
+ "crypto/md5"
"crypto/rand"
"errors"
"fmt"
"io"
+ "io/ioutil"
"sort"
"strings"
"sync"
"time"
- "git.curoverse.com/arvados.git/lib/cloud"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/lib/cloud"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
tagKeyInstanceType = "InstanceType"
tagKeyIdleBehavior = "IdleBehavior"
tagKeyInstanceSecret = "InstanceSecret"
+ tagKeyInstanceSetID = "InstanceSetID"
)
// An InstanceView shows a worker's current state and recent activity.
//
// New instances are configured and set up according to the given
// cluster configuration.
-func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
+func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
wp := &Pool{
logger: logger,
arvClient: arvClient,
+ instanceSetID: instanceSetID,
instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
newExecutor: newExecutor,
- bootProbeCommand: cluster.CloudVMs.BootProbeCommand,
- imageID: cloud.ImageID(cluster.CloudVMs.ImageID),
+ bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
+ runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
+ imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
instanceTypes: cluster.InstanceTypes,
- maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
- probeInterval: duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
- syncInterval: duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
- timeoutIdle: duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
- timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
- timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
- timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
- timeoutTERM: duration(cluster.Dispatch.TimeoutTERM, defaultTimeoutTERM),
- timeoutSignal: duration(cluster.Dispatch.TimeoutSignal, defaultTimeoutSignal),
+ maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
+ probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
+ syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
+ timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
+ timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
+ timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
+ timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+ timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
+ timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
installPublicKey: installPublicKey,
+ tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
stop: make(chan bool),
}
wp.registerMetrics(reg)
// configuration
logger logrus.FieldLogger
arvClient *arvados.Client
+ instanceSetID cloud.InstanceSetID
instanceSet *throttledInstanceSet
newExecutor func(cloud.Instance) Executor
bootProbeCommand string
+ runnerSource string
imageID cloud.ImageID
instanceTypes map[string]arvados.InstanceType
syncInterval time.Duration
timeoutTERM time.Duration
timeoutSignal time.Duration
installPublicKey ssh.PublicKey
+ tagKeyPrefix string
// private state
subscribers map[<-chan struct{}]chan<- struct{}
creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
workers map[cloud.InstanceID]*worker
loaded bool // loaded list of instances from InstanceSet at least once
- exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
+ exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
atQuotaUntil time.Time
atQuotaErr cloud.QuotaError
stop chan bool
mtx sync.RWMutex
setupOnce sync.Once
+ runnerData []byte
+ runnerMD5 [md5.Size]byte
+ runnerCmd string
throttleCreate throttle
throttleInstances throttle
mInstancesPrice *prometheus.GaugeVec
mVCPUs *prometheus.GaugeVec
mMemory *prometheus.GaugeVec
+ mBootOutcomes *prometheus.CounterVec
+ mDisappearances *prometheus.CounterVec
}
type createCall struct {
instanceType arvados.InstanceType
}
+func (wp *Pool) CheckHealth() error {
+ wp.setupOnce.Do(wp.setup)
+ if err := wp.loadRunnerData(); err != nil {
+ return fmt.Errorf("error loading runner binary: %s", err)
+ }
+ return nil
+}
+
// Subscribe returns a buffered channel that becomes ready after any
// change to the pool's state that could have scheduling implications:
// a worker's state changes, a new worker appears, the cloud
func (wp *Pool) Create(it arvados.InstanceType) bool {
logger := wp.logger.WithField("InstanceType", it.Name)
wp.setupOnce.Do(wp.setup)
+ if wp.loadRunnerData() != nil {
+ // Boot probe is certain to fail.
+ return false
+ }
wp.mtx.Lock()
defer wp.mtx.Unlock()
if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
go func() {
defer wp.notify()
tags := cloud.InstanceTags{
- tagKeyInstanceType: it.Name,
- tagKeyIdleBehavior: string(IdleBehaviorRun),
- tagKeyInstanceSecret: secret,
+ wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID),
+ wp.tagKeyPrefix + tagKeyInstanceType: it.Name,
+ wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
+ wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
}
- initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename))
+ initCmd := TagVerifier{nil, secret}.InitCommand()
inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
wp.mtx.Lock()
defer wp.mtx.Unlock()
//
// Caller must have lock.
func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
- inst = tagVerifier{inst}
+ secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
+ inst = TagVerifier{inst, secret}
id := inst.ID()
if wkr := wp.workers[id]; wkr != nil {
wkr.executor.SetTarget(inst)
}
state := StateUnknown
- if _, ok := wp.creating[inst.Tags()[tagKeyInstanceSecret]]; ok {
+ if _, ok := wp.creating[secret]; ok {
state = StateBooting
}
// process); otherwise, default to "run". After this,
// wkr.idleBehavior is the source of truth, and will only be
// changed via SetIdleBehavior().
- idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior])
+ idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
if !validIdleBehavior[idleBehavior] {
idleBehavior = IdleBehaviorRun
}
// time (Idle) or the earliest create time (Booting)
for _, wkr := range wp.workers {
if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
- logger.WithField("Instance", wkr.instance).Info("shutting down")
+ logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
+ wkr.reportBootOutcome(BootOutcomeAborted)
wkr.shutdown()
return true
}
// In the returned map, the time value indicates when the Pool
// observed that the container process had exited. A container that
// has not yet exited has a zero time value. The caller should use
-// KillContainer() to garbage-collect the entries for exited
+// ForgetContainer() to garbage-collect the entries for exited
// containers.
func (wp *Pool) Running() map[string]time.Time {
wp.setupOnce.Do(wp.setup)
defer wp.mtx.Unlock()
var wkr *worker
for _, w := range wp.workers {
- if w.instType == it && w.state == StateIdle {
+ if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
if wkr == nil || w.busy.After(wkr.busy) {
wkr = w
}
//
// KillContainer returns immediately; the act of killing the container
// takes some time, and runs in the background.
-func (wp *Pool) KillContainer(uuid string, reason string) {
+//
+// KillContainer returns false if the container has already ended.
+func (wp *Pool) KillContainer(uuid string, reason string) bool {
wp.mtx.Lock()
defer wp.mtx.Unlock()
logger := wp.logger.WithFields(logrus.Fields{
"ContainerUUID": uuid,
"Reason": reason,
})
- if _, ok := wp.exited[uuid]; ok {
- logger.Debug("clearing placeholder for exited crunch-run process")
- delete(wp.exited, uuid)
- return
- }
for _, wkr := range wp.workers {
rr := wkr.running[uuid]
if rr == nil {
}
if rr != nil {
rr.Kill(reason)
- return
+ return true
}
}
logger.Debug("cannot kill: already disappeared")
+ return false
+}
+
+// ForgetContainer clears the placeholder for the given exited
+// container, so it isn't returned by subsequent calls to Running().
+//
+// ForgetContainer has no effect if the container has not yet exited.
+//
+// The "container exited at time T" placeholder (which necessitates
+// ForgetContainer) exists to make it easier for the caller
+// (scheduler) to distinguish a container that exited without
+// finalizing its state from a container that exited too recently for
+// its final state to have appeared in the scheduler's queue cache.
+func (wp *Pool) ForgetContainer(uuid string) {
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
+ if _, ok := wp.exited[uuid]; ok {
+ wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
+ delete(wp.exited, uuid)
+ }
}
func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
Subsystem: "dispatchcloud",
Name: "instances_total",
Help: "Number of cloud VMs.",
- }, []string{"category"})
+ }, []string{"category", "instance_type"})
reg.MustRegister(wp.mInstances)
wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "arvados",
Help: "Total memory on all cloud VMs.",
}, []string{"category"})
reg.MustRegister(wp.mMemory)
+ wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "boot_outcomes",
+ Help: "Boot outcomes by type.",
+ }, []string{"state"})
+ for k := range validBootOutcomes {
+ wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
+ }
+ reg.MustRegister(wp.mBootOutcomes)
+ wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "instances_disappeared",
+ Help: "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
+ }, []string{"outcome"})
+ for _, v := range stateString {
+ wp.mDisappearances.WithLabelValues(v).Add(0)
+ }
+ reg.MustRegister(wp.mDisappearances)
}
func (wp *Pool) runMetrics() {
wp.mtx.RLock()
defer wp.mtx.RUnlock()
- instances := map[string]int64{}
+ type entKey struct {
+ cat string
+ instType string
+ }
+ instances := map[entKey]int64{}
price := map[string]float64{}
cpu := map[string]int64{}
mem := map[string]int64{}
default:
cat = "idle"
}
- instances[cat]++
+ instances[entKey{cat, wkr.instType.Name}]++
price[cat] += wkr.instType.Price
cpu[cat] += int64(wkr.instType.VCPUs)
mem[cat] += int64(wkr.instType.RAM)
running += int64(len(wkr.running) + len(wkr.starting))
}
for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
- wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat]))
wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
+ // make sure to reset gauges for non-existing category/nodetype combinations
+ for _, it := range wp.instanceTypes {
+ if _, ok := instances[entKey{cat, it.Name}]; !ok {
+ wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
+ }
+ }
+ }
+ for k, v := range instances {
+ wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
}
wp.mContainersRunning.Set(float64(running))
}
return r
}
+// KillInstance destroys a cloud VM instance. It returns an error if
+// the given instance does not exist.
+func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
+ wkr, ok := wp.workers[id]
+ if !ok {
+ return errors.New("instance not found")
+ }
+ wkr.logger.WithField("Reason", reason).Info("shutting down")
+ wkr.reportBootOutcome(BootOutcomeAborted)
+ wkr.shutdown()
+ return nil
+}
+
func (wp *Pool) setup() {
wp.creating = map[string]createCall{}
wp.exited = map[string]time.Time{}
wp.workers = map[cloud.InstanceID]*worker{}
wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
+ wp.loadRunnerData()
+}
+
+// Load the runner program to be deployed on worker nodes into
+// wp.runnerData, if necessary. Errors are logged.
+//
+// If auto-deploy is disabled, len(wp.runnerData) will be 0.
+//
+// Caller must not have lock.
+func (wp *Pool) loadRunnerData() error {
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
+ if wp.runnerData != nil {
+ return nil
+ } else if wp.runnerSource == "" {
+ wp.runnerCmd = "crunch-run"
+ wp.runnerData = []byte{}
+ return nil
+ }
+ logger := wp.logger.WithField("source", wp.runnerSource)
+ logger.Debug("loading runner")
+ buf, err := ioutil.ReadFile(wp.runnerSource)
+ if err != nil {
+ logger.WithError(err).Error("failed to load runner program")
+ return err
+ }
+ wp.runnerData = buf
+ wp.runnerMD5 = md5.Sum(buf)
+ wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
+ return nil
}
func (wp *Pool) notify() {
}
wp.logger.Debug("getting instance list")
threshold := time.Now()
- instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
+ instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
if err != nil {
wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
return err
notify := false
for _, inst := range instances {
- itTag := inst.Tags()[tagKeyInstanceType]
+ itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
it, ok := wp.instanceTypes[itTag]
if !ok {
- wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
+ wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
continue
}
if wkr, isNew := wp.updateWorker(inst, it); isNew {
notify = true
} else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
- wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
+ wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
wkr.shutdown()
}
}
"WorkerState": wkr.state,
})
logger.Info("instance disappeared in cloud")
+ wkr.reportBootOutcome(BootOutcomeDisappeared)
+ if wp.mDisappearances != nil {
+ wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
+ }
delete(wp.workers, id)
go wkr.Close()
notify = true