From: Lucas Di Pentima Date: Mon, 10 Jul 2017 13:22:44 +0000 (-0300) Subject: 7475: Merge branch 'master' into 7475-nodemgr-unsatisfiable-job-comms X-Git-Tag: 1.1.0~104^2~3 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/17cd77ac947e2c8f4ca51aa930ffc235051d7f72?hp=13d40db2d01ab306e171d2b5c540e51796bdfa44 7475: Merge branch 'master' into 7475-nodemgr-unsatisfiable-job-comms Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima --- diff --git a/sdk/python/arvados/util.py b/sdk/python/arvados/util.py index 97e1d26d2b..1a97358605 100644 --- a/sdk/python/arvados/util.py +++ b/sdk/python/arvados/util.py @@ -24,6 +24,8 @@ collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}') group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}') user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}') link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}') +job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}') +container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}') manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE) def clear_tmpdir(path=None): diff --git a/services/nodemanager/arvnodeman/jobqueue.py b/services/nodemanager/arvnodeman/jobqueue.py index ca914e1096..e60967a667 100644 --- a/services/nodemanager/arvnodeman/jobqueue.py +++ b/services/nodemanager/arvnodeman/jobqueue.py @@ -8,9 +8,12 @@ from __future__ import absolute_import, print_function import logging import subprocess +import arvados.util + from . import clientactor from .config import ARVADOS_ERRORS + class ServerCalculator(object): """Generate cloud server wishlists from an Arvados job queue. @@ -58,7 +61,6 @@ class ServerCalculator(object): self.max_nodes = max_nodes or float('inf') self.max_price = max_price or float('inf') self.logger = logging.getLogger('arvnodeman.jobqueue') - self.logged_jobs = set() self.logger.info("Using cloud node sizes:") for s in self.cloud_sizes: @@ -83,20 +85,26 @@ class ServerCalculator(object): def servers_for_queue(self, queue): servers = [] - seen_jobs = set() + unsatisfiable_jobs = {} for job in queue: - seen_jobs.add(job['uuid']) constraints = job['runtime_constraints'] want_count = max(1, self.coerce_int(constraints.get('min_nodes'), 1)) cloud_size = self.cloud_size_for_constraints(constraints) if cloud_size is None: - if job['uuid'] not in self.logged_jobs: - self.logged_jobs.add(job['uuid']) - self.logger.debug("job %s not satisfiable", job['uuid']) - elif (want_count <= self.max_nodes) and (want_count*cloud_size.price <= self.max_price): + unsatisfiable_jobs[job['uuid']] = ( + 'Requirements for a single node exceed the available ' + 'cloud node size') + elif (want_count > self.max_nodes): + unsatisfiable_jobs[job['uuid']] = ( + "Job's min_nodes constraint is greater than the configured " + "max_nodes (%d)" % self.max_nodes) + elif (want_count*cloud_size.price <= self.max_price): servers.extend([cloud_size.real] * want_count) - self.logged_jobs.intersection_update(seen_jobs) - return servers + else: + unsatisfiable_jobs[job['uuid']] = ( + "Job's price (%d) is above system's max_price " + "limit (%d)" % (want_count*cloud_size.price, self.max_price)) + return (servers, unsatisfiable_jobs) def cheapest_size(self): return self.cloud_sizes[0] @@ -107,6 +115,7 @@ class ServerCalculator(object): return s return None + class JobQueueMonitorActor(clientactor.RemotePollLoopActor): """Actor to generate server wishlists from the job queue. @@ -165,7 +174,28 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor): return queuelist def _got_response(self, queue): - server_list = self._calculator.servers_for_queue(queue) + server_list, unsatisfiable_jobs = self._calculator.servers_for_queue(queue) + # Cancel any job/container with unsatisfiable requirements, emitting + # a log explaining why. + for job_uuid, reason in unsatisfiable_jobs.iteritems(): + self._logger.debug("Cancelling unsatisfiable job '%s'", job_uuid) + try: + self._client.logs().create(body={ + 'object_uuid': job_uuid, + 'event_type': 'stderr', + 'properties': {'text': reason}, + }).execute() + # Cancel the job depending on it type + if arvados.util.container_uuid_pattern.match(job_uuid): + subprocess.check_call(['scancel', '--name='+job_uuid]) + elif arvados.util.job_uuid_pattern.match(job_uuid): + self._client.jobs().cancel(uuid=job_uuid).execute() + else: + raise Exception('Unknown job type') + except Exception as error: + self._logger.error("Trying to cancel job '%s': %s", + job_uuid, + error) self._logger.debug("Calculated wishlist: %s", ', '.join(s.name for s in server_list) or "(empty)") return super(JobQueueMonitorActor, self)._got_response(server_list) diff --git a/services/nodemanager/tests/test_jobqueue.py b/services/nodemanager/tests/test_jobqueue.py index 8aa0835aca..669b624711 100644 --- a/services/nodemanager/tests/test_jobqueue.py +++ b/services/nodemanager/tests/test_jobqueue.py @@ -24,63 +24,69 @@ class ServerCalculatorTestCase(unittest.TestCase): def test_empty_queue_needs_no_servers(self): servcalc = self.make_calculator([1]) - self.assertEqual([], servcalc.servers_for_queue([])) + self.assertEqual(([], {}), servcalc.servers_for_queue([])) def test_easy_server_count(self): servcalc = self.make_calculator([1]) - servlist = self.calculate(servcalc, {'min_nodes': 3}) + servlist, _ = self.calculate(servcalc, {'min_nodes': 3}) self.assertEqual(3, len(servlist)) def test_default_5pct_ram_value_decrease(self): servcalc = self.make_calculator([1]) - servlist = self.calculate(servcalc, {'min_ram_mb_per_node': 128}) + servlist, _ = self.calculate(servcalc, {'min_ram_mb_per_node': 128}) self.assertEqual(0, len(servlist)) - servlist = self.calculate(servcalc, {'min_ram_mb_per_node': 121}) + servlist, _ = self.calculate(servcalc, {'min_ram_mb_per_node': 121}) self.assertEqual(1, len(servlist)) def test_custom_node_mem_scaling_factor(self): # Simulate a custom 'node_mem_scaling' config parameter by passing # the value to ServerCalculator servcalc = self.make_calculator([1], node_mem_scaling=0.5) - servlist = self.calculate(servcalc, {'min_ram_mb_per_node': 128}) + servlist, _ = self.calculate(servcalc, {'min_ram_mb_per_node': 128}) self.assertEqual(0, len(servlist)) - servlist = self.calculate(servcalc, {'min_ram_mb_per_node': 64}) + servlist, _ = self.calculate(servcalc, {'min_ram_mb_per_node': 64}) self.assertEqual(1, len(servlist)) def test_implicit_server_count(self): servcalc = self.make_calculator([1]) - servlist = self.calculate(servcalc, {}, {'min_nodes': 3}) + servlist, _ = self.calculate(servcalc, {}, {'min_nodes': 3}) self.assertEqual(4, len(servlist)) def test_bad_min_nodes_override(self): servcalc = self.make_calculator([1]) - servlist = self.calculate(servcalc, - {'min_nodes': -2}, {'min_nodes': 'foo'}) + servlist, _ = self.calculate(servcalc, + {'min_nodes': -2}, {'min_nodes': 'foo'}) self.assertEqual(2, len(servlist)) - def test_ignore_unsatisfiable_jobs(self): + def test_ignore_and_return_unsatisfiable_jobs(self): servcalc = self.make_calculator([1], max_nodes=9) - servlist = self.calculate(servcalc, - {'min_cores_per_node': 2}, - {'min_ram_mb_per_node': 256}, - {'min_nodes': 6}, - {'min_nodes': 12}, - {'min_scratch_mb_per_node': 300000}) + servlist, u_jobs = self.calculate(servcalc, + {'min_cores_per_node': 2}, + {'min_ram_mb_per_node': 256}, + {'min_nodes': 6}, + {'min_nodes': 12}, + {'min_scratch_mb_per_node': 300000}) self.assertEqual(6, len(servlist)) + # Only unsatisfiable jobs are returned on u_jobs + self.assertIn('zzzzz-jjjjj-000000000000000', u_jobs.keys()) + self.assertIn('zzzzz-jjjjj-000000000000001', u_jobs.keys()) + self.assertNotIn('zzzzz-jjjjj-000000000000002', u_jobs.keys()) + self.assertIn('zzzzz-jjjjj-000000000000003', u_jobs.keys()) + self.assertIn('zzzzz-jjjjj-000000000000004', u_jobs.keys()) def test_ignore_too_expensive_jobs(self): servcalc = self.make_calculator([1, 2], max_nodes=12, max_price=6) - servlist = self.calculate(servcalc, - {'min_cores_per_node': 1, 'min_nodes': 6}) + servlist, _ = self.calculate(servcalc, + {'min_cores_per_node': 1, 'min_nodes': 6}) self.assertEqual(6, len(servlist)) - servlist = self.calculate(servcalc, - {'min_cores_per_node': 2, 'min_nodes': 6}) + servlist, _ = self.calculate(servcalc, + {'min_cores_per_node': 2, 'min_nodes': 6}) self.assertEqual(0, len(servlist)) def test_job_requesting_max_nodes_accepted(self): servcalc = self.make_calculator([1], max_nodes=4) - servlist = self.calculate(servcalc, {'min_nodes': 4}) + servlist, _ = self.calculate(servcalc, {'min_nodes': 4}) self.assertEqual(4, len(servlist)) def test_cheapest_size(self): @@ -89,37 +95,37 @@ class ServerCalculatorTestCase(unittest.TestCase): def test_next_biggest(self): servcalc = self.make_calculator([1, 2, 4, 8]) - servlist = self.calculate(servcalc, - {'min_cores_per_node': 3}, - {'min_cores_per_node': 6}) + servlist, _ = self.calculate(servcalc, + {'min_cores_per_node': 3}, + {'min_cores_per_node': 6}) self.assertEqual([servcalc.cloud_sizes[2].id, servcalc.cloud_sizes[3].id], [s.id for s in servlist]) def test_multiple_sizes(self): servcalc = self.make_calculator([1, 2]) - servlist = self.calculate(servcalc, - {'min_cores_per_node': 2}, - {'min_cores_per_node': 1}, - {'min_cores_per_node': 1}) + servlist, _ = self.calculate(servcalc, + {'min_cores_per_node': 2}, + {'min_cores_per_node': 1}, + {'min_cores_per_node': 1}) self.assertEqual([servcalc.cloud_sizes[1].id, servcalc.cloud_sizes[0].id, servcalc.cloud_sizes[0].id], [s.id for s in servlist]) - servlist = self.calculate(servcalc, - {'min_cores_per_node': 1}, - {'min_cores_per_node': 2}, - {'min_cores_per_node': 1}) + servlist, _ = self.calculate(servcalc, + {'min_cores_per_node': 1}, + {'min_cores_per_node': 2}, + {'min_cores_per_node': 1}) self.assertEqual([servcalc.cloud_sizes[0].id, servcalc.cloud_sizes[1].id, servcalc.cloud_sizes[0].id], [s.id for s in servlist]) - servlist = self.calculate(servcalc, - {'min_cores_per_node': 1}, - {'min_cores_per_node': 1}, - {'min_cores_per_node': 2}) + servlist, _ = self.calculate(servcalc, + {'min_cores_per_node': 1}, + {'min_cores_per_node': 1}, + {'min_cores_per_node': 2}) self.assertEqual([servcalc.cloud_sizes[0].id, servcalc.cloud_sizes[0].id, servcalc.cloud_sizes[1].id], @@ -131,16 +137,38 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin, unittest.TestCase): TEST_CLASS = jobqueue.JobQueueMonitorActor + class MockCalculator(object): @staticmethod def servers_for_queue(queue): - return [testutil.MockSize(n) for n in queue] + return ([testutil.MockSize(n) for n in queue], {}) + + + class MockCalculatorUnsatisfiableJobs(object): + @staticmethod + def servers_for_queue(queue): + return ([], {k["uuid"]: "Unsatisfiable job mock" for k in queue}) def build_monitor(self, side_effect, *args, **kwargs): super(JobQueueMonitorActorTestCase, self).build_monitor(*args, **kwargs) self.client.jobs().queue().execute.side_effect = side_effect + @mock.patch("subprocess.check_call") + @mock.patch("subprocess.check_output") + def test_unsatisfiable_jobs(self, mock_squeue, mock_scancel): + #mock_scancel.return_value = "" + job_uuid = 'zzzzz-8i9sb-zzzzzzzzzzzzzzz' + container_uuid = 'yyyyy-dz642-yyyyyyyyyyyyyyy' + mock_squeue.return_value = "1|1024|0|Resources|" + container_uuid + "\n" + + self.build_monitor([{'items': [{'uuid': job_uuid}]}], + self.MockCalculatorUnsatisfiableJobs(), True, True) + self.monitor.subscribe(self.subscriber).get(self.TIMEOUT) + self.stop_proxy(self.monitor) + self.client.jobs().cancel.assert_called_with(uuid=job_uuid) + mock_scancel.assert_called_with(['scancel', '--name='+container_uuid]) + @mock.patch("subprocess.check_output") def test_subscribers_get_server_lists(self, mock_squeue): mock_squeue.return_value = ""