X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/02f779feef0138420f8d7dc908eba040bc2dd904..df1ebc0e3184afd3fb66414651fc1aec713928bf:/lib/dispatchcloud/worker/pool.go?ds=sidebyside diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go index a25ed60150..15b0dbcde5 100644 --- a/lib/dispatchcloud/worker/pool.go +++ b/lib/dispatchcloud/worker/pool.go @@ -5,12 +5,15 @@ package worker import ( + "crypto/hmac" "crypto/md5" "crypto/rand" + "crypto/sha256" "errors" "fmt" "io" "io/ioutil" + mathrand "math/rand" "sort" "strings" "sync" @@ -101,12 +104,15 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe instanceSetID: instanceSetID, instanceSet: &throttledInstanceSet{InstanceSet: instanceSet}, newExecutor: newExecutor, + cluster: cluster, bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand, + instanceInitCommand: cloud.InitCommand(cluster.Containers.CloudVMs.InstanceInitCommand), runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary, imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID), instanceTypes: cluster.InstanceTypes, maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond, maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps, + maxInstances: cluster.Containers.CloudVMs.MaxInstances, probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval), syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval), timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle), @@ -116,8 +122,11 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM), timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal), timeoutStaleRunLock: duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock), + systemRootToken: cluster.SystemRootToken, installPublicKey: installPublicKey, tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix, + runnerCmdDefault: cluster.Containers.CrunchRunCommand, + runnerArgs: append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...), stop: make(chan bool), } wp.registerMetrics(reg) @@ -139,7 +148,9 @@ type Pool struct { instanceSetID cloud.InstanceSetID instanceSet *throttledInstanceSet newExecutor func(cloud.Instance) Executor + cluster *arvados.Cluster bootProbeCommand string + instanceInitCommand cloud.InitCommand runnerSource string imageID cloud.ImageID instanceTypes map[string]arvados.InstanceType @@ -147,6 +158,7 @@ type Pool struct { probeInterval time.Duration maxProbesPerSecond int maxConcurrentInstanceCreateOps int + maxInstances int timeoutIdle time.Duration timeoutBooting time.Duration timeoutProbe time.Duration @@ -154,23 +166,27 @@ type Pool struct { timeoutTERM time.Duration timeoutSignal time.Duration timeoutStaleRunLock time.Duration + systemRootToken string installPublicKey ssh.PublicKey tagKeyPrefix string + runnerCmdDefault string // crunch-run command to use if not deploying a binary + runnerArgs []string // extra args passed to crunch-run // private state - subscribers map[<-chan struct{}]chan<- struct{} - creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret) - workers map[cloud.InstanceID]*worker - loaded bool // loaded list of instances from InstanceSet at least once - exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called - atQuotaUntil time.Time - atQuotaErr cloud.QuotaError - stop chan bool - mtx sync.RWMutex - setupOnce sync.Once - runnerData []byte - runnerMD5 [md5.Size]byte - runnerCmd string + subscribers map[<-chan struct{}]chan<- struct{} + creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret) + workers map[cloud.InstanceID]*worker + loaded bool // loaded list of instances from InstanceSet at least once + exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called + atQuotaUntilFewerInstances int + atQuotaUntil time.Time + atQuotaErr cloud.QuotaError + stop chan bool + mtx sync.RWMutex + setupOnce sync.Once + runnerData []byte + runnerMD5 [md5.Size]byte + runnerCmd string mContainersRunning prometheus.Gauge mInstances *prometheus.GaugeVec @@ -184,6 +200,8 @@ type Pool struct { mTimeFromShutdownToGone prometheus.Summary mTimeFromQueueToCrunchRun prometheus.Summary mRunProbeDuration *prometheus.SummaryVec + mProbeAgeMax prometheus.Gauge + mProbeAgeMedian prometheus.Gauge } type createCall struct { @@ -291,10 +309,10 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int { // pool. The worker is added immediately; instance creation runs in // the background. // -// Create returns false if a pre-existing error state prevents it from -// even attempting to create a new instance. Those errors are logged -// by the Pool, so the caller does not need to log anything in such -// cases. +// Create returns false if a pre-existing error or a configuration +// setting prevents it from even attempting to create a new +// instance. Those errors are logged by the Pool, so the caller does +// not need to log anything in such cases. func (wp *Pool) Create(it arvados.InstanceType) bool { logger := wp.logger.WithField("InstanceType", it.Name) wp.setupOnce.Do(wp.setup) @@ -304,7 +322,10 @@ func (wp *Pool) Create(it arvados.InstanceType) bool { } wp.mtx.Lock() defer wp.mtx.Unlock() - if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil { + if time.Now().Before(wp.atQuotaUntil) || + wp.atQuotaUntilFewerInstances > 0 || + wp.instanceSet.throttleCreate.Error() != nil || + (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating)) { return false } // The maxConcurrentInstanceCreateOps knob throttles the number of node create @@ -330,7 +351,7 @@ func (wp *Pool) Create(it arvados.InstanceType) bool { wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun), wp.tagKeyPrefix + tagKeyInstanceSecret: secret, } - initCmd := TagVerifier{nil, secret, nil}.InitCommand() + initCmd := TagVerifier{nil, secret, nil}.InitCommand() + "\n" + wp.instanceInitCommand inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey) wp.mtx.Lock() defer wp.mtx.Unlock() @@ -341,8 +362,24 @@ func (wp *Pool) Create(it arvados.InstanceType) bool { if err != nil { if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() { wp.atQuotaErr = err - wp.atQuotaUntil = time.Now().Add(quotaErrorTTL) - time.AfterFunc(quotaErrorTTL, wp.notify) + n := len(wp.workers) + len(wp.creating) - 1 + if n < 1 { + // Quota error with no + // instances running -- + // nothing to do but wait + wp.atQuotaUntilFewerInstances = 0 + wp.atQuotaUntil = time.Now().Add(quotaErrorTTL) + time.AfterFunc(quotaErrorTTL, wp.notify) + logger.WithField("atQuotaUntil", wp.atQuotaUntil).Info("quota error with 0 running -- waiting for quotaErrorTTL") + } else if n < wp.atQuotaUntilFewerInstances || wp.atQuotaUntilFewerInstances == 0 { + // Quota error with N + // instances running -- report + // AtQuota until some + // instances shut down + wp.atQuotaUntilFewerInstances = n + wp.atQuotaUntil = time.Time{} + logger.WithField("atQuotaUntilFewerInstances", n).Info("quota error -- waiting for next instance shutdown") + } } logger.WithError(err).Error("create failed") wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify) @@ -350,15 +387,21 @@ func (wp *Pool) Create(it arvados.InstanceType) bool { } wp.updateWorker(inst, it) }() + if len(wp.creating)+len(wp.workers) == wp.maxInstances { + logger.Infof("now at MaxInstances limit of %d instances", wp.maxInstances) + } return true } // AtQuota returns true if Create is not expected to work at the -// moment. +// moment (e.g., cloud provider has reported quota errors, or we are +// already at our own configured quota). func (wp *Pool) AtQuota() bool { wp.mtx.Lock() defer wp.mtx.Unlock() - return time.Now().Before(wp.atQuotaUntil) + return wp.atQuotaUntilFewerInstances > 0 || + time.Now().Before(wp.atQuotaUntil) || + (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating)) } // SetIdleBehavior determines how the indicated instance will behave @@ -378,10 +421,15 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) func (wp *Pool) reportSSHConnected(inst cloud.Instance) { wp.mtx.Lock() defer wp.mtx.Unlock() - wkr := wp.workers[inst.ID()] + wkr, ok := wp.workers[inst.ID()] + if !ok { + // race: inst was removed from the pool + return + } if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() { - // the node is not in booting state (can happen if a-d-c is restarted) OR - // this is not the first SSH connection + // the node is not in booting state (can happen if + // a-d-c is restarted) OR this is not the first SSH + // connection return } @@ -602,6 +650,20 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) { Help: "Number of containers reported running by cloud VMs.", }) reg.MustRegister(wp.mContainersRunning) + wp.mProbeAgeMax = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "probe_age_seconds_max", + Help: "Maximum number of seconds since an instance's most recent successful probe.", + }) + reg.MustRegister(wp.mProbeAgeMax) + wp.mProbeAgeMedian = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "probe_age_seconds_median", + Help: "Median number of seconds since an instance's most recent successful probe.", + }) + reg.MustRegister(wp.mProbeAgeMedian) wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "arvados", Subsystem: "dispatchcloud", @@ -714,6 +776,8 @@ func (wp *Pool) updateMetrics() { cpu := map[string]int64{} mem := map[string]int64{} var running int64 + now := time.Now() + var probed []time.Time for _, wkr := range wp.workers { var cat string switch { @@ -733,6 +797,7 @@ func (wp *Pool) updateMetrics() { cpu[cat] += int64(wkr.instType.VCPUs) mem[cat] += int64(wkr.instType.RAM) running += int64(len(wkr.running) + len(wkr.starting)) + probed = append(probed, wkr.probed) } for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} { wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat]) @@ -749,6 +814,15 @@ func (wp *Pool) updateMetrics() { wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v)) } wp.mContainersRunning.Set(float64(running)) + + if len(probed) == 0 { + wp.mProbeAgeMax.Set(0) + wp.mProbeAgeMedian.Set(0) + } else { + sort.Slice(probed, func(i, j int) bool { return probed[i].Before(probed[j]) }) + wp.mProbeAgeMax.Set(now.Sub(probed[0]).Seconds()) + wp.mProbeAgeMedian.Set(now.Sub(probed[len(probed)/2]).Seconds()) + } } func (wp *Pool) runProbes() { @@ -764,6 +838,13 @@ func (wp *Pool) runProbes() { workers := []cloud.InstanceID{} for range probeticker.C { + // Add some jitter. Without this, if probeInterval is + // a multiple of syncInterval and sync is + // instantaneous (as with the loopback driver), the + // first few probes race with sync operations and + // don't update the workers. + time.Sleep(time.Duration(mathrand.Int63n(int64(wp.probeInterval) / 23))) + workers = workers[:0] wp.mtx.Lock() for id, wkr := range wp.workers { @@ -847,6 +928,9 @@ func (wp *Pool) Instances() []InstanceView { // KillInstance destroys a cloud VM instance. It returns an error if // the given instance does not exist. func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error { + wp.setupOnce.Do(wp.setup) + wp.mtx.Lock() + defer wp.mtx.Unlock() wkr, ok := wp.workers[id] if !ok { return errors.New("instance not found") @@ -877,7 +961,7 @@ func (wp *Pool) loadRunnerData() error { if wp.runnerData != nil { return nil } else if wp.runnerSource == "" { - wp.runnerCmd = "crunch-run" + wp.runnerCmd = wp.runnerCmdDefault wp.runnerData = []byte{} return nil } @@ -890,7 +974,7 @@ func (wp *Pool) loadRunnerData() error { } wp.runnerData = buf wp.runnerMD5 = md5.Sum(buf) - wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5) + wp.runnerCmd = fmt.Sprintf("/tmp/arvados-crunch-run/crunch-run~%x", wp.runnerMD5) return nil } @@ -968,6 +1052,14 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { notify = true } + if wp.atQuotaUntilFewerInstances > len(wp.workers)+len(wp.creating) { + // After syncing, there are fewer instances (including + // pending creates) than there were last time we saw a + // quota error. This might mean it's now possible to + // create new instances. Reset our "at quota" state. + wp.atQuotaUntilFewerInstances = 0 + } + if !wp.loaded { notify = true wp.loaded = true @@ -990,6 +1082,12 @@ func (wp *Pool) waitUntilLoaded() { } } +func (wp *Pool) gatewayAuthSecret(uuid string) string { + h := hmac.New(sha256.New, []byte(wp.systemRootToken)) + fmt.Fprint(h, uuid) + return fmt.Sprintf("%x", h.Sum(nil)) +} + // Return a random string of n hexadecimal digits (n*4 random bits). n // must be even. func randomHex(n int) string {