Merge branch 'master' into 10786-remove-centos6-from-build-directory
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 import os
2 import urlparse
3 from functools import partial
4 import logging
5 import json
6 import re
7 from cStringIO import StringIO
8
9 from schema_salad.sourceline import SourceLine
10
11 import cwltool.draft2tool
12 from cwltool.draft2tool import CommandLineTool
13 import cwltool.workflow
14 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
15 from cwltool.load_tool import fetch_document
16 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
17 from cwltool.utils import aslist
18 from cwltool.builder import substitute
19
20 import arvados.collection
21 import ruamel.yaml as yaml
22
23 from .arvdocker import arv_docker_get_image
24 from .pathmapper import ArvPathMapper
25 from ._version import __version__
26 from . import done
27
28 logger = logging.getLogger('arvados.cwl-runner')
29
30 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
31
32 def trim_listing(obj):
33     """Remove 'listing' field from Directory objects that are keep references.
34
35     When Directory objects represent Keep references, it redundant and
36     potentially very expensive to pass fully enumerated Directory objects
37     between instances of cwl-runner (e.g. a submitting a job, or using the
38     RunInSingleContainer feature), so delete the 'listing' field when it is
39     safe to do so.
40     """
41
42     if obj.get("location", "").startswith("keep:") and "listing" in obj:
43         del obj["listing"]
44     if obj.get("location", "").startswith("_:"):
45         del obj["location"]
46
47 def upload_dependencies(arvrunner, name, document_loader,
48                         workflowobj, uri, loadref_run):
49     """Upload the dependencies of the workflowobj document to Keep.
50
51     Returns a pathmapper object mapping local paths to keep references.  Also
52     does an in-place update of references in "workflowobj".
53
54     Use scandeps to find $import, $include, $schemas, run, File and Directory
55     fields that represent external references.
56
57     If workflowobj has an "id" field, this will reload the document to ensure
58     it is scanning the raw document prior to preprocessing.
59     """
60
61     loaded = set()
62     def loadref(b, u):
63         joined = document_loader.fetcher.urljoin(b, u)
64         defrg, _ = urlparse.urldefrag(joined)
65         if defrg not in loaded:
66             loaded.add(defrg)
67             # Use fetch_text to get raw file (before preprocessing).
68             text = document_loader.fetch_text(defrg)
69             if isinstance(text, bytes):
70                 textIO = StringIO(text.decode('utf-8'))
71             else:
72                 textIO = StringIO(text)
73             return yaml.safe_load(textIO)
74         else:
75             return {}
76
77     if loadref_run:
78         loadref_fields = set(("$import", "run"))
79     else:
80         loadref_fields = set(("$import",))
81
82     scanobj = workflowobj
83     if "id" in workflowobj:
84         # Need raw file content (before preprocessing) to ensure
85         # that external references in $include and $mixin are captured.
86         scanobj = loadref("", workflowobj["id"])
87
88     sc = scandeps(uri, scanobj,
89                   loadref_fields,
90                   set(("$include", "$schemas", "location")),
91                   loadref, urljoin=document_loader.fetcher.urljoin)
92
93     normalizeFilesDirs(sc)
94
95     if "id" in workflowobj:
96         sc.append({"class": "File", "location": workflowobj["id"]})
97
98     mapper = ArvPathMapper(arvrunner, sc, "",
99                            "keep:%s",
100                            "keep:%s/%s",
101                            name=name)
102
103     def setloc(p):
104         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
105             p["location"] = mapper.mapper(p["location"]).resolved
106     adjustFileObjs(workflowobj, setloc)
107     adjustDirObjs(workflowobj, setloc)
108
109     return mapper
110
111
112 def upload_docker(arvrunner, tool):
113     if isinstance(tool, CommandLineTool):
114         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
115         if docker_req:
116             if docker_req.get("dockerOutputDirectory"):
117                 # TODO: can be supported by containers API, but not jobs API.
118                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
119                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
120             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
121     elif isinstance(tool, cwltool.workflow.Workflow):
122         for s in tool.steps:
123             upload_docker(arvrunner, s.embedded_tool)
124
125 def upload_instance(arvrunner, name, tool, job_order):
126         upload_docker(arvrunner, tool)
127
128         for t in tool.tool["inputs"]:
129             def setSecondary(fileobj):
130                 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
131                     if "secondaryFiles" not in fileobj:
132                         fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
133
134                 if isinstance(fileobj, list):
135                     for e in fileobj:
136                         setSecondary(e)
137
138             if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
139                 setSecondary(job_order[shortname(t["id"])])
140
141         workflowmapper = upload_dependencies(arvrunner,
142                                              name,
143                                              tool.doc_loader,
144                                              tool.tool,
145                                              tool.tool["id"],
146                                              True)
147         jobmapper = upload_dependencies(arvrunner,
148                                         os.path.basename(job_order.get("id", "#")),
149                                         tool.doc_loader,
150                                         job_order,
151                                         job_order.get("id", "#"),
152                                         False)
153
154         if "id" in job_order:
155             del job_order["id"]
156
157         return workflowmapper
158
159 def arvados_jobs_image(arvrunner):
160     img = "arvados/jobs:"+__version__
161     try:
162         arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
163     except Exception as e:
164         raise Exception("Docker image %s is not available\n%s" % (img, e) )
165     return img
166
167 class Runner(object):
168     def __init__(self, runner, tool, job_order, enable_reuse,
169                  output_name, output_tags, submit_runner_ram=0,
170                  name=None):
171         self.arvrunner = runner
172         self.tool = tool
173         self.job_order = job_order
174         self.running = False
175         self.enable_reuse = enable_reuse
176         self.uuid = None
177         self.final_output = None
178         self.output_name = output_name
179         self.output_tags = output_tags
180         self.name = name
181
182         if submit_runner_ram:
183             self.submit_runner_ram = submit_runner_ram
184         else:
185             self.submit_runner_ram = 1024
186
187         if self.submit_runner_ram <= 0:
188             raise Exception("Value of --submit-runner-ram must be greater than zero")
189
190     def update_pipeline_component(self, record):
191         pass
192
193     def arvados_job_spec(self, *args, **kwargs):
194         if self.name is None:
195             self.name = self.tool.tool.get("label") or os.path.basename(self.tool.tool["id"])
196
197         # Need to filter this out, gets added by cwltool when providing
198         # parameters on the command line.
199         if "job_order" in self.job_order:
200             del self.job_order["job_order"]
201
202         workflowmapper = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
203         adjustDirObjs(self.job_order, trim_listing)
204         return workflowmapper
205
206     def done(self, record):
207         try:
208             if record["state"] == "Complete":
209                 if record.get("exit_code") is not None:
210                     if record["exit_code"] == 33:
211                         processStatus = "UnsupportedRequirement"
212                     elif record["exit_code"] == 0:
213                         processStatus = "success"
214                     else:
215                         processStatus = "permanentFail"
216                 else:
217                     processStatus = "success"
218             else:
219                 processStatus = "permanentFail"
220
221             outputs = {}
222
223             if processStatus == "permanentFail":
224                 logc = arvados.collection.CollectionReader(record["log"],
225                                                            api_client=self.arvrunner.api,
226                                                            keep_client=self.arvrunner.keep_client,
227                                                            num_retries=self.arvrunner.num_retries)
228                 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
229
230             self.final_output = record["output"]
231             outc = arvados.collection.CollectionReader(self.final_output,
232                                                        api_client=self.arvrunner.api,
233                                                        keep_client=self.arvrunner.keep_client,
234                                                        num_retries=self.arvrunner.num_retries)
235             if "cwl.output.json" in outc:
236                 with outc.open("cwl.output.json") as f:
237                     if f.size() > 0:
238                         outputs = json.load(f)
239             def keepify(fileobj):
240                 path = fileobj["location"]
241                 if not path.startswith("keep:"):
242                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
243             adjustFileObjs(outputs, keepify)
244             adjustDirObjs(outputs, keepify)
245         except Exception as e:
246             logger.exception("[%s] While getting final output object: %s", self.name, e)
247             self.arvrunner.output_callback({}, "permanentFail")
248         else:
249             self.arvrunner.output_callback(outputs, processStatus)
250         finally:
251             if record["uuid"] in self.arvrunner.processes:
252                 del self.arvrunner.processes[record["uuid"]]