X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/dc021c3b57dcdebe464c148d55f9990a74e8246b..414025952297852fff735664047ac14f6590522d:/lib/dispatchcloud/worker/pool.go diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go index 649910f63c..79af5a0cb3 100644 --- a/lib/dispatchcloud/worker/pool.go +++ b/lib/dispatchcloud/worker/pool.go @@ -5,10 +5,12 @@ package worker import ( + "crypto/md5" "crypto/rand" "errors" "fmt" "io" + "io/ioutil" "sort" "strings" "sync" @@ -100,6 +102,7 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe instanceSet: &throttledInstanceSet{InstanceSet: instanceSet}, newExecutor: newExecutor, bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand, + runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary, imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID), instanceTypes: cluster.InstanceTypes, maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond, @@ -135,6 +138,7 @@ type Pool struct { instanceSet *throttledInstanceSet newExecutor func(cloud.Instance) Executor bootProbeCommand string + runnerSource string imageID cloud.ImageID instanceTypes map[string]arvados.InstanceType syncInterval time.Duration @@ -160,6 +164,9 @@ type Pool struct { stop chan bool mtx sync.RWMutex setupOnce sync.Once + runnerData []byte + runnerMD5 [md5.Size]byte + runnerCmd string throttleCreate throttle throttleInstances throttle @@ -177,6 +184,14 @@ type createCall struct { instanceType arvados.InstanceType } +func (wp *Pool) CheckHealth() error { + wp.setupOnce.Do(wp.setup) + if err := wp.loadRunnerData(); err != nil { + return fmt.Errorf("error loading runner binary: %s", err) + } + return nil +} + // Subscribe returns a buffered channel that becomes ready after any // change to the pool's state that could have scheduling implications: // a worker's state changes, a new worker appears, the cloud @@ -276,6 +291,10 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int { func (wp *Pool) Create(it arvados.InstanceType) bool { logger := wp.logger.WithField("InstanceType", it.Name) wp.setupOnce.Do(wp.setup) + if wp.loadRunnerData() != nil { + // Boot probe is certain to fail. + return false + } wp.mtx.Lock() defer wp.mtx.Unlock() if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil { @@ -416,7 +435,7 @@ func (wp *Pool) Shutdown(it arvados.InstanceType) bool { // time (Idle) or the earliest create time (Booting) for _, wkr := range wp.workers { if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it { - logger.WithField("Instance", wkr.instance).Info("shutting down") + logger.WithField("Instance", wkr.instance.ID()).Info("shutting down") wkr.shutdown() return true } @@ -475,7 +494,7 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b defer wp.mtx.Unlock() var wkr *worker for _, w := range wp.workers { - if w.instType == it && w.state == StateIdle { + if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun { if wkr == nil || w.busy.After(wkr.busy) { wkr = w } @@ -551,7 +570,7 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) { Subsystem: "dispatchcloud", Name: "instances_total", Help: "Number of cloud VMs.", - }, []string{"category"}) + }, []string{"category", "instance_type"}) reg.MustRegister(wp.mInstances) wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "arvados", @@ -599,7 +618,11 @@ func (wp *Pool) updateMetrics() { wp.mtx.RLock() defer wp.mtx.RUnlock() - instances := map[string]int64{} + type entKey struct { + cat string + instType string + } + instances := map[entKey]int64{} price := map[string]float64{} cpu := map[string]int64{} mem := map[string]int64{} @@ -618,17 +641,25 @@ func (wp *Pool) updateMetrics() { default: cat = "idle" } - instances[cat]++ + instances[entKey{cat, wkr.instType.Name}]++ price[cat] += wkr.instType.Price cpu[cat] += int64(wkr.instType.VCPUs) mem[cat] += int64(wkr.instType.RAM) running += int64(len(wkr.running) + len(wkr.starting)) } for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} { - wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat])) wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat]) wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat])) wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat])) + // make sure to reset gauges for non-existing category/nodetype combinations + for _, it := range wp.instanceTypes { + if _, ok := instances[entKey{cat, it.Name}]; !ok { + wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0)) + } + } + } + for k, v := range instances { + wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v)) } wp.mContainersRunning.Set(float64(running)) } @@ -743,6 +774,36 @@ func (wp *Pool) setup() { wp.exited = map[string]time.Time{} wp.workers = map[cloud.InstanceID]*worker{} wp.subscribers = map[<-chan struct{}]chan<- struct{}{} + wp.loadRunnerData() +} + +// Load the runner program to be deployed on worker nodes into +// wp.runnerData, if necessary. Errors are logged. +// +// If auto-deploy is disabled, len(wp.runnerData) will be 0. +// +// Caller must not have lock. +func (wp *Pool) loadRunnerData() error { + wp.mtx.Lock() + defer wp.mtx.Unlock() + if wp.runnerData != nil { + return nil + } else if wp.runnerSource == "" { + wp.runnerCmd = "crunch-run" + wp.runnerData = []byte{} + return nil + } + logger := wp.logger.WithField("source", wp.runnerSource) + logger.Debug("loading runner") + buf, err := ioutil.ReadFile(wp.runnerSource) + if err != nil { + logger.WithError(err).Error("failed to load runner program") + return err + } + wp.runnerData = buf + wp.runnerMD5 = md5.Sum(buf) + wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5) + return nil } func (wp *Pool) notify() { @@ -786,13 +847,13 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType] it, ok := wp.instanceTypes[itTag] if !ok { - wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag) + wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag) continue } if wkr, isNew := wp.updateWorker(inst, it); isNew { notify = true } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown { - wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying") + wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying") wkr.shutdown() } }