services/keep-balance
services/login-sync
services/nodemanager
-services/nodemanager-integration
+services/nodemanager_integration
services/crunch-run
services/crunch-dispatch-local
services/crunch-dispatch-slurm
apps/workbench_units | apps/workbench_functionals | apps/workbench_integration)
suite=apps/workbench
;;
+ services/nodemanager | services/nodemanager_integration)
+ suite=services/nodemanager_suite
+ ;;
*)
suite="${1}"
;;
}
do_test services/login-sync login-sync
-test_nodemanager-integration() {
+test_nodemanager_integration() {
cd "$WORKSPACE/services/nodemanager" \
- && tests/integration_test.py ${testargs[services/nodemanager-integration]}
+ && tests/integration_test.py ${testargs[services/nodemanager_integration]}
}
-do_test services/nodemanager-integration nodemanager-integration
+do_test services/nodemanager_integration nodemanager_integration
for p in "${pythonstuff[@]}"
do
if self.arvrunner.trash_intermediate:
command.append("--trash-intermediate")
+ if self.arvrunner.project_uuid:
+ command.append("--project-uuid="+self.arvrunner.project_uuid)
+
command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
container_req["command"] = command
stubs.expect_container_request_uuid + '\n')
+ @stubs
+ def test_submit_container_project(self, stubs):
+ project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+ capture_stdout = cStringIO.StringIO()
+ try:
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug", "--project-uuid="+project_uuid,
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+ self.assertEqual(exited, 0)
+ except:
+ logging.exception("")
+
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["owner_uuid"] = project_uuid
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+ '--enable-reuse', '--on-error=continue', '--project-uuid='+project_uuid,
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+
+ stubs.api.container_requests().create.assert_called_with(
+ body=JsonDiffMatcher(expect_container))
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_container_request_uuid + '\n')
+
+
@stubs
def test_submit_job_runner_image(self, stubs):
capture_stdout = cStringIO.StringIO()
Path string `json:"path"`
Content interface{} `json:"content"`
ExcludeFromOutput bool `json:"exclude_from_output"`
- Capacity int64 `json:capacity`
+ Capacity int64 `json:"capacity"`
}
// RuntimeConstraints specify a container's compute resources (RAM,
}
func (e TransactionError) Error() (s string) {
- s = fmt.Sprintf("request failed: %s", e.URL)
+ s = fmt.Sprintf("request failed: %s", e.URL.String())
if e.Status != "" {
s = s + ": " + e.Status
}
def __init__(self, *args, **kwargs):
super(pykka.ThreadingActor, self).__init__(*args, **kwargs)
self.actor_ref = TellableActorRef(self)
+ self._killfunc = kwargs.get("killfunc", os.kill)
def on_failure(self, exception_type, exception_value, tb):
lg = getattr(self, "_logger", logging)
if (exception_type in (threading.ThreadError, MemoryError) or
exception_type is OSError and exception_value.errno == errno.ENOMEM):
lg.critical("Unhandled exception is a fatal error, killing Node Manager")
- os.kill(os.getpid(), signal.SIGKILL)
+ self._killfunc(os.getpid(), signal.SIGKILL)
def ping(self):
return True
+ def get_thread(self):
+ return threading.current_thread()
class WatchdogActor(pykka.ThreadingActor):
def __init__(self, timeout, *args, **kwargs):
self.actors = [a.proxy() for a in args]
self.actor_ref = TellableActorRef(self)
self._later = self.actor_ref.tell_proxy()
+ self._killfunc = kwargs.get("killfunc", os.kill)
def kill_self(self, e, act):
lg = getattr(self, "_logger", logging)
lg.critical("Watchdog exception", exc_info=e)
lg.critical("Actor %s watchdog ping time out, killing Node Manager", act)
- os.kill(os.getpid(), signal.SIGKILL)
+ self._killfunc(os.getpid(), signal.SIGKILL)
def on_start(self):
self._later.run()
return super(ComputeNodeShutdownActor, self)._finished()
def cancel_shutdown(self, reason, **kwargs):
+ if self.cancel_reason is not None:
+ # already cancelled
+ return
self.cancel_reason = reason
self._logger.info("Shutdown cancelled: %s.", reason)
self._finished(success_flag=False)
@_cancel_on_exception
def shutdown_node(self):
+ if self.cancel_reason is not None:
+ # already cancelled
+ return
if self.cancellable:
self._logger.info("Checking that node is still eligible for shutdown")
eligible, reason = self._monitor.shutdown_eligible().get()
if output in ("drng\n", "alloc\n", "drng*\n", "alloc*\n"):
self._timer.schedule(time.time() + 10,
self._later.await_slurm_drain)
- elif output in ("idle\n"):
+ elif output in ("idle\n",):
# Not in "drng" but idle, don't shut down
self.cancel_shutdown("slurm state is %s" % output.strip(), try_resume=False)
else:
message at a later time. This actor runs the necessary event loop for
delivery.
"""
- def __init__(self, max_sleep=1):
+ def __init__(self, max_sleep=1, timefunc=None):
super(TimedCallBackActor, self).__init__()
self._proxy = self.actor_ref.tell_proxy()
self.messages = []
self.max_sleep = max_sleep
+ if timefunc is None:
+ self._timefunc = time.time
+ else:
+ self._timefunc = timefunc
def schedule(self, delivery_time, receiver, *args, **kwargs):
if not self.messages:
def deliver(self):
if not self.messages:
return
- til_next = self.messages[0][0] - time.time()
+ til_next = self.messages[0][0] - self._timefunc()
if til_next <= 0:
t, receiver, args, kwargs = heapq.heappop(self.messages)
try:
],
install_requires=[
'apache-libcloud>=0.20',
- 'arvados-python-client>=0.1.20150206225333',
+ 'arvados-python-client>=0.1.20170731145219',
'future',
'pykka',
'python-daemon',
]
self.make_actor()
self.wait_for_assignment(self.setup_actor, 'cloud_node')
+ self.setup_actor.ping().get(self.TIMEOUT)
self.assertEqual(1, self.cloud_client.post_create_node.call_count)
def test_instance_exceeded_not_retried(self):
self.api_client.nodes().create().execute.side_effect = retry_resp
self.api_client.nodes().update().execute.side_effect = retry_resp
self.wait_for_assignment(self.setup_actor, 'cloud_node')
+ self.setup_actor.ping().get(self.TIMEOUT)
self.assertEqual(self.setup_actor.actor_ref.actor_urn,
subscriber.call_args[0][0].actor_ref.actor_urn)
self.make_mocks(shutdown_open=True, arvados_node=testutil.arvados_node_mock(crunch_worker_state="busy"))
self.cloud_client.destroy_node.return_value = True
self.make_actor(cancellable=True)
- self.check_success_flag(False)
+ self.check_success_flag(False, 2)
self.assertFalse(self.cloud_client.destroy_node.called)
def test_uncancellable_shutdown(self, *mocks):
self.make_mocks(shutdown_open=True, arvados_node=testutil.arvados_node_mock(crunch_worker_state="busy"))
self.cloud_client.destroy_node.return_value = True
self.make_actor(cancellable=False)
- self.check_success_flag(True, 2)
+ self.check_success_flag(True, 4)
self.assertTrue(self.cloud_client.destroy_node.called)
def test_arvados_node_cleaned_after_shutdown(self, *mocks):
+ if len(mocks) == 1:
+ mocks[0].return_value = "drain\n"
cloud_node = testutil.cloud_node_mock(62)
arv_node = testutil.arvados_node_mock(62)
self.make_mocks(cloud_node, arv_node)
self.assertTrue(update_mock().execute.called)
def test_arvados_node_not_cleaned_after_shutdown_cancelled(self, *mocks):
+ if len(mocks) == 1:
+ mocks[0].return_value = "idle\n"
cloud_node = testutil.cloud_node_mock(61)
arv_node = testutil.arvados_node_mock(61)
self.make_mocks(cloud_node, arv_node, shutdown_open=False)
self.cloud_client.destroy_node.return_value = False
self.make_actor(cancellable=True)
self.shutdown_actor.cancel_shutdown("test")
+ self.shutdown_actor.ping().get(self.TIMEOUT)
self.check_success_flag(False, 2)
self.assertFalse(self.arvados_client.nodes().update.called)
def test_in_state_when_no_state_available(self):
self.make_actor(arv_node=testutil.arvados_node_mock(
crunch_worker_state=None))
- print(self.node_actor.get_state().get())
self.assertTrue(self.node_state('idle'))
def test_in_state_when_no_state_available_old(self):
self.make_actor(arv_node=testutil.arvados_node_mock(
crunch_worker_state=None, age=90000))
- print(self.node_actor.get_state().get())
self.assertTrue(self.node_state('down'))
def test_in_idle_state(self):
self.timer = testutil.MockTimer(False)
self.make_actor()
self.check_success_flag(None, 0)
+ # At this point, 1st try should have happened.
+
self.timer.deliver()
self.check_success_flag(None, 0)
- self.timer.deliver()
+ # At this point, 2nd try should have happened.
+
# Order is critical here: if the mock gets called when no return value
# or side effect is set, we may invoke a real subprocess.
proc_mock.return_value = end_state
proc_mock.side_effect = None
+
+ # 3rd try
+ self.timer.deliver()
+
self.check_success_flag(True, 3)
self.check_slurm_got_args(proc_mock, 'NodeName=compute63')
self.check_success_flag(True)
self.assertFalse(proc_mock.called)
- def test_node_undrained_when_shutdown_cancelled(self, proc_mock):
+ def test_node_resumed_when_shutdown_cancelled(self, proc_mock):
try:
proc_mock.side_effect = iter(['', 'drng\n', 'drng\n', ''])
self.make_mocks(arvados_node=testutil.arvados_node_mock(job_uuid=True))
self.timer = testutil.MockTimer(False)
self.make_actor()
self.busywait(lambda: proc_mock.call_args is not None)
- self.shutdown_actor.cancel_shutdown("test").get(self.TIMEOUT)
+ self.shutdown_actor.cancel_shutdown("test")
self.check_success_flag(False, 2)
- self.assertEqual(proc_mock.call_args_list,
- [mock.call(['scontrol', 'update', 'NodeName=compute99', 'State=DRAIN', 'Reason=Node Manager shutdown']),
- mock.call(['sinfo', '--noheader', '-o', '%t', '-n', 'compute99']),
- mock.call(['sinfo', '--noheader', '-o', '%t', '-n', 'compute99']),
- mock.call(['scontrol', 'update', 'NodeName=compute99', 'State=RESUME'])])
+ self.assertEqual(proc_mock.call_args_list[0], mock.call(['scontrol', 'update', 'NodeName=compute99', 'State=DRAIN', 'Reason=Node Manager shutdown']))
+ self.assertEqual(proc_mock.call_args_list[-1], mock.call(['scontrol', 'update', 'NodeName=compute99', 'State=RESUME']))
+
finally:
self.shutdown_actor.actor_ref.stop()
proc_mock.side_effect = iter([OSError, 'drain\n', OSError, 'idle\n', 'idle\n'])
self.make_mocks(arvados_node=testutil.arvados_node_mock(job_uuid=True))
self.make_actor()
- self.check_success_flag(False, 2)
+ self.check_success_flag(False, 5)
def test_issue_slurm_drain_retry(self, proc_mock):
- proc_mock.side_effect = iter([OSError, '', OSError, 'drng\n'])
+ proc_mock.side_effect = iter([OSError, OSError, 'drng\n', 'drain\n'])
self.check_success_after_reset(proc_mock, timer=False)
def test_arvados_node_cleaned_after_shutdown(self, proc_mock):
class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
unittest.TestCase):
+
+ def busywait(self, f):
+ n = 0
+ while not f() and n < 200:
+ time.sleep(.1)
+ self.daemon.ping().get(self.TIMEOUT)
+ n += 1
+ self.assertTrue(f())
+
def mock_node_start(self, **kwargs):
# Make sure that every time the daemon starts a setup actor,
# it gets a new mock object back.
self.daemon.update_server_wishlist(want_sizes).get(self.TIMEOUT)
def monitor_list(self):
- return pykka.ActorRegistry.get_by_class(ComputeNodeMonitorActor)
+ return [c.actor.actor_ref for c in self.daemon.cloud_nodes.get(self.TIMEOUT).nodes.values() if c.actor]
- def monitored_arvados_nodes(self):
+ def monitored_arvados_nodes(self, include_unpaired=True):
pairings = []
for future in [actor.proxy().arvados_node
for actor in self.monitor_list()]:
try:
- pairings.append(future.get(self.TIMEOUT))
+ g = future.get(self.TIMEOUT)
+ if g or include_unpaired:
+ pairings.append(g)
except pykka.ActorDeadError:
pass
return pairings
def alive_monitor_count(self):
return len(self.monitored_arvados_nodes())
+ def paired_monitor_count(self):
+ return len(self.monitored_arvados_nodes(False))
+
def assertShutdownCancellable(self, expected=True):
self.assertTrue(self.node_shutdown.start.called)
self.assertIs(expected,
def test_easy_node_creation(self):
size = testutil.MockSize(1)
self.make_daemon(want_sizes=[size])
- self.stop_proxy(self.daemon)
- self.assertTrue(self.node_setup.start.called)
+ self.busywait(lambda: self.node_setup.start.called)
def check_monitors_arvados_nodes(self, *arv_nodes):
+ self.busywait(lambda: len(arv_nodes) == len(self.monitored_arvados_nodes()))
self.assertItemsEqual(arv_nodes, self.monitored_arvados_nodes())
def test_node_pairing(self):
cloud_node = testutil.cloud_node_mock(1)
arv_node = testutil.arvados_node_mock(1)
self.make_daemon([cloud_node], [arv_node])
- self.stop_proxy(self.daemon)
self.check_monitors_arvados_nodes(arv_node)
def test_node_pairing_after_arvados_update(self):
[testutil.arvados_node_mock(1, ip_address=None)])
arv_node = testutil.arvados_node_mock(2)
self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
- self.stop_proxy(self.daemon)
self.check_monitors_arvados_nodes(arv_node)
def test_arvados_node_un_and_re_paired(self):
self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
self.check_monitors_arvados_nodes(arv_node)
self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
- self.assertEqual(0, self.alive_monitor_count())
+ self.busywait(lambda: 0 == self.alive_monitor_count())
self.daemon.update_cloud_nodes([testutil.cloud_node_mock(3)])
- self.stop_proxy(self.daemon)
self.check_monitors_arvados_nodes(arv_node)
def test_old_arvados_node_not_double_assigned(self):
def test_node_count_satisfied(self):
self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1)],
want_sizes=[testutil.MockSize(1)])
- self.stop_proxy(self.daemon)
- self.assertFalse(self.node_setup.start.called)
+ self.busywait(lambda: not self.node_setup.start.called)
def test_dont_count_missing_as_busy(self):
size = testutil.MockSize(1)
2,
last_ping_at='1970-01-01T01:02:03.04050607Z')],
want_sizes=[size, size])
- self.stop_proxy(self.daemon)
- self.assertTrue(self.node_setup.start.called)
+ self.busywait(lambda: 2 == self.alive_monitor_count())
+ self.busywait(lambda: self.node_setup.start.called)
def test_missing_counts_towards_max(self):
size = testutil.MockSize(1)
testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
want_sizes=[size, size],
max_nodes=2)
- self.stop_proxy(self.daemon)
- self.assertFalse(self.node_setup.start.called)
+ self.busywait(lambda: not self.node_setup.start.called)
def test_excess_counts_missing(self):
size = testutil.MockSize(1)
arvados_nodes=[testutil.arvados_node_mock(1),
testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
want_sizes=[size])
- self.assertEqual(2, self.alive_monitor_count())
+ self.busywait(lambda: 2 == self.paired_monitor_count())
for mon_ref in self.monitor_list():
self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
self.assertEqual(1, self.node_shutdown.start.call_count)
arvados_nodes=[testutil.arvados_node_mock(1),
testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
want_sizes=[size])
-
+ self.busywait(lambda: 2 == self.paired_monitor_count())
get_cloud_node = mock.MagicMock(name="get_cloud_node")
get_cloud_node.get.return_value = cloud_nodes[1]
mock_node_monitor = mock.MagicMock()
self.daemon.cloud_nodes.get()[cloud_nodes[1].id].shutdown_actor = mock_shutdown.proxy()
- self.assertEqual(2, self.alive_monitor_count())
+ self.busywait(lambda: 2 == self.alive_monitor_count())
for mon_ref in self.monitor_list():
self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
- self.assertEqual(1, self.node_shutdown.start.call_count)
+ self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
def test_booting_nodes_counted(self):
cloud_node = testutil.cloud_node_mock(1)
self.daemon.max_nodes.get(self.TIMEOUT)
self.assertTrue(self.node_setup.start.called)
self.daemon.update_server_wishlist(server_wishlist).get(self.TIMEOUT)
- self.stop_proxy(self.daemon)
- self.assertEqual(1, self.node_setup.start.call_count)
+ self.busywait(lambda: 1 == self.node_setup.start.call_count)
def test_boot_new_node_when_all_nodes_busy(self):
size = testutil.MockSize(2)
arv_node = testutil.arvados_node_mock(2, job_uuid=True)
self.make_daemon([testutil.cloud_node_mock(2, size=size)], [arv_node],
[size], avail_sizes=[(size, {"cores":1})])
+ self.busywait(lambda: 1 == self.paired_monitor_count())
self.busywait(lambda: self.node_setup.start.called)
- self.stop_proxy(self.daemon)
- self.assertTrue(self.node_setup.start.called)
def test_boot_new_node_below_min_nodes(self):
min_size = testutil.MockSize(1)
now = time.time()
self.monitor_list()[0].tell_proxy().consider_shutdown()
self.busywait(lambda: self.node_shutdown.start.called)
- self.stop_proxy(self.daemon)
self.assertShutdownCancellable(False)
def test_booted_node_shut_down_when_never_paired(self):
self.daemon.update_cloud_nodes([cloud_node])
self.monitor_list()[0].tell_proxy().consider_shutdown()
self.busywait(lambda: self.node_shutdown.start.called)
- self.stop_proxy(self.daemon)
self.assertShutdownCancellable(False)
def test_booted_node_shut_down_when_never_working(self):
self.monitor_list()[0].proxy().cloud_node_start_time = time.time()-3601
self.daemon.update_cloud_nodes([cloud_node])
self.busywait(lambda: self.node_shutdown.start.called)
- self.stop_proxy(self.daemon)
self.assertShutdownCancellable(False)
def test_node_that_pairs_not_considered_failed_boot(self):
def test_booting_nodes_shut_down(self):
self.make_daemon(want_sizes=[testutil.MockSize(1)])
self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
- self.stop_proxy(self.daemon)
- self.assertTrue(self.last_setup.stop_if_no_cloud_node.called)
+ self.busywait(lambda: self.last_setup.stop_if_no_cloud_node.called)
def test_all_booting_nodes_tried_to_shut_down(self):
size = testutil.MockSize(2)
arv_node = testutil.arvados_node_mock(1)
size = testutil.MockSize(1)
self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], want_sizes=[size])
- self.assertEqual(1, self.alive_monitor_count())
+ self.busywait(lambda: 1 == self.paired_monitor_count())
monitor = self.monitor_list()[0].proxy()
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
self.stop_proxy(self.daemon)
cloud_node = testutil.cloud_node_mock(1)
arv_node = testutil.arvados_node_mock(1)
self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], min_nodes=1)
- self.assertEqual(1, self.alive_monitor_count())
+ self.busywait(lambda: 1 == self.paired_monitor_count())
monitor = self.monitor_list()[0].proxy()
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
self.stop_proxy(self.daemon)
def test_shutdown_accepted_below_capacity(self):
self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
- self.assertEqual(1, self.alive_monitor_count())
+ self.busywait(lambda: 1 == self.alive_monitor_count())
monitor = self.monitor_list()[0].proxy()
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
- self.stop_proxy(self.daemon)
- self.assertTrue(self.node_shutdown.start.called)
+ self.busywait(lambda: self.node_shutdown.start.called)
def test_shutdown_declined_when_idle_and_job_queued(self):
size = testutil.MockSize(1)
arv_nodes = [testutil.arvados_node_mock(3, job_uuid=True),
testutil.arvados_node_mock(4, job_uuid=None)]
self.make_daemon(cloud_nodes, arv_nodes, [size])
- self.assertEqual(2, self.alive_monitor_count())
+ self.busywait(lambda: 2 == self.paired_monitor_count())
for mon_ref in self.monitor_list():
monitor = mon_ref.proxy()
if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
self.last_shutdown.success.get.return_value = False
self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
- self.assertEqual(1, self.alive_monitor_count())
+ self.busywait(lambda: 1 == self.paired_monitor_count())
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
self.last_shutdown.success.get.return_value = True
self.last_shutdown.stop.side_effect = lambda: monitor.stop()
self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
- self.assertEqual(0, self.alive_monitor_count())
+ self.busywait(lambda: 0 == self.paired_monitor_count())
def test_nodes_shutting_down_replaced_below_max_nodes(self):
size = testutil.MockSize(6)
self.assertTrue(self.node_shutdown.start.called)
self.daemon.update_server_wishlist(
[testutil.MockSize(6)]).get(self.TIMEOUT)
- self.stop_proxy(self.daemon)
- self.assertTrue(self.node_setup.start.called)
+ self.busywait(lambda: self.node_setup.start.called)
def test_nodes_shutting_down_not_replaced_at_max_nodes(self):
cloud_node = testutil.cloud_node_mock(7)
self.make_daemon([cloud_node], [testutil.arvados_node_mock(7)],
max_nodes=1)
- self.assertEqual(1, self.alive_monitor_count())
+ self.busywait(lambda: 1 == self.paired_monitor_count())
monitor = self.monitor_list()[0].proxy()
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
self.assertTrue(self.node_shutdown.start.called)
self.daemon.update_server_wishlist(
[testutil.MockSize(7)]).get(self.TIMEOUT)
- self.stop_proxy(self.daemon)
- self.assertFalse(self.node_setup.start.called)
+ self.busywait(lambda: not self.node_setup.start.called)
def test_nodes_shutting_down_count_against_excess(self):
size = testutil.MockSize(8)
arv_nodes = [testutil.arvados_node_mock(n, size=size) for n in [8, 9]]
self.make_daemon(cloud_nodes, arv_nodes, [size],
avail_sizes=[(size, {"cores":1})])
- self.assertEqual(2, self.alive_monitor_count())
+ self.busywait(lambda: 2 == self.paired_monitor_count())
for mon_ref in self.monitor_list():
self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
self.assertEqual(1, self.node_shutdown.start.call_count)
size = testutil.MockSize(2)
self.daemon.update_server_wishlist([size] * 2).get(self.TIMEOUT)
self.timer.deliver()
- self.stop_proxy(self.daemon)
- self.assertEqual(1, self.node_setup.start.call_count)
+ self.busywait(lambda: 1 == self.node_setup.start.call_count)
def test_shutdown_actor_stopped_when_cloud_node_delisted(self):
self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
monitor = self.monitor_list()[0].proxy()
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
- self.stop_proxy(self.daemon)
- self.assertEqual(
- 1, self.last_shutdown.stop.call_count)
+ self.busywait(lambda: 1 == self.last_shutdown.stop.call_count)
def test_shutdown_actor_cleanup_copes_with_dead_actors(self):
self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
# the ActorDeadError.
self.last_shutdown.stop.side_effect = pykka.ActorDeadError
self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
- self.stop_proxy(self.daemon)
- self.assertEqual(1, self.last_shutdown.stop.call_count)
+ self.busywait(lambda: 1 == self.last_shutdown.stop.call_count)
def test_node_create_two_sizes(self):
small = testutil.MockSize(1)
testutil.arvados_node_mock(3)],
want_sizes=[small, small, big],
avail_sizes=avail_sizes)
-
+ self.busywait(lambda: 3 == self.paired_monitor_count())
self.daemon.update_server_wishlist([small, big, big]).get(self.TIMEOUT)
self.assertEqual(0, self.node_shutdown.start.call_count)
booting = self.daemon.booting.get()
cloud_nodes = self.daemon.cloud_nodes.get()
- self.stop_proxy(self.daemon)
+ self.busywait(lambda: 1 == self.node_setup.start.call_count)
+ self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
- self.assertEqual(1, self.node_setup.start.call_count)
- self.assertEqual(1, self.node_shutdown.start.call_count)
+ self.stop_proxy(self.daemon)
# booting a new big node
sizecounts = {a[0].id: 0 for a in avail_sizes}
import arvnodeman.baseactor
class BogusActor(arvnodeman.baseactor.BaseNodeManagerActor):
- def __init__(self, e):
- super(BogusActor, self).__init__()
+ def __init__(self, e, killfunc=None):
+ super(BogusActor, self).__init__(killfunc=killfunc)
self.exp = e
def doStuff(self):
def ping(self):
# Called by WatchdogActorTest, this delay is longer than the test timeout
# of 1 second, which should cause the watchdog ping to fail.
- time.sleep(4)
+ time.sleep(2)
return True
class ActorUnhandledExceptionTest(testutil.ActorTestMixin, unittest.TestCase):
def test_fatal_error(self):
for e in (MemoryError(), threading.ThreadError(), OSError(errno.ENOMEM, "")):
- with mock.patch('os.kill') as kill_mock:
- act = BogusActor.start(e).tell_proxy()
- act.doStuff()
- act.actor_ref.stop(block=True)
- self.assertTrue(kill_mock.called)
-
- @mock.patch('os.kill')
- def test_nonfatal_error(self, kill_mock):
- act = BogusActor.start(OSError(errno.ENOENT, "")).tell_proxy()
+ kill_mock = mock.Mock('os.kill')
+ bgact = BogusActor.start(e, killfunc=kill_mock)
+ act_thread = bgact.proxy().get_thread().get()
+ act = bgact.tell_proxy()
+ act.doStuff()
+ act.actor_ref.stop(block=True)
+ act_thread.join()
+ self.assertTrue(kill_mock.called)
+
+ def test_nonfatal_error(self):
+ kill_mock = mock.Mock('os.kill')
+ act = BogusActor.start(OSError(errno.ENOENT, ""), killfunc=kill_mock).tell_proxy()
act.doStuff()
act.actor_ref.stop(block=True)
self.assertFalse(kill_mock.called)
class WatchdogActorTest(testutil.ActorTestMixin, unittest.TestCase):
- @mock.patch('os.kill')
- def test_time_timout(self, kill_mock):
+
+ def test_time_timout(self):
+ kill_mock = mock.Mock('os.kill')
act = BogusActor.start(OSError(errno.ENOENT, ""))
- watch = arvnodeman.baseactor.WatchdogActor.start(1, act)
+ watch = arvnodeman.baseactor.WatchdogActor.start(1, act, killfunc=kill_mock)
+ time.sleep(1)
watch.stop(block=True)
act.stop(block=True)
self.assertTrue(kill_mock.called)
@mock.patch("subprocess.check_call")
@mock.patch("subprocess.check_output")
def test_unsatisfiable_jobs(self, mock_squeue, mock_scancel):
- #mock_scancel.return_value = ""
job_uuid = 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'
container_uuid = 'yyyyy-dz642-yyyyyyyyyyyyyyy'
mock_squeue.return_value = "1|1024|0|Resources|" + container_uuid + "\n"
self.build_monitor([{'items': [{'uuid': job_uuid}]}],
self.MockCalculatorUnsatisfiableJobs(), True, True)
self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
+ self.monitor.ping().get(self.TIMEOUT)
self.stop_proxy(self.monitor)
self.client.jobs().cancel.assert_called_with(uuid=job_uuid)
mock_scancel.assert_called_with(['scancel', '--name='+container_uuid])
def test_delayed_turnaround(self):
receiver = mock.Mock()
- with mock.patch('time.time', return_value=0) as mock_now:
- deliverer = timedcallback.TimedCallBackActor.start().proxy()
- deliverer.schedule(1, receiver, 'delayed')
- deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
- self.assertFalse(receiver.called)
- mock_now.return_value = 2
- deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
- self.stop_proxy(deliverer)
+ mock_now = mock.Mock()
+ mock_now.return_value = 0
+ deliverer = timedcallback.TimedCallBackActor.start(timefunc=mock_now).proxy()
+ deliverer.schedule(1, receiver, 'delayed')
+ deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
+ self.assertFalse(receiver.called)
+ mock_now.return_value = 2
+ deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
+ self.stop_proxy(deliverer)
receiver.assert_called_with('delayed')
def test_out_of_order_scheduling(self):
receiver = mock.Mock()
- with mock.patch('time.time', return_value=1.5) as mock_now:
- deliverer = timedcallback.TimedCallBackActor.start().proxy()
- deliverer.schedule(2, receiver, 'second')
- deliverer.schedule(1, receiver, 'first')
- deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
- receiver.assert_called_with('first')
- mock_now.return_value = 2.5
- deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
- self.stop_proxy(deliverer)
+ mock_now = mock.Mock()
+ mock_now.return_value = 1.5
+ deliverer = timedcallback.TimedCallBackActor.start(timefunc=mock_now).proxy()
+ deliverer.schedule(2, receiver, 'second')
+ deliverer.schedule(1, receiver, 'first')
+ deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
+ receiver.assert_called_with('first')
+ mock_now.return_value = 2.5
+ deliverer.schedule(3, receiver, 'failure').get(self.TIMEOUT)
+ self.stop_proxy(deliverer)
receiver.assert_called_with('second')
def test_dead_actors_ignored(self):
if __name__ == '__main__':
unittest.main()
-
pykka.ActorRegistry.stop_all()
def stop_proxy(self, proxy):
- return proxy.actor_ref.stop(timeout=self.TIMEOUT)
+ th = proxy.get_thread().get()
+ t = proxy.actor_ref.stop(timeout=self.TIMEOUT)
+ th.join()
+ return t
def wait_for_assignment(self, proxy, attr_name, unassigned=None,
timeout=TIMEOUT):
if result is not unassigned:
return result
- def busywait(self, f):
+ def busywait(self, f, finalize=None):
n = 0
- while not f() and n < 10:
+ while not f() and n < 20:
time.sleep(.1)
n += 1
+ if finalize is not None:
+ finalize()
self.assertTrue(f())
}
func (ps *pgEventSource) DBHealth() error {
- ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
+ defer cancel()
var i int
return ps.db.QueryRowContext(ctx, "SELECT 1").Scan(&i)
}
// Receive websocket frames from the client and pass them to
// sess.Receive().
go func() {
+ defer cancel()
buf := make([]byte, 2<<20)
for {
select {
err = errFrameTooBig
}
if err != nil {
- if err != io.EOF {
+ if err != io.EOF && ctx.Err() == nil {
log.WithError(err).Info("read error")
}
- cancel()
return
}
err = sess.Receive(buf)
if err != nil {
log.WithError(err).Error("sess.Receive() failed")
- cancel()
return
}
}
// sess.EventMessage() as needed, and send them to the client
// as websocket frames.
go func() {
+ defer cancel()
for {
var ok bool
var data interface{}
buf, err = sess.EventMessage(e)
if err != nil {
log.WithError(err).Error("EventMessage failed")
- cancel()
- break
+ return
} else if len(buf) == 0 {
log.Debug("skip")
continue
t0 := time.Now()
_, err = ws.Write(buf)
if err != nil {
- log.WithError(err).Error("write failed")
- cancel()
- break
+ if ctx.Err() == nil {
+ log.WithError(err).Error("write failed")
+ }
+ return
}
log.Debug("sent")
// is done/cancelled or the incoming event stream ends. Shut
// down the handler if the outgoing queue fills up.
go func() {
+ defer cancel()
ticker := time.NewTicker(h.PingTimeout)
defer ticker.Stop()
default:
}
}
- continue
case e, ok := <-incoming.Channel():
if !ok {
- cancel()
return
}
if !sess.Filter(e) {
case queue <- e:
default:
log.WithError(errQueueFull).Error("terminate")
- cancel()
return
}
}
// client will probably reconnect and do the
// same thing all over again.
time.Sleep(100 * time.Millisecond)
+ if sess.ws.Request().Context().Err() != nil {
+ // Session terminated while we were sleeping
+ return
+ }
}
now := time.Now()
e := &event{