14360: Shutdown between tests to eliminate leaking logs.
authorTom Clegg <tclegg@veritasgenetics.com>
Tue, 18 Dec 2018 07:59:05 +0000 (02:59 -0500)
committerTom Clegg <tclegg@veritasgenetics.com>
Tue, 18 Dec 2018 07:59:05 +0000 (02:59 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

lib/dispatchcloud/dispatcher.go
lib/dispatchcloud/dispatcher_test.go
lib/dispatchcloud/scheduler/scheduler.go

index 1a47e8490b48c14cade2564ad0f64a7c399bbcf1..bea6ed3cc5251cb5f73059b8a912a8506620f807 100644 (file)
@@ -51,6 +51,7 @@ type dispatcher struct {
 
        setupOnce sync.Once
        stop      chan struct{}
+       stopped   chan struct{}
 }
 
 // Start starts the dispatcher. Start can be called multiple times
@@ -79,6 +80,7 @@ func (disp *dispatcher) Close() {
        case disp.stop <- struct{}{}:
        default:
        }
+       <-disp.stopped
 }
 
 // Make a worker.Executor for the given instance.
@@ -109,6 +111,7 @@ func (disp *dispatcher) initialize() {
                }
        }
        disp.stop = make(chan struct{}, 1)
+       disp.stopped = make(chan struct{})
        disp.logger = logrus.StandardLogger()
 
        if key, err := ssh.ParsePrivateKey(disp.Cluster.Dispatch.PrivateKey); err != nil {
@@ -144,6 +147,7 @@ func (disp *dispatcher) initialize() {
 }
 
 func (disp *dispatcher) run() {
+       defer close(disp.stopped)
        defer disp.instanceSet.Stop()
 
        staleLockTimeout := time.Duration(disp.Cluster.Dispatch.StaleLockTimeout)
index 7d3ab79309567bb853d07daed11b6820dc3cdfca..42767a159a9ba1780541b2fb8dff0cb34a092385 100644 (file)
@@ -179,8 +179,12 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
 }
 
 func (s *DispatcherSuite) TestAPIPermissions(c *check.C) {
-       drivers["test"] = s.stubDriver
        s.cluster.ManagementToken = "abcdefgh"
+       drivers["test"] = s.stubDriver
+       s.disp.setupOnce.Do(s.disp.initialize)
+       s.disp.queue = &test.Queue{}
+       go s.disp.run()
+
        for _, token := range []string{"abc", ""} {
                req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil)
                if token != "" {
@@ -197,8 +201,12 @@ func (s *DispatcherSuite) TestAPIPermissions(c *check.C) {
 }
 
 func (s *DispatcherSuite) TestAPIDisabled(c *check.C) {
-       drivers["test"] = s.stubDriver
        s.cluster.ManagementToken = ""
+       drivers["test"] = s.stubDriver
+       s.disp.setupOnce.Do(s.disp.initialize)
+       s.disp.queue = &test.Queue{}
+       go s.disp.run()
+
        for _, token := range []string{"abc", ""} {
                req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil)
                if token != "" {
@@ -214,6 +222,9 @@ func (s *DispatcherSuite) TestInstancesAPI(c *check.C) {
        s.cluster.ManagementToken = "abcdefgh"
        s.cluster.CloudVMs.TimeoutBooting = arvados.Duration(time.Second)
        drivers["test"] = s.stubDriver
+       s.disp.setupOnce.Do(s.disp.initialize)
+       s.disp.queue = &test.Queue{}
+       go s.disp.run()
 
        type instance struct {
                Instance             string
index 9a5fb10d51c22de773ace09ad9cc1eee8bb65b3a..3971a5319d72135ca82d7f899a432ef8601fe677 100644 (file)
@@ -37,6 +37,7 @@ type Scheduler struct {
 
        runOnce sync.Once
        stop    chan struct{}
+       stopped chan struct{}
 }
 
 // New returns a new unstarted Scheduler.
@@ -51,6 +52,7 @@ func New(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, stale
                staleLockTimeout:    staleLockTimeout,
                queueUpdateInterval: queueUpdateInterval,
                stop:                make(chan struct{}),
+               stopped:             make(chan struct{}),
                locking:             map[string]bool{},
        }
 }
@@ -64,9 +66,12 @@ func (sch *Scheduler) Start() {
 // Stop.
 func (sch *Scheduler) Stop() {
        close(sch.stop)
+       <-sch.stopped
 }
 
 func (sch *Scheduler) run() {
+       defer close(sch.stopped)
+
        // Ensure the queue is fetched once before attempting anything.
        for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
                sch.logger.Errorf("error updating queue: %s", err)