7824: Merge branch 'master' into 7824-arvls-arvput-collection-api-usage
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 import os
2 import urlparse
3 from functools import partial
4 import logging
5 import json
6 import subprocess
7
8 from StringIO import StringIO
9
10 from schema_salad.sourceline import SourceLine
11
12 import cwltool.draft2tool
13 from cwltool.draft2tool import CommandLineTool
14 import cwltool.workflow
15 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
16 from cwltool.load_tool import fetch_document
17 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
18 from cwltool.utils import aslist
19 from cwltool.builder import substitute
20 from cwltool.pack import pack
21
22 import arvados.collection
23 import ruamel.yaml as yaml
24
25 from .arvdocker import arv_docker_get_image
26 from .pathmapper import ArvPathMapper
27 from ._version import __version__
28 from . import done
29
30 logger = logging.getLogger('arvados.cwl-runner')
31
32 def trim_listing(obj):
33     """Remove 'listing' field from Directory objects that are keep references.
34
35     When Directory objects represent Keep references, it redundant and
36     potentially very expensive to pass fully enumerated Directory objects
37     between instances of cwl-runner (e.g. a submitting a job, or using the
38     RunInSingleContainer feature), so delete the 'listing' field when it is
39     safe to do so.
40     """
41
42     if obj.get("location", "").startswith("keep:") and "listing" in obj:
43         del obj["listing"]
44     if obj.get("location", "").startswith("_:"):
45         del obj["location"]
46
47 def upload_dependencies(arvrunner, name, document_loader,
48                         workflowobj, uri, loadref_run, include_primary=True):
49     """Upload the dependencies of the workflowobj document to Keep.
50
51     Returns a pathmapper object mapping local paths to keep references.  Also
52     does an in-place update of references in "workflowobj".
53
54     Use scandeps to find $import, $include, $schemas, run, File and Directory
55     fields that represent external references.
56
57     If workflowobj has an "id" field, this will reload the document to ensure
58     it is scanning the raw document prior to preprocessing.
59     """
60
61     loaded = set()
62     def loadref(b, u):
63         joined = document_loader.fetcher.urljoin(b, u)
64         defrg, _ = urlparse.urldefrag(joined)
65         if defrg not in loaded:
66             loaded.add(defrg)
67             # Use fetch_text to get raw file (before preprocessing).
68             text = document_loader.fetch_text(defrg)
69             if isinstance(text, bytes):
70                 textIO = StringIO(text.decode('utf-8'))
71             else:
72                 textIO = StringIO(text)
73             return yaml.safe_load(textIO)
74         else:
75             return {}
76
77     if loadref_run:
78         loadref_fields = set(("$import", "run"))
79     else:
80         loadref_fields = set(("$import",))
81
82     scanobj = workflowobj
83     if "id" in workflowobj:
84         # Need raw file content (before preprocessing) to ensure
85         # that external references in $include and $mixin are captured.
86         scanobj = loadref("", workflowobj["id"])
87
88     sc = scandeps(uri, scanobj,
89                   loadref_fields,
90                   set(("$include", "$schemas", "location")),
91                   loadref, urljoin=document_loader.fetcher.urljoin)
92
93     normalizeFilesDirs(sc)
94
95     if include_primary and "id" in workflowobj:
96         sc.append({"class": "File", "location": workflowobj["id"]})
97
98     if "$schemas" in workflowobj:
99         for s in workflowobj["$schemas"]:
100             sc.append({"class": "File", "location": s})
101
102     mapper = ArvPathMapper(arvrunner, sc, "",
103                            "keep:%s",
104                            "keep:%s/%s",
105                            name=name)
106
107     def setloc(p):
108         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
109             p["location"] = mapper.mapper(p["location"]).resolved
110     adjustFileObjs(workflowobj, setloc)
111     adjustDirObjs(workflowobj, setloc)
112
113     if "$schemas" in workflowobj:
114         sch = []
115         for s in workflowobj["$schemas"]:
116             sch.append(mapper.mapper(s).resolved)
117         workflowobj["$schemas"] = sch
118
119     return mapper
120
121
122 def upload_docker(arvrunner, tool):
123     """Uploads Docker images used in CommandLineTool objects."""
124
125     if isinstance(tool, CommandLineTool):
126         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
127         if docker_req:
128             if docker_req.get("dockerOutputDirectory"):
129                 # TODO: can be supported by containers API, but not jobs API.
130                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
131                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
132             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
133     elif isinstance(tool, cwltool.workflow.Workflow):
134         for s in tool.steps:
135             upload_docker(arvrunner, s.embedded_tool)
136
137 def packed_workflow(arvrunner, tool):
138     """Create a packed workflow.
139
140     A "packed" workflow is one where all the components have been combined into a single document."""
141
142     return pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
143                 tool.tool["id"], tool.metadata)
144
145 def tag_git_version(packed):
146     if tool.tool["id"].startswith("file://"):
147         path = os.path.dirname(tool.tool["id"][7:])
148         try:
149             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
150         except (OSError, subprocess.CalledProcessError):
151             pass
152         else:
153             packed["http://schema.org/version"] = githash
154
155
156 def upload_job_order(arvrunner, name, tool, job_order):
157     """Upload local files referenced in the input object and return updated input
158     object with 'location' updated to the proper keep references.
159     """
160
161     for t in tool.tool["inputs"]:
162         def setSecondary(fileobj):
163             if isinstance(fileobj, dict) and fileobj.get("class") == "File":
164                 if "secondaryFiles" not in fileobj:
165                     fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
166
167             if isinstance(fileobj, list):
168                 for e in fileobj:
169                     setSecondary(e)
170
171         if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
172             setSecondary(job_order[shortname(t["id"])])
173
174     jobmapper = upload_dependencies(arvrunner,
175                                     name,
176                                     tool.doc_loader,
177                                     job_order,
178                                     job_order.get("id", "#"),
179                                     False)
180
181     if "id" in job_order:
182         del job_order["id"]
183
184     # Need to filter this out, gets added by cwltool when providing
185     # parameters on the command line.
186     if "job_order" in job_order:
187         del job_order["job_order"]
188
189     return job_order
190
191 def upload_workflow_deps(arvrunner, tool):
192     # Ensure that Docker images needed by this workflow are available
193
194     upload_docker(arvrunner, tool)
195
196     document_loader = tool.doc_loader
197
198     def upload_tool_deps(deptool):
199         if "id" in deptool:
200             upload_dependencies(arvrunner,
201                                 "%s dependencies" % (shortname(deptool["id"])),
202                                 document_loader,
203                                 deptool,
204                                 deptool["id"],
205                                 False,
206                                 include_primary=False)
207             document_loader.idx[deptool["id"]] = deptool
208
209     tool.visit(upload_tool_deps)
210
211 def arvados_jobs_image(arvrunner, img):
212     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
213
214     try:
215         arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
216     except Exception as e:
217         raise Exception("Docker image %s is not available\n%s" % (img, e) )
218     return img
219
220 def upload_workflow_collection(arvrunner, name, packed):
221     collection = arvados.collection.Collection(api_client=arvrunner.api,
222                                                keep_client=arvrunner.keep_client,
223                                                num_retries=arvrunner.num_retries)
224     with collection.open("workflow.cwl", "w") as f:
225         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
226
227     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
228                ["name", "like", name+"%"]]
229     if arvrunner.project_uuid:
230         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
231     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
232
233     if exists["items"]:
234         logger.info("Using collection %s", exists["items"][0]["uuid"])
235     else:
236         collection.save_new(name=name,
237                             owner_uuid=arvrunner.project_uuid,
238                             ensure_unique_name=True,
239                             num_retries=arvrunner.num_retries)
240         logger.info("Uploaded to %s", collection.manifest_locator())
241
242     return collection.portable_data_hash()
243
244
245 class Runner(object):
246     """Base class for runner processes, which submit an instance of
247     arvados-cwl-runner and wait for the final result."""
248
249     def __init__(self, runner, tool, job_order, enable_reuse,
250                  output_name, output_tags, submit_runner_ram=0,
251                  name=None, on_error=None, submit_runner_image=None):
252         self.arvrunner = runner
253         self.tool = tool
254         self.job_order = job_order
255         self.running = False
256         self.enable_reuse = enable_reuse
257         self.uuid = None
258         self.final_output = None
259         self.output_name = output_name
260         self.output_tags = output_tags
261         self.name = name
262         self.on_error = on_error
263         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
264
265         if submit_runner_ram:
266             self.submit_runner_ram = submit_runner_ram
267         else:
268             self.submit_runner_ram = 1024
269
270         if self.submit_runner_ram <= 0:
271             raise Exception("Value of --submit-runner-ram must be greater than zero")
272
273     def update_pipeline_component(self, record):
274         pass
275
276     def done(self, record):
277         """Base method for handling a completed runner."""
278
279         try:
280             if record["state"] == "Complete":
281                 if record.get("exit_code") is not None:
282                     if record["exit_code"] == 33:
283                         processStatus = "UnsupportedRequirement"
284                     elif record["exit_code"] == 0:
285                         processStatus = "success"
286                     else:
287                         processStatus = "permanentFail"
288                 else:
289                     processStatus = "success"
290             else:
291                 processStatus = "permanentFail"
292
293             outputs = {}
294
295             if processStatus == "permanentFail":
296                 logc = arvados.collection.CollectionReader(record["log"],
297                                                            api_client=self.arvrunner.api,
298                                                            keep_client=self.arvrunner.keep_client,
299                                                            num_retries=self.arvrunner.num_retries)
300                 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
301
302             self.final_output = record["output"]
303             outc = arvados.collection.CollectionReader(self.final_output,
304                                                        api_client=self.arvrunner.api,
305                                                        keep_client=self.arvrunner.keep_client,
306                                                        num_retries=self.arvrunner.num_retries)
307             if "cwl.output.json" in outc:
308                 with outc.open("cwl.output.json") as f:
309                     if f.size() > 0:
310                         outputs = json.load(f)
311             def keepify(fileobj):
312                 path = fileobj["location"]
313                 if not path.startswith("keep:"):
314                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
315             adjustFileObjs(outputs, keepify)
316             adjustDirObjs(outputs, keepify)
317         except Exception as e:
318             logger.exception("[%s] While getting final output object: %s", self.name, e)
319             self.arvrunner.output_callback({}, "permanentFail")
320         else:
321             self.arvrunner.output_callback(outputs, processStatus)
322         finally:
323             if record["uuid"] in self.arvrunner.processes:
324                 del self.arvrunner.processes[record["uuid"]]