X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b205525d0b7c7b9042513fe77d2e8061534208ae..e838828374ceed5ef6da260939251e86f72b6f27:/lib/dispatchcloud/scheduler/scheduler.go diff --git a/lib/dispatchcloud/scheduler/scheduler.go b/lib/dispatchcloud/scheduler/scheduler.go index 9a5fb10d51..eb82c48839 100644 --- a/lib/dispatchcloud/scheduler/scheduler.go +++ b/lib/dispatchcloud/scheduler/scheduler.go @@ -7,10 +7,12 @@ package scheduler import ( + "context" "sync" "time" - "github.com/Sirupsen/logrus" + "git.curoverse.com/arvados.git/sdk/go/ctxlog" + "github.com/sirupsen/logrus" ) // A Scheduler maps queued containers onto unallocated workers in @@ -32,26 +34,30 @@ type Scheduler struct { staleLockTimeout time.Duration queueUpdateInterval time.Duration - locking map[string]bool - mtx sync.Mutex + uuidOp map[string]string // operation in progress: "lock", "cancel", ... + mtx sync.Mutex + wakeup *time.Timer runOnce sync.Once stop chan struct{} + stopped chan struct{} } // New returns a new unstarted Scheduler. // // Any given queue and pool should not be used by more than one // scheduler at a time. -func New(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler { +func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler { return &Scheduler{ - logger: logger, + logger: ctxlog.FromContext(ctx), queue: queue, pool: pool, staleLockTimeout: staleLockTimeout, queueUpdateInterval: queueUpdateInterval, + wakeup: time.NewTimer(time.Second), stop: make(chan struct{}), - locking: map[string]bool{}, + stopped: make(chan struct{}), + uuidOp: map[string]string{}, } } @@ -64,13 +70,19 @@ func (sch *Scheduler) Start() { // Stop. func (sch *Scheduler) Stop() { close(sch.stop) + <-sch.stopped } func (sch *Scheduler) run() { + defer close(sch.stopped) + // Ensure the queue is fetched once before attempting anything. for err := sch.queue.Update(); err != nil; err = sch.queue.Update() { sch.logger.Errorf("error updating queue: %s", err) - d := sch.queueUpdateInterval / 60 + d := sch.queueUpdateInterval / 10 + if d < time.Second { + d = time.Second + } sch.logger.Infof("waiting %s before retry", d) time.Sleep(d) } @@ -106,6 +118,7 @@ func (sch *Scheduler) run() { return case <-queueNotify: case <-poolNotify: + case <-sch.wakeup.C: } } }