setupOnce sync.Once
stop chan struct{}
+ stopped chan struct{}
}
// Start starts the dispatcher. Start can be called multiple times
case disp.stop <- struct{}{}:
default:
}
+ <-disp.stopped
}
// Make a worker.Executor for the given instance.
}
}
disp.stop = make(chan struct{}, 1)
+ disp.stopped = make(chan struct{})
disp.logger = logrus.StandardLogger()
if key, err := ssh.ParsePrivateKey(disp.Cluster.Dispatch.PrivateKey); err != nil {
}
func (disp *dispatcher) run() {
+ defer close(disp.stopped)
defer disp.instanceSet.Stop()
staleLockTimeout := time.Duration(disp.Cluster.Dispatch.StaleLockTimeout)
}
func (s *DispatcherSuite) TestAPIPermissions(c *check.C) {
- drivers["test"] = s.stubDriver
s.cluster.ManagementToken = "abcdefgh"
+ drivers["test"] = s.stubDriver
+ s.disp.setupOnce.Do(s.disp.initialize)
+ s.disp.queue = &test.Queue{}
+ go s.disp.run()
+
for _, token := range []string{"abc", ""} {
req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil)
if token != "" {
}
func (s *DispatcherSuite) TestAPIDisabled(c *check.C) {
- drivers["test"] = s.stubDriver
s.cluster.ManagementToken = ""
+ drivers["test"] = s.stubDriver
+ s.disp.setupOnce.Do(s.disp.initialize)
+ s.disp.queue = &test.Queue{}
+ go s.disp.run()
+
for _, token := range []string{"abc", ""} {
req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil)
if token != "" {
s.cluster.ManagementToken = "abcdefgh"
s.cluster.CloudVMs.TimeoutBooting = arvados.Duration(time.Second)
drivers["test"] = s.stubDriver
+ s.disp.setupOnce.Do(s.disp.initialize)
+ s.disp.queue = &test.Queue{}
+ go s.disp.run()
type instance struct {
Instance string
runOnce sync.Once
stop chan struct{}
+ stopped chan struct{}
}
// New returns a new unstarted Scheduler.
staleLockTimeout: staleLockTimeout,
queueUpdateInterval: queueUpdateInterval,
stop: make(chan struct{}),
+ stopped: make(chan struct{}),
locking: map[string]bool{},
}
}
// Stop.
func (sch *Scheduler) Stop() {
close(sch.stop)
+ <-sch.stopped
}
func (sch *Scheduler) run() {
+ defer close(sch.stopped)
+
// Ensure the queue is fetched once before attempting anything.
for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
sch.logger.Errorf("error updating queue: %s", err)