X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d084ca24b06c598271844d2ba4c8c40f251c0999..8a2035547ad8bf6abad6a4a03bb0b59211a00932:/sdk/cwl/arvados_cwl/__init__.py diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index 52009dadab..71ddd17221 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -22,7 +22,6 @@ from cwltool.errors import WorkflowException import cwltool.main import cwltool.workflow import cwltool.process -import schema_salad from schema_salad.sourceline import SourceLine import arvados @@ -111,8 +110,8 @@ class ArvCwlRunner(object): kwargs["fetcher_constructor"] = partial(CollectionFetcher, api_client=self.api, fs_access=CollectionFsAccess("", collection_cache=self.collection_cache), - num_retries=self.num_retries, - overrides=kwargs.get("override_tools")) + num_retries=self.num_retries) + kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries) if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool": return ArvadosCommandTool(self, toolpath_object, **kwargs) elif "class" in toolpath_object and toolpath_object["class"] == "Workflow": @@ -225,18 +224,21 @@ class ArvCwlRunner(object): def check_features(self, obj): if isinstance(obj, dict): - if obj.get("writable"): - raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported") + if obj.get("writable") and self.work_api != "containers": + raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs") if obj.get("class") == "DockerRequirement": if obj.get("dockerOutputDirectory"): - # TODO: can be supported by containers API, but not jobs API. - raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError( - "Option 'dockerOutputDirectory' of DockerRequirement not supported.") + if self.work_api != "containers": + raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError( + "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.") + if not obj.get("dockerOutputDirectory").startswith('/'): + raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError( + "Option 'dockerOutputDirectory' must be an absolute path.") for v in obj.itervalues(): self.check_features(v) elif isinstance(obj, list): for i,v in enumerate(obj): - with SourceLine(obj, i, UnsupportedRequirement): + with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)): self.check_features(v) def make_output_collection(self, name, tagsString, outputObj): @@ -280,7 +282,7 @@ class ArvCwlRunner(object): def rewrite(fileobj): fileobj["location"] = generatemapper.mapper(fileobj["location"]).target - for k in ("basename", "listing", "contents"): + for k in ("listing", "contents", "nameext", "nameroot", "dirname"): if k in fileobj: del fileobj[k] @@ -366,8 +368,7 @@ class ArvCwlRunner(object): # Upload direct dependencies of workflow steps, get back mapping of files to keep references. # Also uploads docker images. - override_tools = {} - upload_workflow_deps(self, tool, override_tools) + merged_map = upload_workflow_deps(self, tool) # Reload tool object which may have been updated by # upload_workflow_deps @@ -375,8 +376,7 @@ class ArvCwlRunner(object): makeTool=self.arv_make_tool, loader=tool.doc_loader, avsc_names=tool.doc_schema, - metadata=tool.metadata, - override_tools=override_tools) + metadata=tool.metadata) # Upload local file references in the job order. job_order = upload_job_order(self, "%s input" % kwargs["name"], @@ -390,7 +390,8 @@ class ArvCwlRunner(object): kwargs.get("enable_reuse"), uuid=existing_uuid, submit_runner_ram=kwargs.get("submit_runner_ram"), - name=kwargs["name"]) + name=kwargs["name"], + merged_map=merged_map) tmpl.save() # cwltool.main will write our return value to stdout. return (tmpl.uuid, "success") @@ -399,7 +400,8 @@ class ArvCwlRunner(object): self.project_uuid, uuid=existing_uuid, submit_runner_ram=kwargs.get("submit_runner_ram"), - name=kwargs["name"]), + name=kwargs["name"], + merged_map=merged_map), "success") self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse") @@ -426,12 +428,6 @@ class ArvCwlRunner(object): if self.work_api == "containers": if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"): kwargs["runnerjob"] = tool.tool["id"] - upload_dependencies(self, - kwargs["name"], - tool.doc_loader, - tool.tool, - tool.tool["id"], - False) runnerjob = tool.job(job_order, self.output_callback, **kwargs).next() @@ -443,7 +439,8 @@ class ArvCwlRunner(object): name=kwargs.get("name"), on_error=kwargs.get("on_error"), submit_runner_image=kwargs.get("submit_runner_image"), - intermediate_output_ttl=kwargs.get("intermediate_output_ttl")) + intermediate_output_ttl=kwargs.get("intermediate_output_ttl"), + merged_map=merged_map) elif self.work_api == "jobs": runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name, @@ -451,7 +448,8 @@ class ArvCwlRunner(object): submit_runner_ram=kwargs.get("submit_runner_ram"), name=kwargs.get("name"), on_error=kwargs.get("on_error"), - submit_runner_image=kwargs.get("submit_runner_image")) + submit_runner_image=kwargs.get("submit_runner_image"), + merged_map=merged_map) elif "cwl_runner_job" not in kwargs and self.work_api == "jobs": # Create pipeline for local run self.pipeline = self.api.pipeline_instances().create( @@ -559,7 +557,7 @@ def versionstring(): arvpkg = pkg_resources.require("arvados-python-client") cwlpkg = pkg_resources.require("cwltool") - return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version, + return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version, "arvados-python-client", arvpkg[0].version, "cwltool", cwlpkg[0].version) @@ -755,7 +753,7 @@ def main(args, stdout, stderr, api_client=None, keep_client=None): arvargs.conformance_test = None arvargs.use_container = True arvargs.relax_path_checks = True - arvargs.validate = None + arvargs.print_supported_versions = False make_fs_access = partial(CollectionFsAccess, collection_cache=runner.collection_cache)