10081: Rename cwl.input.json to cwl.input.yml, fix test.
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 import os
2 import urlparse
3 from functools import partial
4 import logging
5 import json
6 import re
7 from cStringIO import StringIO
8
9 import cwltool.draft2tool
10 from cwltool.draft2tool import CommandLineTool
11 import cwltool.workflow
12 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
13 from cwltool.load_tool import fetch_document
14 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
15
16 import arvados.collection
17 import ruamel.yaml as yaml
18
19 from .arvdocker import arv_docker_get_image
20 from .pathmapper import ArvPathMapper
21
22 logger = logging.getLogger('arvados.cwl-runner')
23
24 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
25
26 def upload_dependencies(arvrunner, name, document_loader,
27                         workflowobj, uri, loadref_run):
28     """Upload the dependencies of the workflowobj document to Keep.
29
30     Returns a pathmapper object mapping local paths to keep references.  Also
31     does an in-place update of references in "workflowobj".
32
33     Use scandeps to find $import, $include, $schemas, run, File and Directory
34     fields that represent external references.
35
36     If workflowobj has an "id" field, this will reload the document to ensure
37     it is scanning the raw document prior to preprocessing.
38     """
39
40     loaded = set()
41     def loadref(b, u):
42         joined = urlparse.urljoin(b, u)
43         defrg, _ = urlparse.urldefrag(joined)
44         if defrg not in loaded:
45             loaded.add(defrg)
46             # Use fetch_text to get raw file (before preprocessing).
47             text = document_loader.fetch_text(defrg)
48             if isinstance(text, bytes):
49                 textIO = StringIO(text.decode('utf-8'))
50             else:
51                 textIO = StringIO(text)
52             return yaml.safe_load(textIO)
53         else:
54             return {}
55
56     if loadref_run:
57         loadref_fields = set(("$import", "run"))
58     else:
59         loadref_fields = set(("$import",))
60
61     scanobj = workflowobj
62     if "id" in workflowobj:
63         # Need raw file content (before preprocessing) to ensure
64         # that external references in $include and $mixin are captured.
65         scanobj = loadref("", workflowobj["id"])
66
67     sc = scandeps(uri, scanobj,
68                   loadref_fields,
69                   set(("$include", "$schemas", "location")),
70                   loadref)
71
72     files = []
73     def visitFiles(path):
74         files.append(path)
75
76     adjustFileObjs(sc, visitFiles)
77     adjustDirObjs(sc, visitFiles)
78
79     normalizeFilesDirs(files)
80
81     if "id" in workflowobj:
82         files.append({"class": "File", "location": workflowobj["id"]})
83
84     mapper = ArvPathMapper(arvrunner, files, "",
85                            "keep:%s",
86                            "keep:%s/%s",
87                            name=name)
88
89     def setloc(p):
90         if not p["location"].startswith("keep:"):
91             p["location"] = mapper.mapper(p["location"]).resolved
92     adjustFileObjs(workflowobj, setloc)
93     adjustDirObjs(workflowobj, setloc)
94
95     return mapper
96
97
98 def upload_docker(arvrunner, tool):
99     if isinstance(tool, CommandLineTool):
100         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
101         if docker_req:
102             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
103     elif isinstance(tool, cwltool.workflow.Workflow):
104         for s in tool.steps:
105             upload_docker(arvrunner, s.embedded_tool)
106
107
108 class Runner(object):
109     def __init__(self, runner, tool, job_order, enable_reuse):
110         self.arvrunner = runner
111         self.tool = tool
112         self.job_order = job_order
113         self.running = False
114         self.enable_reuse = enable_reuse
115         self.uuid = None
116
117     def update_pipeline_component(self, record):
118         pass
119
120     def arvados_job_spec(self, *args, **kwargs):
121         upload_docker(self.arvrunner, self.tool)
122
123         self.name = os.path.basename(self.tool.tool["id"])
124
125         workflowmapper = upload_dependencies(self.arvrunner,
126                                              self.name,
127                                              self.tool.doc_loader,
128                                              self.tool.tool,
129                                              self.tool.tool["id"],
130                                              True)
131
132         jobmapper = upload_dependencies(self.arvrunner,
133                                         os.path.basename(self.job_order.get("id", "#")),
134                                         self.tool.doc_loader,
135                                         self.job_order,
136                                         self.job_order.get("id", "#"),
137                                         False)
138
139         if "id" in self.job_order:
140             del self.job_order["id"]
141
142         return workflowmapper
143
144
145     def done(self, record):
146         if record["state"] == "Complete":
147             if record.get("exit_code") is not None:
148                 if record["exit_code"] == 33:
149                     processStatus = "UnsupportedRequirement"
150                 elif record["exit_code"] == 0:
151                     processStatus = "success"
152                 else:
153                     processStatus = "permanentFail"
154             else:
155                 processStatus = "success"
156         else:
157             processStatus = "permanentFail"
158
159         outputs = None
160         try:
161             try:
162                 outc = arvados.collection.Collection(record["output"])
163                 with outc.open("cwl.output.json") as f:
164                     outputs = json.load(f)
165                 def keepify(fileobj):
166                     path = fileobj["location"]
167                     if not path.startswith("keep:"):
168                         fileobj["location"] = "keep:%s/%s" % (record["output"], path)
169                 adjustFileObjs(outputs, keepify)
170                 adjustDirObjs(outputs, keepify)
171             except Exception as e:
172                 logger.error("While getting final output object: %s", e)
173             self.arvrunner.output_callback(outputs, processStatus)
174         finally:
175             del self.arvrunner.processes[record["uuid"]]