From cc9aed221c506cfc28f1b6ca37675dba6543d824 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Wed, 12 Oct 2016 16:58:25 -0400 Subject: [PATCH] 10259: When using --submit with jobs API, create RunningOnServer pipeline instance. This is more similar to the way it works when you run a workflow from workbench, and ensures that the pipeline will be marked completed even if the arvados-cwl-runner client goes away. --- sdk/cwl/arvados_cwl/arvjob.py | 37 ++++++++++++----------- sdk/cwl/tests/test_submit.py | 57 +++++++++++++++++++++++++++-------- 2 files changed, 63 insertions(+), 31 deletions(-) diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py index 8b1a934683..8b6642f1d0 100644 --- a/sdk/cwl/arvados_cwl/arvjob.py +++ b/sdk/cwl/arvados_cwl/arvjob.py @@ -2,6 +2,7 @@ import logging import re import copy import json +import time from cwltool.process import get_feature, shortname from cwltool.errors import WorkflowException @@ -248,27 +249,27 @@ class RunnerJob(Runner): def run(self, *args, **kwargs): job_spec = self.arvados_job_spec(*args, **kwargs) - job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid) - response = self.arvrunner.api.jobs().create( - body=job_spec, - find_or_create=self.enable_reuse - ).execute(num_retries=self.arvrunner.num_retries) - - self.uuid = response["uuid"] + self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().create( + body={ + "owner_uuid": self.arvrunner.project_uuid, + "name": shortname(self.tool.tool["id"]), + "components": {"cwl-runner": job_spec }, + "state": "RunningOnServer"}).execute(num_retries=self.arvrunner.num_retries) + logger.info("Created pipeline %s", self.arvrunner.pipeline["uuid"]) + + job = None + while not job: + time.sleep(2) + self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().get( + uuid=self.arvrunner.pipeline["uuid"]).execute( + num_retries=self.arvrunner.num_retries) + job = self.arvrunner.pipeline["components"]["cwl-runner"].get("job") + + self.uuid = job["uuid"] self.arvrunner.processes[self.uuid] = self - logger.info("Submitted job %s", response["uuid"]) - - if kwargs.get("submit"): - self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().create( - body={ - "owner_uuid": self.arvrunner.project_uuid, - "name": shortname(self.tool.tool["id"]), - "components": {"cwl-runner": {"job": {"uuid": self.uuid, "state": response["state"]} } }, - "state": "RunningOnClient"}).execute(num_retries=self.arvrunner.num_retries) - - if response["state"] in ("Complete", "Failed", "Cancelled"): + if job["state"] in ("Complete", "Failed", "Cancelled"): self.done(response) diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py index 6674efb8c4..f5fe2c0e00 100644 --- a/sdk/cwl/tests/test_submit.py +++ b/sdk/cwl/tests/test_submit.py @@ -112,6 +112,37 @@ def stubs(func): 'script_version': 'master', 'script': 'cwl-runner' } + stubs.pipeline_component = stubs.expect_job_spec.copy() + stubs.expect_pipeline_instance = { + 'name': 'submit_wf.cwl', + 'state': 'RunningOnServer', + "components": { + "cwl-runner": { + 'runtime_constraints': {'docker_image': 'arvados/jobs'}, + 'script_parameters': { + 'y': {'basename': '99999999999999999999999999999998+99', 'location': 'keep:99999999999999999999999999999998+99', 'class': 'Directory'}, + 'x': {'basename': 'blorp.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999994+99/blorp.txt'}, + 'z': {'basename': 'anonymous', 'class': 'Directory', + 'listing': [ + {'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'} + ]}, + 'cwl:tool': '99999999999999999999999999999991+99/wf/submit_wf.cwl' + }, + 'repository': 'arvados', + 'script_version': 'master', + 'script': 'cwl-runner' + } + } + } + stubs.pipeline_create = copy.deepcopy(stubs.expect_pipeline_instance) + stubs.pipeline_create["uuid"] = "zzzzz-d1hrv-zzzzzzzzzzzzzzz" + stubs.pipeline_with_job = copy.deepcopy(stubs.pipeline_create) + stubs.pipeline_with_job["components"]["cwl-runner"]["job"] = { + "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz", + "state": "Queued" + } + stubs.api.pipeline_instances().create().execute.return_value = stubs.pipeline_create + stubs.api.pipeline_instances().get().execute.return_value = stubs.pipeline_with_job stubs.expect_container_spec = { 'priority': 1, @@ -157,11 +188,12 @@ def stubs(func): class TestSubmit(unittest.TestCase): + @mock.patch("time.sleep") @stubs - def test_submit(self, stubs): + def test_submit(self, stubs, tm): capture_stdout = cStringIO.StringIO() exited = arvados_cwl.main( - ["--submit", "--no-wait", + ["--submit", "--no-wait", "--debug", "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"], capture_stdout, sys.stderr, api_client=stubs.api) self.assertEqual(exited, 0) @@ -192,16 +224,16 @@ class TestSubmit(unittest.TestCase): }, ensure_unique_name=True), mock.call().execute()]) - expect_job = copy.deepcopy(stubs.expect_job_spec) - expect_job["owner_uuid"] = stubs.fake_user_uuid - stubs.api.jobs().create.assert_called_with( - body=expect_job, - find_or_create=True) + expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance) + expect_pipeline["owner_uuid"] = stubs.fake_user_uuid + stubs.api.pipeline_instances().create.assert_called_with( + body=expect_pipeline) self.assertEqual(capture_stdout.getvalue(), stubs.expect_job_uuid + '\n') + @mock.patch("time.sleep") @stubs - def test_submit_with_project_uuid(self, stubs): + def test_submit_with_project_uuid(self, stubs, tm): project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz' exited = arvados_cwl.main( @@ -211,11 +243,10 @@ class TestSubmit(unittest.TestCase): sys.stdout, sys.stderr, api_client=stubs.api) self.assertEqual(exited, 0) - expect_body = copy.deepcopy(stubs.expect_job_spec) - expect_body["owner_uuid"] = project_uuid - stubs.api.jobs().create.assert_called_with( - body=expect_body, - find_or_create=True) + expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance) + expect_pipeline["owner_uuid"] = project_uuid + stubs.api.pipeline_instances().create.assert_called_with( + body=expect_pipeline) @stubs def test_submit_container(self, stubs): -- 2.30.2