package scheduler
import (
+ "context"
"sync"
"time"
- "github.com/Sirupsen/logrus"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+ "github.com/sirupsen/logrus"
)
// A Scheduler maps queued containers onto unallocated workers in
staleLockTimeout time.Duration
queueUpdateInterval time.Duration
+ uuidOp map[string]string // operation in progress: "lock", "cancel", ...
+ mtx sync.Mutex
+ wakeup *time.Timer
+
runOnce sync.Once
stop chan struct{}
+ stopped chan struct{}
}
// New returns a new unstarted Scheduler.
//
// Any given queue and pool should not be used by more than one
// scheduler at a time.
-func New(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
+func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
return &Scheduler{
- logger: logger,
+ logger: ctxlog.FromContext(ctx),
queue: queue,
pool: pool,
staleLockTimeout: staleLockTimeout,
queueUpdateInterval: queueUpdateInterval,
+ wakeup: time.NewTimer(time.Second),
stop: make(chan struct{}),
+ stopped: make(chan struct{}),
+ uuidOp: map[string]string{},
}
}
// Stop.
func (sch *Scheduler) Stop() {
close(sch.stop)
+ <-sch.stopped
}
func (sch *Scheduler) run() {
+ defer close(sch.stopped)
+
// Ensure the queue is fetched once before attempting anything.
for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
sch.logger.Errorf("error updating queue: %s", err)
- d := sch.queueUpdateInterval / 60
+ d := sch.queueUpdateInterval / 10
+ if d < time.Second {
+ d = time.Second
+ }
sch.logger.Infof("waiting %s before retry", d)
time.Sleep(d)
}
return
case <-queueNotify:
case <-poolNotify:
+ case <-sch.wakeup.C:
}
}
}