From 2a018b15662ae5f0b30d1d11eb2d0ffa685964e0 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Wed, 16 Jan 2019 17:09:34 -0500 Subject: [PATCH] 14325: Add IdleBehavior to worker pool. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- lib/dispatchcloud/worker/pool.go | 86 ++++++++++++++++------- lib/dispatchcloud/worker/pool_test.go | 45 ++++++++++-- lib/dispatchcloud/worker/worker.go | 98 +++++++++++++++++++-------- 3 files changed, 173 insertions(+), 56 deletions(-) diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go index 25b57c6e5b..e69351317f 100644 --- a/lib/dispatchcloud/worker/pool.go +++ b/lib/dispatchcloud/worker/pool.go @@ -5,6 +5,7 @@ package worker import ( + "errors" "io" "sort" "strings" @@ -19,18 +20,19 @@ import ( const ( tagKeyInstanceType = "InstanceType" - tagKeyHold = "Hold" + tagKeyIdleBehavior = "IdleBehavior" ) // An InstanceView shows a worker's current state and recent activity. type InstanceView struct { - Instance string + Instance cloud.InstanceID Price float64 ArvadosInstanceType string ProviderInstanceType string LastContainerUUID string LastBusy time.Time WorkerState string + IdleBehavior IdleBehavior } // An Executor executes shell commands on a remote host. @@ -173,7 +175,8 @@ func (wp *Pool) Unsubscribe(ch <-chan struct{}) { } // Unallocated returns the number of unallocated (creating + booting + -// idle + unknown) workers for each instance type. +// idle + unknown) workers for each instance type. Workers in +// hold/drain mode are not included. func (wp *Pool) Unallocated() map[arvados.InstanceType]int { wp.setupOnce.Do(wp.setup) wp.mtx.RLock() @@ -184,7 +187,7 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int { creating[it] = len(times) } for _, wkr := range wp.workers { - if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) { + if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) || wkr.idleBehavior != IdleBehaviorRun { continue } it := wkr.instType @@ -222,7 +225,10 @@ func (wp *Pool) Create(it arvados.InstanceType) error { if time.Now().Before(wp.atQuotaUntil) { return wp.atQuotaErr } - tags := cloud.InstanceTags{tagKeyInstanceType: it.Name} + tags := cloud.InstanceTags{ + tagKeyInstanceType: it.Name, + tagKeyIdleBehavior: string(IdleBehaviorRun), + } now := time.Now() wp.creating[it] = append(wp.creating[it], now) go func() { @@ -259,6 +265,21 @@ func (wp *Pool) AtQuota() bool { return time.Now().Before(wp.atQuotaUntil) } +// SetIdleBehavior determines how the indicated instance will behave +// when it has no containers running. +func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error { + wp.mtx.Lock() + defer wp.mtx.Unlock() + wkr, ok := wp.workers[id] + if !ok { + return errors.New("requested instance does not exist") + } + wkr.idleBehavior = idleBehavior + wkr.saveTags() + wkr.shutdownIfIdle() + return nil +} + // Add or update worker attached to the given instance. Use // initialState if a new worker is created. // @@ -274,32 +295,46 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi if initialState == StateBooting && wkr.state == StateUnknown { wkr.state = StateBooting } + wkr.saveTags() return wkr, false } - if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" { - initialState = StateHold + + // If an instance has a valid IdleBehavior tag when it first + // appears, initialize the new worker accordingly (this is how + // we restore IdleBehavior that was set by a prior dispatch + // process); otherwise, default to "run". After this, + // wkr.idleBehavior is the source of truth, and will only be + // changed via SetIdleBehavior(). + idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior]) + if !validIdleBehavior[idleBehavior] { + idleBehavior = IdleBehaviorRun } + logger := wp.logger.WithFields(logrus.Fields{ "InstanceType": it.Name, "Instance": inst, }) - logger.WithField("State", initialState).Infof("instance appeared in cloud") + logger.WithFields(logrus.Fields{ + "State": initialState, + "IdleBehavior": idleBehavior, + }).Infof("instance appeared in cloud") now := time.Now() wkr := &worker{ - mtx: &wp.mtx, - wp: wp, - logger: logger, - executor: wp.newExecutor(inst), - state: initialState, - instance: inst, - instType: it, - appeared: now, - probed: now, - busy: now, - updated: now, - running: make(map[string]struct{}), - starting: make(map[string]struct{}), - probing: make(chan struct{}, 1), + mtx: &wp.mtx, + wp: wp, + logger: logger, + executor: wp.newExecutor(inst), + state: initialState, + idleBehavior: idleBehavior, + instance: inst, + instType: it, + appeared: now, + probed: now, + busy: now, + updated: now, + running: make(map[string]struct{}), + starting: make(map[string]struct{}), + probing: make(chan struct{}, 1), } wp.workers[id] = wkr return wkr, true @@ -322,7 +357,7 @@ func (wp *Pool) Shutdown(it arvados.InstanceType) bool { // TODO: shutdown the worker with the longest idle // time (Idle) or the earliest create time (Booting) for _, wkr := range wp.workers { - if wkr.state == tryState && wkr.instType == it { + if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it { logger.WithField("Instance", wkr.instance).Info("shutting down") wkr.shutdown() return true @@ -590,18 +625,19 @@ func (wp *Pool) Instances() []InstanceView { wp.mtx.Lock() for _, w := range wp.workers { r = append(r, InstanceView{ - Instance: w.instance.String(), + Instance: w.instance.ID(), Price: w.instType.Price, ArvadosInstanceType: w.instType.Name, ProviderInstanceType: w.instType.ProviderType, LastContainerUUID: w.lastUUID, LastBusy: w.busy, WorkerState: w.state.String(), + IdleBehavior: w.idleBehavior, }) } wp.mtx.Unlock() sort.Slice(r, func(i, j int) bool { - return strings.Compare(r[i].Instance, r[j].Instance) < 0 + return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0 }) return r } diff --git a/lib/dispatchcloud/worker/pool_test.go b/lib/dispatchcloud/worker/pool_test.go index 5c9fbb38c2..7e84613eb1 100644 --- a/lib/dispatchcloud/worker/pool_test.go +++ b/lib/dispatchcloud/worker/pool_test.go @@ -47,6 +47,7 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) { lameInstanceSet := &test.LameInstanceSet{Hold: make(chan bool)} type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01} type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02} + type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04} pool := &Pool{ logger: logrus.StandardLogger(), newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} }, @@ -54,6 +55,7 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) { instanceTypes: arvados.InstanceTypeMap{ type1.Name: type1, type2.Name: type2, + type3.Name: type3, }, } notify := pool.Subscribe() @@ -63,23 +65,39 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) { c.Check(pool.Unallocated()[type1], check.Equals, 0) c.Check(pool.Unallocated()[type2], check.Equals, 0) + c.Check(pool.Unallocated()[type3], check.Equals, 0) pool.Create(type2) pool.Create(type1) pool.Create(type2) + pool.Create(type3) c.Check(pool.Unallocated()[type1], check.Equals, 1) c.Check(pool.Unallocated()[type2], check.Equals, 2) + c.Check(pool.Unallocated()[type3], check.Equals, 1) // Unblock the pending Create calls. - go lameInstanceSet.Release(3) + go lameInstanceSet.Release(4) // Wait for each instance to either return from its Create // call, or show up in a poll. suite.wait(c, pool, notify, func() bool { pool.mtx.RLock() defer pool.mtx.RUnlock() - return len(pool.workers) == 3 + return len(pool.workers) == 4 }) + // Place type3 node on admin-hold + for _, instv := range pool.Instances() { + if instv.ArvadosInstanceType == type3.Name { + pool.SetIdleBehavior(instv.Instance, IdleBehaviorHold) + break + } + } + c.Check(pool.Shutdown(type3), check.Equals, false) + suite.wait(c, pool, notify, func() bool { + return pool.Unallocated()[type3] == 0 + }) + + // Shutdown both type2 nodes c.Check(pool.Shutdown(type2), check.Equals, true) suite.wait(c, pool, notify, func() bool { return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 1 @@ -99,16 +117,35 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) { } break } + + // Shutdown type1 node c.Check(pool.Shutdown(type1), check.Equals, true) suite.wait(c, pool, notify, func() bool { - return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0 + return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0 && pool.Unallocated()[type3] == 0 }) select { case <-notify2: case <-time.After(time.Second): c.Error("notify did not receive") } - go lameInstanceSet.Release(3) // unblock Destroy calls + + // Place type3 node on admin-drain so it shuts down right away + for _, instv := range pool.Instances() { + if instv.ArvadosInstanceType == type3.Name { + pool.SetIdleBehavior(instv.Instance, IdleBehaviorDrain) + break + } + } + suite.wait(c, pool, notify, func() bool { + return pool.Unallocated()[type3] == 0 + }) + + go lameInstanceSet.Release(4) // unblock Destroy calls + + suite.wait(c, pool, notify, func() bool { + pool.getInstancesAndSync() + return len(pool.Instances()) == 0 + }) } func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, ready func() bool) { diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go index 1adc7c29fd..1c72cc4b7c 100644 --- a/lib/dispatchcloud/worker/worker.go +++ b/lib/dispatchcloud/worker/worker.go @@ -15,6 +15,11 @@ import ( "github.com/sirupsen/logrus" ) +const ( + // TODO: configurable + maxPingFailTime = 10 * time.Minute +) + // State indicates whether a worker is available to do work, and (if // not) whether/when it is expected to become ready. type State int @@ -25,12 +30,6 @@ const ( StateIdle // instance booted, no containers are running StateRunning // instance is running one or more containers StateShutdown // worker has stopped monitoring the instance - StateHold // running, but not available to run new containers -) - -const ( - // TODO: configurable - maxPingFailTime = 10 * time.Minute ) var stateString = map[State]string{ @@ -39,7 +38,6 @@ var stateString = map[State]string{ StateIdle: "idle", StateRunning: "running", StateShutdown: "shutdown", - StateHold: "hold", } // String implements fmt.Stringer. @@ -53,26 +51,42 @@ func (s State) MarshalText() ([]byte, error) { return []byte(stateString[s]), nil } +// IdleBehavior indicates the behavior desired when a node becomes idle. +type IdleBehavior string + +const ( + IdleBehaviorRun IdleBehavior = "run" // run containers, or shutdown on idle timeout + IdleBehaviorHold = "hold" // don't shutdown or run more containers + IdleBehaviorDrain = "drain" // shutdown immediately when idle +) + +var validIdleBehavior = map[IdleBehavior]bool{ + IdleBehaviorRun: true, + IdleBehaviorHold: true, + IdleBehaviorDrain: true, +} + type worker struct { logger logrus.FieldLogger executor Executor wp *Pool - mtx sync.Locker // must be wp's Locker. - state State - instance cloud.Instance - instType arvados.InstanceType - vcpus int64 - memory int64 - appeared time.Time - probed time.Time - updated time.Time - busy time.Time - destroyed time.Time - lastUUID string - running map[string]struct{} // remember to update state idle<->running when this changes - starting map[string]struct{} // remember to update state idle<->running when this changes - probing chan struct{} + mtx sync.Locker // must be wp's Locker. + state State + idleBehavior IdleBehavior + instance cloud.Instance + instType arvados.InstanceType + vcpus int64 + memory int64 + appeared time.Time + probed time.Time + updated time.Time + busy time.Time + destroyed time.Time + lastUUID string + running map[string]struct{} // remember to update state idle<->running when this changes + starting map[string]struct{} // remember to update state idle<->running when this changes + probing chan struct{} } // caller must have lock. @@ -275,7 +289,7 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) { // caller must have lock. func (wkr *worker) shutdownIfBroken(dur time.Duration) { - if wkr.state == StateHold { + if wkr.idleBehavior == IdleBehaviorHold { return } label, threshold := "", wkr.wp.timeoutProbe @@ -295,19 +309,25 @@ func (wkr *worker) shutdownIfBroken(dur time.Duration) { // caller must have lock. func (wkr *worker) shutdownIfIdle() bool { - if wkr.state != StateIdle { + if wkr.idleBehavior == IdleBehaviorHold { + return false + } + if !(wkr.state == StateIdle || (wkr.state == StateBooting && wkr.idleBehavior == IdleBehaviorDrain)) { return false } age := time.Since(wkr.busy) - if age < wkr.wp.timeoutIdle { + if wkr.idleBehavior != IdleBehaviorDrain && age < wkr.wp.timeoutIdle { return false } - wkr.logger.WithField("Age", age).Info("shutdown idle worker") + wkr.logger.WithFields(logrus.Fields{ + "Age": age, + "IdleBehavior": wkr.idleBehavior, + }).Info("shutdown idle worker") wkr.shutdown() return true } -// caller must have lock +// caller must have lock. func (wkr *worker) shutdown() { now := time.Now() wkr.updated = now @@ -322,3 +342,27 @@ func (wkr *worker) shutdown() { } }() } + +// Save worker tags to cloud provider metadata, if they don't already +// match. Caller must have lock. +func (wkr *worker) saveTags() { + instance := wkr.instance + have := instance.Tags() + want := cloud.InstanceTags{ + tagKeyInstanceType: wkr.instType.Name, + tagKeyIdleBehavior: string(wkr.idleBehavior), + } + go func() { + for k, v := range want { + if v == have[k] { + continue + } + err := instance.SetTags(want) + if err != nil { + wkr.wp.logger.WithField("Instance", instance).WithError(err).Warnf("error updating tags") + } + break + + } + }() +} -- 2.39.5