package scheduler
import (
+ "context"
"sync"
"time"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
"github.com/sirupsen/logrus"
)
staleLockTimeout time.Duration
queueUpdateInterval time.Duration
- locking map[string]bool
- mtx sync.Mutex
+ uuidOp map[string]string // operation in progress: "lock", "cancel", ...
+ mtx sync.Mutex
+ wakeup *time.Timer
runOnce sync.Once
stop chan struct{}
//
// Any given queue and pool should not be used by more than one
// scheduler at a time.
-func New(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
+func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
return &Scheduler{
- logger: logger,
+ logger: ctxlog.FromContext(ctx),
queue: queue,
pool: pool,
staleLockTimeout: staleLockTimeout,
queueUpdateInterval: queueUpdateInterval,
+ wakeup: time.NewTimer(time.Second),
stop: make(chan struct{}),
stopped: make(chan struct{}),
- locking: map[string]bool{},
+ uuidOp: map[string]string{},
}
}
// Ensure the queue is fetched once before attempting anything.
for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
sch.logger.Errorf("error updating queue: %s", err)
- d := sch.queueUpdateInterval / 60
+ d := sch.queueUpdateInterval / 10
+ if d < time.Second {
+ d = time.Second
+ }
sch.logger.Infof("waiting %s before retry", d)
time.Sleep(d)
}
return
case <-queueNotify:
case <-poolNotify:
+ case <-sch.wakeup.C:
}
}
}