X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/be8ed479042df4fdefe1fd18c1e2e984e1c99bc0..2a748e79c3a72454d70e40f39fcad9dabf4943cc:/lib/dispatchcloud/worker/pool.go diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go index 1665a1e43d..e90935e2aa 100644 --- a/lib/dispatchcloud/worker/pool.go +++ b/lib/dispatchcloud/worker/pool.go @@ -5,7 +5,9 @@ package worker import ( + "crypto/rand" "errors" + "fmt" "io" "sort" "strings" @@ -16,16 +18,19 @@ import ( "git.curoverse.com/arvados.git/sdk/go/arvados" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" + "golang.org/x/crypto/ssh" ) const ( - tagKeyInstanceType = "InstanceType" - tagKeyIdleBehavior = "IdleBehavior" + tagKeyInstanceType = "InstanceType" + tagKeyIdleBehavior = "IdleBehavior" + tagKeyInstanceSecret = "InstanceSecret" ) // An InstanceView shows a worker's current state and recent activity. type InstanceView struct { Instance cloud.InstanceID `json:"instance"` + Address string `json:"address"` Price float64 `json:"price"` ArvadosInstanceType string `json:"arvados_instance_type"` ProviderInstanceType string `json:"provider_instance_type"` @@ -84,7 +89,7 @@ func duration(conf arvados.Duration, def time.Duration) time.Duration { // // New instances are configured and set up according to the given // cluster configuration. -func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool { +func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool { wp := &Pool{ logger: logger, arvClient: arvClient, @@ -100,6 +105,7 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting), timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe), timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown), + installPublicKey: installPublicKey, stop: make(chan bool), } wp.registerMetrics(reg) @@ -130,10 +136,11 @@ type Pool struct { timeoutBooting time.Duration timeoutProbe time.Duration timeoutShutdown time.Duration + installPublicKey ssh.PublicKey // private state subscribers map[<-chan struct{}]chan<- struct{} - creating map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls + creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret) workers map[cloud.InstanceID]*worker loaded bool // loaded list of instances from InstanceSet at least once exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called @@ -146,13 +153,16 @@ type Pool struct { throttleCreate throttle throttleInstances throttle - mInstances prometheus.Gauge - mInstancesPrice prometheus.Gauge mContainersRunning prometheus.Gauge - mVCPUs prometheus.Gauge - mVCPUsInuse prometheus.Gauge - mMemory prometheus.Gauge - mMemoryInuse prometheus.Gauge + mInstances *prometheus.GaugeVec + mInstancesPrice *prometheus.GaugeVec + mVCPUs *prometheus.GaugeVec + mMemory *prometheus.GaugeVec +} + +type createCall struct { + time time.Time + instanceType arvados.InstanceType } // Subscribe returns a buffered channel that becomes ready after any @@ -200,16 +210,28 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int { defer wp.mtx.RUnlock() unalloc := map[arvados.InstanceType]int{} creating := map[arvados.InstanceType]int{} - for it, times := range wp.creating { - creating[it] = len(times) + oldestCreate := map[arvados.InstanceType]time.Time{} + for _, cc := range wp.creating { + it := cc.instanceType + creating[it]++ + if t, ok := oldestCreate[it]; !ok || t.After(cc.time) { + oldestCreate[it] = cc.time + } } for _, wkr := range wp.workers { - if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) || wkr.idleBehavior != IdleBehaviorRun { + // Skip workers that are not expected to become + // available soon. Note len(wkr.running)>0 is not + // redundant here: it can be true even in + // StateUnknown. + if wkr.state == StateShutdown || + wkr.state == StateRunning || + wkr.idleBehavior != IdleBehaviorRun || + len(wkr.running) > 0 { continue } it := wkr.instType unalloc[it]++ - if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) { + if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) { // If up to N new workers appear in // Instances() while we are waiting for N // Create() calls to complete, we assume we're @@ -247,25 +269,24 @@ func (wp *Pool) Create(it arvados.InstanceType) bool { if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil { return false } - tags := cloud.InstanceTags{ - tagKeyInstanceType: it.Name, - tagKeyIdleBehavior: string(IdleBehaviorRun), - } now := time.Now() - wp.creating[it] = append(wp.creating[it], now) + secret := randomHex(instanceSecretLength) + wp.creating[secret] = createCall{time: now, instanceType: it} go func() { defer wp.notify() - inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil) + tags := cloud.InstanceTags{ + tagKeyInstanceType: it.Name, + tagKeyIdleBehavior: string(IdleBehaviorRun), + tagKeyInstanceSecret: secret, + } + initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename)) + inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey) wp.mtx.Lock() defer wp.mtx.Unlock() - // Remove our timestamp marker from wp.creating - for i, t := range wp.creating[it] { - if t == now { - copy(wp.creating[it][i:], wp.creating[it][i+1:]) - wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1] - break - } - } + // delete() is deferred so the updateWorker() call + // below knows to use StateBooting when adding a new + // worker. + defer delete(wp.creating, secret) if err != nil { if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() { wp.atQuotaErr = err @@ -276,7 +297,7 @@ func (wp *Pool) Create(it arvados.InstanceType) bool { wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify) return } - wp.updateWorker(inst, it, StateBooting) + wp.updateWorker(inst, it) }() return true } @@ -304,25 +325,30 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) return nil } -// Add or update worker attached to the given instance. Use -// initialState if a new worker is created. +// Add or update worker attached to the given instance. // // The second return value is true if a new worker is created. // +// A newly added instance has state=StateBooting if its tags match an +// entry in wp.creating, otherwise StateUnknown. +// // Caller must have lock. -func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) { +func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) { + inst = tagVerifier{inst} id := inst.ID() if wkr := wp.workers[id]; wkr != nil { wkr.executor.SetTarget(inst) wkr.instance = inst wkr.updated = time.Now() - if initialState == StateBooting && wkr.state == StateUnknown { - wkr.state = StateBooting - } wkr.saveTags() return wkr, false } + state := StateUnknown + if _, ok := wp.creating[inst.Tags()[tagKeyInstanceSecret]]; ok { + state = StateBooting + } + // If an instance has a valid IdleBehavior tag when it first // appears, initialize the new worker accordingly (this is how // we restore IdleBehavior that was set by a prior dispatch @@ -336,10 +362,11 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi logger := wp.logger.WithFields(logrus.Fields{ "InstanceType": it.Name, - "Instance": inst, + "Instance": inst.ID(), + "Address": inst.Address(), }) logger.WithFields(logrus.Fields{ - "State": initialState, + "State": state, "IdleBehavior": idleBehavior, }).Infof("instance appeared in cloud") now := time.Now() @@ -348,7 +375,7 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi wp: wp, logger: logger, executor: wp.newExecutor(inst), - state: initialState, + state: state, idleBehavior: idleBehavior, instance: inst, instType: it, @@ -392,8 +419,12 @@ func (wp *Pool) Shutdown(it arvados.InstanceType) bool { } // CountWorkers returns the current number of workers in each state. +// +// CountWorkers blocks, if necessary, until the initial instance list +// has been loaded from the cloud provider. func (wp *Pool) CountWorkers() map[State]int { wp.setupOnce.Do(wp.setup) + wp.waitUntilLoaded() wp.mtx.Lock() defer wp.mtx.Unlock() r := map[State]int{} @@ -475,10 +506,14 @@ func (wp *Pool) KillContainer(uuid string) { func (wp *Pool) kill(wkr *worker, uuid string) { logger := wp.logger.WithFields(logrus.Fields{ "ContainerUUID": uuid, - "Instance": wkr.instance, + "Instance": wkr.instance.ID(), }) logger.Debug("killing process") - stdout, stderr, err := wkr.executor.Execute(nil, "crunch-run --kill 15 "+uuid, nil) + cmd := "crunch-run --kill 15 " + uuid + if u := wkr.instance.RemoteUser(); u != "root" { + cmd = "sudo " + cmd + } + stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil) if err != nil { logger.WithFields(logrus.Fields{ "stderr": string(stderr), @@ -504,20 +539,6 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) { if reg == nil { reg = prometheus.NewRegistry() } - wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: "arvados", - Subsystem: "dispatchcloud", - Name: "instances_total", - Help: "Number of cloud VMs including pending, booting, running, held, and shutting down.", - }) - reg.MustRegister(wp.mInstances) - wp.mInstancesPrice = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: "arvados", - Subsystem: "dispatchcloud", - Name: "instances_price_total", - Help: "Sum of prices of all cloud VMs including pending, booting, running, held, and shutting down.", - }) - reg.MustRegister(wp.mInstancesPrice) wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "arvados", Subsystem: "dispatchcloud", @@ -525,40 +546,40 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) { Help: "Number of containers reported running by cloud VMs.", }) reg.MustRegister(wp.mContainersRunning) - - wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{ + wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "instances_total", + Help: "Number of cloud VMs.", + }, []string{"category"}) + reg.MustRegister(wp.mInstances) + wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "instances_price", + Help: "Price of cloud VMs.", + }, []string{"category"}) + reg.MustRegister(wp.mInstancesPrice) + wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "arvados", Subsystem: "dispatchcloud", Name: "vcpus_total", Help: "Total VCPUs on all cloud VMs.", - }) + }, []string{"category"}) reg.MustRegister(wp.mVCPUs) - wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: "arvados", - Subsystem: "dispatchcloud", - Name: "vcpus_inuse", - Help: "VCPUs on cloud VMs that are running containers.", - }) - reg.MustRegister(wp.mVCPUsInuse) - wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{ + wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "arvados", Subsystem: "dispatchcloud", Name: "memory_bytes_total", Help: "Total memory on all cloud VMs.", - }) + }, []string{"category"}) reg.MustRegister(wp.mMemory) - wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: "arvados", - Subsystem: "dispatchcloud", - Name: "memory_bytes_inuse", - Help: "Memory on cloud VMs that are running containers.", - }) - reg.MustRegister(wp.mMemoryInuse) } func (wp *Pool) runMetrics() { ch := wp.Subscribe() defer wp.Unsubscribe(ch) + wp.updateMetrics() for range ch { wp.updateMetrics() } @@ -568,26 +589,38 @@ func (wp *Pool) updateMetrics() { wp.mtx.RLock() defer wp.mtx.RUnlock() - var price float64 - var alloc, cpu, cpuInuse, mem, memInuse int64 + instances := map[string]int64{} + price := map[string]float64{} + cpu := map[string]int64{} + mem := map[string]int64{} + var running int64 for _, wkr := range wp.workers { - price += wkr.instType.Price - cpu += int64(wkr.instType.VCPUs) - mem += int64(wkr.instType.RAM) - if len(wkr.running)+len(wkr.starting) == 0 { - continue + var cat string + switch { + case len(wkr.running)+len(wkr.starting) > 0: + cat = "inuse" + case wkr.idleBehavior == IdleBehaviorHold: + cat = "hold" + case wkr.state == StateBooting: + cat = "booting" + case wkr.state == StateUnknown: + cat = "unknown" + default: + cat = "idle" } - alloc += int64(len(wkr.running) + len(wkr.starting)) - cpuInuse += int64(wkr.instType.VCPUs) - memInuse += int64(wkr.instType.RAM) - } - wp.mInstances.Set(float64(len(wp.workers))) - wp.mInstancesPrice.Set(price) - wp.mContainersRunning.Set(float64(alloc)) - wp.mVCPUs.Set(float64(cpu)) - wp.mMemory.Set(float64(mem)) - wp.mVCPUsInuse.Set(float64(cpuInuse)) - wp.mMemoryInuse.Set(float64(memInuse)) + instances[cat]++ + price[cat] += wkr.instType.Price + cpu[cat] += int64(wkr.instType.VCPUs) + mem[cat] += int64(wkr.instType.RAM) + running += int64(len(wkr.running) + len(wkr.starting)) + } + for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} { + wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat])) + wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat]) + wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat])) + wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat])) + } + wp.mContainersRunning.Set(float64(running)) } func (wp *Pool) runProbes() { @@ -666,6 +699,7 @@ func (wp *Pool) Instances() []InstanceView { for _, w := range wp.workers { r = append(r, InstanceView{ Instance: w.instance.ID(), + Address: w.instance.Address(), Price: w.instType.Price, ArvadosInstanceType: w.instType.Name, ProviderInstanceType: w.instType.ProviderType, @@ -683,7 +717,7 @@ func (wp *Pool) Instances() []InstanceView { } func (wp *Pool) setup() { - wp.creating = map[arvados.InstanceType][]time.Time{} + wp.creating = map[string]createCall{} wp.exited = map[string]time.Time{} wp.workers = map[cloud.InstanceID]*worker{} wp.subscribers = map[<-chan struct{}]chan<- struct{}{} @@ -733,7 +767,7 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag) continue } - if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew { + if wkr, isNew := wp.updateWorker(inst, it); isNew { notify = true } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown { wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying") @@ -746,7 +780,7 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { continue } logger := wp.logger.WithFields(logrus.Fields{ - "Instance": wkr.instance, + "Instance": wkr.instance.ID(), "WorkerState": wkr.state, }) logger.Info("instance disappeared in cloud") @@ -756,6 +790,7 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { } if !wp.loaded { + notify = true wp.loaded = true wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list") } @@ -764,3 +799,25 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { go wp.notify() } } + +func (wp *Pool) waitUntilLoaded() { + ch := wp.Subscribe() + wp.mtx.RLock() + defer wp.mtx.RUnlock() + for !wp.loaded { + wp.mtx.RUnlock() + <-ch + wp.mtx.RLock() + } +} + +// Return a random string of n hexadecimal digits (n*4 random bits). n +// must be even. +func randomHex(n int) string { + buf := make([]byte, n/2) + _, err := rand.Read(buf) + if err != nil { + panic(err) + } + return fmt.Sprintf("%x", buf) +}