X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/8fc29fafb91cf64ce4ededbdd85ef9507c51f216..64e387b2f4f0fe6c4c7bf16232706c7cf194caf0:/lib/dispatchcloud/worker/pool.go?ds=sidebyside diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go index 84b61fc006..0636fcee89 100644 --- a/lib/dispatchcloud/worker/pool.go +++ b/lib/dispatchcloud/worker/pool.go @@ -5,17 +5,19 @@ package worker import ( + "crypto/md5" "crypto/rand" "errors" "fmt" "io" + "io/ioutil" "sort" "strings" "sync" "time" - "git.curoverse.com/arvados.git/lib/cloud" - "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/lib/cloud" + "git.arvados.org/arvados.git/sdk/go/arvados" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "golang.org/x/crypto/ssh" @@ -25,6 +27,7 @@ const ( tagKeyInstanceType = "InstanceType" tagKeyIdleBehavior = "IdleBehavior" tagKeyInstanceSecret = "InstanceSecret" + tagKeyInstanceSetID = "InstanceSetID" ) // An InstanceView shows a worker's current state and recent activity. @@ -91,13 +94,15 @@ func duration(conf arvados.Duration, def time.Duration) time.Duration { // // New instances are configured and set up according to the given // cluster configuration. -func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool { +func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool { wp := &Pool{ logger: logger, arvClient: arvClient, + instanceSetID: instanceSetID, instanceSet: &throttledInstanceSet{InstanceSet: instanceSet}, newExecutor: newExecutor, bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand, + runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary, imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID), instanceTypes: cluster.InstanceTypes, maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond, @@ -110,6 +115,7 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM), timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal), installPublicKey: installPublicKey, + tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix, stop: make(chan bool), } wp.registerMetrics(reg) @@ -128,9 +134,11 @@ type Pool struct { // configuration logger logrus.FieldLogger arvClient *arvados.Client + instanceSetID cloud.InstanceSetID instanceSet *throttledInstanceSet newExecutor func(cloud.Instance) Executor bootProbeCommand string + runnerSource string imageID cloud.ImageID instanceTypes map[string]arvados.InstanceType syncInterval time.Duration @@ -143,18 +151,22 @@ type Pool struct { timeoutTERM time.Duration timeoutSignal time.Duration installPublicKey ssh.PublicKey + tagKeyPrefix string // private state subscribers map[<-chan struct{}]chan<- struct{} creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret) workers map[cloud.InstanceID]*worker loaded bool // loaded list of instances from InstanceSet at least once - exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called + exited map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called atQuotaUntil time.Time atQuotaErr cloud.QuotaError stop chan bool mtx sync.RWMutex setupOnce sync.Once + runnerData []byte + runnerMD5 [md5.Size]byte + runnerCmd string throttleCreate throttle throttleInstances throttle @@ -164,6 +176,7 @@ type Pool struct { mInstancesPrice *prometheus.GaugeVec mVCPUs *prometheus.GaugeVec mMemory *prometheus.GaugeVec + mDisappearances *prometheus.CounterVec } type createCall struct { @@ -171,6 +184,14 @@ type createCall struct { instanceType arvados.InstanceType } +func (wp *Pool) CheckHealth() error { + wp.setupOnce.Do(wp.setup) + if err := wp.loadRunnerData(); err != nil { + return fmt.Errorf("error loading runner binary: %s", err) + } + return nil +} + // Subscribe returns a buffered channel that becomes ready after any // change to the pool's state that could have scheduling implications: // a worker's state changes, a new worker appears, the cloud @@ -270,6 +291,10 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int { func (wp *Pool) Create(it arvados.InstanceType) bool { logger := wp.logger.WithField("InstanceType", it.Name) wp.setupOnce.Do(wp.setup) + if wp.loadRunnerData() != nil { + // Boot probe is certain to fail. + return false + } wp.mtx.Lock() defer wp.mtx.Unlock() if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil { @@ -281,11 +306,12 @@ func (wp *Pool) Create(it arvados.InstanceType) bool { go func() { defer wp.notify() tags := cloud.InstanceTags{ - tagKeyInstanceType: it.Name, - tagKeyIdleBehavior: string(IdleBehaviorRun), - tagKeyInstanceSecret: secret, + wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID), + wp.tagKeyPrefix + tagKeyInstanceType: it.Name, + wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun), + wp.tagKeyPrefix + tagKeyInstanceSecret: secret, } - initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename)) + initCmd := TagVerifier{nil, secret}.InitCommand() inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey) wp.mtx.Lock() defer wp.mtx.Unlock() @@ -338,7 +364,8 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) // // Caller must have lock. func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) { - inst = tagVerifier{inst} + secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret] + inst = TagVerifier{inst, secret} id := inst.ID() if wkr := wp.workers[id]; wkr != nil { wkr.executor.SetTarget(inst) @@ -349,7 +376,7 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*wor } state := StateUnknown - if _, ok := wp.creating[inst.Tags()[tagKeyInstanceSecret]]; ok { + if _, ok := wp.creating[secret]; ok { state = StateBooting } @@ -359,7 +386,7 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*wor // process); otherwise, default to "run". After this, // wkr.idleBehavior is the source of truth, and will only be // changed via SetIdleBehavior(). - idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior]) + idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior]) if !validIdleBehavior[idleBehavior] { idleBehavior = IdleBehaviorRun } @@ -438,7 +465,7 @@ func (wp *Pool) CountWorkers() map[State]int { // In the returned map, the time value indicates when the Pool // observed that the container process had exited. A container that // has not yet exited has a zero time value. The caller should use -// KillContainer() to garbage-collect the entries for exited +// ForgetContainer() to garbage-collect the entries for exited // containers. func (wp *Pool) Running() map[string]time.Time { wp.setupOnce.Do(wp.setup) @@ -485,18 +512,15 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b // // KillContainer returns immediately; the act of killing the container // takes some time, and runs in the background. -func (wp *Pool) KillContainer(uuid string, reason string) { +// +// KillContainer returns false if the container has already ended. +func (wp *Pool) KillContainer(uuid string, reason string) bool { wp.mtx.Lock() defer wp.mtx.Unlock() logger := wp.logger.WithFields(logrus.Fields{ "ContainerUUID": uuid, "Reason": reason, }) - if _, ok := wp.exited[uuid]; ok { - logger.Debug("clearing placeholder for exited crunch-run process") - delete(wp.exited, uuid) - return - } for _, wkr := range wp.workers { rr := wkr.running[uuid] if rr == nil { @@ -504,10 +528,30 @@ func (wp *Pool) KillContainer(uuid string, reason string) { } if rr != nil { rr.Kill(reason) - return + return true } } logger.Debug("cannot kill: already disappeared") + return false +} + +// ForgetContainer clears the placeholder for the given exited +// container, so it isn't returned by subsequent calls to Running(). +// +// ForgetContainer has no effect if the container has not yet exited. +// +// The "container exited at time T" placeholder (which necessitates +// ForgetContainer) exists to make it easier for the caller +// (scheduler) to distinguish a container that exited without +// finalizing its state from a container that exited too recently for +// its final state to have appeared in the scheduler's queue cache. +func (wp *Pool) ForgetContainer(uuid string) { + wp.mtx.Lock() + defer wp.mtx.Unlock() + if _, ok := wp.exited[uuid]; ok { + wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process") + delete(wp.exited, uuid) + } } func (wp *Pool) registerMetrics(reg *prometheus.Registry) { @@ -526,7 +570,7 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) { Subsystem: "dispatchcloud", Name: "instances_total", Help: "Number of cloud VMs.", - }, []string{"category"}) + }, []string{"category", "instance_type"}) reg.MustRegister(wp.mInstances) wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "arvados", @@ -549,6 +593,16 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) { Help: "Total memory on all cloud VMs.", }, []string{"category"}) reg.MustRegister(wp.mMemory) + wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "instances_disappeared", + Help: "Number of occurrences of an instance disappearing from the cloud provider's list of instances.", + }, []string{"state"}) + for _, v := range stateString { + wp.mDisappearances.WithLabelValues(v).Add(0) + } + reg.MustRegister(wp.mDisappearances) } func (wp *Pool) runMetrics() { @@ -564,7 +618,11 @@ func (wp *Pool) updateMetrics() { wp.mtx.RLock() defer wp.mtx.RUnlock() - instances := map[string]int64{} + type entKey struct { + cat string + instType string + } + instances := map[entKey]int64{} price := map[string]float64{} cpu := map[string]int64{} mem := map[string]int64{} @@ -583,17 +641,25 @@ func (wp *Pool) updateMetrics() { default: cat = "idle" } - instances[cat]++ + instances[entKey{cat, wkr.instType.Name}]++ price[cat] += wkr.instType.Price cpu[cat] += int64(wkr.instType.VCPUs) mem[cat] += int64(wkr.instType.RAM) running += int64(len(wkr.running) + len(wkr.starting)) } for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} { - wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat])) wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat]) wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat])) wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat])) + // make sure to reset gauges for non-existing category/nodetype combinations + for _, it := range wp.instanceTypes { + if _, ok := instances[entKey{cat, it.Name}]; !ok { + wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0)) + } + } + } + for k, v := range instances { + wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v)) } wp.mContainersRunning.Set(float64(running)) } @@ -708,6 +774,36 @@ func (wp *Pool) setup() { wp.exited = map[string]time.Time{} wp.workers = map[cloud.InstanceID]*worker{} wp.subscribers = map[<-chan struct{}]chan<- struct{}{} + wp.loadRunnerData() +} + +// Load the runner program to be deployed on worker nodes into +// wp.runnerData, if necessary. Errors are logged. +// +// If auto-deploy is disabled, len(wp.runnerData) will be 0. +// +// Caller must not have lock. +func (wp *Pool) loadRunnerData() error { + wp.mtx.Lock() + defer wp.mtx.Unlock() + if wp.runnerData != nil { + return nil + } else if wp.runnerSource == "" { + wp.runnerCmd = "crunch-run" + wp.runnerData = []byte{} + return nil + } + logger := wp.logger.WithField("source", wp.runnerSource) + logger.Debug("loading runner") + buf, err := ioutil.ReadFile(wp.runnerSource) + if err != nil { + logger.WithError(err).Error("failed to load runner program") + return err + } + wp.runnerData = buf + wp.runnerMD5 = md5.Sum(buf) + wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5) + return nil } func (wp *Pool) notify() { @@ -728,7 +824,7 @@ func (wp *Pool) getInstancesAndSync() error { } wp.logger.Debug("getting instance list") threshold := time.Now() - instances, err := wp.instanceSet.Instances(cloud.InstanceTags{}) + instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)}) if err != nil { wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify) return err @@ -748,7 +844,7 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { notify := false for _, inst := range instances { - itTag := inst.Tags()[tagKeyInstanceType] + itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType] it, ok := wp.instanceTypes[itTag] if !ok { wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag) @@ -771,6 +867,9 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { "WorkerState": wkr.state, }) logger.Info("instance disappeared in cloud") + if wp.mDisappearances != nil { + wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc() + } delete(wp.workers, id) go wkr.Close() notify = true