11167: Merge branch 'master' into 11167-wb-remove-arvget
authorLucas Di Pentima <lucas@curoverse.com>
Wed, 9 Aug 2017 23:49:39 +0000 (20:49 -0300)
committerLucas Di Pentima <lucas@curoverse.com>
Wed, 9 Aug 2017 23:49:39 +0000 (20:49 -0300)
Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <lucas@curoverse.com>

20 files changed:
build/run-tests.sh
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/tests/test_submit.py
sdk/go/arvados/container.go
sdk/go/arvados/error.go
services/nodemanager/arvnodeman/baseactor.py
services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
services/nodemanager/arvnodeman/timedcallback.py
services/nodemanager/setup.py
services/nodemanager/tests/test_computenode_dispatch.py
services/nodemanager/tests/test_computenode_dispatch_slurm.py
services/nodemanager/tests/test_daemon.py
services/nodemanager/tests/test_failure.py
services/nodemanager/tests/test_jobqueue.py
services/nodemanager/tests/test_timedcallback.py
services/nodemanager/tests/testutil.py
services/ws/event_source.go
services/ws/handler.go
services/ws/session_v0.go

index 3952b36604102cf6f3fd78fceb95eac915d8c2d4..20780811a58e5ecd59ce9c4b399f3b914c462480 100755 (executable)
@@ -81,7 +81,7 @@ services/keepstore
 services/keep-balance
 services/login-sync
 services/nodemanager
-services/nodemanager-integration
+services/nodemanager_integration
 services/crunch-run
 services/crunch-dispatch-local
 services/crunch-dispatch-slurm
@@ -545,6 +545,9 @@ do_test() {
         apps/workbench_units | apps/workbench_functionals | apps/workbench_integration)
             suite=apps/workbench
             ;;
+        services/nodemanager | services/nodemanager_integration)
+            suite=services/nodemanager_suite
+            ;;
         *)
             suite="${1}"
             ;;
@@ -860,11 +863,11 @@ test_login-sync() {
 }
 do_test services/login-sync login-sync
 
-test_nodemanager-integration() {
+test_nodemanager_integration() {
     cd "$WORKSPACE/services/nodemanager" \
-        && tests/integration_test.py ${testargs[services/nodemanager-integration]}
+        && tests/integration_test.py ${testargs[services/nodemanager_integration]}
 }
-do_test services/nodemanager-integration nodemanager-integration
+do_test services/nodemanager_integration nodemanager_integration
 
 for p in "${pythonstuff[@]}"
 do
index 4ab65d9d8774708613787b3a694f64bf876004da..769a63bce3f56763e7fa1767317d5af9828a03d0 100644 (file)
@@ -363,6 +363,9 @@ class RunnerContainer(Runner):
         if self.arvrunner.trash_intermediate:
             command.append("--trash-intermediate")
 
+        if self.arvrunner.project_uuid:
+            command.append("--project-uuid="+self.arvrunner.project_uuid)
+
         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
 
         container_req["command"] = command
index 8ab0a8de9c2448e999475b7ac8735dcedc928099..49545a83dc7ac34eea9acc11dc3e022f2839f22c 100644 (file)
@@ -860,6 +860,31 @@ class TestSubmit(unittest.TestCase):
                          stubs.expect_container_request_uuid + '\n')
 
 
