X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/bd6b334c80cea328a51a8612d40ef16bdd6ab2e2..aaa964ac44f923985d6a6eb40c179f62c13ca8ce:/sdk/cwl/arvados_cwl/arvcontainer.py diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py index 08da4ca16e..cea2b1091d 100644 --- a/sdk/cwl/arvados_cwl/arvcontainer.py +++ b/sdk/cwl/arvados_cwl/arvcontainer.py @@ -2,6 +2,8 @@ import logging import json import os +import ruamel.yaml as yaml + from cwltool.errors import WorkflowException from cwltool.process import get_feature, UnsupportedRequirement, shortname from cwltool.pathmapper import adjustFiles @@ -12,6 +14,7 @@ import arvados.collection from .arvdocker import arv_docker_get_image from . import done from .runner import Runner, arvados_jobs_image +from .fsaccess import CollectionFetcher logger = logging.getLogger('arvados.cwl-runner') @@ -177,28 +180,9 @@ class RunnerContainer(Runner): json.dump(self.job_order, f, sort_keys=True, indent=4) jobobj.save_new(owner_uuid=self.arvrunner.project_uuid) - workflowname = os.path.basename(self.tool.tool["id"]) - workflowpath = "/var/lib/cwl/workflow/%s" % workflowname - workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1] - workflowcollection = workflowcollection[5:workflowcollection.index('/')] jobpath = "/var/lib/cwl/job/cwl.input.json" - command = ["arvados-cwl-runner", "--local", "--api=containers"] - if self.output_name: - command.append("--output-name=" + self.output_name) - - if self.output_tags: - command.append("--output-tags=" + self.output_tags) - - if self.enable_reuse: - command.append("--enable-reuse") - else: - command.append("--disable-reuse") - - command.extend([workflowpath, jobpath]) - - return { - "command": command, + container_req = { "owner_uuid": self.arvrunner.project_uuid, "name": self.name, "output_path": "/var/spool/cwl", @@ -207,10 +191,6 @@ class RunnerContainer(Runner): "state": "Committed", "container_image": arvados_jobs_image(self.arvrunner), "mounts": { - "/var/lib/cwl/workflow": { - "kind": "collection", - "portable_data_hash": "%s" % workflowcollection - }, jobpath: { "kind": "collection", "portable_data_hash": "%s/cwl.input.json" % jobobj.portable_data_hash() @@ -231,6 +211,45 @@ class RunnerContainer(Runner): } } + workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1] + if workflowcollection.startswith("keep:"): + workflowcollection = workflowcollection[5:workflowcollection.index('/')] + workflowname = os.path.basename(self.tool.tool["id"]) + workflowpath = "/var/lib/cwl/workflow/%s" % workflowname + container_req["mounts"]["/var/lib/cwl/workflow"] = { + "kind": "collection", + "portable_data_hash": "%s" % workflowcollection + } + elif workflowcollection.startswith("arvwf:"): + workflowpath = "/var/lib/cwl/workflow.json#main" + fetcher = CollectionFetcher({}, None, + api_client=self.arvrunner.api, + keep_client=self.arvrunner.keep_client) + wfobj = yaml.safe_load(fetcher.fetch_text(workflowcollection)) + container_req["mounts"]["/var/lib/cwl/workflow.json"] = { + "kind": "json", + "json": wfobj + } + + command = ["arvados-cwl-runner", "--local", "--api=containers"] + if self.output_name: + command.append("--output-name=" + self.output_name) + + if self.output_tags: + command.append("--output-tags=" + self.output_tags) + + if self.enable_reuse: + command.append("--enable-reuse") + else: + command.append("--disable-reuse") + + command.extend([workflowpath, jobpath]) + + container_req["command"] = command + + return container_req + + def run(self, *args, **kwargs): kwargs["keepprefix"] = "keep:" job_spec = self.arvados_job_spec(*args, **kwargs)