X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/de4ecbc700759ff22e76948a58f7d70e5d3c1464..828161a7527c535e5c657cd17b79cd562475fe82:/sdk/cwl/arvados_cwl/__init__.py diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index 5d83300b41..87c3db1df6 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -11,6 +11,7 @@ import threading import hashlib import copy import json +import re from functools import partial import pkg_resources # part of setuptools @@ -28,12 +29,12 @@ from arvados.errors import ApiError from .arvcontainer import ArvadosContainer, RunnerContainer from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate -from. runner import Runner, upload_instance +from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies from .arvtool import ArvadosCommandTool from .arvworkflow import ArvadosWorkflow, upload_workflow from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver from .perf import Perf -from .pathmapper import FinalOutputPathMapper +from .pathmapper import NoFollowPathMapper from ._version import __version__ from cwltool.pack import pack @@ -200,17 +201,13 @@ class ArvCwlRunner(object): def check_features(self, obj): if isinstance(obj, dict): - if obj.get("class") == "InitialWorkDirRequirement": - if self.work_api == "containers": - raise UnsupportedRequirement("InitialWorkDirRequirement not supported with --api=containers") if obj.get("writable"): raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported") - if obj.get("class") == "CommandLineTool": - if self.work_api == "containers": - if obj.get("stdin"): - raise SourceLine(obj, "stdin", UnsupportedRequirement).makeError("Stdin redirection currently not suppported with --api=containers") - if obj.get("stderr"): - raise SourceLine(obj, "stderr", UnsupportedRequirement).makeError("Stderr redirection currently not suppported with --api=containers") + if obj.get("class") == "DockerRequirement": + if obj.get("dockerOutputDirectory"): + # TODO: can be supported by containers API, but not jobs API. + raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError( + "Option 'dockerOutputDirectory' of DockerRequirement not supported.") for v in obj.itervalues(): self.check_features(v) elif isinstance(obj, list): @@ -228,7 +225,7 @@ class ArvCwlRunner(object): adjustDirObjs(outputObj, capture) adjustFileObjs(outputObj, capture) - generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False) + generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False) final = arvados.collection.Collection(api_client=self.api, keep_client=self.keep_client, @@ -312,6 +309,10 @@ class ArvCwlRunner(object): body={ 'output': self.final_output_collection.portable_data_hash(), }).execute(num_retries=self.num_retries) + self.api.collections().update(uuid=self.final_output_collection.manifest_locator(), + body={ + 'is_trashed': True + }).execute(num_retries=self.num_retries) except Exception as e: logger.info("Setting container output: %s", e) elif self.work_api == "jobs" and "TASK_UUID" in os.environ: @@ -334,23 +335,44 @@ class ArvCwlRunner(object): keep_client=self.keep_client) self.fs_access = make_fs_access(kwargs["basedir"]) + if not kwargs.get("name"): + kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"]) + + # Upload direct dependencies of workflow steps, get back mapping of files to keep references. + # Also uploads docker images. + upload_workflow_deps(self, tool) + + # Reload tool object which may have been updated by + # upload_workflow_deps + tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]], + makeTool=self.arv_make_tool, + loader=tool.doc_loader, + avsc_names=tool.doc_schema, + metadata=tool.metadata) + + # Upload local file references in the job order. + job_order = upload_job_order(self, "%s input" % kwargs["name"], + tool, job_order) + existing_uuid = kwargs.get("update_workflow") if existing_uuid or kwargs.get("create_workflow"): + # Create a pipeline template or workflow record and exit. if self.work_api == "jobs": tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"), uuid=existing_uuid, submit_runner_ram=kwargs.get("submit_runner_ram"), - name=kwargs.get("name")) + name=kwargs["name"]) tmpl.save() # cwltool.main will write our return value to stdout. return (tmpl.uuid, "success") - else: + elif self.work_api == "containers": return (upload_workflow(self, tool, job_order, - self.project_uuid, - uuid=existing_uuid, - submit_runner_ram=kwargs.get("submit_runner_ram"), - name=kwargs.get("name")), "success") + self.project_uuid, + uuid=existing_uuid, + submit_runner_ram=kwargs.get("submit_runner_ram"), + name=kwargs["name"]), + "success") self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse") @@ -360,9 +382,6 @@ class ArvCwlRunner(object): kwargs["tmpdir_prefix"] = "tmp" kwargs["compute_checksum"] = kwargs.get("compute_checksum") - if not kwargs["name"]: - del kwargs["name"] - if self.work_api == "containers": kwargs["outdir"] = "/var/spool/cwl" kwargs["docker_outdir"] = "/var/spool/cwl" @@ -373,13 +392,18 @@ class ArvCwlRunner(object): kwargs["docker_outdir"] = "$(task.outdir)" kwargs["tmpdir"] = "$(task.tmpdir)" - upload_instance(self, shortname(tool.tool["id"]), tool, job_order) - runnerjob = None if kwargs.get("submit"): + # Submit a runner job to run the workflow for us. if self.work_api == "containers": if tool.tool["class"] == "CommandLineTool": kwargs["runnerjob"] = tool.tool["id"] + upload_dependencies(self, + kwargs["name"], + tool.doc_loader, + tool.tool, + tool.tool["id"], + False) runnerjob = tool.job(job_order, self.output_callback, **kwargs).next() @@ -388,7 +412,7 @@ class ArvCwlRunner(object): self.output_name, self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"), - name=kwargs["name"], + name=kwargs.get("name"), on_error=kwargs.get("on_error"), submit_runner_image=kwargs.get("submit_runner_image")) elif self.work_api == "jobs": @@ -396,11 +420,11 @@ class ArvCwlRunner(object): self.output_name, self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"), - name=kwargs["name"], + name=kwargs.get("name"), on_error=kwargs.get("on_error"), submit_runner_image=kwargs.get("submit_runner_image")) - if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers": + if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs": # Create pipeline for local run self.pipeline = self.api.pipeline_instances().create( body={ @@ -600,6 +624,8 @@ def arg_parser(): # type: () -> argparse.ArgumentParser def add_arv_hints(): cache = {} + cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*") + cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*") res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml') cache["http://arvados.org/cwl"] = res.read() res.close() @@ -652,9 +678,11 @@ def main(args, stdout, stderr, api_client=None, keep_client=None): if arvargs.debug: logger.setLevel(logging.DEBUG) + logging.getLogger('arvados').setLevel(logging.DEBUG) if arvargs.quiet: logger.setLevel(logging.WARN) + logging.getLogger('arvados').setLevel(logging.WARN) logging.getLogger('arvados.arv-run').setLevel(logging.WARN) if arvargs.metrics: @@ -685,6 +713,7 @@ def main(args, stdout, stderr, api_client=None, keep_client=None): keep_client=keep_client), fetcher_constructor=partial(CollectionFetcher, api_client=api_client, - keep_client=keep_client), - resolver=partial(collectionResolver, api_client), + keep_client=keep_client, + num_retries=runner.num_retries), + resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries), logger_handler=arvados.log_handler)