13111: Merge branch 'master' into 13111-webdav-projects
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import os
6 import urlparse
7 from functools import partial
8 import logging
9 import json
10 import subprocess
11
12 from StringIO import StringIO
13
14 from schema_salad.sourceline import SourceLine
15
16 from cwltool.command_line_tool import CommandLineTool
17 import cwltool.workflow
18 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
19 from cwltool.load_tool import fetch_document
20 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
21 from cwltool.utils import aslist
22 from cwltool.builder import substitute
23 from cwltool.pack import pack
24
25 import arvados.collection
26 import ruamel.yaml as yaml
27
28 from .arvdocker import arv_docker_get_image
29 from .pathmapper import ArvPathMapper, trim_listing
30 from ._version import __version__
31 from . import done
32
33 logger = logging.getLogger('arvados.cwl-runner')
34
35 def trim_anonymous_location(obj):
36     """Remove 'location' field from File and Directory literals.
37
38     To make internal handling easier, literals are assigned a random id for
39     'location'.  However, when writing the record back out, this can break
40     reproducibility.  Since it is valid for literals not have a 'location'
41     field, remove it.
42
43     """
44
45     if obj.get("location", "").startswith("_:"):
46         del obj["location"]
47
48 def remove_redundant_fields(obj):
49     for field in ("path", "nameext", "nameroot", "dirname"):
50         if field in obj:
51             del obj[field]
52
53 def find_defaults(d, op):
54     if isinstance(d, list):
55         for i in d:
56             find_defaults(i, op)
57     elif isinstance(d, dict):
58         if "default" in d:
59             op(d)
60         else:
61             for i in d.itervalues():
62                 find_defaults(i, op)
63
64 def upload_dependencies(arvrunner, name, document_loader,
65                         workflowobj, uri, loadref_run, include_primary=True):
66     """Upload the dependencies of the workflowobj document to Keep.
67
68     Returns a pathmapper object mapping local paths to keep references.  Also
69     does an in-place update of references in "workflowobj".
70
71     Use scandeps to find $import, $include, $schemas, run, File and Directory
72     fields that represent external references.
73
74     If workflowobj has an "id" field, this will reload the document to ensure
75     it is scanning the raw document prior to preprocessing.
76     """
77
78     loaded = set()
79     def loadref(b, u):
80         joined = document_loader.fetcher.urljoin(b, u)
81         defrg, _ = urlparse.urldefrag(joined)
82         if defrg not in loaded:
83             loaded.add(defrg)
84             # Use fetch_text to get raw file (before preprocessing).
85             text = document_loader.fetch_text(defrg)
86             if isinstance(text, bytes):
87                 textIO = StringIO(text.decode('utf-8'))
88             else:
89                 textIO = StringIO(text)
90             return yaml.safe_load(textIO)
91         else:
92             return {}
93
94     if loadref_run:
95         loadref_fields = set(("$import", "run"))
96     else:
97         loadref_fields = set(("$import",))
98
99     scanobj = workflowobj
100     if "id" in workflowobj:
101         # Need raw file content (before preprocessing) to ensure
102         # that external references in $include and $mixin are captured.
103         scanobj = loadref("", workflowobj["id"])
104
105     sc = scandeps(uri, scanobj,
106                   loadref_fields,
107                   set(("$include", "$schemas", "location")),
108                   loadref, urljoin=document_loader.fetcher.urljoin)
109
110     normalizeFilesDirs(sc)
111
112     if include_primary and "id" in workflowobj:
113         sc.append({"class": "File", "location": workflowobj["id"]})
114
115     if "$schemas" in workflowobj:
116         for s in workflowobj["$schemas"]:
117             sc.append({"class": "File", "location": s})
118
119     def capture_default(obj):
120         remove = [False]
121         def add_default(f):
122             if "location" not in f and "path" in f:
123                 f["location"] = f["path"]
124                 del f["path"]
125             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
126                 # Remove from sc
127                 sc[:] = [x for x in sc if x["location"] != f["location"]]
128                 # Delete "default" from workflowobj
129                 remove[0] = True
130         visit_class(obj["default"], ("File", "Directory"), add_default)
131         if remove[0]:
132             del obj["default"]
133
134     find_defaults(workflowobj, capture_default)
135
136     mapper = ArvPathMapper(arvrunner, sc, "",
137                            "keep:%s",
138                            "keep:%s/%s",
139                            name=name,
140                            single_collection=True)
141
142     def setloc(p):
143         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
144             p["location"] = mapper.mapper(p["location"]).resolved
145     adjustFileObjs(workflowobj, setloc)
146     adjustDirObjs(workflowobj, setloc)
147
148     if "$schemas" in workflowobj:
149         sch = []
150         for s in workflowobj["$schemas"]:
151             sch.append(mapper.mapper(s).resolved)
152         workflowobj["$schemas"] = sch
153
154     return mapper
155
156
157 def upload_docker(arvrunner, tool):
158     """Uploads Docker images used in CommandLineTool objects."""
159
160     if isinstance(tool, CommandLineTool):
161         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
162         if docker_req:
163             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
164                 # TODO: can be supported by containers API, but not jobs API.
165                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
166                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
167             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
168         else:
169             arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
170     elif isinstance(tool, cwltool.workflow.Workflow):
171         for s in tool.steps:
172             upload_docker(arvrunner, s.embedded_tool)
173
174 def packed_workflow(arvrunner, tool, merged_map):
175     """Create a packed workflow.
176
177     A "packed" workflow is one where all the components have been combined into a single document."""
178
179     rewrites = {}
180     packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
181                   tool.tool["id"], tool.metadata, rewrite_out=rewrites)
182
183     rewrite_to_orig = {}
184     for k,v in rewrites.items():
185         rewrite_to_orig[v] = k
186
187     def visit(v, cur_id):
188         if isinstance(v, dict):
189             if v.get("class") in ("CommandLineTool", "Workflow"):
190                 cur_id = rewrite_to_orig.get(v["id"], v["id"])
191             if "location" in v and not v["location"].startswith("keep:"):
192                 v["location"] = merged_map[cur_id][v["location"]]
193             for l in v:
194                 visit(v[l], cur_id)
195         if isinstance(v, list):
196             for l in v:
197                 visit(l, cur_id)
198     visit(packed, None)
199     return packed
200
201 def tag_git_version(packed):
202     if tool.tool["id"].startswith("file://"):
203         path = os.path.dirname(tool.tool["id"][7:])
204         try:
205             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
206         except (OSError, subprocess.CalledProcessError):
207             pass
208         else:
209             packed["http://schema.org/version"] = githash
210
211
212 def discover_secondary_files(inputs, job_order):
213     for t in inputs:
214         def setSecondary(fileobj):
215             if isinstance(fileobj, dict) and fileobj.get("class") == "File":
216                 if "secondaryFiles" not in fileobj:
217                     fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
218
219             if isinstance(fileobj, list):
220                 for e in fileobj:
221                     setSecondary(e)
222
223         if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
224             setSecondary(job_order[shortname(t["id"])])
225
226 def upload_job_order(arvrunner, name, tool, job_order):
227     """Upload local files referenced in the input object and return updated input
228     object with 'location' updated to the proper keep references.
229     """
230
231     discover_secondary_files(tool.tool["inputs"], job_order)
232
233     jobmapper = upload_dependencies(arvrunner,
234                                     name,
235                                     tool.doc_loader,
236                                     job_order,
237                                     job_order.get("id", "#"),
238                                     False)
239
240     if "id" in job_order:
241         del job_order["id"]
242
243     # Need to filter this out, gets added by cwltool when providing
244     # parameters on the command line.
245     if "job_order" in job_order:
246         del job_order["job_order"]
247
248     return job_order
249
250 def upload_workflow_deps(arvrunner, tool):
251     # Ensure that Docker images needed by this workflow are available
252
253     upload_docker(arvrunner, tool)
254
255     document_loader = tool.doc_loader
256
257     merged_map = {}
258
259     def upload_tool_deps(deptool):
260         if "id" in deptool:
261             pm = upload_dependencies(arvrunner,
262                                 "%s dependencies" % (shortname(deptool["id"])),
263                                 document_loader,
264                                 deptool,
265                                 deptool["id"],
266                                 False,
267                                 include_primary=False)
268             document_loader.idx[deptool["id"]] = deptool
269             toolmap = {}
270             for k,v in pm.items():
271                 toolmap[k] = v.resolved
272             merged_map[deptool["id"]] = toolmap
273
274     tool.visit(upload_tool_deps)
275
276     return merged_map
277
278 def arvados_jobs_image(arvrunner, img):
279     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
280
281     try:
282         arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
283     except Exception as e:
284         raise Exception("Docker image %s is not available\n%s" % (img, e) )
285     return img
286
287 def upload_workflow_collection(arvrunner, name, packed):
288     collection = arvados.collection.Collection(api_client=arvrunner.api,
289                                                keep_client=arvrunner.keep_client,
290                                                num_retries=arvrunner.num_retries)
291     with collection.open("workflow.cwl", "w") as f:
292         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
293
294     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
295                ["name", "like", name+"%"]]
296     if arvrunner.project_uuid:
297         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
298     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
299
300     if exists["items"]:
301         logger.info("Using collection %s", exists["items"][0]["uuid"])
302     else:
303         collection.save_new(name=name,
304                             owner_uuid=arvrunner.project_uuid,
305                             ensure_unique_name=True,
306                             num_retries=arvrunner.num_retries)
307         logger.info("Uploaded to %s", collection.manifest_locator())
308
309     return collection.portable_data_hash()
310
311
312 class Runner(object):
313     """Base class for runner processes, which submit an instance of
314     arvados-cwl-runner and wait for the final result."""
315
316     def __init__(self, runner, tool, job_order, enable_reuse,
317                  output_name, output_tags, submit_runner_ram=0,
318                  name=None, on_error=None, submit_runner_image=None,
319                  intermediate_output_ttl=0, merged_map=None, priority=None,
320                  secret_store=None):
321         self.arvrunner = runner
322         self.tool = tool
323         self.job_order = job_order
324         self.running = False
325         if enable_reuse:
326             # If reuse is permitted by command line arguments but
327             # disabled by the workflow itself, disable it.
328             reuse_req, _ = get_feature(self.tool, "http://arvados.org/cwl#ReuseRequirement")
329             if reuse_req:
330                 enable_reuse = reuse_req["enableReuse"]
331         self.enable_reuse = enable_reuse
332         self.uuid = None
333         self.final_output = None
334         self.output_name = output_name
335         self.output_tags = output_tags
336         self.name = name
337         self.on_error = on_error
338         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
339         self.intermediate_output_ttl = intermediate_output_ttl
340         self.priority = priority
341         self.secret_store = secret_store
342
343         if submit_runner_ram:
344             self.submit_runner_ram = submit_runner_ram
345         else:
346             self.submit_runner_ram = 3000
347
348         if self.submit_runner_ram <= 0:
349             raise Exception("Value of --submit-runner-ram must be greater than zero")
350
351         self.merged_map = merged_map or {}
352
353     def update_pipeline_component(self, record):
354         pass
355
356     def done(self, record):
357         """Base method for handling a completed runner."""
358
359         try:
360             if record["state"] == "Complete":
361                 if record.get("exit_code") is not None:
362                     if record["exit_code"] == 33:
363                         processStatus = "UnsupportedRequirement"
364                     elif record["exit_code"] == 0:
365                         processStatus = "success"
366                     else:
367                         processStatus = "permanentFail"
368                 else:
369                     processStatus = "success"
370             else:
371                 processStatus = "permanentFail"
372
373             outputs = {}
374
375             if processStatus == "permanentFail":
376                 logc = arvados.collection.CollectionReader(record["log"],
377                                                            api_client=self.arvrunner.api,
378                                                            keep_client=self.arvrunner.keep_client,
379                                                            num_retries=self.arvrunner.num_retries)
380                 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
381
382             self.final_output = record["output"]
383             outc = arvados.collection.CollectionReader(self.final_output,
384                                                        api_client=self.arvrunner.api,
385                                                        keep_client=self.arvrunner.keep_client,
386                                                        num_retries=self.arvrunner.num_retries)
387             if "cwl.output.json" in outc:
388                 with outc.open("cwl.output.json") as f:
389                     if f.size() > 0:
390                         outputs = json.load(f)
391             def keepify(fileobj):
392                 path = fileobj["location"]
393                 if not path.startswith("keep:"):
394                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
395             adjustFileObjs(outputs, keepify)
396             adjustDirObjs(outputs, keepify)
397         except Exception as e:
398             logger.exception("[%s] While getting final output object: %s", self.name, e)
399             self.arvrunner.output_callback({}, "permanentFail")
400         else:
401             self.arvrunner.output_callback(outputs, processStatus)
402         finally:
403             self.arvrunner.process_done(record["uuid"])