X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/e1b133b13f9ac50a87051d07c36a3904d6f01028..b9f7a8693579045f05d142dba8bffd2c5660dfce:/sdk/cwl/arvados_cwl/arvworkflow.py diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py index a9c3bd5b40..8c1db3a817 100644 --- a/sdk/cwl/arvados_cwl/arvworkflow.py +++ b/sdk/cwl/arvados_cwl/arvworkflow.py @@ -3,6 +3,8 @@ import json import copy import logging +from schema_salad.sourceline import SourceLine, cmap + from cwltool.pack import pack from cwltool.load_tool import fetch_document from cwltool.process import shortname @@ -18,7 +20,8 @@ from .perf import Perf logger = logging.getLogger('arvados.cwl-runner') metrics = logging.getLogger('arvados.cwl-runner.metrics') -def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None, submit_runner_ram=0): +def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None, + submit_runner_ram=0, name=None): upload_docker(arvRunner, tool) document_loader, workflowobj, uri = (tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]), tool.tool["id"]) @@ -33,7 +36,9 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None, submit_ if sn in job_order: inp["default"] = job_order[sn] - name = os.path.basename(tool.tool["id"]) + if not name: + name = tool.tool.get("label", os.path.basename(tool.tool["id"])) + upload_dependencies(arvRunner, name, document_loader, packed, uri, False) @@ -41,9 +46,9 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None, submit_ body = { "workflow": { - "name": tool.tool.get("label", name), + "name": name, "description": tool.tool.get("doc", ""), - "definition":yaml.safe_dump(packed) + "definition":yaml.round_trip_dump(packed) }} if project_uuid: body["workflow"]["owner_uuid"] = project_uuid @@ -91,23 +96,25 @@ class ArvadosWorkflow(Workflow): joborder_keepmount = copy.deepcopy(joborder) def keepmount(obj): - if "location" not in obj: - raise WorkflowException("%s object is missing required 'location' field: %s" % (obj["class"], obj)) - if obj["location"].startswith("keep:"): - obj["location"] = "/keep/" + obj["location"][5:] - if "listing" in obj: - del obj["listing"] - elif obj["location"].startswith("_:"): - del obj["location"] - else: - raise WorkflowException("Location is not a keep reference or a literal: '%s'" % obj["location"]) + with SourceLine(obj, None, WorkflowException): + if "location" not in obj: + raise WorkflowException("%s object is missing required 'location' field: %s" % (obj["class"], obj)) + with SourceLine(obj, "location", WorkflowException): + if obj["location"].startswith("keep:"): + obj["location"] = "/keep/" + obj["location"][5:] + if "listing" in obj: + del obj["listing"] + elif obj["location"].startswith("_:"): + del obj["location"] + else: + raise WorkflowException("Location is not a keep reference or a literal: '%s'" % obj["location"]) adjustFileObjs(joborder_keepmount, keepmount) adjustDirObjs(joborder_keepmount, keepmount) adjustFileObjs(packed, keepmount) adjustDirObjs(packed, keepmount) - wf_runner = { + wf_runner = cmap({ "class": "CommandLineTool", "baseCommand": "cwltool", "inputs": self.tool["inputs"], @@ -118,15 +125,15 @@ class ArvadosWorkflow(Workflow): "class": "InitialWorkDirRequirement", "listing": [{ "entryname": "workflow.cwl", - "entry": yaml.safe_dump(packed).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${') + "entry": yaml.round_trip_dump(packed).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${') }, { "entryname": "cwl.input.yml", - "entry": yaml.safe_dump(joborder_keepmount).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${') + "entry": yaml.round_trip_dump(joborder_keepmount).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${') }] }], "hints": workflowobj["hints"], "arguments": ["--no-container", "--move-outputs", "--preserve-entire-environment", "workflow.cwl#main", "cwl.input.yml"] - } + }) kwargs["loader"] = self.doc_loader kwargs["avsc_names"] = self.doc_schema return ArvadosCommandTool(self.arvrunner, wf_runner, **kwargs).job(joborder, output_callback, **kwargs)