group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
+job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
+container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
def clear_tmpdir(path=None):
import logging
import subprocess
+import arvados.util
+
from . import clientactor
from .config import ARVADOS_ERRORS
def _got_response(self, queue):
server_list, unsatisfiable_jobs = self._calculator.servers_for_queue(queue)
- # Cancel any job with unsatisfiable requirements, emitting a log
- # explaining why.
+ # Cancel any job/container with unsatisfiable requirements, emitting
+ # a log explaining why.
for job_uuid, reason in unsatisfiable_jobs.iteritems():
self._logger.debug("Cancelling unsatisfiable job '%s'", job_uuid)
try:
'event_type': 'stderr',
'properties': {'text': reason},
}).execute()
- self._client.jobs().cancel(uuid=job_uuid).execute()
+ # Cancel the job depending on it type
+ if arvados.util.container_uuid_pattern.match(job_uuid):
+ subprocess.check_call(['scancel', '--name='+job_uuid])
+ elif arvados.util.job_uuid_pattern.match(job_uuid):
+ self._client.jobs().cancel(uuid=job_uuid).execute()
+ else:
+ raise Exception('Unknown job type')
except Exception as error:
self._logger.error("Trying to cancel job '%s': %s",
job_uuid,
class MockCalculatorUnsatisfiableJobs(object):
@staticmethod
def servers_for_queue(queue):
- return ([], {k: "Unsatisfiable job mock" for k in queue})
+ return ([], {k["uuid"]: "Unsatisfiable job mock" for k in queue})
def build_monitor(self, side_effect, *args, **kwargs):
super(JobQueueMonitorActorTestCase, self).build_monitor(*args, **kwargs)
self.client.jobs().queue().execute.side_effect = side_effect
+ @mock.patch("subprocess.check_call")
@mock.patch("subprocess.check_output")
- def test_unsatisfiable_jobs(self, mock_squeue):
- mock_squeue.return_value = ""
+ def test_unsatisfiable_jobs(self, mock_squeue, mock_scancel):
+ #mock_scancel.return_value = ""
+ job_uuid = 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'
+ container_uuid = 'yyyyy-dz642-yyyyyyyyyyyyyyy'
+ mock_squeue.return_value = "1|1024|0|Resources|" + container_uuid + "\n"
- self.build_monitor([{'items': ['job1']}],
+ self.build_monitor([{'items': [{'uuid': job_uuid}]}],
self.MockCalculatorUnsatisfiableJobs(), True, True)
self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
self.stop_proxy(self.monitor)
- self.client.jobs().cancel.assert_called_with(uuid='job1')
+ self.client.jobs().cancel.assert_called_with(uuid=job_uuid)
+ mock_scancel.assert_called_with(['scancel', '--name='+container_uuid])
@mock.patch("subprocess.check_output")
def test_subscribers_get_server_lists(self, mock_squeue):