From a0d554e7b4c22d7602caea036c157f50f3deb7e5 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Mon, 3 Jun 2024 14:59:39 -0400 Subject: [PATCH] 9964: Extract outputBinding->glob and set 'output_glob' Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- sdk/cwl/arvados_cwl/arvcontainer.py | 15 ++++- sdk/cwl/arvados_cwl/arvtool.py | 27 ++++++++- sdk/cwl/tests/test_container.py | 89 +++++++++++++++++++++++++++++ 3 files changed, 129 insertions(+), 2 deletions(-) diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py index 34b79d67b4..c887c12fb3 100644 --- a/sdk/cwl/arvados_cwl/arvcontainer.py +++ b/sdk/cwl/arvados_cwl/arvcontainer.py @@ -43,7 +43,7 @@ def cleanup_name_for_collection(name): class ArvadosContainer(JobBase): """Submit and manage a Crunch container request for executing a CWL CommandLineTool.""" - def __init__(self, runner, job_runtime, + def __init__(self, runner, job_runtime, globpatterns, builder, # type: Builder joborder, # type: Dict[Text, Union[Dict[Text, Any], List, Text]] make_path_mapper, # type: Callable[..., PathMapper] @@ -57,6 +57,7 @@ class ArvadosContainer(JobBase): self.running = False self.uuid = None self.attempt_count = 0 + self.globpatterns = globpatterns def update_pipeline_component(self, r): pass @@ -366,6 +367,18 @@ class ArvadosContainer(JobBase): logger.warning("%s API revision is %s, revision %s is required to support setting properties on output collections.", self.arvrunner.label(self), self.arvrunner.api._rootDesc["revision"], "20220510") + if self.arvrunner.api._rootDesc["revision"] >= "20240502" and self.globpatterns: + output_glob = [] + for gb in self.globpatterns: + gb = self.builder.do_eval(gb) + if not gb: + continue + for gbeval in aslist(gb): + output_glob.append(gbeval) + output_glob.append(gbeval + "/**") + if output_glob: + container_request["output_glob"] = output_glob + ram_multiplier = [1] oom_retry_req, _ = self.get_requirement("http://arvados.org/cwl#OutOfMemoryRetry") diff --git a/sdk/cwl/arvados_cwl/arvtool.py b/sdk/cwl/arvados_cwl/arvtool.py index 86fecc0a1d..1a24f6470e 100644 --- a/sdk/cwl/arvados_cwl/arvtool.py +++ b/sdk/cwl/arvados_cwl/arvtool.py @@ -11,6 +11,9 @@ from functools import partial from schema_salad.sourceline import SourceLine from cwltool.errors import WorkflowException from arvados.util import portable_data_hash_pattern +from cwltool.utils import aslist + +from typing import Sequence, Mapping def validate_cluster_target(arvrunner, runtimeContext): if (runtimeContext.submit_runner_cluster and @@ -70,10 +73,32 @@ class ArvadosCommandTool(CommandLineTool): "dockerPull": loadingContext.default_docker_image}) self.arvrunner = arvrunner + self.globpatterns = [] + self._collect_globs(toolpath_object["outputs"]) + + def _collect_globs(self, inputschema): + if isinstance(inputschema, str): + return + + if isinstance(inputschema, Sequence): + for i in inputschema: + print(i) + self._collect_globs(i) + + if isinstance(inputschema, Mapping): + if "type" in inputschema: + self._collect_globs(inputschema["type"]) + if inputschema["type"] == "record": + for field in inputschema["fields"]: + self._collect_globs(field) + + if "outputBinding" in inputschema and "glob" in inputschema["outputBinding"]: + for gb in aslist(inputschema["outputBinding"]["glob"]): + self.globpatterns.append(gb) def make_job_runner(self, runtimeContext): if runtimeContext.work_api == "containers": - return partial(ArvadosContainer, self.arvrunner, runtimeContext) + return partial(ArvadosContainer, self.arvrunner, runtimeContext, self.globpatterns) else: raise Exception("Unsupported work_api %s", runtimeContext.work_api) diff --git a/sdk/cwl/tests/test_container.py b/sdk/cwl/tests/test_container.py index 885ee165b0..b4a1a49fa8 100644 --- a/sdk/cwl/tests/test_container.py +++ b/sdk/cwl/tests/test_container.py @@ -563,6 +563,7 @@ class TestContainer(unittest.TestCase): arvjob = arvados_cwl.ArvadosContainer(runner, runtimeContext, + [], mock.MagicMock(), {}, None, @@ -667,6 +668,7 @@ class TestContainer(unittest.TestCase): arvjob = arvados_cwl.ArvadosContainer(runner, runtimeContext, + [], mock.MagicMock(), {}, None, @@ -1483,6 +1485,93 @@ class TestContainer(unittest.TestCase): else: self.assertEqual(None, kwargs['body'].get('output_properties')) + @mock.patch("arvados.commands.keepdocker.list_images_in_arv") + def test_output_glob(self, keepdocker): + arvados_cwl.add_arv_hints() + for rev in ["20231117", "20240502"]: + runner = mock.MagicMock() + runner.ignore_docker_for_reuse = False + runner.intermediate_output_ttl = 0 + runner.secret_store = cwltool.secrets.SecretStore() + runner.api._rootDesc = {"revision": rev} + runner.api.config.return_value = {"Containers": {"DefaultKeepCacheRAM": 256<<20}} + + keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")] + runner.api.collections().get().execute.return_value = { + "portable_data_hash": "99999999999999999999999999999993+99"} + + tool = cmap({ + "inputs": [{ + "id": "inp", + "type": "string" + }], + "outputs": [ + { + "id": "o1", + "type": "File", + "outputBinding": { + "glob": "*.txt" + } + }, + { + "id": "o2", + "type": "File", + "outputBinding": { + "glob": ["*.dat", "*.bat"] + } + }, + { + "id": "o3", + "type": { + "type": "record", + "fields": [ + { + "name": "f1", + "type": "File", + "outputBinding": { + "glob": ["*.cat"] + } + } + ] + } + }, + { + "id": "o4", + "type": "File", + "outputBinding": { + "glob": "$(inputs.inp)" + } + }, + + ], + "baseCommand": "ls", + "arguments": [{"valueFrom": "$(runtime.outdir)"}], + "id": "", + "cwlVersion": "v1.2", + "class": "CommandLineTool", + "hints": [ ] + }) + + loadingContext, runtimeContext = self.helper(runner) + runtimeContext.name = "test_timelimit" + + arvtool = cwltool.load_tool.load_tool(tool, loadingContext) + arvtool.formatgraph = None + + for j in arvtool.job({"inp": "quux"}, mock.MagicMock(), runtimeContext): + j.run(runtimeContext) + + _, kwargs = runner.api.container_requests().create.call_args + if rev == "20240502": + self.assertEqual(['*.txt', '*.txt/**', + '*.dat', '*.dat/**', + '*.bat', '*.bat/**', + '*.cat', '*.cat/**', + 'quux', 'quux/**', + ], kwargs['body'].get('output_glob')) + else: + self.assertEqual(None, kwargs['body'].get('output_glob')) + class TestWorkflow(unittest.TestCase): def setUp(self): -- 2.30.2