10291: Fix catching all exceptions in test case.
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 import os
2 import urlparse
3 from functools import partial
4 import logging
5 import json
6 import re
7 from cStringIO import StringIO
8
9 import cwltool.draft2tool
10 from cwltool.draft2tool import CommandLineTool
11 import cwltool.workflow
12 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
13 from cwltool.load_tool import fetch_document
14 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
15
16 import arvados.collection
17 import ruamel.yaml as yaml
18
19 from .arvdocker import arv_docker_get_image
20 from .pathmapper import ArvPathMapper
21
22 logger = logging.getLogger('arvados.cwl-runner')
23
24 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
25
26 def trim_listing(obj):
27     """Remove 'listing' field from Directory objects that are keep references.
28
29     When Directory objects represent Keep references, it redundant and
30     potentially very expensive to pass fully enumerated Directory objects
31     between instances of cwl-runner (e.g. a submitting a job, or using the
32     RunInSingleContainer feature), so delete the 'listing' field when it is
33     safe to do so.
34     """
35
36     if obj.get("location", "").startswith("keep:") and "listing" in obj:
37         del obj["listing"]
38     if obj.get("location", "").startswith("_:"):
39         del obj["location"]
40
41 def upload_dependencies(arvrunner, name, document_loader,
42                         workflowobj, uri, loadref_run):
43     """Upload the dependencies of the workflowobj document to Keep.
44
45     Returns a pathmapper object mapping local paths to keep references.  Also
46     does an in-place update of references in "workflowobj".
47
48     Use scandeps to find $import, $include, $schemas, run, File and Directory
49     fields that represent external references.
50
51     If workflowobj has an "id" field, this will reload the document to ensure
52     it is scanning the raw document prior to preprocessing.
53     """
54
55     loaded = set()
56     def loadref(b, u):
57         joined = urlparse.urljoin(b, u)
58         defrg, _ = urlparse.urldefrag(joined)
59         if defrg not in loaded:
60             loaded.add(defrg)
61             # Use fetch_text to get raw file (before preprocessing).
62             text = document_loader.fetch_text(defrg)
63             if isinstance(text, bytes):
64                 textIO = StringIO(text.decode('utf-8'))
65             else:
66                 textIO = StringIO(text)
67             return yaml.safe_load(textIO)
68         else:
69             return {}
70
71     if loadref_run:
72         loadref_fields = set(("$import", "run"))
73     else:
74         loadref_fields = set(("$import",))
75
76     scanobj = workflowobj
77     if "id" in workflowobj:
78         # Need raw file content (before preprocessing) to ensure
79         # that external references in $include and $mixin are captured.
80         scanobj = loadref("", workflowobj["id"])
81
82     sc = scandeps(uri, scanobj,
83                   loadref_fields,
84                   set(("$include", "$schemas", "location")),
85                   loadref)
86
87     normalizeFilesDirs(sc)
88
89     if "id" in workflowobj:
90         sc.append({"class": "File", "location": workflowobj["id"]})
91
92     mapper = ArvPathMapper(arvrunner, sc, "",
93                            "keep:%s",
94                            "keep:%s/%s",
95                            name=name)
96
97     def setloc(p):
98         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
99             p["location"] = mapper.mapper(p["location"]).resolved
100     adjustFileObjs(workflowobj, setloc)
101     adjustDirObjs(workflowobj, setloc)
102
103     return mapper
104
105
106 def upload_docker(arvrunner, tool):
107     if isinstance(tool, CommandLineTool):
108         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
109         if docker_req:
110             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
111     elif isinstance(tool, cwltool.workflow.Workflow):
112         for s in tool.steps:
113             upload_docker(arvrunner, s.embedded_tool)
114
115 def upload_instance(arvrunner, name, tool, job_order):
116         upload_docker(arvrunner, tool)
117
118         workflowmapper = upload_dependencies(arvrunner,
119                                              name,
120                                              tool.doc_loader,
121                                              tool.tool,
122                                              tool.tool["id"],
123                                              True)
124
125         jobmapper = upload_dependencies(arvrunner,
126                                         os.path.basename(job_order.get("id", "#")),
127                                         tool.doc_loader,
128                                         job_order,
129                                         job_order.get("id", "#"),
130                                         False)
131
132         if "id" in job_order:
133             del job_order["id"]
134
135         return workflowmapper
136
137
138 class Runner(object):
139     def __init__(self, runner, tool, job_order, enable_reuse, output_name):
140         self.arvrunner = runner
141         self.tool = tool
142         self.job_order = job_order
143         self.running = False
144         self.enable_reuse = enable_reuse
145         self.uuid = None
146         self.final_output = None
147         self.output_name = output_name
148
149     def update_pipeline_component(self, record):
150         pass
151
152     def arvados_job_spec(self, *args, **kwargs):
153         self.name = os.path.basename(self.tool.tool["id"])
154         workflowmapper = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
155         adjustDirObjs(self.job_order, trim_listing)
156         return workflowmapper
157
158     def done(self, record):
159         if record["state"] == "Complete":
160             if record.get("exit_code") is not None:
161                 if record["exit_code"] == 33:
162                     processStatus = "UnsupportedRequirement"
163                 elif record["exit_code"] == 0:
164                     processStatus = "success"
165                 else:
166                     processStatus = "permanentFail"
167             else:
168                 processStatus = "success"
169         else:
170             processStatus = "permanentFail"
171
172         outputs = None
173         try:
174             try:
175                 self.final_output = record["output"]
176                 outc = arvados.collection.CollectionReader(self.final_output,
177                                                            api_client=self.arvrunner.api,
178                                                            keep_client=self.arvrunner.keep_client,
179                                                            num_retries=self.arvrunner.num_retries)
180                 with outc.open("cwl.output.json") as f:
181                     outputs = json.load(f)
182                 def keepify(fileobj):
183                     path = fileobj["location"]
184                     if not path.startswith("keep:"):
185                         fileobj["location"] = "keep:%s/%s" % (record["output"], path)
186                 adjustFileObjs(outputs, keepify)
187                 adjustDirObjs(outputs, keepify)
188             except Exception as e:
189                 logger.error("While getting final output object: %s", e)
190             self.arvrunner.output_callback(outputs, processStatus)
191         finally:
192             del self.arvrunner.processes[record["uuid"]]