package worker
import (
+ "crypto/rand"
"errors"
+ "fmt"
"io"
"sort"
"strings"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
+ "golang.org/x/crypto/ssh"
)
const (
- tagKeyInstanceType = "InstanceType"
- tagKeyIdleBehavior = "IdleBehavior"
+ tagKeyInstanceType = "InstanceType"
+ tagKeyIdleBehavior = "IdleBehavior"
+ tagKeyInstanceSecret = "InstanceSecret"
)
// An InstanceView shows a worker's current state and recent activity.
type InstanceView struct {
- Instance cloud.InstanceID
- Price float64
- ArvadosInstanceType string
- ProviderInstanceType string
- LastContainerUUID string
- LastBusy time.Time
- WorkerState string
- IdleBehavior IdleBehavior
+ Instance cloud.InstanceID `json:"instance"`
+ Address string `json:"address"`
+ Price float64 `json:"price"`
+ ArvadosInstanceType string `json:"arvados_instance_type"`
+ ProviderInstanceType string `json:"provider_instance_type"`
+ LastContainerUUID string `json:"last_container_uuid"`
+ LastBusy time.Time `json:"last_busy"`
+ WorkerState string `json:"worker_state"`
+ IdleBehavior IdleBehavior `json:"idle_behavior"`
}
// An Executor executes shell commands on a remote host.
defaultTimeoutBooting = time.Minute * 10
defaultTimeoutProbe = time.Minute * 10
defaultTimeoutShutdown = time.Second * 10
+
+ // Time after a quota error to try again anyway, even if no
+ // instances have been shutdown.
+ quotaErrorTTL = time.Minute
+
+ // Time between "X failed because rate limiting" messages
+ logRateLimitErrorInterval = time.Second * 10
)
func duration(conf arvados.Duration, def time.Duration) time.Duration {
//
// New instances are configured and set up according to the given
// cluster configuration.
-func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
+func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
wp := &Pool{
logger: logger,
arvClient: arvClient,
- instanceSet: instanceSet,
+ instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
newExecutor: newExecutor,
bootProbeCommand: cluster.CloudVMs.BootProbeCommand,
imageID: cloud.ImageID(cluster.CloudVMs.ImageID),
timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+ installPublicKey: installPublicKey,
stop: make(chan bool),
}
wp.registerMetrics(reg)
// configuration
logger logrus.FieldLogger
arvClient *arvados.Client
- instanceSet cloud.InstanceSet
+ instanceSet *throttledInstanceSet
newExecutor func(cloud.Instance) Executor
bootProbeCommand string
imageID cloud.ImageID
timeoutBooting time.Duration
timeoutProbe time.Duration
timeoutShutdown time.Duration
+ installPublicKey ssh.PublicKey
// private state
subscribers map[<-chan struct{}]chan<- struct{}
mtx sync.RWMutex
setupOnce sync.Once
- mInstances prometheus.Gauge
+ throttleCreate throttle
+ throttleInstances throttle
+
mContainersRunning prometheus.Gauge
- mVCPUs prometheus.Gauge
- mVCPUsInuse prometheus.Gauge
- mMemory prometheus.Gauge
- mMemoryInuse prometheus.Gauge
+ mInstances *prometheus.GaugeVec
+ mInstancesPrice *prometheus.GaugeVec
+ mVCPUs *prometheus.GaugeVec
+ mMemory *prometheus.GaugeVec
}
-// Subscribe returns a channel that becomes ready whenever a worker's
-// state changes.
+// Subscribe returns a buffered channel that becomes ready after any
+// change to the pool's state that could have scheduling implications:
+// a worker's state changes, a new worker appears, the cloud
+// provider's API rate limiting period ends, etc.
+//
+// Additional events that occur while the channel is already ready
+// will be dropped, so it is OK if the caller services the channel
+// slowly.
//
// Example:
//
// ch := wp.Subscribe()
// defer wp.Unsubscribe(ch)
// for range ch {
-// // ...try scheduling some work...
+// tryScheduling(wp)
// if done {
// break
// }
creating[it] = len(times)
}
for _, wkr := range wp.workers {
- if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) || wkr.idleBehavior != IdleBehaviorRun {
+ // Skip workers that are not expected to become
+ // available soon. Note len(wkr.running)>0 is not
+ // redundant here: it can be true even in
+ // StateUnknown.
+ if wkr.state == StateShutdown ||
+ wkr.state == StateRunning ||
+ wkr.idleBehavior != IdleBehaviorRun ||
+ len(wkr.running) > 0 {
continue
}
it := wkr.instType
// Create a new instance with the given type, and add it to the worker
// pool. The worker is added immediately; instance creation runs in
// the background.
-func (wp *Pool) Create(it arvados.InstanceType) error {
+//
+// Create returns false if a pre-existing error state prevents it from
+// even attempting to create a new instance. Those errors are logged
+// by the Pool, so the caller does not need to log anything in such
+// cases.
+func (wp *Pool) Create(it arvados.InstanceType) bool {
logger := wp.logger.WithField("InstanceType", it.Name)
wp.setupOnce.Do(wp.setup)
wp.mtx.Lock()
defer wp.mtx.Unlock()
- if time.Now().Before(wp.atQuotaUntil) {
- return wp.atQuotaErr
- }
- tags := cloud.InstanceTags{
- tagKeyInstanceType: it.Name,
- tagKeyIdleBehavior: string(IdleBehaviorRun),
+ if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
+ return false
}
now := time.Now()
wp.creating[it] = append(wp.creating[it], now)
go func() {
defer wp.notify()
- inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
+ secret := randomHex(instanceSecretLength)
+ tags := cloud.InstanceTags{
+ tagKeyInstanceType: it.Name,
+ tagKeyIdleBehavior: string(IdleBehaviorRun),
+ tagKeyInstanceSecret: secret,
+ }
+ initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename))
+ inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
wp.mtx.Lock()
defer wp.mtx.Unlock()
// Remove our timestamp marker from wp.creating
break
}
}
- if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
- wp.atQuotaErr = err
- wp.atQuotaUntil = time.Now().Add(time.Minute)
- }
if err != nil {
+ if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
+ wp.atQuotaErr = err
+ wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
+ time.AfterFunc(quotaErrorTTL, wp.notify)
+ }
logger.WithError(err).Error("create failed")
+ wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
return
}
wp.updateWorker(inst, it, StateBooting)
}()
- return nil
+ return true
}
// AtQuota returns true if Create is not expected to work at the
//
// Caller must have lock.
func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
+ inst = tagVerifier{inst}
id := inst.ID()
if wkr := wp.workers[id]; wkr != nil {
wkr.executor.SetTarget(inst)
logger := wp.logger.WithFields(logrus.Fields{
"InstanceType": it.Name,
- "Instance": inst,
+ "Instance": inst.ID(),
+ "Address": inst.Address(),
})
logger.WithFields(logrus.Fields{
"State": initialState,
}
// Running returns the container UUIDs being prepared/run on workers.
+//
+// In the returned map, the time value indicates when the Pool
+// observed that the container process had exited. A container that
+// has not yet exited has a zero time value. The caller should use
+// KillContainer() to garbage-collect the entries for exited
+// containers.
func (wp *Pool) Running() map[string]time.Time {
wp.setupOnce.Do(wp.setup)
wp.mtx.Lock()
func (wp *Pool) kill(wkr *worker, uuid string) {
logger := wp.logger.WithFields(logrus.Fields{
"ContainerUUID": uuid,
- "Instance": wkr.instance,
+ "Instance": wkr.instance.ID(),
})
logger.Debug("killing process")
- stdout, stderr, err := wkr.executor.Execute(nil, "crunch-run --kill 15 "+uuid, nil)
+ cmd := "crunch-run --kill 15 " + uuid
+ if u := wkr.instance.RemoteUser(); u != "root" {
+ cmd = "sudo " + cmd
+ }
+ stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
if err != nil {
logger.WithFields(logrus.Fields{
"stderr": string(stderr),
if reg == nil {
reg = prometheus.NewRegistry()
}
- wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "dispatchcloud",
- Name: "instances_total",
- Help: "Number of cloud VMs including pending, booting, running, held, and shutting down.",
- })
- reg.MustRegister(wp.mInstances)
wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "dispatchcloud",
Help: "Number of containers reported running by cloud VMs.",
})
reg.MustRegister(wp.mContainersRunning)
-
- wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
+ wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "instances_total",
+ Help: "Number of cloud VMs.",
+ }, []string{"category"})
+ reg.MustRegister(wp.mInstances)
+ wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "instances_price",
+ Help: "Price of cloud VMs.",
+ }, []string{"category"})
+ reg.MustRegister(wp.mInstancesPrice)
+ wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "dispatchcloud",
Name: "vcpus_total",
Help: "Total VCPUs on all cloud VMs.",
- })
+ }, []string{"category"})
reg.MustRegister(wp.mVCPUs)
- wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "dispatchcloud",
- Name: "vcpus_inuse",
- Help: "VCPUs on cloud VMs that are running containers.",
- })
- reg.MustRegister(wp.mVCPUsInuse)
- wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
+ wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "dispatchcloud",
Name: "memory_bytes_total",
Help: "Total memory on all cloud VMs.",
- })
+ }, []string{"category"})
reg.MustRegister(wp.mMemory)
- wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "dispatchcloud",
- Name: "memory_bytes_inuse",
- Help: "Memory on cloud VMs that are running containers.",
- })
- reg.MustRegister(wp.mMemoryInuse)
}
func (wp *Pool) runMetrics() {
wp.mtx.RLock()
defer wp.mtx.RUnlock()
- var alloc, cpu, cpuInuse, mem, memInuse int64
+ instances := map[string]int64{}
+ price := map[string]float64{}
+ cpu := map[string]int64{}
+ mem := map[string]int64{}
+ var running int64
for _, wkr := range wp.workers {
- cpu += int64(wkr.instType.VCPUs)
- mem += int64(wkr.instType.RAM)
- if len(wkr.running)+len(wkr.starting) == 0 {
- continue
+ var cat string
+ switch {
+ case len(wkr.running)+len(wkr.starting) > 0:
+ cat = "inuse"
+ case wkr.idleBehavior == IdleBehaviorHold:
+ cat = "hold"
+ case wkr.state == StateBooting:
+ cat = "booting"
+ case wkr.state == StateUnknown:
+ cat = "unknown"
+ default:
+ cat = "idle"
}
- alloc += int64(len(wkr.running) + len(wkr.starting))
- cpuInuse += int64(wkr.instType.VCPUs)
- memInuse += int64(wkr.instType.RAM)
- }
- wp.mInstances.Set(float64(len(wp.workers)))
- wp.mContainersRunning.Set(float64(alloc))
- wp.mVCPUs.Set(float64(cpu))
- wp.mMemory.Set(float64(mem))
- wp.mVCPUsInuse.Set(float64(cpuInuse))
- wp.mMemoryInuse.Set(float64(memInuse))
+ instances[cat]++
+ price[cat] += wkr.instType.Price
+ cpu[cat] += int64(wkr.instType.VCPUs)
+ mem[cat] += int64(wkr.instType.RAM)
+ running += int64(len(wkr.running) + len(wkr.starting))
+ }
+ for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
+ wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat]))
+ wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
+ wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
+ wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
+ }
+ wp.mContainersRunning.Set(float64(running))
}
func (wp *Pool) runProbes() {
for _, w := range wp.workers {
r = append(r, InstanceView{
Instance: w.instance.ID(),
+ Address: w.instance.Address(),
Price: w.instType.Price,
ArvadosInstanceType: w.instType.Name,
ProviderInstanceType: w.instType.ProviderType,
func (wp *Pool) getInstancesAndSync() error {
wp.setupOnce.Do(wp.setup)
+ if err := wp.instanceSet.throttleInstances.Error(); err != nil {
+ return err
+ }
wp.logger.Debug("getting instance list")
threshold := time.Now()
instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
if err != nil {
+ wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
return err
}
wp.sync(threshold, instances)
continue
}
logger := wp.logger.WithFields(logrus.Fields{
- "Instance": wkr.instance,
+ "Instance": wkr.instance.ID(),
"WorkerState": wkr.state,
})
logger.Info("instance disappeared in cloud")
go wp.notify()
}
}
+
+// Return a random string of n hexadecimal digits (n*4 random bits). n
+// must be even.
+func randomHex(n int) string {
+ buf := make([]byte, n/2)
+ _, err := rand.Read(buf)
+ if err != nil {
+ panic(err)
+ }
+ return fmt.Sprintf("%x", buf)
+}