From 17479bd75a29c52470abe0049cb447e114eb39e9 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Tue, 30 Oct 2018 13:50:30 -0400 Subject: [PATCH] 14360: Encapsulate scheduler object. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- lib/dispatchcloud/container/queue.go | 59 ++++- lib/dispatchcloud/dispatcher.go | 62 ++--- lib/dispatchcloud/dispatcher_test.go | 1 + .../scheduler/fix_stale_locks.go | 24 +- lib/dispatchcloud/scheduler/interfaces.go | 3 + .../scheduler/{map.go => run_queue.go} | 60 ++--- .../{map_test.go => run_queue_test.go} | 232 ++++++++++-------- lib/dispatchcloud/scheduler/scheduler.go | 107 ++++++++ lib/dispatchcloud/scheduler/sync.go | 29 +-- lib/dispatchcloud/test/queue.go | 48 +++- 10 files changed, 404 insertions(+), 221 deletions(-) rename lib/dispatchcloud/scheduler/{map.go => run_queue.go} (65%) rename lib/dispatchcloud/scheduler/{map_test.go => run_queue_test.go} (53%) create mode 100644 lib/dispatchcloud/scheduler/scheduler.go diff --git a/lib/dispatchcloud/container/queue.go b/lib/dispatchcloud/container/queue.go index e0e1cd0cd1..432f4d4884 100644 --- a/lib/dispatchcloud/container/queue.go +++ b/lib/dispatchcloud/container/queue.go @@ -69,6 +69,9 @@ type Queue struct { // When no network update is in progress, this protection is // not needed, and dontupdate is nil. dontupdate map[string]struct{} + + // active notification subscribers (see Subscribe) + subscribers map[<-chan struct{}]chan struct{} } // NewQueue returns a new Queue. When a new container appears in the @@ -76,11 +79,46 @@ type Queue struct { // assign an appropriate arvados.InstanceType for the queue entry. func NewQueue(logger logrus.FieldLogger, reg *prometheus.Registry, chooseType typeChooser, client APIClient) *Queue { return &Queue{ - logger: logger, - reg: reg, - chooseType: chooseType, - client: client, - current: map[string]QueueEnt{}, + logger: logger, + reg: reg, + chooseType: chooseType, + client: client, + current: map[string]QueueEnt{}, + subscribers: map[<-chan struct{}]chan struct{}{}, + } +} + +// Subscribe returns a channel that becomes ready to receive when an +// entry in the Queue is updated. +// +// ch := q.Subscribe() +// defer q.Unsubscribe(ch) +// for range ch { +// // ... +// } +func (cq *Queue) Subscribe() <-chan struct{} { + cq.mtx.Lock() + defer cq.mtx.Unlock() + ch := make(chan struct{}, 1) + cq.subscribers[ch] = ch + return ch +} + +// Unsubscribe stops sending updates to the given channel. See +// Subscribe. +func (cq *Queue) Unsubscribe(ch <-chan struct{}) { + cq.mtx.Lock() + defer cq.mtx.Unlock() + delete(cq.subscribers, ch) +} + +// Caller must have lock. +func (cq *Queue) notify() { + for _, ch := range cq.subscribers { + select { + case ch <- struct{}{}: + default: + } } } @@ -167,6 +205,7 @@ func (cq *Queue) Update() error { } cq.dontupdate = nil cq.updated = updateStarted + cq.notify() return nil } @@ -192,9 +231,16 @@ func (cq *Queue) Unlock(uuid string) error { // Cancel cancels the given container. func (cq *Queue) Cancel(uuid string) error { - return cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{ + err := cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{ "container": {"state": arvados.ContainerStateCancelled}, }) + if err != nil { + return err + } + cq.mtx.Lock() + defer cq.mtx.Unlock() + cq.notify() + return nil } func (cq *Queue) apiUpdate(uuid, action string) error { @@ -215,6 +261,7 @@ func (cq *Queue) apiUpdate(uuid, action string) error { ent.Container.State, ent.Container.Priority, ent.Container.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID cq.current[uuid] = ent } + cq.notify() return nil } diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go index c441dc6fa7..97aacf6044 100644 --- a/lib/dispatchcloud/dispatcher.go +++ b/lib/dispatchcloud/dispatcher.go @@ -28,14 +28,10 @@ import ( ) const ( - defaultPollInterval = time.Second + defaultPollInterval = time.Second + defaultStaleLockTimeout = time.Minute ) -type containerQueue interface { - scheduler.ContainerQueue - Update() error -} - type pool interface { scheduler.WorkerPool Instances() []worker.InstanceView @@ -45,14 +41,13 @@ type dispatcher struct { Cluster *arvados.Cluster InstanceSetID cloud.InstanceSetID - logger logrus.FieldLogger - reg *prometheus.Registry - instanceSet cloud.InstanceSet - pool pool - queue containerQueue - httpHandler http.Handler - pollInterval time.Duration - sshKey ssh.Signer + logger logrus.FieldLogger + reg *prometheus.Registry + instanceSet cloud.InstanceSet + pool pool + queue scheduler.ContainerQueue + httpHandler http.Handler + sshKey ssh.Signer setupOnce sync.Once stop chan struct{} @@ -140,39 +135,24 @@ func (disp *dispatcher) initialize() { mux.Handle("/metrics", metricsH) mux.Handle("/metrics.json", metricsH) disp.httpHandler = auth.RequireLiteralToken(disp.Cluster.ManagementToken, mux) - - if d := disp.Cluster.Dispatch.PollInterval; d > 0 { - disp.pollInterval = time.Duration(d) - } else { - disp.pollInterval = defaultPollInterval - } } func (disp *dispatcher) run() { defer disp.instanceSet.Stop() - t0 := time.Now() - disp.logger.Infof("FixStaleLocks starting.") - scheduler.FixStaleLocks(disp.logger, disp.queue, disp.pool, time.Duration(disp.Cluster.Dispatch.StaleLockTimeout)) - disp.logger.Infof("FixStaleLocks finished (%s), starting scheduling.", time.Since(t0)) - - wp := disp.pool.Subscribe() - defer disp.pool.Unsubscribe(wp) - poll := time.NewTicker(disp.pollInterval) - for { - scheduler.Map(disp.logger, disp.queue, disp.pool) - scheduler.Sync(disp.logger, disp.queue, disp.pool) - select { - case <-disp.stop: - return - case <-wp: - case <-poll.C: - err := disp.queue.Update() - if err != nil { - disp.logger.Errorf("error updating queue: %s", err) - } - } + staleLockTimeout := time.Duration(disp.Cluster.Dispatch.StaleLockTimeout) + if staleLockTimeout == 0 { + staleLockTimeout = defaultStaleLockTimeout } + pollInterval := time.Duration(disp.Cluster.Dispatch.PollInterval) + if pollInterval <= 0 { + pollInterval = defaultPollInterval + } + sched := scheduler.New(disp.logger, disp.queue, disp.pool, staleLockTimeout, pollInterval) + sched.Start() + defer sched.Stop() + + <-disp.stop } // Management API: all active and queued containers. diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go index 362fb4942e..248986bf3c 100644 --- a/lib/dispatchcloud/dispatcher_test.go +++ b/lib/dispatchcloud/dispatcher_test.go @@ -199,6 +199,7 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) { PrivateKey: dispatchprivraw, PollInterval: arvados.Duration(5 * time.Millisecond), ProbeInterval: arvados.Duration(5 * time.Millisecond), + StaleLockTimeout: arvados.Duration(5 * time.Millisecond), MaxProbesPerSecond: 1000, }, InstanceTypes: arvados.InstanceTypeMap{ diff --git a/lib/dispatchcloud/scheduler/fix_stale_locks.go b/lib/dispatchcloud/scheduler/fix_stale_locks.go index e9644aed21..985941090d 100644 --- a/lib/dispatchcloud/scheduler/fix_stale_locks.go +++ b/lib/dispatchcloud/scheduler/fix_stale_locks.go @@ -9,17 +9,17 @@ import ( "git.curoverse.com/arvados.git/lib/dispatchcloud/worker" "git.curoverse.com/arvados.git/sdk/go/arvados" - "github.com/Sirupsen/logrus" ) -// FixStaleLocks waits for any already-locked containers (i.e., locked +// fixStaleLocks waits for any already-locked containers (i.e., locked // by a prior dispatcher process) to appear on workers as the worker // pool recovers its state. It unlocks any that still remain when all -// workers are recovered or shutdown, or its timer expires. -func FixStaleLocks(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, limit time.Duration) { - wp := pool.Subscribe() - defer pool.Unsubscribe(wp) - timeout := time.NewTimer(limit) +// workers are recovered or shutdown, or its timer +// (sch.staleLockTimeout) expires. +func (sch *Scheduler) fixStaleLocks() { + wp := sch.pool.Subscribe() + defer sch.pool.Unsubscribe(wp) + timeout := time.NewTimer(sch.staleLockTimeout) waiting: for { unlock := false @@ -28,15 +28,15 @@ waiting: // If all workers have been contacted, unlock // containers that aren't claimed by any // worker. - unlock = pool.Workers()[worker.StateUnknown] == 0 + unlock = sch.pool.Workers()[worker.StateUnknown] == 0 case <-timeout.C: // Give up and unlock the containers, even // though they might be working. unlock = true } - running := pool.Running() - qEntries, _ := queue.Entries() + running := sch.pool.Running() + qEntries, _ := sch.queue.Entries() for uuid, ent := range qEntries { if ent.Container.State != arvados.ContainerStateLocked { continue @@ -47,9 +47,9 @@ waiting: if !unlock { continue waiting } - err := queue.Unlock(uuid) + err := sch.queue.Unlock(uuid) if err != nil { - logger.Warnf("Unlock %s: %s", uuid, err) + sch.logger.Warnf("Unlock %s: %s", uuid, err) } } return diff --git a/lib/dispatchcloud/scheduler/interfaces.go b/lib/dispatchcloud/scheduler/interfaces.go index bdb8678e9f..467247ad9b 100644 --- a/lib/dispatchcloud/scheduler/interfaces.go +++ b/lib/dispatchcloud/scheduler/interfaces.go @@ -21,6 +21,9 @@ type ContainerQueue interface { Cancel(uuid string) error Forget(uuid string) Get(uuid string) (arvados.Container, bool) + Subscribe() <-chan struct{} + Unsubscribe(<-chan struct{}) + Update() error } // A WorkerPool asynchronously starts and stops worker VMs, and starts diff --git a/lib/dispatchcloud/scheduler/map.go b/lib/dispatchcloud/scheduler/run_queue.go similarity index 65% rename from lib/dispatchcloud/scheduler/map.go rename to lib/dispatchcloud/scheduler/run_queue.go index aa92d04339..9fc1a16580 100644 --- a/lib/dispatchcloud/scheduler/map.go +++ b/lib/dispatchcloud/scheduler/run_queue.go @@ -2,11 +2,6 @@ // // SPDX-License-Identifier: AGPL-3.0 -// Package scheduler uses a resizable worker pool to execute -// containers in priority order. -// -// Scheduler functions must not be called concurrently using the same -// queue or pool. package scheduler import ( @@ -18,28 +13,8 @@ import ( "github.com/Sirupsen/logrus" ) -// Map maps queued containers onto unallocated workers in priority -// order, creating new workers if needed. It locks containers that can -// be mapped onto existing/pending workers, and starts them if -// possible. -// -// Map unlocks any containers that are locked but can't be -// mapped. (For example, this happens when the cloud provider reaches -// quota/capacity and a previously mappable container's priority is -// surpassed by a newer container.) -// -// If it encounters errors while creating new workers, Map shuts down -// idle workers, in case they are consuming quota. -// -// Map should not be called without first calling FixStaleLocks. -// -// FixStaleLocks() -// for { -// Map() -// Sync() -// } -func Map(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool) { - unsorted, _ := queue.Entries() +func (sch *Scheduler) runQueue() { + unsorted, _ := sch.queue.Entries() sorted := make([]container.QueueEnt, 0, len(unsorted)) for _, ent := range unsorted { sorted = append(sorted, ent) @@ -48,20 +23,20 @@ func Map(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool) { return sorted[i].Container.Priority > sorted[j].Container.Priority }) - running := pool.Running() - unalloc := pool.Unallocated() + running := sch.pool.Running() + unalloc := sch.pool.Unallocated() - logger.WithFields(logrus.Fields{ + sch.logger.WithFields(logrus.Fields{ "Containers": len(sorted), "Processes": len(running), - }).Debug("mapping") + }).Debug("runQueue") dontstart := map[arvados.InstanceType]bool{} var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota for i, ctr := range sorted { ctr, it := ctr.Container, ctr.InstanceType - logger := logger.WithFields(logrus.Fields{ + logger := sch.logger.WithFields(logrus.Fields{ "ContainerUUID": ctr.UUID, "InstanceType": it.Name, }) @@ -69,19 +44,20 @@ func Map(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool) { continue } if ctr.State == arvados.ContainerStateQueued { - logger.Debugf("locking") - if unalloc[it] < 1 && pool.AtQuota() { + if unalloc[it] < 1 && sch.pool.AtQuota() { + logger.Debugf("not locking: AtQuota and no unalloc workers") overquota = sorted[i:] break } - err := queue.Lock(ctr.UUID) + logger.Debugf("locking") + err := sch.queue.Lock(ctr.UUID) if err != nil { logger.WithError(err).Warnf("lock error") unalloc[it]++ continue } var ok bool - ctr, ok = queue.Get(ctr.UUID) + ctr, ok = sch.queue.Get(ctr.UUID) if !ok { logger.Error("(BUG?) container disappeared from queue after Lock succeeded") continue @@ -95,12 +71,12 @@ func Map(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool) { } if unalloc[it] < 1 { logger.Info("creating new instance") - err := pool.Create(it) + err := sch.pool.Create(it) if err != nil { if _, ok := err.(cloud.QuotaError); !ok { logger.WithError(err).Warn("error creating worker") } - queue.Unlock(ctr.UUID) + sch.queue.Unlock(ctr.UUID) // Don't let lower-priority containers // starve this one by using keeping // idle workers alive on different @@ -117,7 +93,7 @@ func Map(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool) { // higher-priority container on the same // instance type. Don't let this one sneak in // ahead of it. - } else if pool.StartContainer(it, ctr) { + } else if sch.pool.StartContainer(it, ctr) { unalloc[it]-- } else { dontstart[it] = true @@ -130,9 +106,9 @@ func Map(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool) { for _, ctr := range overquota { ctr := ctr.Container if ctr.State == arvados.ContainerStateLocked { - logger := logger.WithField("ContainerUUID", ctr.UUID) + logger := sch.logger.WithField("ContainerUUID", ctr.UUID) logger.Debug("unlock because pool capacity is used by higher priority containers") - err := queue.Unlock(ctr.UUID) + err := sch.queue.Unlock(ctr.UUID) if err != nil { logger.WithError(err).Warn("error unlocking") } @@ -144,7 +120,7 @@ func Map(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool) { if n < 1 { continue } - pool.Shutdown(it) + sch.pool.Shutdown(it) } } } diff --git a/lib/dispatchcloud/scheduler/map_test.go b/lib/dispatchcloud/scheduler/run_queue_test.go similarity index 53% rename from lib/dispatchcloud/scheduler/map_test.go rename to lib/dispatchcloud/scheduler/run_queue_test.go index f30520053b..e5e8c7ecfa 100644 --- a/lib/dispatchcloud/scheduler/map_test.go +++ b/lib/dispatchcloud/scheduler/run_queue_test.go @@ -6,10 +6,8 @@ package scheduler import ( "errors" - "fmt" "time" - "git.curoverse.com/arvados.git/lib/dispatchcloud/container" "git.curoverse.com/arvados.git/lib/dispatchcloud/test" "git.curoverse.com/arvados.git/lib/dispatchcloud/worker" "git.curoverse.com/arvados.git/sdk/go/arvados" @@ -20,14 +18,6 @@ import ( var ( logger = logrus.StandardLogger() - // arbitrary example instance types - types = func() (r []arvados.InstanceType) { - for i := 0; i < 16; i++ { - r = append(r, test.InstanceType(i)) - } - return - }() - // arbitrary example container UUIDs uuids = func() (r []string) { for i := 0; i < 16; i++ { @@ -37,38 +27,6 @@ var ( }() ) -type stubQueue struct { - ents map[string]container.QueueEnt -} - -func (q *stubQueue) Entries() (map[string]container.QueueEnt, time.Time) { - return q.ents, time.Now() -} -func (q *stubQueue) Lock(uuid string) error { - return q.setState(uuid, arvados.ContainerStateLocked) -} -func (q *stubQueue) Unlock(uuid string) error { - return q.setState(uuid, arvados.ContainerStateQueued) -} -func (q *stubQueue) Cancel(uuid string) error { - return q.setState(uuid, arvados.ContainerStateCancelled) -} -func (q *stubQueue) Forget(uuid string) { -} -func (q *stubQueue) Get(uuid string) (arvados.Container, bool) { - ent, ok := q.ents[uuid] - return ent.Container, ok -} -func (q *stubQueue) setState(uuid string, state arvados.ContainerState) error { - ent, ok := q.ents[uuid] - if !ok { - return fmt.Errorf("no such ent: %q", uuid) - } - ent.Container.State = state - q.ents[uuid] = ent - return nil -} - type stubQuotaError struct { error } @@ -135,47 +93,71 @@ var _ = check.Suite(&SchedulerSuite{}) type SchedulerSuite struct{} -// Map priority=4 container to idle node. Create a new instance for +// Assign priority=4 container to idle node. Create a new instance for // the priority=3 container. Don't try to start any priority<3 // containers because priority=3 container didn't start // immediately. Don't try to create any other nodes after the failed // create. -func (*SchedulerSuite) TestMapIdle(c *check.C) { - queue := stubQueue{ - ents: map[string]container.QueueEnt{ - uuids[1]: { - Container: arvados.Container{UUID: uuids[1], Priority: 1, State: arvados.ContainerStateQueued}, - InstanceType: types[1], +func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) { + queue := test.Queue{ + ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) { + return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil + }, + Containers: []arvados.Container{ + { + UUID: test.ContainerUUID(1), + Priority: 1, + State: arvados.ContainerStateQueued, + RuntimeConstraints: arvados.RuntimeConstraints{ + VCPUs: 1, + RAM: 1 << 30, + }, }, - uuids[2]: { - Container: arvados.Container{UUID: uuids[2], Priority: 2, State: arvados.ContainerStateQueued}, - InstanceType: types[1], + { + UUID: test.ContainerUUID(2), + Priority: 2, + State: arvados.ContainerStateQueued, + RuntimeConstraints: arvados.RuntimeConstraints{ + VCPUs: 1, + RAM: 1 << 30, + }, }, - uuids[3]: { - Container: arvados.Container{UUID: uuids[3], Priority: 3, State: arvados.ContainerStateQueued}, - InstanceType: types[1], + { + UUID: test.ContainerUUID(3), + Priority: 3, + State: arvados.ContainerStateQueued, + RuntimeConstraints: arvados.RuntimeConstraints{ + VCPUs: 1, + RAM: 1 << 30, + }, }, - uuids[4]: { - Container: arvados.Container{UUID: uuids[4], Priority: 4, State: arvados.ContainerStateQueued}, - InstanceType: types[1], + { + UUID: test.ContainerUUID(4), + Priority: 4, + State: arvados.ContainerStateQueued, + RuntimeConstraints: arvados.RuntimeConstraints{ + VCPUs: 1, + RAM: 1 << 30, + }, }, }, } + queue.Update() pool := stubPool{ unalloc: map[arvados.InstanceType]int{ - types[1]: 1, - types[2]: 2, + test.InstanceType(1): 1, + test.InstanceType(2): 2, }, idle: map[arvados.InstanceType]int{ - types[1]: 1, - types[2]: 2, + test.InstanceType(1): 1, + test.InstanceType(2): 2, }, running: map[string]time.Time{}, canCreate: 1, } - Map(logger, &queue, &pool) - c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{types[1]}) - c.Check(pool.starts, check.DeepEquals, []string{uuids[4], uuids[3]}) + New(logger, &queue, &pool, time.Millisecond, time.Millisecond).runQueue() + c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1)}) + c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4), test.ContainerUUID(3)}) c.Check(pool.running, check.HasLen, 1) for uuid := range pool.running { c.Check(uuid, check.Equals, uuids[4]) @@ -184,31 +166,43 @@ func (*SchedulerSuite) TestMapIdle(c *check.C) { // Shutdown some nodes if Create() fails -- and without even calling // Create(), if AtQuota() is true. -func (*SchedulerSuite) TestMapShutdownAtQuota(c *check.C) { +func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) { for quota := 0; quota < 2; quota++ { - shouldCreate := types[1 : 1+quota] - queue := stubQueue{ - ents: map[string]container.QueueEnt{ - uuids[1]: { - Container: arvados.Container{UUID: uuids[1], Priority: 1, State: arvados.ContainerStateQueued}, - InstanceType: types[1], + shouldCreate := []arvados.InstanceType{} + for i := 1; i < 1+quota; i++ { + shouldCreate = append(shouldCreate, test.InstanceType(i)) + } + queue := test.Queue{ + ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) { + return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil + }, + Containers: []arvados.Container{ + { + UUID: test.ContainerUUID(1), + Priority: 1, + State: arvados.ContainerStateQueued, + RuntimeConstraints: arvados.RuntimeConstraints{ + VCPUs: 1, + RAM: 1 << 30, + }, }, }, } + queue.Update() pool := stubPool{ atQuota: quota == 0, unalloc: map[arvados.InstanceType]int{ - types[2]: 2, + test.InstanceType(2): 2, }, idle: map[arvados.InstanceType]int{ - types[2]: 2, + test.InstanceType(2): 2, }, running: map[string]time.Time{}, creates: []arvados.InstanceType{}, starts: []string{}, canCreate: 0, } - Map(logger, &queue, &pool) + New(logger, &queue, &pool, time.Millisecond, time.Millisecond).runQueue() c.Check(pool.creates, check.DeepEquals, shouldCreate) c.Check(pool.starts, check.DeepEquals, []string{}) c.Check(pool.shutdowns, check.Not(check.Equals), 0) @@ -217,55 +211,89 @@ func (*SchedulerSuite) TestMapShutdownAtQuota(c *check.C) { // Start lower-priority containers while waiting for new/existing // workers to come up for higher-priority containers. -func (*SchedulerSuite) TestMapStartWhileCreating(c *check.C) { +func (*SchedulerSuite) TestStartWhileCreating(c *check.C) { pool := stubPool{ unalloc: map[arvados.InstanceType]int{ - types[1]: 1, - types[2]: 1, + test.InstanceType(1): 1, + test.InstanceType(2): 1, }, idle: map[arvados.InstanceType]int{ - types[1]: 1, - types[2]: 1, + test.InstanceType(1): 1, + test.InstanceType(2): 1, }, running: map[string]time.Time{}, canCreate: 2, } - queue := stubQueue{ - ents: map[string]container.QueueEnt{ - uuids[1]: { + queue := test.Queue{ + ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) { + return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil + }, + Containers: []arvados.Container{ + { // create a new worker - Container: arvados.Container{UUID: uuids[1], Priority: 1, State: arvados.ContainerStateQueued}, - InstanceType: types[1], + UUID: test.ContainerUUID(1), + Priority: 1, + State: arvados.ContainerStateQueued, + RuntimeConstraints: arvados.RuntimeConstraints{ + VCPUs: 1, + RAM: 1 << 30, + }, }, - uuids[2]: { + { // tentatively map to unalloc worker - Container: arvados.Container{UUID: uuids[2], Priority: 2, State: arvados.ContainerStateQueued}, - InstanceType: types[1], + UUID: test.ContainerUUID(2), + Priority: 2, + State: arvados.ContainerStateQueued, + RuntimeConstraints: arvados.RuntimeConstraints{ + VCPUs: 1, + RAM: 1 << 30, + }, }, - uuids[3]: { + { // start now on idle worker - Container: arvados.Container{UUID: uuids[3], Priority: 3, State: arvados.ContainerStateQueued}, - InstanceType: types[1], + UUID: test.ContainerUUID(3), + Priority: 3, + State: arvados.ContainerStateQueued, + RuntimeConstraints: arvados.RuntimeConstraints{ + VCPUs: 1, + RAM: 1 << 30, + }, }, - uuids[4]: { + { // create a new worker - Container: arvados.Container{UUID: uuids[4], Priority: 4, State: arvados.ContainerStateQueued}, - InstanceType: types[2], + UUID: test.ContainerUUID(4), + Priority: 4, + State: arvados.ContainerStateQueued, + RuntimeConstraints: arvados.RuntimeConstraints{ + VCPUs: 2, + RAM: 2 << 30, + }, }, - uuids[5]: { + { // tentatively map to unalloc worker - Container: arvados.Container{UUID: uuids[5], Priority: 5, State: arvados.ContainerStateQueued}, - InstanceType: types[2], + UUID: test.ContainerUUID(5), + Priority: 5, + State: arvados.ContainerStateQueued, + RuntimeConstraints: arvados.RuntimeConstraints{ + VCPUs: 2, + RAM: 2 << 30, + }, }, - uuids[6]: { + { // start now on idle worker - Container: arvados.Container{UUID: uuids[6], Priority: 6, State: arvados.ContainerStateQueued}, - InstanceType: types[2], + UUID: test.ContainerUUID(6), + Priority: 6, + State: arvados.ContainerStateQueued, + RuntimeConstraints: arvados.RuntimeConstraints{ + VCPUs: 2, + RAM: 2 << 30, + }, }, }, } - Map(logger, &queue, &pool) - c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{types[2], types[1]}) + queue.Update() + New(logger, &queue, &pool, time.Millisecond, time.Millisecond).runQueue() + c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)}) c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]}) running := map[string]bool{} for uuid, t := range pool.running { diff --git a/lib/dispatchcloud/scheduler/scheduler.go b/lib/dispatchcloud/scheduler/scheduler.go new file mode 100644 index 0000000000..0be8edb7b6 --- /dev/null +++ b/lib/dispatchcloud/scheduler/scheduler.go @@ -0,0 +1,107 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +// Package scheduler uses a resizable worker pool to execute +// containers in priority order. +package scheduler + +import ( + "sync" + "time" + + "github.com/Sirupsen/logrus" +) + +// A Scheduler maps queued containers onto unallocated workers in +// priority order, creating new workers if needed. It locks containers +// that can be mapped onto existing/pending workers, and starts them +// if possible. +// +// A Scheduler unlocks any containers that are locked but can't be +// mapped. (For example, this happens when the cloud provider reaches +// quota/capacity and a previously mappable container's priority is +// surpassed by a newer container.) +// +// If it encounters errors while creating new workers, a Scheduler +// shuts down idle workers, in case they are consuming quota. +type Scheduler struct { + logger logrus.FieldLogger + queue ContainerQueue + pool WorkerPool + staleLockTimeout time.Duration + queueUpdateInterval time.Duration + + runOnce sync.Once + stop chan struct{} +} + +// New returns a new unstarted Scheduler. +// +// Any given queue and pool should not be used by more than one +// scheduler at a time. +func New(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler { + return &Scheduler{ + logger: logger, + queue: queue, + pool: pool, + staleLockTimeout: staleLockTimeout, + queueUpdateInterval: queueUpdateInterval, + stop: make(chan struct{}), + } +} + +// Start starts the scheduler. +func (sch *Scheduler) Start() { + go sch.runOnce.Do(sch.run) +} + +// Stop stops the scheduler. No other method should be called after +// Stop. +func (sch *Scheduler) Stop() { + close(sch.stop) +} + +func (sch *Scheduler) run() { + // Ensure the queue is fetched once before attempting anything. + for err := sch.queue.Update(); err != nil; err = sch.queue.Update() { + sch.logger.Errorf("error updating queue: %s", err) + d := sch.queueUpdateInterval / 60 + sch.logger.Infof("waiting %s before retry", d) + time.Sleep(d) + } + + // Keep the queue up to date. + poll := time.NewTicker(sch.queueUpdateInterval) + defer poll.Stop() + go func() { + for range poll.C { + err := sch.queue.Update() + if err != nil { + sch.logger.Errorf("error updating queue: %s", err) + } + } + }() + + t0 := time.Now() + sch.logger.Infof("FixStaleLocks starting.") + sch.fixStaleLocks() + sch.logger.Infof("FixStaleLocks finished (%s), starting scheduling.", time.Since(t0)) + + poolNotify := sch.pool.Subscribe() + defer sch.pool.Unsubscribe(poolNotify) + + queueNotify := sch.queue.Subscribe() + defer sch.queue.Unsubscribe(queueNotify) + + for { + sch.runQueue() + sch.sync() + select { + case <-sch.stop: + return + case <-queueNotify: + case <-poolNotify: + } + } +} diff --git a/lib/dispatchcloud/scheduler/sync.go b/lib/dispatchcloud/scheduler/sync.go index bd0e9b309e..a85162debe 100644 --- a/lib/dispatchcloud/scheduler/sync.go +++ b/lib/dispatchcloud/scheduler/sync.go @@ -12,7 +12,7 @@ import ( "github.com/Sirupsen/logrus" ) -// Sync resolves discrepancies between the queue and the pool: +// sync resolves discrepancies between the queue and the pool: // // Lingering crunch-run processes for finalized and unlocked/requeued // containers are killed. @@ -22,27 +22,24 @@ import ( // // Running containers whose crunch-run processes have exited are // cancelled. -// -// Sync must not be called concurrently with other calls to Map or -// Sync using the same queue or pool. -func Sync(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool) { - running := pool.Running() +func (sch *Scheduler) sync() { + running := sch.pool.Running() cancel := func(ent container.QueueEnt, reason string) { uuid := ent.Container.UUID - logger := logger.WithField("ContainerUUID", uuid) + logger := sch.logger.WithField("ContainerUUID", uuid) logger.Infof("cancelling container because %s", reason) - err := queue.Cancel(uuid) + err := sch.queue.Cancel(uuid) if err != nil { logger.WithError(err).Print("error cancelling container") } } kill := func(ent container.QueueEnt) { uuid := ent.Container.UUID - logger := logger.WithField("ContainerUUID", uuid) + logger := sch.logger.WithField("ContainerUUID", uuid) logger.Debugf("killing crunch-run process because state=%q", ent.Container.State) - pool.KillContainer(uuid) + sch.pool.KillContainer(uuid) } - qEntries, qUpdated := queue.Entries() + qEntries, qUpdated := sch.queue.Entries() for uuid, ent := range qEntries { exited, running := running[uuid] switch ent.Container.State { @@ -64,11 +61,11 @@ func Sync(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool) { // container. go kill(ent) } else { - logger.WithFields(logrus.Fields{ + sch.logger.WithFields(logrus.Fields{ "ContainerUUID": uuid, "State": ent.Container.State, }).Info("container finished") - queue.Forget(uuid) + sch.queue.Forget(uuid) } case arvados.ContainerStateQueued: if running { @@ -80,18 +77,18 @@ func Sync(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool) { } case arvados.ContainerStateLocked: if running && !exited.IsZero() && qUpdated.After(exited) { - logger = logger.WithFields(logrus.Fields{ + logger := sch.logger.WithFields(logrus.Fields{ "ContainerUUID": uuid, "Exited": time.Since(exited).Seconds(), }) logger.Infof("requeueing container because state=%q after crunch-run exited", ent.Container.State) - err := queue.Unlock(uuid) + err := sch.queue.Unlock(uuid) if err != nil { logger.WithError(err).Info("error requeueing container") } } default: - logger.WithField("ContainerUUID", uuid).Errorf("BUG: unexpected state %q", ent.Container.State) + sch.logger.WithField("ContainerUUID", uuid).Errorf("BUG: unexpected state %q", ent.Container.State) } } } diff --git a/lib/dispatchcloud/test/queue.go b/lib/dispatchcloud/test/queue.go index 909f561144..0bad5a391e 100644 --- a/lib/dispatchcloud/test/queue.go +++ b/lib/dispatchcloud/test/queue.go @@ -6,6 +6,7 @@ package test import ( "fmt" + "sync" "time" "git.curoverse.com/arvados.git/lib/dispatchcloud/container" @@ -22,13 +23,18 @@ type Queue struct { // must not be nil. ChooseType func(*arvados.Container) (arvados.InstanceType, error) - entries map[string]container.QueueEnt - updTime time.Time + entries map[string]container.QueueEnt + updTime time.Time + subscribers map[<-chan struct{}]chan struct{} + + mtx sync.Mutex } // Entries returns the containers that were queued when Update was // last called. func (q *Queue) Entries() (map[string]container.QueueEnt, time.Time) { + q.mtx.Lock() + defer q.mtx.Unlock() updTime := q.updTime r := map[string]container.QueueEnt{} for uuid, ent := range q.entries { @@ -42,11 +48,15 @@ func (q *Queue) Entries() (map[string]container.QueueEnt, time.Time) { // the state has been changed (via Lock, Unlock, or Cancel) since the // last Update, the updated state is returned. func (q *Queue) Get(uuid string) (arvados.Container, bool) { + q.mtx.Lock() + defer q.mtx.Unlock() ent, ok := q.entries[uuid] return ent.Container, ok } func (q *Queue) Forget(uuid string) { + q.mtx.Lock() + defer q.mtx.Unlock() delete(q.entries, uuid) } @@ -62,7 +72,35 @@ func (q *Queue) Cancel(uuid string) error { return q.changeState(uuid, q.entries[uuid].Container.State, arvados.ContainerStateCancelled) } +func (q *Queue) Subscribe() <-chan struct{} { + q.mtx.Lock() + defer q.mtx.Unlock() + if q.subscribers == nil { + q.subscribers = map[<-chan struct{}]chan struct{}{} + } + ch := make(chan struct{}, 1) + q.subscribers[ch] = ch + return ch +} + +func (q *Queue) Unsubscribe(ch <-chan struct{}) { + q.mtx.Lock() + defer q.mtx.Unlock() + delete(q.subscribers, ch) +} + +func (q *Queue) notify() { + for _, ch := range q.subscribers { + select { + case ch <- struct{}{}: + default: + } + } +} + func (q *Queue) changeState(uuid string, from, to arvados.ContainerState) error { + q.mtx.Lock() + defer q.mtx.Unlock() ent := q.entries[uuid] if ent.Container.State != from { return fmt.Errorf("lock failed: state=%q", ent.Container.State) @@ -75,11 +113,14 @@ func (q *Queue) changeState(uuid string, from, to arvados.ContainerState) error break } } + q.notify() return nil } // Update rebuilds the current entries from the Containers slice. func (q *Queue) Update() error { + q.mtx.Lock() + defer q.mtx.Unlock() updTime := time.Now() upd := map[string]container.QueueEnt{} for _, ctr := range q.Containers { @@ -95,6 +136,7 @@ func (q *Queue) Update() error { } q.entries = upd q.updTime = updTime + q.notify() return nil } @@ -106,6 +148,8 @@ func (q *Queue) Update() error { // The resulting changes are not exposed through Get() or Entries() // until the next call to Update(). func (q *Queue) Notify(upd arvados.Container) { + q.mtx.Lock() + defer q.mtx.Unlock() for i, ctr := range q.Containers { if ctr.UUID == upd.UUID { q.Containers[i] = upd -- 2.30.2