X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/74df5a58360fe6bcb273480d3ddec507a53b6b2b..e838828374ceed5ef6da260939251e86f72b6f27:/lib/dispatchcloud/scheduler/scheduler.go diff --git a/lib/dispatchcloud/scheduler/scheduler.go b/lib/dispatchcloud/scheduler/scheduler.go index 83fc08a9ff..eb82c48839 100644 --- a/lib/dispatchcloud/scheduler/scheduler.go +++ b/lib/dispatchcloud/scheduler/scheduler.go @@ -7,9 +7,11 @@ package scheduler import ( + "context" "sync" "time" + "git.curoverse.com/arvados.git/sdk/go/ctxlog" "github.com/sirupsen/logrus" ) @@ -32,8 +34,9 @@ type Scheduler struct { staleLockTimeout time.Duration queueUpdateInterval time.Duration - locking map[string]bool - mtx sync.Mutex + uuidOp map[string]string // operation in progress: "lock", "cancel", ... + mtx sync.Mutex + wakeup *time.Timer runOnce sync.Once stop chan struct{} @@ -44,16 +47,17 @@ type Scheduler struct { // // Any given queue and pool should not be used by more than one // scheduler at a time. -func New(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler { +func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler { return &Scheduler{ - logger: logger, + logger: ctxlog.FromContext(ctx), queue: queue, pool: pool, staleLockTimeout: staleLockTimeout, queueUpdateInterval: queueUpdateInterval, + wakeup: time.NewTimer(time.Second), stop: make(chan struct{}), stopped: make(chan struct{}), - locking: map[string]bool{}, + uuidOp: map[string]string{}, } } @@ -75,7 +79,10 @@ func (sch *Scheduler) run() { // Ensure the queue is fetched once before attempting anything. for err := sch.queue.Update(); err != nil; err = sch.queue.Update() { sch.logger.Errorf("error updating queue: %s", err) - d := sch.queueUpdateInterval / 60 + d := sch.queueUpdateInterval / 10 + if d < time.Second { + d = time.Second + } sch.logger.Infof("waiting %s before retry", d) time.Sleep(d) } @@ -111,6 +118,7 @@ func (sch *Scheduler) run() { return case <-queueNotify: case <-poolNotify: + case <-sch.wakeup.C: } } }