7475: Check for job unsatisfiable type (job/container) and cancel it using
authorLucas Di Pentima <lucas@curoverse.com>
Thu, 6 Jul 2017 21:10:07 +0000 (18:10 -0300)
committerLucas Di Pentima <lucas@curoverse.com>
Thu, 6 Jul 2017 21:10:07 +0000 (18:10 -0300)
the proper method.
Updated test to check for both cases.

Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <lucas@curoverse.com>

sdk/python/arvados/util.py
services/nodemanager/arvnodeman/jobqueue.py
services/nodemanager/tests/test_jobqueue.py

index 97e1d26d2ba2e1cf7ad503ab80b2974676c87cf8..1a973586051769e816103553e22326839a0c3670 100644 (file)
@@ -24,6 +24,8 @@ collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
+job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
+container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
 
 def clear_tmpdir(path=None):
index 895e03d5ba5ddf6b8c2762e92120e86ccdd904e6..e60967a667cada135429132024e612053749d8bb 100644 (file)
@@ -8,6 +8,8 @@ from __future__ import absolute_import, print_function
 import logging
 import subprocess
 
+import arvados.util
+
 from . import clientactor
 from .config import ARVADOS_ERRORS
 
@@ -173,8 +175,8 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
 
     def _got_response(self, queue):
         server_list, unsatisfiable_jobs = self._calculator.servers_for_queue(queue)
-        # Cancel any job with unsatisfiable requirements, emitting a log
-        # explaining why.
+        # Cancel any job/container with unsatisfiable requirements, emitting
+        # a log explaining why.
         for job_uuid, reason in unsatisfiable_jobs.iteritems():
             self._logger.debug("Cancelling unsatisfiable job '%s'", job_uuid)
             try:
@@ -183,7 +185,13 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
                     'event_type': 'stderr',
                     'properties': {'text': reason},
                 }).execute()
-                self._client.jobs().cancel(uuid=job_uuid).execute()
+                # Cancel the job depending on it type
+                if arvados.util.container_uuid_pattern.match(job_uuid):
+                    subprocess.check_call(['scancel', '--name='+job_uuid])
+                elif arvados.util.job_uuid_pattern.match(job_uuid):
+                    self._client.jobs().cancel(uuid=job_uuid).execute()
+                else:
+                    raise Exception('Unknown job type')
             except Exception as error:
                 self._logger.error("Trying to cancel job '%s': %s",
                                    job_uuid,
index ab2258dbfbd3adfbf62ec0739d47f9fb34c34645..669b6247114c0f4843f5c2dd51eb9f9d4c00d4a9 100644 (file)
@@ -147,22 +147,27 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
     class MockCalculatorUnsatisfiableJobs(object):
         @staticmethod
         def servers_for_queue(queue):
-            return ([], {k: "Unsatisfiable job mock" for k in queue})
+            return ([], {k["uuid"]: "Unsatisfiable job mock" for k in queue})
 
 
     def build_monitor(self, side_effect, *args, **kwargs):
         super(JobQueueMonitorActorTestCase, self).build_monitor(*args, **kwargs)
         self.client.jobs().queue().execute.side_effect = side_effect
 
+    @mock.patch("subprocess.check_call")
     @mock.patch("subprocess.check_output")
-    def test_unsatisfiable_jobs(self, mock_squeue):
-        mock_squeue.return_value = ""
+    def test_unsatisfiable_jobs(self, mock_squeue, mock_scancel):
+        #mock_scancel.return_value = ""
+        job_uuid = 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'
+        container_uuid = 'yyyyy-dz642-yyyyyyyyyyyyyyy'
+        mock_squeue.return_value = "1|1024|0|Resources|" + container_uuid + "\n"
 
-        self.build_monitor([{'items': ['job1']}],
+        self.build_monitor([{'items': [{'uuid': job_uuid}]}],
                            self.MockCalculatorUnsatisfiableJobs(), True, True)
         self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
         self.stop_proxy(self.monitor)
-        self.client.jobs().cancel.assert_called_with(uuid='job1')
+        self.client.jobs().cancel.assert_called_with(uuid=job_uuid)
+        mock_scancel.assert_called_with(['scancel', '--name='+container_uuid])
 
     @mock.patch("subprocess.check_output")
     def test_subscribers_get_server_lists(self, mock_squeue):