Merge branch '10088-raw-files' into 10081-cwl-run-same-job
[arvados.git] / sdk / cwl / arvados_cwl / arvworkflow.py
1 import os
2 import json
3 import copy
4 import logging
5
6 from cwltool.pack import pack
7 from cwltool.load_tool import fetch_document
8 from cwltool.process import shortname
9 from cwltool.workflow import Workflow
10
11 import ruamel.yaml as yaml
12
13 from .runner import upload_docker, upload_dependencies
14 from .arvtool import ArvadosCommandTool
15
16 logger = logging.getLogger('arvados.cwl-runner')
17
18 def upload_workflow(arvRunner, tool, job_order, project_uuid, update_uuid):
19     upload_docker(arvRunner, tool)
20
21     document_loader, workflowobj, uri = (tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]), tool.tool["id"])
22
23     packed = pack(document_loader, workflowobj, uri, tool.metadata)
24
25     main = [p for p in packed["$graph"] if p["id"] == "#main"][0]
26     for inp in main["inputs"]:
27         sn = shortname(inp["id"])
28         if sn in job_order:
29             inp["default"] = job_order[sn]
30
31     name = os.path.basename(tool.tool["id"])
32     upload_dependencies(arvRunner, name, document_loader,
33                         packed, uri, False)
34
35     body = {
36         "workflow": {
37             "owner_uuid": project_uuid,
38             "name": tool.tool.get("label", name),
39             "description": tool.tool.get("doc", ""),
40             "definition":yaml.safe_dump(packed)
41         }}
42
43     if update_uuid:
44         return arvRunner.api.workflows().update(uuid=update_uuid, body=body).execute(num_retries=arvRunner.num_retries)["uuid"]
45     else:
46         return arvRunner.api.workflows().create(body=body).execute(num_retries=arvRunner.num_retries)["uuid"]
47
48 class ArvadosWorkflow(Workflow):
49     """Wrap cwltool Workflow to override selected methods."""
50
51     def __init__(self, arvrunner, toolpath_object, **kwargs):
52         super(ArvadosWorkflow, self).__init__(toolpath_object, **kwargs)
53         self.arvrunner = arvrunner
54         self.work_api = kwargs["work_api"]
55
56     def job(self, joborder, output_callback, **kwargs):
57         kwargs["work_api"] = self.work_api
58         req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
59         if req:
60             document_loader, workflowobj, uri = (self.doc_loader, self.doc_loader.fetch(self.tool["id"]), self.tool["id"])
61             workflowobj["requirements"] = self.requirements + workflowobj.get("requirements", [])
62             workflowobj["hints"] = self.hints + workflowobj.get("hints", [])
63             packed = pack(document_loader, workflowobj, uri, self.metadata)
64             wf_runner = {
65                 "class": "CommandLineTool",
66                 "baseCommand": "cwltool",
67                 "inputs": self.tool["inputs"],
68                 "outputs": self.tool["outputs"],
69                 "stdout": "cwl.output.json",
70                 "requirements": workflowobj["requirements"]+[
71                     {"class": "InlineJavascriptRequirement"},
72                     {
73                     "class": "InitialWorkDirRequirement",
74                     "listing": [{
75                             "entryname": "workflow.json",
76                             "entry": json.dumps(packed, sort_keys=True, indent=4).replace('$(', '\$(').replace('${', '\${')
77                         }, {
78                             "entryname": "cwl.input.json",
79                             "entry": "$(JSON.stringify(inputs))"
80                         }]
81                 }],
82                 "hints": workflowobj["hints"],
83                 "arguments": ["--no-container", "--move-outputs", "workflow.json", "cwl.input.json"]
84             }
85             kwargs["loader"] = self.doc_loader
86             kwargs["avsc_names"] = self.doc_schema
87             return ArvadosCommandTool(self.arvrunner, wf_runner, **kwargs).job(joborder, output_callback, **kwargs)
88         else:
89             return super(ArvadosWorkflow, self).job(joborder, output_callback, **kwargs)