X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/71460cf96c9b43c8ab0a38118c3745a4c0e6d7e9..442a871e7f3476938d0ebb3adbe3b9a7742f03ad:/lib/dispatchcloud/worker/pool.go diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go index 4ddd3745ef..0ee36a96ff 100644 --- a/lib/dispatchcloud/worker/pool.go +++ b/lib/dispatchcloud/worker/pool.go @@ -5,7 +5,9 @@ package worker import ( - "bytes" + "crypto/rand" + "errors" + "fmt" "io" "sort" "strings" @@ -14,30 +16,35 @@ import ( "git.curoverse.com/arvados.git/lib/cloud" "git.curoverse.com/arvados.git/sdk/go/arvados" - "github.com/Sirupsen/logrus" "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" + "golang.org/x/crypto/ssh" ) const ( - tagKeyInstanceType = "InstanceType" - tagKeyHold = "Hold" + tagKeyInstanceType = "InstanceType" + tagKeyIdleBehavior = "IdleBehavior" + tagKeyInstanceSecret = "InstanceSecret" + tagKeyInstanceSetID = "InstanceSetID" ) -// A View shows a worker's current state and recent activity. -type View struct { - Instance string - Price float64 - ArvadosInstanceType string - ProviderInstanceType string - LastContainerUUID string - Unallocated time.Time - WorkerState string +// An InstanceView shows a worker's current state and recent activity. +type InstanceView struct { + Instance cloud.InstanceID `json:"instance"` + Address string `json:"address"` + Price float64 `json:"price"` + ArvadosInstanceType string `json:"arvados_instance_type"` + ProviderInstanceType string `json:"provider_instance_type"` + LastContainerUUID string `json:"last_container_uuid"` + LastBusy time.Time `json:"last_busy"` + WorkerState string `json:"worker_state"` + IdleBehavior IdleBehavior `json:"idle_behavior"` } // An Executor executes shell commands on a remote host. type Executor interface { // Run cmd on the current target. - Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error) + Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error) // Use the given target for subsequent operations. The new // target is the same host as the previous target, but it @@ -61,6 +68,16 @@ const ( defaultTimeoutIdle = time.Minute defaultTimeoutBooting = time.Minute * 10 defaultTimeoutProbe = time.Minute * 10 + defaultTimeoutShutdown = time.Second * 10 + defaultTimeoutTERM = time.Minute * 2 + defaultTimeoutSignal = time.Second * 5 + + // Time after a quota error to try again anyway, even if no + // instances have been shutdown. + quotaErrorTTL = time.Minute + + // Time between "X failed because rate limiting" messages + logRateLimitErrorInterval = time.Second * 10 ) func duration(conf arvados.Duration, def time.Duration) time.Duration { @@ -75,20 +92,28 @@ func duration(conf arvados.Duration, def time.Duration) time.Duration { // // New instances are configured and set up according to the given // cluster configuration. -func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool { +func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool { wp := &Pool{ logger: logger, - instanceSet: instanceSet, + arvClient: arvClient, + instanceSetID: instanceSetID, + instanceSet: &throttledInstanceSet{InstanceSet: instanceSet}, newExecutor: newExecutor, - bootProbeCommand: cluster.CloudVMs.BootProbeCommand, - imageID: cloud.ImageID(cluster.CloudVMs.ImageID), + bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand, + imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID), instanceTypes: cluster.InstanceTypes, - maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond, - probeInterval: duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval), - syncInterval: duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval), - timeoutIdle: duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle), - timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting), - timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe), + maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond, + probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval), + syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval), + timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle), + timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting), + timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe), + timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown), + timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM), + timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal), + installPublicKey: installPublicKey, + tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix, + stop: make(chan bool), } wp.registerMetrics(reg) go func() { @@ -105,7 +130,9 @@ func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cl type Pool struct { // configuration logger logrus.FieldLogger - instanceSet cloud.InstanceSet + arvClient *arvados.Client + instanceSetID cloud.InstanceSetID + instanceSet *throttledInstanceSet newExecutor func(cloud.Instance) Executor bootProbeCommand string imageID cloud.ImageID @@ -116,53 +143,54 @@ type Pool struct { timeoutIdle time.Duration timeoutBooting time.Duration timeoutProbe time.Duration + timeoutShutdown time.Duration + timeoutTERM time.Duration + timeoutSignal time.Duration + installPublicKey ssh.PublicKey + tagKeyPrefix string // private state subscribers map[<-chan struct{}]chan<- struct{} - creating map[arvados.InstanceType]int // goroutines waiting for (InstanceSet)Create to return + creating map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret) workers map[cloud.InstanceID]*worker loaded bool // loaded list of instances from InstanceSet at least once exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called atQuotaUntil time.Time + atQuotaErr cloud.QuotaError stop chan bool mtx sync.RWMutex setupOnce sync.Once - mInstances prometheus.Gauge + throttleCreate throttle + throttleInstances throttle + mContainersRunning prometheus.Gauge - mVCPUs prometheus.Gauge - mVCPUsInuse prometheus.Gauge - mMemory prometheus.Gauge - mMemoryInuse prometheus.Gauge -} - -type worker struct { - state State - instance cloud.Instance - executor Executor - instType arvados.InstanceType - vcpus int64 - memory int64 - booted bool - probed time.Time - updated time.Time - busy time.Time - unallocated time.Time - lastUUID string - running map[string]struct{} - starting map[string]struct{} - probing chan struct{} -} - -// Subscribe returns a channel that becomes ready whenever a worker's -// state changes. + mInstances *prometheus.GaugeVec + mInstancesPrice *prometheus.GaugeVec + mVCPUs *prometheus.GaugeVec + mMemory *prometheus.GaugeVec +} + +type createCall struct { + time time.Time + instanceType arvados.InstanceType +} + +// Subscribe returns a buffered channel that becomes ready after any +// change to the pool's state that could have scheduling implications: +// a worker's state changes, a new worker appears, the cloud +// provider's API rate limiting period ends, etc. +// +// Additional events that occur while the channel is already ready +// will be dropped, so it is OK if the caller services the channel +// slowly. // // Example: // // ch := wp.Subscribe() // defer wp.Unsubscribe(ch) // for range ch { -// // ...try scheduling some work... +// tryScheduling(wp) // if done { // break // } @@ -185,95 +213,193 @@ func (wp *Pool) Unsubscribe(ch <-chan struct{}) { } // Unallocated returns the number of unallocated (creating + booting + -// idle + unknown) workers for each instance type. +// idle + unknown) workers for each instance type. Workers in +// hold/drain mode are not included. func (wp *Pool) Unallocated() map[arvados.InstanceType]int { wp.setupOnce.Do(wp.setup) wp.mtx.RLock() defer wp.mtx.RUnlock() - u := map[arvados.InstanceType]int{} - for it, c := range wp.creating { - u[it] = c + unalloc := map[arvados.InstanceType]int{} + creating := map[arvados.InstanceType]int{} + oldestCreate := map[arvados.InstanceType]time.Time{} + for _, cc := range wp.creating { + it := cc.instanceType + creating[it]++ + if t, ok := oldestCreate[it]; !ok || t.After(cc.time) { + oldestCreate[it] = cc.time + } } for _, wkr := range wp.workers { - if len(wkr.running)+len(wkr.starting) == 0 && (wkr.state == StateRunning || wkr.state == StateBooting || wkr.state == StateUnknown) { - u[wkr.instType]++ + // Skip workers that are not expected to become + // available soon. Note len(wkr.running)>0 is not + // redundant here: it can be true even in + // StateUnknown. + if wkr.state == StateShutdown || + wkr.state == StateRunning || + wkr.idleBehavior != IdleBehaviorRun || + len(wkr.running) > 0 { + continue + } + it := wkr.instType + unalloc[it]++ + if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) { + // If up to N new workers appear in + // Instances() while we are waiting for N + // Create() calls to complete, we assume we're + // just seeing a race between Instances() and + // Create() responses. + // + // The other common reason why nodes have + // state==Unknown is that they appeared at + // startup, before any Create calls. They + // don't match the above timing condition, so + // we never mistakenly attribute them to + // pending Create calls. + creating[it]-- } } - return u + for it, c := range creating { + unalloc[it] += c + } + return unalloc } // Create a new instance with the given type, and add it to the worker // pool. The worker is added immediately; instance creation runs in // the background. -func (wp *Pool) Create(it arvados.InstanceType) error { +// +// Create returns false if a pre-existing error state prevents it from +// even attempting to create a new instance. Those errors are logged +// by the Pool, so the caller does not need to log anything in such +// cases. +func (wp *Pool) Create(it arvados.InstanceType) bool { logger := wp.logger.WithField("InstanceType", it.Name) wp.setupOnce.Do(wp.setup) wp.mtx.Lock() defer wp.mtx.Unlock() - tags := cloud.InstanceTags{tagKeyInstanceType: it.Name} - wp.creating[it]++ + if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil { + return false + } + now := time.Now() + secret := randomHex(instanceSecretLength) + wp.creating[secret] = createCall{time: now, instanceType: it} go func() { defer wp.notify() - inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil) + tags := cloud.InstanceTags{ + wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID), + wp.tagKeyPrefix + tagKeyInstanceType: it.Name, + wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun), + wp.tagKeyPrefix + tagKeyInstanceSecret: secret, + } + initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename)) + inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey) wp.mtx.Lock() defer wp.mtx.Unlock() - wp.creating[it]-- - if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() { - wp.atQuotaUntil = time.Now().Add(time.Minute) - } + // delete() is deferred so the updateWorker() call + // below knows to use StateBooting when adding a new + // worker. + defer delete(wp.creating, secret) if err != nil { + if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() { + wp.atQuotaErr = err + wp.atQuotaUntil = time.Now().Add(quotaErrorTTL) + time.AfterFunc(quotaErrorTTL, wp.notify) + } logger.WithError(err).Error("create failed") + wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify) return } - wp.updateWorker(inst, it, StateBooting) + wp.updateWorker(inst, it) }() - return nil + return true } // AtQuota returns true if Create is not expected to work at the // moment. func (wp *Pool) AtQuota() bool { + wp.mtx.Lock() + defer wp.mtx.Unlock() return time.Now().Before(wp.atQuotaUntil) } -// Add or update worker attached to the given instance. Use -// initialState if a new worker is created. Caller must have lock. +// SetIdleBehavior determines how the indicated instance will behave +// when it has no containers running. +func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error { + wp.mtx.Lock() + defer wp.mtx.Unlock() + wkr, ok := wp.workers[id] + if !ok { + return errors.New("requested instance does not exist") + } + wkr.setIdleBehavior(idleBehavior) + return nil +} + +// Add or update worker attached to the given instance. +// +// The second return value is true if a new worker is created. +// +// A newly added instance has state=StateBooting if its tags match an +// entry in wp.creating, otherwise StateUnknown. // -// Returns true when a new worker is created. -func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) bool { +// Caller must have lock. +func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) { + secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret] + inst = tagVerifier{inst, secret} id := inst.ID() - if wp.workers[id] != nil { - wp.workers[id].executor.SetTarget(inst) - wp.workers[id].instance = inst - wp.workers[id].updated = time.Now() - if initialState == StateBooting && wp.workers[id].state == StateUnknown { - wp.workers[id].state = StateBooting - } - return false + if wkr := wp.workers[id]; wkr != nil { + wkr.executor.SetTarget(inst) + wkr.instance = inst + wkr.updated = time.Now() + wkr.saveTags() + return wkr, false + } + + state := StateUnknown + if _, ok := wp.creating[secret]; ok { + state = StateBooting } - if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" { - initialState = StateHold + + // If an instance has a valid IdleBehavior tag when it first + // appears, initialize the new worker accordingly (this is how + // we restore IdleBehavior that was set by a prior dispatch + // process); otherwise, default to "run". After this, + // wkr.idleBehavior is the source of truth, and will only be + // changed via SetIdleBehavior(). + idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior]) + if !validIdleBehavior[idleBehavior] { + idleBehavior = IdleBehaviorRun } - wp.logger.WithFields(logrus.Fields{ + + logger := wp.logger.WithFields(logrus.Fields{ "InstanceType": it.Name, - "Instance": inst, - "State": initialState, + "Instance": inst.ID(), + "Address": inst.Address(), + }) + logger.WithFields(logrus.Fields{ + "State": state, + "IdleBehavior": idleBehavior, }).Infof("instance appeared in cloud") now := time.Now() - wp.workers[id] = &worker{ - executor: wp.newExecutor(inst), - state: initialState, - instance: inst, - instType: it, - probed: now, - busy: now, - updated: now, - unallocated: now, - running: make(map[string]struct{}), - starting: make(map[string]struct{}), - probing: make(chan struct{}, 1), - } - return true + wkr := &worker{ + mtx: &wp.mtx, + wp: wp, + logger: logger, + executor: wp.newExecutor(inst), + state: state, + idleBehavior: idleBehavior, + instance: inst, + instType: it, + appeared: now, + probed: now, + busy: now, + updated: now, + running: make(map[string]*remoteRunner), + starting: make(map[string]*remoteRunner), + probing: make(chan struct{}, 1), + } + wp.workers[id] = wkr + return wkr, true } // Shutdown shuts down a worker with the given type, or returns false @@ -284,46 +410,27 @@ func (wp *Pool) Shutdown(it arvados.InstanceType) bool { defer wp.mtx.Unlock() logger := wp.logger.WithField("InstanceType", it.Name) logger.Info("shutdown requested") - for _, tryState := range []State{StateBooting, StateRunning} { + for _, tryState := range []State{StateBooting, StateIdle} { // TODO: shutdown the worker with the longest idle - // time (Running) or the earliest create time - // (Booting) + // time (Idle) or the earliest create time (Booting) for _, wkr := range wp.workers { - if wkr.state != tryState || len(wkr.running)+len(wkr.starting) > 0 { - continue + if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it { + logger.WithField("Instance", wkr.instance).Info("shutting down") + wkr.shutdown() + return true } - if wkr.instType != it { - continue - } - logger = logger.WithField("Instance", wkr.instance) - logger.Info("shutting down") - wp.shutdown(wkr, logger) - return true } } return false } -// caller must have lock -func (wp *Pool) shutdown(wkr *worker, logger logrus.FieldLogger) { - wkr.updated = time.Now() - wkr.state = StateShutdown - go func() { - err := wkr.instance.Destroy() - if err != nil { - logger.WithError(err).WithField("Instance", wkr.instance).Warn("shutdown failed") - return - } - wp.mtx.Lock() - wp.atQuotaUntil = time.Now() - wp.mtx.Unlock() - wp.notify() - }() -} - -// Workers returns the current number of workers in each state. -func (wp *Pool) Workers() map[State]int { +// CountWorkers returns the current number of workers in each state. +// +// CountWorkers blocks, if necessary, until the initial instance list +// has been loaded from the cloud provider. +func (wp *Pool) CountWorkers() map[State]int { wp.setupOnce.Do(wp.setup) + wp.waitUntilLoaded() wp.mtx.Lock() defer wp.mtx.Unlock() r := map[State]int{} @@ -334,6 +441,12 @@ func (wp *Pool) Workers() map[State]int { } // Running returns the container UUIDs being prepared/run on workers. +// +// In the returned map, the time value indicates when the Pool +// observed that the container process had exited. A container that +// has not yet exited has a zero time value. The caller should use +// KillContainer() to garbage-collect the entries for exited +// containers. func (wp *Pool) Running() map[string]time.Time { wp.setupOnce.Do(wp.setup) wp.mtx.Lock() @@ -356,17 +469,12 @@ func (wp *Pool) Running() map[string]time.Time { // StartContainer starts a container on an idle worker immediately if // possible, otherwise returns false. func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool { - logger := wp.logger.WithFields(logrus.Fields{ - "InstanceType": it.Name, - "ContainerUUID": ctr.UUID, - "Priority": ctr.Priority, - }) wp.setupOnce.Do(wp.setup) wp.mtx.Lock() defer wp.mtx.Unlock() var wkr *worker for _, w := range wp.workers { - if w.instType == it && w.state == StateRunning && len(w.running)+len(w.starting) == 0 { + if w.instType == it && w.state == StateIdle { if wkr == nil || w.busy.After(wkr.busy) { wkr = w } @@ -375,88 +483,44 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b if wkr == nil { return false } - logger = logger.WithField("Instance", wkr.instance) - logger.Debug("starting container") - wkr.starting[ctr.UUID] = struct{}{} - go func() { - stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil) - wp.mtx.Lock() - defer wp.mtx.Unlock() - wkr.updated = time.Now() - delete(wkr.starting, ctr.UUID) - wkr.running[ctr.UUID] = struct{}{} - if err != nil { - logger.WithField("stdout", string(stdout)). - WithField("stderr", string(stderr)). - WithError(err). - Error("error starting crunch-run process") - // Leave uuid in wkr.running, though: it's - // possible the error was just a communication - // failure and the process was in fact - // started. Wait for next probe to find out. - return - } - logger.Info("crunch-run process started") - wkr.lastUUID = ctr.UUID - }() + wkr.startContainer(ctr) return true } // KillContainer kills the crunch-run process for the given container // UUID, if it's running on any worker. -func (wp *Pool) KillContainer(uuid string) { +// +// KillContainer returns immediately; the act of killing the container +// takes some time, and runs in the background. +func (wp *Pool) KillContainer(uuid string, reason string) { wp.mtx.Lock() defer wp.mtx.Unlock() + logger := wp.logger.WithFields(logrus.Fields{ + "ContainerUUID": uuid, + "Reason": reason, + }) if _, ok := wp.exited[uuid]; ok { - wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process") + logger.Debug("clearing placeholder for exited crunch-run process") delete(wp.exited, uuid) return } for _, wkr := range wp.workers { - if _, ok := wkr.running[uuid]; ok { - go wp.kill(wkr, uuid) + rr := wkr.running[uuid] + if rr == nil { + rr = wkr.starting[uuid] + } + if rr != nil { + rr.Kill(reason) return } } - wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared") -} - -func (wp *Pool) kill(wkr *worker, uuid string) { - logger := wp.logger.WithFields(logrus.Fields{ - "ContainerUUID": uuid, - "Instance": wkr.instance, - }) - logger.Debug("killing process") - stdout, stderr, err := wkr.executor.Execute("crunch-run --kill "+uuid, nil) - if err != nil { - logger.WithFields(logrus.Fields{ - "stderr": string(stderr), - "stdout": string(stdout), - "error": err, - }).Warn("kill failed") - return - } - logger.Debug("killing process succeeded") - wp.mtx.Lock() - defer wp.mtx.Unlock() - if _, ok := wkr.running[uuid]; ok { - delete(wkr.running, uuid) - wkr.updated = time.Now() - go wp.notify() - } + logger.Debug("cannot kill: already disappeared") } func (wp *Pool) registerMetrics(reg *prometheus.Registry) { if reg == nil { reg = prometheus.NewRegistry() } - wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: "arvados", - Subsystem: "dispatchcloud", - Name: "instances_total", - Help: "Number of cloud VMs including pending, booting, running, held, and shutting down.", - }) - reg.MustRegister(wp.mInstances) wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "arvados", Subsystem: "dispatchcloud", @@ -464,40 +528,40 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) { Help: "Number of containers reported running by cloud VMs.", }) reg.MustRegister(wp.mContainersRunning) - - wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{ + wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "instances_total", + Help: "Number of cloud VMs.", + }, []string{"category"}) + reg.MustRegister(wp.mInstances) + wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "instances_price", + Help: "Price of cloud VMs.", + }, []string{"category"}) + reg.MustRegister(wp.mInstancesPrice) + wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "arvados", Subsystem: "dispatchcloud", Name: "vcpus_total", Help: "Total VCPUs on all cloud VMs.", - }) + }, []string{"category"}) reg.MustRegister(wp.mVCPUs) - wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: "arvados", - Subsystem: "dispatchcloud", - Name: "vcpus_inuse", - Help: "VCPUs on cloud VMs that are running containers.", - }) - reg.MustRegister(wp.mVCPUsInuse) - wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{ + wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "arvados", Subsystem: "dispatchcloud", Name: "memory_bytes_total", Help: "Total memory on all cloud VMs.", - }) + }, []string{"category"}) reg.MustRegister(wp.mMemory) - wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: "arvados", - Subsystem: "dispatchcloud", - Name: "memory_bytes_inuse", - Help: "Memory on cloud VMs that are running containers.", - }) - reg.MustRegister(wp.mMemoryInuse) } func (wp *Pool) runMetrics() { ch := wp.Subscribe() defer wp.Unsubscribe(ch) + wp.updateMetrics() for range ch { wp.updateMetrics() } @@ -507,23 +571,38 @@ func (wp *Pool) updateMetrics() { wp.mtx.RLock() defer wp.mtx.RUnlock() - var alloc, cpu, cpuInuse, mem, memInuse int64 + instances := map[string]int64{} + price := map[string]float64{} + cpu := map[string]int64{} + mem := map[string]int64{} + var running int64 for _, wkr := range wp.workers { - cpu += int64(wkr.instType.VCPUs) - mem += int64(wkr.instType.RAM) - if len(wkr.running)+len(wkr.starting) == 0 { - continue + var cat string + switch { + case len(wkr.running)+len(wkr.starting) > 0: + cat = "inuse" + case wkr.idleBehavior == IdleBehaviorHold: + cat = "hold" + case wkr.state == StateBooting: + cat = "booting" + case wkr.state == StateUnknown: + cat = "unknown" + default: + cat = "idle" } - alloc += int64(len(wkr.running) + len(wkr.starting)) - cpuInuse += int64(wkr.instType.VCPUs) - memInuse += int64(wkr.instType.RAM) + instances[cat]++ + price[cat] += wkr.instType.Price + cpu[cat] += int64(wkr.instType.VCPUs) + mem[cat] += int64(wkr.instType.RAM) + running += int64(len(wkr.running) + len(wkr.starting)) + } + for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} { + wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat])) + wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat]) + wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat])) + wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat])) } - wp.mInstances.Set(float64(len(wp.workers))) - wp.mContainersRunning.Set(float64(alloc)) - wp.mVCPUs.Set(float64(cpu)) - wp.mMemory.Set(float64(mem)) - wp.mVCPUsInuse.Set(float64(cpuInuse)) - wp.mMemoryInuse.Set(float64(memInuse)) + wp.mContainersRunning.Set(float64(running)) } func (wp *Pool) runProbes() { @@ -542,7 +621,7 @@ func (wp *Pool) runProbes() { workers = workers[:0] wp.mtx.Lock() for id, wkr := range wp.workers { - if wkr.state == StateShutdown || wp.shutdownIfIdle(wkr) { + if wkr.state == StateShutdown || wkr.shutdownIfIdle() { continue } workers = append(workers, id) @@ -553,20 +632,12 @@ func (wp *Pool) runProbes() { wp.mtx.Lock() wkr, ok := wp.workers[id] wp.mtx.Unlock() - if !ok || wkr.state == StateShutdown { - // Deleted/shutdown while we - // were probing others + if !ok { + // Deleted while we were probing + // others continue } - select { - case wkr.probing <- struct{}{}: - go func() { - wp.probeAndUpdate(wkr) - <-wkr.probing - }() - default: - wp.logger.WithField("Instance", wkr.instance).Debug("still waiting for last probe to finish") - } + go wkr.ProbeAndUpdate() select { case <-wp.stop: return @@ -595,76 +666,52 @@ func (wp *Pool) runSync() { } } -// caller must have lock. -func (wp *Pool) shutdownIfBroken(wkr *worker, dur time.Duration) { - if wkr.state == StateHold { - return - } - label, threshold := "", wp.timeoutProbe - if wkr.state == StateBooting { - label, threshold = "new ", wp.timeoutBooting - } - if dur < threshold { - return - } - wp.logger.WithFields(logrus.Fields{ - "Instance": wkr.instance, - "Duration": dur, - "Since": wkr.probed, - "State": wkr.state, - }).Warnf("%sinstance unresponsive, shutting down", label) - wp.shutdown(wkr, wp.logger) -} - -// caller must have lock. -func (wp *Pool) shutdownIfIdle(wkr *worker) bool { - if len(wkr.running)+len(wkr.starting) > 0 || wkr.state != StateRunning { - return false - } - age := time.Since(wkr.unallocated) - if age < wp.timeoutIdle { - return false - } - logger := wp.logger.WithFields(logrus.Fields{ - "Age": age, - "Instance": wkr.instance, - }) - logger.Info("shutdown idle worker") - wp.shutdown(wkr, logger) - return true -} - // Stop synchronizing with the InstanceSet. func (wp *Pool) Stop() { wp.setupOnce.Do(wp.setup) close(wp.stop) } -// View reports status information for every worker in the pool. -func (wp *Pool) View() []View { - var r []View +// Instances returns an InstanceView for each worker in the pool, +// summarizing its current state and recent activity. +func (wp *Pool) Instances() []InstanceView { + var r []InstanceView wp.setupOnce.Do(wp.setup) wp.mtx.Lock() for _, w := range wp.workers { - r = append(r, View{ - Instance: w.instance.String(), + r = append(r, InstanceView{ + Instance: w.instance.ID(), + Address: w.instance.Address(), Price: w.instType.Price, ArvadosInstanceType: w.instType.Name, ProviderInstanceType: w.instType.ProviderType, LastContainerUUID: w.lastUUID, - Unallocated: w.unallocated, + LastBusy: w.busy, WorkerState: w.state.String(), + IdleBehavior: w.idleBehavior, }) } wp.mtx.Unlock() sort.Slice(r, func(i, j int) bool { - return strings.Compare(r[i].Instance, r[j].Instance) < 0 + return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0 }) return r } +// KillInstance destroys a cloud VM instance. It returns an error if +// the given instance does not exist. +func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error { + wkr, ok := wp.workers[id] + if !ok { + return errors.New("instance not found") + } + wkr.logger.WithField("Reason", reason).Info("shutting down") + wkr.shutdown() + return nil +} + func (wp *Pool) setup() { - wp.creating = map[arvados.InstanceType]int{} + wp.creating = map[string]createCall{} wp.exited = map[string]time.Time{} wp.workers = map[cloud.InstanceID]*worker{} wp.subscribers = map[<-chan struct{}]chan<- struct{}{} @@ -683,10 +730,14 @@ func (wp *Pool) notify() { func (wp *Pool) getInstancesAndSync() error { wp.setupOnce.Do(wp.setup) + if err := wp.instanceSet.throttleInstances.Error(); err != nil { + return err + } wp.logger.Debug("getting instance list") threshold := time.Now() - instances, err := wp.instanceSet.Instances(cloud.InstanceTags{}) + instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)}) if err != nil { + wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify) return err } wp.sync(threshold, instances) @@ -704,14 +755,17 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { notify := false for _, inst := range instances { - itTag := inst.Tags()[tagKeyInstanceType] + itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType] it, ok := wp.instanceTypes[itTag] if !ok { wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag) continue } - if wp.updateWorker(inst, it, StateUnknown) { + if wkr, isNew := wp.updateWorker(inst, it); isNew { notify = true + } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown { + wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying") + wkr.shutdown() } } @@ -720,16 +774,17 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { continue } logger := wp.logger.WithFields(logrus.Fields{ - "Instance": wkr.instance, + "Instance": wkr.instance.ID(), "WorkerState": wkr.state, }) logger.Info("instance disappeared in cloud") delete(wp.workers, id) - go wkr.executor.Close() + go wkr.Close() notify = true } if !wp.loaded { + notify = true wp.loaded = true wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list") } @@ -739,137 +794,24 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { } } -// should be called in a new goroutine -func (wp *Pool) probeAndUpdate(wkr *worker) { - logger := wp.logger.WithField("Instance", wkr.instance) - wp.mtx.Lock() - updated := wkr.updated - booted := wkr.booted - wp.mtx.Unlock() - - var ( - ctrUUIDs []string - ok bool - stderr []byte - ) - if !booted { - booted, stderr = wp.probeBooted(wkr) - wp.mtx.Lock() - if booted && !wkr.booted { - wkr.booted = booted - logger.Info("instance booted") - } else { - booted = wkr.booted - } - wp.mtx.Unlock() - } - if booted { - ctrUUIDs, ok, stderr = wp.probeRunning(wkr) - } - logger = logger.WithField("stderr", string(stderr)) - wp.mtx.Lock() - defer wp.mtx.Unlock() - if !ok { - if wkr.state == StateShutdown { - return - } - dur := time.Since(wkr.probed) - logger := logger.WithFields(logrus.Fields{ - "Duration": dur, - "State": wkr.state, - }) - if wkr.state == StateBooting { - logger.Debug("new instance not responding") - } else { - logger.Info("instance not responding") - } - wp.shutdownIfBroken(wkr, dur) - return - } - - updateTime := time.Now() - wkr.probed = updateTime - if len(ctrUUIDs) > 0 { - wkr.busy = updateTime - wkr.lastUUID = ctrUUIDs[0] - } - if wkr.state == StateShutdown || wkr.state == StateHold { - } else if booted { - if wkr.state != StateRunning { - wkr.state = StateRunning - go wp.notify() - } - } else { - wkr.state = StateBooting - } - - if updated != wkr.updated { - // Worker was updated (e.g., by starting a new - // container) after the probe began. Avoid clobbering - // those changes with the probe results. - return - } - - if len(ctrUUIDs) == 0 && len(wkr.running) > 0 { - wkr.unallocated = updateTime - } - running := map[string]struct{}{} - changed := false - for _, uuid := range ctrUUIDs { - running[uuid] = struct{}{} - if _, ok := wkr.running[uuid]; !ok { - changed = true - } - } - for uuid := range wkr.running { - if _, ok := running[uuid]; !ok { - logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended") - wp.exited[uuid] = updateTime - changed = true - } - } - if changed { - wkr.running = running - wkr.updated = updateTime - go wp.notify() - } -} - -func (wp *Pool) probeRunning(wkr *worker) (running []string, ok bool, stderr []byte) { - cmd := "crunch-run --list" - stdout, stderr, err := wkr.executor.Execute(cmd, nil) - if err != nil { - wp.logger.WithFields(logrus.Fields{ - "Instance": wkr.instance, - "Command": cmd, - "stdout": string(stdout), - "stderr": string(stderr), - }).WithError(err).Warn("probe failed") - return nil, false, stderr - } - stdout = bytes.TrimRight(stdout, "\n") - if len(stdout) == 0 { - return nil, true, stderr +func (wp *Pool) waitUntilLoaded() { + ch := wp.Subscribe() + wp.mtx.RLock() + defer wp.mtx.RUnlock() + for !wp.loaded { + wp.mtx.RUnlock() + <-ch + wp.mtx.RLock() } - return strings.Split(string(stdout), "\n"), true, stderr } -func (wp *Pool) probeBooted(wkr *worker) (ok bool, stderr []byte) { - cmd := wp.bootProbeCommand - if cmd == "" { - cmd = "true" - } - stdout, stderr, err := wkr.executor.Execute(cmd, nil) - logger := wp.logger.WithFields(logrus.Fields{ - "Instance": wkr.instance, - "Command": cmd, - "stdout": string(stdout), - "stderr": string(stderr), - }) +// Return a random string of n hexadecimal digits (n*4 random bits). n +// must be even. +func randomHex(n int) string { + buf := make([]byte, n/2) + _, err := rand.Read(buf) if err != nil { - logger.WithError(err).Debug("boot probe failed") - return false, stderr + panic(err) } - logger.Info("boot probe succeeded") - return true, stderr + return fmt.Sprintf("%x", buf) }