16723: Fix stubPool KillContainer(): return false if not running.
[arvados.git] / lib / dispatchcloud / scheduler / scheduler.go
index 9a5fb10d51c22de773ace09ad9cc1eee8bb65b3a..6409ea031a4f02228118bc081891990dfcbe20f9 100644 (file)
@@ -7,10 +7,12 @@
 package scheduler
 
 import (
+       "context"
        "sync"
        "time"
 
-       "github.com/Sirupsen/logrus"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "github.com/sirupsen/logrus"
 )
 
 // A Scheduler maps queued containers onto unallocated workers in
@@ -32,26 +34,30 @@ type Scheduler struct {
        staleLockTimeout    time.Duration
        queueUpdateInterval time.Duration
 
-       locking map[string]bool
-       mtx     sync.Mutex
+       uuidOp map[string]string // operation in progress: "lock", "cancel", ...
+       mtx    sync.Mutex
+       wakeup *time.Timer
 
        runOnce sync.Once
        stop    chan struct{}
+       stopped chan struct{}
 }
 
 // New returns a new unstarted Scheduler.
 //
 // Any given queue and pool should not be used by more than one
 // scheduler at a time.
-func New(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
+func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
        return &Scheduler{
-               logger:              logger,
+               logger:              ctxlog.FromContext(ctx),
                queue:               queue,
                pool:                pool,
                staleLockTimeout:    staleLockTimeout,
                queueUpdateInterval: queueUpdateInterval,
+               wakeup:              time.NewTimer(time.Second),
                stop:                make(chan struct{}),
-               locking:             map[string]bool{},
+               stopped:             make(chan struct{}),
+               uuidOp:              map[string]string{},
        }
 }
 
@@ -64,13 +70,19 @@ func (sch *Scheduler) Start() {
 // Stop.
 func (sch *Scheduler) Stop() {
        close(sch.stop)
+       <-sch.stopped
 }
 
 func (sch *Scheduler) run() {
+       defer close(sch.stopped)
+
        // Ensure the queue is fetched once before attempting anything.
        for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
                sch.logger.Errorf("error updating queue: %s", err)
-               d := sch.queueUpdateInterval / 60
+               d := sch.queueUpdateInterval / 10
+               if d < time.Second {
+                       d = time.Second
+               }
                sch.logger.Infof("waiting %s before retry", d)
                time.Sleep(d)
        }
@@ -106,6 +118,7 @@ func (sch *Scheduler) run() {
                        return
                case <-queueNotify:
                case <-poolNotify:
+               case <-sch.wakeup.C:
                }
        }
 }