From: Peter Amstutz Date: Wed, 23 Mar 2016 18:00:21 +0000 (-0400) Subject: Merge branch 'master' into 8766-cwl-collection-project X-Git-Tag: 1.1.0~1036^2 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/a3d43ada117ee7e0535579fc3f1b66503f95b4c7?hp=4447bfa5edac893079894484244318a872a0d3ba Merge branch 'master' into 8766-cwl-collection-project Conflicts: sdk/cwl/tests/test_job.py --- diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index bb884c5b3d..ab0e48debc 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -213,7 +213,15 @@ class ArvadosJob(object): tmpdir = None outdir = None keepdir = None - for l in log.readlines(): + for l in log: + # Determine the tmpdir, outdir and keepdir paths from + # the job run. Unfortunately, we can't take the first + # values we find (which are expected to be near the + # top) and stop scanning because if the node fails and + # the job restarts on a different node these values + # will different runs, and we need to know about the + # final run that actually produced output. + g = tmpdirre.match(l) if g: tmpdir = g.group(1) @@ -224,13 +232,39 @@ class ArvadosJob(object): if g: keepdir = g.group(1) - # It turns out if the job fails and restarts it can - # come up on a different compute node, so we have to - # read the log to the end to be sure instead of taking the - # easy way out. - # - #if tmpdir and outdir and keepdir: - # break + colname = "Output %s of %s" % (record["output"][0:7], self.name) + + # check if collection already exists with same owner, name and content + collection_exists = self.arvrunner.api.collections().list( + filters=[["owner_uuid", "=", self.arvrunner.project_uuid], + ['portable_data_hash', '=', record["output"]], + ["name", "=", colname]] + ).execute(num_retries=self.arvrunner.num_retries) + + if not collection_exists["items"]: + # Create a collection located in the same project as the + # pipeline with the contents of the output. + # First, get output record. + collections = self.arvrunner.api.collections().list( + limit=1, + filters=[['portable_data_hash', '=', record["output"]]], + select=["manifest_text"] + ).execute(num_retries=self.arvrunner.num_retries) + + if not collections["items"]: + raise WorkflowException( + "Job output '%s' cannot be found on API server" % ( + record["output"])) + + # Create new collection in the parent project + # with the output contents. + self.arvrunner.api.collections().create(body={ + "owner_uuid": self.arvrunner.project_uuid, + "name": colname, + "portable_data_hash": record["output"], + "manifest_text": collections["items"][0]["manifest_text"] + }, ensure_unique_name=True).execute( + num_retries=self.arvrunner.num_retries) self.builder.outdir = outdir self.builder.pathmapper.keepdir = keepdir diff --git a/sdk/cwl/tests/test_job.py b/sdk/cwl/tests/test_job.py index 5731d088cb..4351b1abfe 100644 --- a/sdk/cwl/tests/test_job.py +++ b/sdk/cwl/tests/test_job.py @@ -26,7 +26,7 @@ class TestJob(unittest.TestCase): 'task.env': {'TMPDIR': '$(task.tmpdir)'}, 'command': ['ls'] }], - 'crunchrunner': 'ff6fc71e593081ef9733afacaeee15ea+140/crunchrunner' + 'crunchrunner': arvados_cwl.crunchrunner_pdh + '/crunchrunner' }, 'script_version': 'master', 'minimum_script_version': '9e5b98e8f5f4727856b53447191f9c06e3da2ba6', @@ -67,7 +67,11 @@ class TestJob(unittest.TestCase): 'task.env': {'TMPDIR': '$(task.tmpdir)'}, 'command': ['ls'] }], +<<<<<<< HEAD + 'crunchrunner': arvados_cwl.crunchrunner_pdh + '/crunchrunner' +======= 'crunchrunner': 'ff6fc71e593081ef9733afacaeee15ea+140/crunchrunner' +>>>>>>> master }, 'script_version': 'master', 'minimum_script_version': '9e5b98e8f5f4727856b53447191f9c06e3da2ba6', @@ -79,3 +83,80 @@ class TestJob(unittest.TestCase): 'min_scratch_mb_per_node': 5024 # tmpdirSize + outdirSize } }, find_or_create=True) + + @mock.patch("arvados.collection.Collection") + def test_done(self, col): + api = mock.MagicMock() + + runner = mock.MagicMock() + runner.api = api + runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz" + runner.num_retries = 0 + + col().open.return_value = [] + api.collections().list().execute.side_effect = ({"items": []}, + {"items": [{"manifest_text": "XYZ"}]}) + + arvjob = arvados_cwl.ArvadosJob(runner) + arvjob.name = "testjob" + arvjob.builder = mock.MagicMock() + arvjob.output_callback = mock.MagicMock() + arvjob.collect_outputs = mock.MagicMock() + + arvjob.done({ + "state": "Complete", + "output": "99999999999999999999999999999993+99", + "log": "99999999999999999999999999999994+99", + "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz" + }) + + api.collections().list.assert_has_calls([ + mock.call(), + mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'], + ['portable_data_hash', '=', '99999999999999999999999999999993+99'], + ['name', '=', 'Output 9999999 of testjob']]), + mock.call().execute(num_retries=0), + mock.call(limit=1, filters=[['portable_data_hash', '=', '99999999999999999999999999999993+99']], + select=['manifest_text']), + mock.call().execute(num_retries=0)]) + + api.collections().create.assert_called_with( + ensure_unique_name=True, + body={'portable_data_hash': '99999999999999999999999999999993+99', + 'manifest_text': 'XYZ', + 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz', + 'name': 'Output 9999999 of testjob'}) + + @mock.patch("arvados.collection.Collection") + def test_done_use_existing_collection(self, col): + api = mock.MagicMock() + + runner = mock.MagicMock() + runner.api = api + runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz" + runner.num_retries = 0 + + col().open.return_value = [] + api.collections().list().execute.side_effect = ({"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},) + + arvjob = arvados_cwl.ArvadosJob(runner) + arvjob.name = "testjob" + arvjob.builder = mock.MagicMock() + arvjob.output_callback = mock.MagicMock() + arvjob.collect_outputs = mock.MagicMock() + + arvjob.done({ + "state": "Complete", + "output": "99999999999999999999999999999993+99", + "log": "99999999999999999999999999999994+99", + "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz" + }) + + api.collections().list.assert_has_calls([ + mock.call(), + mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'], + ['portable_data_hash', '=', '99999999999999999999999999999993+99'], + ['name', '=', 'Output 9999999 of testjob']]), + mock.call().execute(num_retries=0)]) + + self.assertFalse(api.collections().create.called)