+    @stubs
+    def test_submit_container_project(self, stubs):
+        project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+        capture_stdout = cStringIO.StringIO()
+        try:
+            exited = arvados_cwl.main(
+                ["--submit", "--no-wait", "--api=containers", "--debug", "--project-uuid="+project_uuid,
+                 "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+                capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+            self.assertEqual(exited, 0)
+        except:
+            logging.exception("")
+
+        expect_container = copy.deepcopy(stubs.expect_container_spec)
+        expect_container["owner_uuid"] = project_uuid
+        expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+                                       '--enable-reuse', '--on-error=continue', '--project-uuid='+project_uuid,
+                                       '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+
+        stubs.api.container_requests().create.assert_called_with(
+            body=JsonDiffMatcher(expect_container))
+        self.assertEqual(capture_stdout.getvalue(),
+                         stubs.expect_container_request_uuid + '\n')
+
+
     @stubs
     def test_submit_job_runner_image(self, stubs):
         capture_stdout = cStringIO.StringIO()
index 7d39d678f8ec02c52f6446461c4de9f8e95c142a..7e588be17bb16c04cdbd6098b8dbff8f7c599d18 100644 (file)
@@ -31,7 +31,7 @@ type Mount struct {
        Path              string      `json:"path"`
        Content           interface{} `json:"content"`
        ExcludeFromOutput bool        `json:"exclude_from_output"`
-       Capacity          int64       `json:capacity`
+       Capacity          int64       `json:"capacity"`
 }
 
 // RuntimeConstraints specify a container's compute resources (RAM,
index 29eebdbf729d557a88e121b582cdd78171e31bdd..773a2e6f9c7d787406511f85a6a5585596153738 100644 (file)
@@ -21,7 +21,7 @@ type TransactionError struct {
 }
 
 func (e TransactionError) Error() (s string) {
-       s = fmt.Sprintf("request failed: %s", e.URL)
+       s = fmt.Sprintf("request failed: %s", e.URL.String())
        if e.Status != "" {
                s = s + ": " + e.Status
        }
index 988b83c142b15d6a62058ec05ee0863d03476503..565db6601f18e68f5f621e0838ee06e051038028 100644 (file)
@@ -82,17 +82,20 @@ class BaseNodeManagerActor(pykka.ThreadingActor):
     def __init__(self, *args, **kwargs):
          super(pykka.ThreadingActor, self).__init__(*args, **kwargs)
          self.actor_ref = TellableActorRef(self)
+         self._killfunc = kwargs.get("killfunc", os.kill)
 
     def on_failure(self, exception_type, exception_value, tb):
         lg = getattr(self, "_logger", logging)
         if (exception_type in (threading.ThreadError, MemoryError) or
             exception_type is OSError and exception_value.errno == errno.ENOMEM):
             lg.critical("Unhandled exception is a fatal error, killing Node Manager")
-            os.kill(os.getpid(), signal.SIGKILL)
+            self._killfunc(os.getpid(), signal.SIGKILL)
 
     def ping(self):
         return True
 
+    def get_thread(self):
+        return threading.current_thread()
 
 class WatchdogActor(pykka.ThreadingActor):
     def __init__(self, timeout, *args, **kwargs):
@@ -101,12 +104,13 @@ class WatchdogActor(pykka.ThreadingActor):
          self.actors = [a.proxy() for a in args]
          self.actor_ref = TellableActorRef(self)
          self._later = self.actor_ref.tell_proxy()
+         self._killfunc = kwargs.get("killfunc", os.kill)
 
     def kill_self(self, e, act):
         lg = getattr(self, "_logger", logging)
         lg.critical("Watchdog exception", exc_info=e)
         lg.critical("Actor %s watchdog ping time out, killing Node Manager", act)
-        os.kill(os.getpid(), signal.SIGKILL)
+        self._killfunc(os.getpid(), signal.SIGKILL)
 
     def on_start(self):
         self._later.run()
index fb9a6bf2142d58a63e0eba8d993e4fa2cb5e8ddb..c5dd1adef1f3173446d7c5efb3d8fbfc31d9d771 100644 (file)
@@ -240,6 +240,9 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
         return super(ComputeNodeShutdownActor, self)._finished()
 
     def cancel_shutdown(self, reason, **kwargs):
+        if self.cancel_reason is not None:
+            # already cancelled
+            return
         self.cancel_reason = reason
         self._logger.info("Shutdown cancelled: %s.", reason)
         self._finished(success_flag=False)
@@ -257,6 +260,9 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
 
     @_cancel_on_exception
     def shutdown_node(self):
+        if self.cancel_reason is not None:
+            # already cancelled
+            return
         if self.cancellable:
             self._logger.info("Checking that node is still eligible for shutdown")
             eligible, reason = self._monitor.shutdown_eligible().get()
index fa56578cffa1108526584ded9730a9cb5ffbbda9..c8883c3ae70f6614b7bd9063030c14e264dfe543 100644 (file)
@@ -73,7 +73,7 @@ class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
         if output in ("drng\n", "alloc\n", "drng*\n", "alloc*\n"):
             self._timer.schedule(time.time() + 10,
                                  self._later.await_slurm_drain)
-        elif output in ("idle\n"):
+        elif output in ("idle\n",):
             # Not in "drng" but idle, don't shut down
             self.cancel_shutdown("slurm state is %s" % output.strip(), try_resume=False)
         else:
index 4d2a1394df2b69b3823b8120524e06af7bcb7cb5..e7e3f25fe383239c310f72e2bd234cefc46f9619 100644 (file)
@@ -19,11 +19,15 @@ class TimedCallBackActor(actor_class):
     message at a later time.  This actor runs the necessary event loop for
     delivery.
     """
-    def __init__(self, max_sleep=1):
+    def __init__(self, max_sleep=1, timefunc=None):
         super(TimedCallBackActor, self).__init__()
         self._proxy = self.actor_ref.tell_proxy()
         self.messages = []
         self.max_sleep = max_sleep
+        if timefunc is None:
+            self._timefunc = time.time
+        else:
+            self._timefunc = timefunc
 
     def schedule(self, delivery_time, receiver, *args, **kwargs):
         if not self.messages:
@@ -33,7 +37,7 @@ class TimedCallBackActor(actor_class):
     def deliver(self):
         if not self.messages:
             return
-        til_next = self.messages[0][0] - time.time()
+        til_next = self.messages[0][0] - self._timefunc()
         if til_next <= 0:
             t, receiver, args, kwargs = heapq.heappop(self.messages)
             try:
index 59d95e3d22f1231030359141298d79f2a547ad8b..d083bf168b50a54087ad398d925e007eab972713 100644 (file)
@@ -33,7 +33,7 @@ setup(name='arvados-node-manager',
       ],
       install_requires=[
           'apache-libcloud>=0.20',
-          'arvados-python-client>=0.1.20150206225333',
+          'arvados-python-client>=0.1.20170731145219',
           'future',
           'pykka',
           'python-daemon',
index a8aa2e38fb46ce2c3e3b0d2ae7f35f01b12e4952..c44305d2b96a66a4cf6ddedf45556f3c58085532 100644 (file)
@@ -100,6 +100,7 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
             ]
         self.make_actor()
         self.wait_for_assignment(self.setup_actor, 'cloud_node')
+        self.setup_actor.ping().get(self.TIMEOUT)
         self.assertEqual(1, self.cloud_client.post_create_node.call_count)
 
     def test_instance_exceeded_not_retried(self):
@@ -151,6 +152,7 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
         self.api_client.nodes().create().execute.side_effect = retry_resp
         self.api_client.nodes().update().execute.side_effect = retry_resp
         self.wait_for_assignment(self.setup_actor, 'cloud_node')
+        self.setup_actor.ping().get(self.TIMEOUT)
         self.assertEqual(self.setup_actor.actor_ref.actor_urn,
                          subscriber.call_args[0][0].actor_ref.actor_urn)
 
@@ -207,17 +209,19 @@ class ComputeNodeShutdownActorMixin(testutil.ActorTestMixin):
         self.make_mocks(shutdown_open=True, arvados_node=testutil.arvados_node_mock(crunch_worker_state="busy"))
         self.cloud_client.destroy_node.return_value = True
         self.make_actor(cancellable=True)
-        self.check_success_flag(False)
+        self.check_success_flag(False, 2)
         self.assertFalse(self.cloud_client.destroy_node.called)
 
     def test_uncancellable_shutdown(self, *mocks):
         self.make_mocks(shutdown_open=True, arvados_node=testutil.arvados_node_mock(crunch_worker_state="busy"))
         self.cloud_client.destroy_node.return_value = True
         self.make_actor(cancellable=False)
-        self.check_success_flag(True, 2)
+        self.check_success_flag(True, 4)
         self.assertTrue(self.cloud_client.destroy_node.called)
 
     def test_arvados_node_cleaned_after_shutdown(self, *mocks):
+        if len(mocks) == 1:
+            mocks[0].return_value = "drain\n"
         cloud_node = testutil.cloud_node_mock(62)
         arv_node = testutil.arvados_node_mock(62)
         self.make_mocks(cloud_node, arv_node)
@@ -235,12 +239,15 @@ class ComputeNodeShutdownActorMixin(testutil.ActorTestMixin):
         self.assertTrue(update_mock().execute.called)
 
     def test_arvados_node_not_cleaned_after_shutdown_cancelled(self, *mocks):
+        if len(mocks) == 1:
+            mocks[0].return_value = "idle\n"
         cloud_node = testutil.cloud_node_mock(61)
         arv_node = testutil.arvados_node_mock(61)
         self.make_mocks(cloud_node, arv_node, shutdown_open=False)
         self.cloud_client.destroy_node.return_value = False
         self.make_actor(cancellable=True)
         self.shutdown_actor.cancel_shutdown("test")
+        self.shutdown_actor.ping().get(self.TIMEOUT)
         self.check_success_flag(False, 2)
         self.assertFalse(self.arvados_client.nodes().update.called)
 
@@ -338,13 +345,11 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
     def test_in_state_when_no_state_available(self):
         self.make_actor(arv_node=testutil.arvados_node_mock(
                 crunch_worker_state=None))
-        print(self.node_actor.get_state().get())
         self.assertTrue(self.node_state('idle'))
 
     def test_in_state_when_no_state_available_old(self):
         self.make_actor(arv_node=testutil.arvados_node_mock(
                 crunch_worker_state=None, age=90000))
-        print(self.node_actor.get_state().get())
         self.assertTrue(self.node_state('down'))
 
     def test_in_idle_state(self):
index c7eb7afc631cd2b06e2c7e6753b2e696ab27b520..0b6162dfaa64405df53794bb575bfffd2420bbff 100644 (file)
@@ -32,13 +32,20 @@ class SLURMComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
             self.timer = testutil.MockTimer(False)
         self.make_actor()
         self.check_success_flag(None, 0)
+        # At this point, 1st try should have happened.
+
         self.timer.deliver()
         self.check_success_flag(None, 0)
-        self.timer.deliver()
+        # At this point, 2nd try should have happened.
+
         # Order is critical here: if the mock gets called when no return value
         # or side effect is set, we may invoke a real subprocess.
         proc_mock.return_value = end_state
         proc_mock.side_effect = None
+
+        # 3rd try
+        self.timer.deliver()
+
         self.check_success_flag(True, 3)
         self.check_slurm_got_args(proc_mock, 'NodeName=compute63')
 
@@ -67,20 +74,18 @@ class SLURMComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
         self.check_success_flag(True)
         self.assertFalse(proc_mock.called)
 
-    def test_node_undrained_when_shutdown_cancelled(self, proc_mock):
+    def test_node_resumed_when_shutdown_cancelled(self, proc_mock):
         try:
             proc_mock.side_effect = iter(['', 'drng\n', 'drng\n', ''])
             self.make_mocks(arvados_node=testutil.arvados_node_mock(job_uuid=True))
             self.timer = testutil.MockTimer(False)
             self.make_actor()
             self.busywait(lambda: proc_mock.call_args is not None)
-            self.shutdown_actor.cancel_shutdown("test").get(self.TIMEOUT)
+            self.shutdown_actor.cancel_shutdown("test")
             self.check_success_flag(False, 2)
-            self.assertEqual(proc_mock.call_args_list,
-                             [mock.call(['scontrol', 'update', 'NodeName=compute99', 'State=DRAIN', 'Reason=Node Manager shutdown']),
-                              mock.call(['sinfo', '--noheader', '-o', '%t', '-n', 'compute99']),
-                              mock.call(['sinfo', '--noheader', '-o', '%t', '-n', 'compute99']),
-                              mock.call(['scontrol', 'update', 'NodeName=compute99', 'State=RESUME'])])
+            self.assertEqual(proc_mock.call_args_list[0], mock.call(['scontrol', 'update', 'NodeName=compute99', 'State=DRAIN', 'Reason=Node Manager shutdown']))
+            self.assertEqual(proc_mock.call_args_list[-1], mock.call(['scontrol', 'update', 'NodeName=compute99', 'State=RESUME']))
+
         finally:
             self.shutdown_actor.actor_ref.stop()
 
@@ -88,10 +93,10 @@ class SLURMComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
         proc_mock.side_effect = iter([OSError, 'drain\n', OSError, 'idle\n', 'idle\n'])
         self.make_mocks(arvados_node=testutil.arvados_node_mock(job_uuid=True))
         self.make_actor()
-        self.check_success_flag(False, 2)
+        self.check_success_flag(False, 5)
 
     def test_issue_slurm_drain_retry(self, proc_mock):
-        proc_mock.side_effect = iter([OSError, '', OSError, 'drng\n'])
+        proc_mock.side_effect = iter([OSError, OSError, 'drng\n', 'drain\n'])
         self.check_success_after_reset(proc_mock, timer=False)
 
     def test_arvados_node_cleaned_after_shutdown(self, proc_mock):
index f714c3c8b38761c0f38e46b70d7a12fd5124bc80..1efa1ffeb35199c251d13e217f2cb37c146c4622 100644 (file)
@@ -21,6 +21,15 @@ import logging
 
 class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                                      unittest.TestCase):
+
+    def busywait(self, f):
+        n = 0
+        while not f() and n < 200:
+            time.sleep(.1)
+            self.daemon.ping().get(self.TIMEOUT)
+            n += 1
+        self.assertTrue(f())
+
     def mock_node_start(self, **kwargs):
         # Make sure that every time the daemon starts a setup actor,
         # it gets a new mock object back.
@@ -102,14 +111,16 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
             self.daemon.update_server_wishlist(want_sizes).get(self.TIMEOUT)
 
     def monitor_list(self):
-        return pykka.ActorRegistry.get_by_class(ComputeNodeMonitorActor)
+        return [c.actor.actor_ref for c in self.daemon.cloud_nodes.get(self.TIMEOUT).nodes.values() if c.actor]
 
-    def monitored_arvados_nodes(self):
+    def monitored_arvados_nodes(self, include_unpaired=True):
         pairings = []
         for future in [actor.proxy().arvados_node
                        for actor in self.monitor_list()]:
             try:
-                pairings.append(future.get(self.TIMEOUT))
+                g = future.get(self.TIMEOUT)
+                if g or include_unpaired:
+                    pairings.append(g)
             except pykka.ActorDeadError:
                 pass
         return pairings
@@ -117,6 +128,9 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
     def alive_monitor_count(self):
         return len(self.monitored_arvados_nodes())
 
+    def paired_monitor_count(self):
+        return len(self.monitored_arvados_nodes(False))
+
     def assertShutdownCancellable(self, expected=True):
         self.assertTrue(self.node_shutdown.start.called)
         self.assertIs(expected,
@@ -126,17 +140,16 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
     def test_easy_node_creation(self):
         size = testutil.MockSize(1)
         self.make_daemon(want_sizes=[size])
-        self.stop_proxy(self.daemon)
-        self.assertTrue(self.node_setup.start.called)
+        self.busywait(lambda: self.node_setup.start.called)
 
     def check_monitors_arvados_nodes(self, *arv_nodes):
+        self.busywait(lambda: len(arv_nodes) == len(self.monitored_arvados_nodes()))
         self.assertItemsEqual(arv_nodes, self.monitored_arvados_nodes())
 
     def test_node_pairing(self):
         cloud_node = testutil.cloud_node_mock(1)
         arv_node = testutil.arvados_node_mock(1)
         self.make_daemon([cloud_node], [arv_node])
-        self.stop_proxy(self.daemon)
         self.check_monitors_arvados_nodes(arv_node)
 
     def test_node_pairing_after_arvados_update(self):
@@ -145,7 +158,6 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                          [testutil.arvados_node_mock(1, ip_address=None)])
         arv_node = testutil.arvados_node_mock(2)
         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
         self.check_monitors_arvados_nodes(arv_node)
 
     def test_arvados_node_un_and_re_paired(self):
@@ -157,9 +169,8 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
         self.check_monitors_arvados_nodes(arv_node)
         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
-        self.assertEqual(0, self.alive_monitor_count())
+        self.busywait(lambda: 0 == self.alive_monitor_count())
         self.daemon.update_cloud_nodes([testutil.cloud_node_mock(3)])
-        self.stop_proxy(self.daemon)
         self.check_monitors_arvados_nodes(arv_node)
 
     def test_old_arvados_node_not_double_assigned(self):
@@ -179,8 +190,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
     def test_node_count_satisfied(self):
         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1)],
                          want_sizes=[testutil.MockSize(1)])
-        self.stop_proxy(self.daemon)
-        self.assertFalse(self.node_setup.start.called)
+        self.busywait(lambda: not self.node_setup.start.called)
 
     def test_dont_count_missing_as_busy(self):
         size = testutil.MockSize(1)
@@ -191,8 +201,8 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                                             2,
                                             last_ping_at='1970-01-01T01:02:03.04050607Z')],
                          want_sizes=[size, size])
-        self.stop_proxy(self.daemon)
-        self.assertTrue(self.node_setup.start.called)
+        self.busywait(lambda: 2 == self.alive_monitor_count())
+        self.busywait(lambda: self.node_setup.start.called)
 
     def test_missing_counts_towards_max(self):
         size = testutil.MockSize(1)
@@ -202,8 +212,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
                          want_sizes=[size, size],
                          max_nodes=2)
-        self.stop_proxy(self.daemon)
-        self.assertFalse(self.node_setup.start.called)
+        self.busywait(lambda: not self.node_setup.start.called)
 
     def test_excess_counts_missing(self):
         size = testutil.MockSize(1)
@@ -212,7 +221,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                          arvados_nodes=[testutil.arvados_node_mock(1),
                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
                          want_sizes=[size])
-        self.assertEqual(2, self.alive_monitor_count())
+        self.busywait(lambda: 2 == self.paired_monitor_count())
         for mon_ref in self.monitor_list():
             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
         self.assertEqual(1, self.node_shutdown.start.call_count)
@@ -224,7 +233,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                          arvados_nodes=[testutil.arvados_node_mock(1),
                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
                          want_sizes=[size])
-
+        self.busywait(lambda: 2 == self.paired_monitor_count())
         get_cloud_node = mock.MagicMock(name="get_cloud_node")
         get_cloud_node.get.return_value = cloud_nodes[1]
         mock_node_monitor = mock.MagicMock()
@@ -233,10 +242,10 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
 
         self.daemon.cloud_nodes.get()[cloud_nodes[1].id].shutdown_actor = mock_shutdown.proxy()
 
-        self.assertEqual(2, self.alive_monitor_count())
+        self.busywait(lambda: 2 == self.alive_monitor_count())
         for mon_ref in self.monitor_list():
             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
-        self.assertEqual(1, self.node_shutdown.start.call_count)
+        self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
 
     def test_booting_nodes_counted(self):
         cloud_node = testutil.cloud_node_mock(1)
@@ -246,17 +255,15 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.daemon.max_nodes.get(self.TIMEOUT)
         self.assertTrue(self.node_setup.start.called)
         self.daemon.update_server_wishlist(server_wishlist).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertEqual(1, self.node_setup.start.call_count)
+        self.busywait(lambda: 1 == self.node_setup.start.call_count)
 
     def test_boot_new_node_when_all_nodes_busy(self):
         size = testutil.MockSize(2)
         arv_node = testutil.arvados_node_mock(2, job_uuid=True)
         self.make_daemon([testutil.cloud_node_mock(2, size=size)], [arv_node],
                          [size], avail_sizes=[(size, {"cores":1})])
+        self.busywait(lambda: 1 == self.paired_monitor_count())
         self.busywait(lambda: self.node_setup.start.called)
-        self.stop_proxy(self.daemon)
-        self.assertTrue(self.node_setup.start.called)
 
     def test_boot_new_node_below_min_nodes(self):
         min_size = testutil.MockSize(1)
@@ -402,7 +409,6 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         now = time.time()
         self.monitor_list()[0].tell_proxy().consider_shutdown()
         self.busywait(lambda: self.node_shutdown.start.called)
-        self.stop_proxy(self.daemon)
         self.assertShutdownCancellable(False)
 
     def test_booted_node_shut_down_when_never_paired(self):
@@ -414,7 +420,6 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.daemon.update_cloud_nodes([cloud_node])
         self.monitor_list()[0].tell_proxy().consider_shutdown()
         self.busywait(lambda: self.node_shutdown.start.called)
-        self.stop_proxy(self.daemon)
         self.assertShutdownCancellable(False)
 
     def test_booted_node_shut_down_when_never_working(self):
@@ -427,7 +432,6 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.monitor_list()[0].proxy().cloud_node_start_time = time.time()-3601
         self.daemon.update_cloud_nodes([cloud_node])
         self.busywait(lambda: self.node_shutdown.start.called)
-        self.stop_proxy(self.daemon)
         self.assertShutdownCancellable(False)
 
     def test_node_that_pairs_not_considered_failed_boot(self):
@@ -457,8 +461,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
     def test_booting_nodes_shut_down(self):
         self.make_daemon(want_sizes=[testutil.MockSize(1)])
         self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertTrue(self.last_setup.stop_if_no_cloud_node.called)
+        self.busywait(lambda: self.last_setup.stop_if_no_cloud_node.called)
 
     def test_all_booting_nodes_tried_to_shut_down(self):
         size = testutil.MockSize(2)
@@ -483,7 +486,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         arv_node = testutil.arvados_node_mock(1)
         size = testutil.MockSize(1)
         self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], want_sizes=[size])
-        self.assertEqual(1, self.alive_monitor_count())
+        self.busywait(lambda: 1 == self.paired_monitor_count())
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
@@ -493,7 +496,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         cloud_node = testutil.cloud_node_mock(1)
         arv_node = testutil.arvados_node_mock(1)
         self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], min_nodes=1)
-        self.assertEqual(1, self.alive_monitor_count())
+        self.busywait(lambda: 1 == self.paired_monitor_count())
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
@@ -501,11 +504,10 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
 
     def test_shutdown_accepted_below_capacity(self):
         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
-        self.assertEqual(1, self.alive_monitor_count())
+        self.busywait(lambda: 1 == self.alive_monitor_count())
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertTrue(self.node_shutdown.start.called)
+        self.busywait(lambda: self.node_shutdown.start.called)
 
     def test_shutdown_declined_when_idle_and_job_queued(self):
         size = testutil.MockSize(1)
@@ -513,7 +515,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         arv_nodes = [testutil.arvados_node_mock(3, job_uuid=True),
                      testutil.arvados_node_mock(4, job_uuid=None)]
         self.make_daemon(cloud_nodes, arv_nodes, [size])
-        self.assertEqual(2, self.alive_monitor_count())
+        self.busywait(lambda: 2 == self.paired_monitor_count())
         for mon_ref in self.monitor_list():
             monitor = mon_ref.proxy()
             if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
@@ -532,13 +534,13 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.last_shutdown.success.get.return_value = False
         self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
-        self.assertEqual(1, self.alive_monitor_count())
+        self.busywait(lambda: 1 == self.paired_monitor_count())
 
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.last_shutdown.success.get.return_value = True
         self.last_shutdown.stop.side_effect = lambda: monitor.stop()
         self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
-        self.assertEqual(0, self.alive_monitor_count())
+        self.busywait(lambda: 0 == self.paired_monitor_count())
 
     def test_nodes_shutting_down_replaced_below_max_nodes(self):
         size = testutil.MockSize(6)
@@ -551,21 +553,19 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.assertTrue(self.node_shutdown.start.called)
         self.daemon.update_server_wishlist(
             [testutil.MockSize(6)]).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertTrue(self.node_setup.start.called)
+        self.busywait(lambda: self.node_setup.start.called)
 
     def test_nodes_shutting_down_not_replaced_at_max_nodes(self):
         cloud_node = testutil.cloud_node_mock(7)
         self.make_daemon([cloud_node], [testutil.arvados_node_mock(7)],
                          max_nodes=1)
-        self.assertEqual(1, self.alive_monitor_count())
+        self.busywait(lambda: 1 == self.paired_monitor_count())
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.assertTrue(self.node_shutdown.start.called)
         self.daemon.update_server_wishlist(
             [testutil.MockSize(7)]).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertFalse(self.node_setup.start.called)
+        self.busywait(lambda: not self.node_setup.start.called)
 
     def test_nodes_shutting_down_count_against_excess(self):
         size = testutil.MockSize(8)
@@ -573,7 +573,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         arv_nodes = [testutil.arvados_node_mock(n, size=size) for n in [8, 9]]
         self.make_daemon(cloud_nodes, arv_nodes, [size],
                          avail_sizes=[(size, {"cores":1})])
-        self.assertEqual(2, self.alive_monitor_count())
+        self.busywait(lambda: 2 == self.paired_monitor_count())
         for mon_ref in self.monitor_list():
             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
         self.assertEqual(1, self.node_shutdown.start.call_count)
@@ -598,8 +598,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         size = testutil.MockSize(2)
         self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
         self.timer.deliver()
-        self.stop_proxy(self.daemon)
-        self.assertEqual(1, self.node_setup.start.call_count)
+        self.busywait(lambda: 1 == self.node_setup.start.call_count)
 
     def test_shutdown_actor_stopped_when_cloud_node_delisted(self):
         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
@@ -607,9 +606,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertEqual(
-            1, self.last_shutdown.stop.call_count)
+        self.busywait(lambda: 1 == self.last_shutdown.stop.call_count)
 
     def test_shutdown_actor_cleanup_copes_with_dead_actors(self):
         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
@@ -620,8 +617,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         # the ActorDeadError.
         self.last_shutdown.stop.side_effect = pykka.ActorDeadError
         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
-        self.stop_proxy(self.daemon)
-        self.assertEqual(1, self.last_shutdown.stop.call_count)
+        self.busywait(lambda: 1 == self.last_shutdown.stop.call_count)
 
     def test_node_create_two_sizes(self):
         small = testutil.MockSize(1)
@@ -675,7 +671,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                                         testutil.arvados_node_mock(3)],
                          want_sizes=[small, small, big],
                          avail_sizes=avail_sizes)
-
+        self.busywait(lambda: 3 == self.paired_monitor_count())
         self.daemon.update_server_wishlist([small, big, big]).get(self.TIMEOUT)
 
         self.assertEqual(0, self.node_shutdown.start.call_count)
@@ -686,10 +682,10 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         booting = self.daemon.booting.get()
         cloud_nodes = self.daemon.cloud_nodes.get()
 
-        self.stop_proxy(self.daemon)
+        self.busywait(lambda: 1 == self.node_setup.start.call_count)
+        self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
 
-        self.assertEqual(1, self.node_setup.start.call_count)
-        self.assertEqual(1, self.node_shutdown.start.call_count)
+        self.stop_proxy(self.daemon)
 
         # booting a new big node
         sizecounts = {a[0].id: 0 for a in avail_sizes}
index cfac61ba2eaf64c67cf44aeb298f2caf4d9b1f86..ef4423dafaf7762b5d8a8c95fdbaacf630156917 100644 (file)
@@ -19,8 +19,8 @@ from . import testutil
 import arvnodeman.baseactor
 
 class BogusActor(arvnodeman.baseactor.BaseNodeManagerActor):
-    def __init__(self, e):
-        super(BogusActor, self).__init__()
+    def __init__(self, e, killfunc=None):
+        super(BogusActor, self).__init__(killfunc=killfunc)
         self.exp = e
 
     def doStuff(self):
@@ -29,30 +29,35 @@ class BogusActor(arvnodeman.baseactor.BaseNodeManagerActor):
     def ping(self):
         # Called by WatchdogActorTest, this delay is longer than the test timeout
         # of 1 second, which should cause the watchdog ping to fail.
-        time.sleep(4)
+        time.sleep(2)
         return True
 
 class ActorUnhandledExceptionTest(testutil.ActorTestMixin, unittest.TestCase):
     def test_fatal_error(self):
         for e in (MemoryError(), threading.ThreadError(), OSError(errno.ENOMEM, "")):
-            with mock.patch('os.kill') as kill_mock:
-                act = BogusActor.start(e).tell_proxy()
-                act.doStuff()
-                act.actor_ref.stop(block=True)
-                self.assertTrue(kill_mock.called)
-
-    @mock.patch('os.kill')
-    def test_nonfatal_error(self, kill_mock):
-        act = BogusActor.start(OSError(errno.ENOENT, "")).tell_proxy()
+            kill_mock = mock.Mock('os.kill')
+            bgact = BogusActor.start(e, killfunc=kill_mock)
+            act_thread = bgact.proxy().get_thread().get()
+            act = bgact.tell_proxy()
+            act.doStuff()
+            act.actor_ref.stop(block=True)
+            act_thread.join()
+            self.assertTrue(kill_mock.called)
+
+    def test_nonfatal_error(self):
+        kill_mock = mock.Mock('os.kill')
+        act = BogusActor.start(OSError(errno.ENOENT, ""), killfunc=kill_mock).tell_proxy()
         act.doStuff()
         act.actor_ref.stop(block=True)
         self.assertFalse(kill_mock.called)
 
 class WatchdogActorTest(testutil.ActorTestMixin, unittest.TestCase):
-    @mock.patch('os.kill')
-    def test_time_timout(self, kill_mock):
+
+    def test_time_timout(self):
+        kill_mock = mock.Mock('os.kill')
         act = BogusActor.start(OSError(errno.ENOENT, ""))
-        watch = arvnodeman.baseactor.WatchdogActor.start(1, act)
+        watch = arvnodeman.baseactor.WatchdogActor.start(1, act, killfunc=kill_mock)
+        time.sleep(1)
         watch.stop(block=True)
         act.stop(block=True)
         self.assertTrue(kill_mock.called)
index 669b6247114c0f4843f5c2dd51eb9f9d4c00d4a9..b1d5e002767a000d7487aa82c8ee5bb9c312e320 100644 (file)
@@ -157,7 +157,6 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
     @mock.patch("subprocess.check_call")
     @mock.patch("subprocess.check_output")
     def test_unsatisfiable_jobs(self, mock_squeue, mock_scancel):
-        #mock_scancel.return_value = ""
         job_uuid = 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'
         container_uuid = 'yyyyy-dz642-yyyyyyyyyyyyyyy'
         mock_squeue.return_value = "1|1024|0|Resources|" + container_uuid + "\n"
@@ -165,6 +164,7 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
         self.build_monitor([{'items': [{'uuid': job_uuid}]}],
                            self.MockCalculatorUnsatisfiableJobs(), True, True)
         self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
+        self.monitor.ping().get(self.TIMEOUT)
         self.stop_proxy(self.monitor)
         self.client.jobs().cancel.assert_called_with(uuid=job_uuid)
         mock_scancel.assert_called_with(['scancel', '--name='+container_uuid])
index cee7fe1c335247b6aba73df9abed3362b53c402c..21a9b5ac778651c58084b31001bda8c56a9ef9ed 100644 (file)
@@ -26,27 +26,29 @@ class TimedCallBackActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
 
     def test_delayed_turnaround(self):
         receiver = mock.Mock()
-        with mock.patch('time.time', return_value=0) as mock_now:
-            deliverer = timedcallback.TimedCallBackActor.start().proxy()
-            deliverer.schedule(1, receiver, 'delayed')
-            deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
-            self.assertFalse(receiver.called)
-            mock_now.return_value = 2
-            deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
-            self.stop_proxy(deliverer)
+        mock_now = mock.Mock()
+        mock_now.return_value = 0
+        deliverer = timedcallback.TimedCallBackActor.start(timefunc=mock_now).proxy()
+        deliverer.schedule(1, receiver, 'delayed')
+        deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
+        self.assertFalse(receiver.called)
+        mock_now.return_value = 2
+        deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
+        self.stop_proxy(deliverer)
         receiver.assert_called_with('delayed')
 
     def test_out_of_order_scheduling(self):
         receiver = mock.Mock()
-        with mock.patch('time.time', return_value=1.5) as mock_now:
-            deliverer = timedcallback.TimedCallBackActor.start().proxy()
-            deliverer.schedule(2, receiver, 'second')
-            deliverer.schedule(1, receiver, 'first')
-            deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
-            receiver.assert_called_with('first')
-            mock_now.return_value = 2.5
-            deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
-            self.stop_proxy(deliverer)
+        mock_now = mock.Mock()
+        mock_now.return_value = 1.5
+        deliverer = timedcallback.TimedCallBackActor.start(timefunc=mock_now).proxy()
+        deliverer.schedule(2, receiver, 'second')
+        deliverer.schedule(1, receiver, 'first')
+        deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
+        receiver.assert_called_with('first')
+        mock_now.return_value = 2.5
+        deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
+        self.stop_proxy(deliverer)
         receiver.assert_called_with('second')
 
     def test_dead_actors_ignored(self):
@@ -61,4 +63,3 @@ class TimedCallBackActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
 
 if __name__ == '__main__':
     unittest.main()
-
index 0a483709adf5b69a8fd13647f9e9bd866fa836cf..6e134375bb8aec05bdd71f830e28f277d3cff5b5 100644 (file)
@@ -123,7 +123,10 @@ class ActorTestMixin(object):
         pykka.ActorRegistry.stop_all()
 
     def stop_proxy(self, proxy):
-        return proxy.actor_ref.stop(timeout=self.TIMEOUT)
+        th = proxy.get_thread().get()
+        t = proxy.actor_ref.stop(timeout=self.TIMEOUT)
+        th.join()
+        return t
 
     def wait_for_assignment(self, proxy, attr_name, unassigned=None,
                             timeout=TIMEOUT):
@@ -136,11 +139,13 @@ class ActorTestMixin(object):
             if result is not unassigned:
                 return result
 
-    def busywait(self, f):
+    def busywait(self, f, finalize=None):
         n = 0
-        while not f() and n < 10:
+        while not f() and n < 20:
             time.sleep(.1)
             n += 1
+        if finalize is not None:
+            finalize()
         self.assertTrue(f())
 
 
index edeb647e4628e675be696cb68f4b61892b4cc606..cfb828b2a5d84c6d16407866374e1f4900185f84 100644 (file)
@@ -248,7 +248,8 @@ func (ps *pgEventSource) DB() *sql.DB {
 }
 
 func (ps *pgEventSource) DBHealth() error {
-       ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
+       ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
+       defer cancel()
        var i int
        return ps.db.QueryRowContext(ctx, "SELECT 1").Scan(&i)
 }
index f9f7f53edc58430f231e9a52d5d95bb1a025084a..d527c39ba1c4eeb12c0cbae63526150da27f096d 100644 (file)
@@ -60,6 +60,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
        // Receive websocket frames from the client and pass them to
        // sess.Receive().
        go func() {
+               defer cancel()
                buf := make([]byte, 2<<20)
                for {
                        select {
@@ -75,16 +76,14 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                                err = errFrameTooBig
                        }
                        if err != nil {
-                               if err != io.EOF {
+                               if err != io.EOF && ctx.Err() == nil {
                                        log.WithError(err).Info("read error")
                                }
-                               cancel()
                                return
                        }
                        err = sess.Receive(buf)
                        if err != nil {
                                log.WithError(err).Error("sess.Receive() failed")
-                               cancel()
                                return
                        }
                }
@@ -94,6 +93,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
        // sess.EventMessage() as needed, and send them to the client
        // as websocket frames.
        go func() {
+               defer cancel()
                for {
                        var ok bool
                        var data interface{}
@@ -119,8 +119,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                                buf, err = sess.EventMessage(e)
                                if err != nil {
                                        log.WithError(err).Error("EventMessage failed")
-                                       cancel()
-                                       break
+                                       return
                                } else if len(buf) == 0 {
                                        log.Debug("skip")
                                        continue
@@ -135,9 +134,10 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                        t0 := time.Now()
                        _, err = ws.Write(buf)
                        if err != nil {
-                               log.WithError(err).Error("write failed")
-                               cancel()
-                               break
+                               if ctx.Err() == nil {
+                                       log.WithError(err).Error("write failed")
+                               }
+                               return
                        }
                        log.Debug("sent")
 
@@ -159,6 +159,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
        // is done/cancelled or the incoming event stream ends. Shut
        // down the handler if the outgoing queue fills up.
        go func() {
+               defer cancel()
                ticker := time.NewTicker(h.PingTimeout)
                defer ticker.Stop()
 
@@ -178,10 +179,8 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                                        default:
                                        }
                                }
-                               continue
                        case e, ok := <-incoming.Channel():
                                if !ok {
-                                       cancel()
                                        return
                                }
                                if !sess.Filter(e) {
@@ -191,7 +190,6 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                                case queue <- e:
                                default:
                                        log.WithError(errQueueFull).Error("terminate")
-                                       cancel()
                                        return
                                }
                        }
index 58c64231cb53c1204ceed70b0ea030a7050ebb95..4fbfc489cf30fe0e56425e37d909c250f83d967d 100644 (file)
@@ -205,6 +205,10 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) {
                        // client will probably reconnect and do the
                        // same thing all over again.
                        time.Sleep(100 * time.Millisecond)
+                       if sess.ws.Request().Context().Err() != nil {
+                               // Session terminated while we were sleeping
+                               return
+                       }
                }
                now := time.Now()
                e := &event{