X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/10d70a1c08984a699ac3f6b893fe6d2141c5ad9e..8685bdc41012f1623cc02b573e27439fdf314799:/lib/dispatchcloud/worker/pool.go diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go index 13b7fd2934..7289179fd6 100644 --- a/lib/dispatchcloud/worker/pool.go +++ b/lib/dispatchcloud/worker/pool.go @@ -5,17 +5,21 @@ package worker import ( + "crypto/hmac" + "crypto/md5" "crypto/rand" + "crypto/sha256" "errors" "fmt" "io" + "io/ioutil" "sort" "strings" "sync" "time" - "git.curoverse.com/arvados.git/lib/cloud" - "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/lib/cloud" + "git.arvados.org/arvados.git/sdk/go/arvados" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "golang.org/x/crypto/ssh" @@ -62,15 +66,16 @@ type Executor interface { } const ( - defaultSyncInterval = time.Minute - defaultProbeInterval = time.Second * 10 - defaultMaxProbesPerSecond = 10 - defaultTimeoutIdle = time.Minute - defaultTimeoutBooting = time.Minute * 10 - defaultTimeoutProbe = time.Minute * 10 - defaultTimeoutShutdown = time.Second * 10 - defaultTimeoutTERM = time.Minute * 2 - defaultTimeoutSignal = time.Second * 5 + defaultSyncInterval = time.Minute + defaultProbeInterval = time.Second * 10 + defaultMaxProbesPerSecond = 10 + defaultTimeoutIdle = time.Minute + defaultTimeoutBooting = time.Minute * 10 + defaultTimeoutProbe = time.Minute * 10 + defaultTimeoutShutdown = time.Second * 10 + defaultTimeoutTERM = time.Minute * 2 + defaultTimeoutSignal = time.Second * 5 + defaultTimeoutStaleRunLock = time.Second * 5 // Time after a quota error to try again anyway, even if no // instances have been shutdown. @@ -83,9 +88,8 @@ const ( func duration(conf arvados.Duration, def time.Duration) time.Duration { if conf > 0 { return time.Duration(conf) - } else { - return def } + return def } // NewPool creates a Pool of workers backed by instanceSet. @@ -94,26 +98,32 @@ func duration(conf arvados.Duration, def time.Duration) time.Duration { // cluster configuration. func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool { wp := &Pool{ - logger: logger, - arvClient: arvClient, - instanceSetID: instanceSetID, - instanceSet: &throttledInstanceSet{InstanceSet: instanceSet}, - newExecutor: newExecutor, - bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand, - imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID), - instanceTypes: cluster.InstanceTypes, - maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond, - probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval), - syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval), - timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle), - timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting), - timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe), - timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown), - timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM), - timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal), - installPublicKey: installPublicKey, - tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix, - stop: make(chan bool), + logger: logger, + arvClient: arvClient, + instanceSetID: instanceSetID, + instanceSet: &throttledInstanceSet{InstanceSet: instanceSet}, + newExecutor: newExecutor, + bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand, + runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary, + imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID), + instanceTypes: cluster.InstanceTypes, + maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond, + maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps, + probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval), + syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval), + timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle), + timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting), + timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe), + timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown), + timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM), + timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal), + timeoutStaleRunLock: duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock), + systemRootToken: cluster.SystemRootToken, + installPublicKey: installPublicKey, + tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix, + runnerCmdDefault: cluster.Containers.CrunchRunCommand, + runnerArgs: cluster.Containers.CrunchRunArgumentsList, + stop: make(chan bool), } wp.registerMetrics(reg) go func() { @@ -129,25 +139,31 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe // zero Pool should not be used. Call NewPool to create a new Pool. type Pool struct { // configuration - logger logrus.FieldLogger - arvClient *arvados.Client - instanceSetID cloud.InstanceSetID - instanceSet *throttledInstanceSet - newExecutor func(cloud.Instance) Executor - bootProbeCommand string - imageID cloud.ImageID - instanceTypes map[string]arvados.InstanceType - syncInterval time.Duration - probeInterval time.Duration - maxProbesPerSecond int - timeoutIdle time.Duration - timeoutBooting time.Duration - timeoutProbe time.Duration - timeoutShutdown time.Duration - timeoutTERM time.Duration - timeoutSignal time.Duration - installPublicKey ssh.PublicKey - tagKeyPrefix string + logger logrus.FieldLogger + arvClient *arvados.Client + instanceSetID cloud.InstanceSetID + instanceSet *throttledInstanceSet + newExecutor func(cloud.Instance) Executor + bootProbeCommand string + runnerSource string + imageID cloud.ImageID + instanceTypes map[string]arvados.InstanceType + syncInterval time.Duration + probeInterval time.Duration + maxProbesPerSecond int + maxConcurrentInstanceCreateOps int + timeoutIdle time.Duration + timeoutBooting time.Duration + timeoutProbe time.Duration + timeoutShutdown time.Duration + timeoutTERM time.Duration + timeoutSignal time.Duration + timeoutStaleRunLock time.Duration + systemRootToken string + installPublicKey ssh.PublicKey + tagKeyPrefix string + runnerCmdDefault string // crunch-run command to use if not deploying a binary + runnerArgs []string // extra args passed to crunch-run // private state subscribers map[<-chan struct{}]chan<- struct{} @@ -160,16 +176,22 @@ type Pool struct { stop chan bool mtx sync.RWMutex setupOnce sync.Once - - throttleCreate throttle - throttleInstances throttle - - mContainersRunning prometheus.Gauge - mInstances *prometheus.GaugeVec - mInstancesPrice *prometheus.GaugeVec - mVCPUs *prometheus.GaugeVec - mMemory *prometheus.GaugeVec - mDisappearances *prometheus.CounterVec + runnerData []byte + runnerMD5 [md5.Size]byte + runnerCmd string + + mContainersRunning prometheus.Gauge + mInstances *prometheus.GaugeVec + mInstancesPrice *prometheus.GaugeVec + mVCPUs *prometheus.GaugeVec + mMemory *prometheus.GaugeVec + mBootOutcomes *prometheus.CounterVec + mDisappearances *prometheus.CounterVec + mTimeToSSH prometheus.Summary + mTimeToReadyForContainer prometheus.Summary + mTimeFromShutdownToGone prometheus.Summary + mTimeFromQueueToCrunchRun prometheus.Summary + mRunProbeDuration *prometheus.SummaryVec } type createCall struct { @@ -177,6 +199,14 @@ type createCall struct { instanceType arvados.InstanceType } +func (wp *Pool) CheckHealth() error { + wp.setupOnce.Do(wp.setup) + if err := wp.loadRunnerData(); err != nil { + return fmt.Errorf("error loading runner binary: %s", err) + } + return nil +} + // Subscribe returns a buffered channel that becomes ready after any // change to the pool's state that could have scheduling implications: // a worker's state changes, a new worker appears, the cloud @@ -276,9 +306,25 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int { func (wp *Pool) Create(it arvados.InstanceType) bool { logger := wp.logger.WithField("InstanceType", it.Name) wp.setupOnce.Do(wp.setup) + if wp.loadRunnerData() != nil { + // Boot probe is certain to fail. + return false + } wp.mtx.Lock() defer wp.mtx.Unlock() - if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil { + if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil { + return false + } + // The maxConcurrentInstanceCreateOps knob throttles the number of node create + // requests in flight. It was added to work around a limitation in Azure's + // managed disks, which support no more than 20 concurrent node creation + // requests from a single disk image (cf. + // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image). + // The code assumes that node creation, from Azure's perspective, means the + // period until the instance appears in the "get all instances" list. + if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps { + logger.Info("reached MaxConcurrentInstanceCreateOps") + wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify) return false } now := time.Now() @@ -292,7 +338,7 @@ func (wp *Pool) Create(it arvados.InstanceType) bool { wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun), wp.tagKeyPrefix + tagKeyInstanceSecret: secret, } - initCmd := TagVerifier{nil, secret}.InitCommand() + initCmd := TagVerifier{nil, secret, nil}.InitCommand() inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey) wp.mtx.Lock() defer wp.mtx.Unlock() @@ -336,6 +382,23 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) return nil } +// Successful connection to the SSH daemon, update the mTimeToSSH metric +func (wp *Pool) reportSSHConnected(inst cloud.Instance) { + wp.mtx.Lock() + defer wp.mtx.Unlock() + wkr := wp.workers[inst.ID()] + if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() { + // the node is not in booting state (can happen if a-d-c is restarted) OR + // this is not the first SSH connection + return + } + + wkr.firstSSHConnection = time.Now() + if wp.mTimeToSSH != nil { + wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds()) + } +} + // Add or update worker attached to the given instance. // // The second return value is true if a new worker is created. @@ -346,7 +409,7 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) // Caller must have lock. func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) { secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret] - inst = TagVerifier{inst, secret} + inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected} id := inst.ID() if wkr := wp.workers[id]; wkr != nil { wkr.executor.SetTarget(inst) @@ -416,7 +479,8 @@ func (wp *Pool) Shutdown(it arvados.InstanceType) bool { // time (Idle) or the earliest create time (Booting) for _, wkr := range wp.workers { if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it { - logger.WithField("Instance", wkr.instance).Info("shutting down") + logger.WithField("Instance", wkr.instance.ID()).Info("shutting down") + wkr.reportBootOutcome(BootOutcomeAborted) wkr.shutdown() return true } @@ -475,7 +539,7 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b defer wp.mtx.Unlock() var wkr *worker for _, w := range wp.workers { - if w.instType == it && w.state == StateIdle { + if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun { if wkr == nil || w.busy.After(wkr.busy) { wkr = w } @@ -551,7 +615,7 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) { Subsystem: "dispatchcloud", Name: "instances_total", Help: "Number of cloud VMs.", - }, []string{"category"}) + }, []string{"category", "instance_type"}) reg.MustRegister(wp.mInstances) wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "arvados", @@ -574,6 +638,16 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) { Help: "Total memory on all cloud VMs.", }, []string{"category"}) reg.MustRegister(wp.mMemory) + wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "boot_outcomes", + Help: "Boot outcomes by type.", + }, []string{"outcome"}) + for k := range validBootOutcomes { + wp.mBootOutcomes.WithLabelValues(string(k)).Add(0) + } + reg.MustRegister(wp.mBootOutcomes) wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "arvados", Subsystem: "dispatchcloud", @@ -584,6 +658,46 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) { wp.mDisappearances.WithLabelValues(v).Add(0) } reg.MustRegister(wp.mDisappearances) + wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "instances_time_to_ssh_seconds", + Help: "Number of seconds between instance creation and the first successful SSH connection.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, + }) + reg.MustRegister(wp.mTimeToSSH) + wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "instances_time_to_ready_for_container_seconds", + Help: "Number of seconds between the first successful SSH connection and ready to run a container.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, + }) + reg.MustRegister(wp.mTimeToReadyForContainer) + wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "instances_time_from_shutdown_request_to_disappearance_seconds", + Help: "Number of seconds between the first shutdown attempt and the disappearance of the worker.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, + }) + reg.MustRegister(wp.mTimeFromShutdownToGone) + wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "containers_time_from_queue_to_crunch_run_seconds", + Help: "Number of seconds between the queuing of a container and the start of crunch-run.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, + }) + reg.MustRegister(wp.mTimeFromQueueToCrunchRun) + wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "instances_run_probe_duration_seconds", + Help: "Number of seconds per runProbe call.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, + }, []string{"outcome"}) + reg.MustRegister(wp.mRunProbeDuration) } func (wp *Pool) runMetrics() { @@ -599,7 +713,11 @@ func (wp *Pool) updateMetrics() { wp.mtx.RLock() defer wp.mtx.RUnlock() - instances := map[string]int64{} + type entKey struct { + cat string + instType string + } + instances := map[entKey]int64{} price := map[string]float64{} cpu := map[string]int64{} mem := map[string]int64{} @@ -618,17 +736,25 @@ func (wp *Pool) updateMetrics() { default: cat = "idle" } - instances[cat]++ + instances[entKey{cat, wkr.instType.Name}]++ price[cat] += wkr.instType.Price cpu[cat] += int64(wkr.instType.VCPUs) mem[cat] += int64(wkr.instType.RAM) running += int64(len(wkr.running) + len(wkr.starting)) } for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} { - wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat])) wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat]) wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat])) wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat])) + // make sure to reset gauges for non-existing category/nodetype combinations + for _, it := range wp.instanceTypes { + if _, ok := instances[entKey{cat, it.Name}]; !ok { + wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0)) + } + } + } + for k, v := range instances { + wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v)) } wp.mContainersRunning.Set(float64(running)) } @@ -734,6 +860,7 @@ func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error { return errors.New("instance not found") } wkr.logger.WithField("Reason", reason).Info("shutting down") + wkr.reportBootOutcome(BootOutcomeAborted) wkr.shutdown() return nil } @@ -743,6 +870,36 @@ func (wp *Pool) setup() { wp.exited = map[string]time.Time{} wp.workers = map[cloud.InstanceID]*worker{} wp.subscribers = map[<-chan struct{}]chan<- struct{}{} + wp.loadRunnerData() +} + +// Load the runner program to be deployed on worker nodes into +// wp.runnerData, if necessary. Errors are logged. +// +// If auto-deploy is disabled, len(wp.runnerData) will be 0. +// +// Caller must not have lock. +func (wp *Pool) loadRunnerData() error { + wp.mtx.Lock() + defer wp.mtx.Unlock() + if wp.runnerData != nil { + return nil + } else if wp.runnerSource == "" { + wp.runnerCmd = wp.runnerCmdDefault + wp.runnerData = []byte{} + return nil + } + logger := wp.logger.WithField("source", wp.runnerSource) + logger.Debug("loading runner") + buf, err := ioutil.ReadFile(wp.runnerSource) + if err != nil { + logger.WithError(err).Error("failed to load runner program") + return err + } + wp.runnerData = buf + wp.runnerMD5 = md5.Sum(buf) + wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5) + return nil } func (wp *Pool) notify() { @@ -786,13 +943,13 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType] it, ok := wp.instanceTypes[itTag] if !ok { - wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag) + wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag) continue } if wkr, isNew := wp.updateWorker(inst, it); isNew { notify = true } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown { - wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying") + wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying") wkr.shutdown() } } @@ -806,9 +963,14 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { "WorkerState": wkr.state, }) logger.Info("instance disappeared in cloud") + wkr.reportBootOutcome(BootOutcomeDisappeared) if wp.mDisappearances != nil { wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc() } + // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down + if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() { + wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds()) + } delete(wp.workers, id) go wkr.Close() notify = true @@ -836,6 +998,12 @@ func (wp *Pool) waitUntilLoaded() { } } +func (wp *Pool) gatewayAuthSecret(uuid string) string { + h := hmac.New(sha256.New, []byte(wp.systemRootToken)) + fmt.Fprint(h, uuid) + return fmt.Sprintf("%x", h.Sum(nil)) +} + // Return a random string of n hexadecimal digits (n*4 random bits). n // must be even. func randomHex(n int) string {