aafe7c3dd4560d7c3e48c4bbeba99012159b36b4
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 import os
2 import urlparse
3 from functools import partial
4 import logging
5 import json
6 import re
7 from cStringIO import StringIO
8
9 import cwltool.draft2tool
10 from cwltool.draft2tool import CommandLineTool
11 import cwltool.workflow
12 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
13 from cwltool.load_tool import fetch_document
14 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
15
16 import arvados.collection
17 import ruamel.yaml as yaml
18
19 from .arvdocker import arv_docker_get_image
20 from .pathmapper import ArvPathMapper
21
22 logger = logging.getLogger('arvados.cwl-runner')
23
24 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
25
26 def del_listing(obj):
27     if obj.get("location", "").startswith("keep:") and "listing" in obj:
28         del obj["listing"]
29     if obj.get("location", "").startswith("_:"):
30         del obj["location"]
31
32 def upload_dependencies(arvrunner, name, document_loader,
33                         workflowobj, uri, loadref_run):
34     """Upload the dependencies of the workflowobj document to Keep.
35
36     Returns a pathmapper object mapping local paths to keep references.  Also
37     does an in-place update of references in "workflowobj".
38
39     Use scandeps to find $import, $include, $schemas, run, File and Directory
40     fields that represent external references.
41
42     If workflowobj has an "id" field, this will reload the document to ensure
43     it is scanning the raw document prior to preprocessing.
44     """
45
46     loaded = set()
47     def loadref(b, u):
48         joined = urlparse.urljoin(b, u)
49         defrg, _ = urlparse.urldefrag(joined)
50         if defrg not in loaded:
51             loaded.add(defrg)
52             # Use fetch_text to get raw file (before preprocessing).
53             text = document_loader.fetch_text(defrg)
54             if isinstance(text, bytes):
55                 textIO = StringIO(text.decode('utf-8'))
56             else:
57                 textIO = StringIO(text)
58             return yaml.safe_load(textIO)
59         else:
60             return {}
61
62     if loadref_run:
63         loadref_fields = set(("$import", "run"))
64     else:
65         loadref_fields = set(("$import",))
66
67     scanobj = workflowobj
68     if "id" in workflowobj:
69         # Need raw file content (before preprocessing) to ensure
70         # that external references in $include and $mixin are captured.
71         scanobj = loadref("", workflowobj["id"])
72
73     sc = scandeps(uri, scanobj,
74                   loadref_fields,
75                   set(("$include", "$schemas", "location")),
76                   loadref)
77
78     normalizeFilesDirs(sc)
79
80     if "id" in workflowobj:
81         sc.append({"class": "File", "location": workflowobj["id"]})
82
83     mapper = ArvPathMapper(arvrunner, sc, "",
84                            "keep:%s",
85                            "keep:%s/%s",
86                            name=name)
87
88     def setloc(p):
89         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
90             p["location"] = mapper.mapper(p["location"]).resolved
91     adjustFileObjs(workflowobj, setloc)
92     adjustDirObjs(workflowobj, setloc)
93
94     return mapper
95
96
97 def upload_docker(arvrunner, tool):
98     if isinstance(tool, CommandLineTool):
99         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
100         if docker_req:
101             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
102     elif isinstance(tool, cwltool.workflow.Workflow):
103         for s in tool.steps:
104             upload_docker(arvrunner, s.embedded_tool)
105
106
107 class Runner(object):
108     def __init__(self, runner, tool, job_order, enable_reuse):
109         self.arvrunner = runner
110         self.tool = tool
111         self.job_order = job_order
112         self.running = False
113         self.enable_reuse = enable_reuse
114         self.uuid = None
115
116     def update_pipeline_component(self, record):
117         pass
118
119     def arvados_job_spec(self, *args, **kwargs):
120         upload_docker(self.arvrunner, self.tool)
121
122         self.name = os.path.basename(self.tool.tool["id"])
123
124         workflowmapper = upload_dependencies(self.arvrunner,
125                                              self.name,
126                                              self.tool.doc_loader,
127                                              self.tool.tool,
128                                              self.tool.tool["id"],
129                                              True)
130
131         jobmapper = upload_dependencies(self.arvrunner,
132                                         os.path.basename(self.job_order.get("id", "#")),
133                                         self.tool.doc_loader,
134                                         self.job_order,
135                                         self.job_order.get("id", "#"),
136                                         False)
137
138         adjustDirObjs(self.job_order, del_listing)
139
140         if "id" in self.job_order:
141             del self.job_order["id"]
142
143         return workflowmapper
144
145
146     def done(self, record):
147         if record["state"] == "Complete":
148             if record.get("exit_code") is not None:
149                 if record["exit_code"] == 33:
150                     processStatus = "UnsupportedRequirement"
151                 elif record["exit_code"] == 0:
152                     processStatus = "success"
153                 else:
154                     processStatus = "permanentFail"
155             else:
156                 processStatus = "success"
157         else:
158             processStatus = "permanentFail"
159
160         outputs = None
161         try:
162             try:
163                 outc = arvados.collection.Collection(record["output"])
164                 with outc.open("cwl.output.json") as f:
165                     outputs = json.load(f)
166                 def keepify(fileobj):
167                     path = fileobj["location"]
168                     if not path.startswith("keep:"):
169                         fileobj["location"] = "keep:%s/%s" % (record["output"], path)
170                 adjustFileObjs(outputs, keepify)
171                 adjustDirObjs(outputs, keepify)
172             except Exception as e:
173                 logger.error("While getting final output object: %s", e)
174             self.arvrunner.output_callback(outputs, processStatus)
175         finally:
176             del self.arvrunner.processes[record["uuid"]]