Merge branch 'master' into 14012-arvput-check-cache
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import os
6 import urlparse
7 from functools import partial
8 import logging
9 import json
10 import subprocess32 as subprocess
11 from collections import namedtuple
12
13 from StringIO import StringIO
14
15 from schema_salad.sourceline import SourceLine, cmap
16
17 from cwltool.command_line_tool import CommandLineTool
18 import cwltool.workflow
19 from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
20 from cwltool.load_tool import fetch_document
21 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
22 from cwltool.utils import aslist
23 from cwltool.builder import substitute
24 from cwltool.pack import pack
25
26 import arvados.collection
27 import ruamel.yaml as yaml
28
29 import arvados_cwl.arvdocker
30 from .pathmapper import ArvPathMapper, trim_listing
31 from ._version import __version__
32 from . import done
33
34 logger = logging.getLogger('arvados.cwl-runner')
35
36 def trim_anonymous_location(obj):
37     """Remove 'location' field from File and Directory literals.
38
39     To make internal handling easier, literals are assigned a random id for
40     'location'.  However, when writing the record back out, this can break
41     reproducibility.  Since it is valid for literals not have a 'location'
42     field, remove it.
43
44     """
45
46     if obj.get("location", "").startswith("_:"):
47         del obj["location"]
48
49
50 def remove_redundant_fields(obj):
51     for field in ("path", "nameext", "nameroot", "dirname"):
52         if field in obj:
53             del obj[field]
54
55
56 def find_defaults(d, op):
57     if isinstance(d, list):
58         for i in d:
59             find_defaults(i, op)
60     elif isinstance(d, dict):
61         if "default" in d:
62             op(d)
63         else:
64             for i in d.itervalues():
65                 find_defaults(i, op)
66
67 def setSecondary(t, fileobj, discovered):
68     if isinstance(fileobj, dict) and fileobj.get("class") == "File":
69         if "secondaryFiles" not in fileobj:
70             fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]])
71             if discovered is not None:
72                 discovered[fileobj["location"]] = fileobj["secondaryFiles"]
73     elif isinstance(fileobj, list):
74         for e in fileobj:
75             setSecondary(t, e, discovered)
76
77 def discover_secondary_files(inputs, job_order, discovered=None):
78     for t in inputs:
79         if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
80             setSecondary(t, job_order[shortname(t["id"])], discovered)
81
82
83 def upload_dependencies(arvrunner, name, document_loader,
84                         workflowobj, uri, loadref_run,
85                         include_primary=True, discovered_secondaryfiles=None):
86     """Upload the dependencies of the workflowobj document to Keep.
87
88     Returns a pathmapper object mapping local paths to keep references.  Also
89     does an in-place update of references in "workflowobj".
90
91     Use scandeps to find $import, $include, $schemas, run, File and Directory
92     fields that represent external references.
93
94     If workflowobj has an "id" field, this will reload the document to ensure
95     it is scanning the raw document prior to preprocessing.
96     """
97
98     loaded = set()
99     def loadref(b, u):
100         joined = document_loader.fetcher.urljoin(b, u)
101         defrg, _ = urlparse.urldefrag(joined)
102         if defrg not in loaded:
103             loaded.add(defrg)
104             # Use fetch_text to get raw file (before preprocessing).
105             text = document_loader.fetch_text(defrg)
106             if isinstance(text, bytes):
107                 textIO = StringIO(text.decode('utf-8'))
108             else:
109                 textIO = StringIO(text)
110             return yaml.safe_load(textIO)
111         else:
112             return {}
113
114     if loadref_run:
115         loadref_fields = set(("$import", "run"))
116     else:
117         loadref_fields = set(("$import",))
118
119     scanobj = workflowobj
120     if "id" in workflowobj:
121         # Need raw file content (before preprocessing) to ensure
122         # that external references in $include and $mixin are captured.
123         scanobj = loadref("", workflowobj["id"])
124
125     sc_result = scandeps(uri, scanobj,
126                   loadref_fields,
127                   set(("$include", "$schemas", "location")),
128                   loadref, urljoin=document_loader.fetcher.urljoin)
129
130     sc = []
131     def only_real(obj):
132         # Only interested in local files than need to be uploaded,
133         # don't include file literals, keep references, etc.
134         sp = obj.get("location", "").split(":")
135         if len(sp) > 1 and sp[0] in ("file", "http", "https"):
136             sc.append(obj)
137
138     visit_class(sc_result, ("File", "Directory"), only_real)
139
140     normalizeFilesDirs(sc)
141
142     if include_primary and "id" in workflowobj:
143         sc.append({"class": "File", "location": workflowobj["id"]})
144
145     if "$schemas" in workflowobj:
146         for s in workflowobj["$schemas"]:
147             sc.append({"class": "File", "location": s})
148
149     def visit_default(obj):
150         remove = [False]
151         def ensure_default_location(f):
152             if "location" not in f and "path" in f:
153                 f["location"] = f["path"]
154                 del f["path"]
155             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
156                 # Doesn't exist, remove from list of dependencies to upload
157                 sc[:] = [x for x in sc if x["location"] != f["location"]]
158                 # Delete "default" from workflowobj
159                 remove[0] = True
160         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
161         if remove[0]:
162             del obj["default"]
163
164     find_defaults(workflowobj, visit_default)
165
166     discovered = {}
167     def discover_default_secondary_files(obj):
168         discover_secondary_files(obj["inputs"],
169                                  {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t},
170                                  discovered)
171
172     visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
173
174     for d in list(discovered.keys()):
175         # Only interested in discovered secondaryFiles which are local
176         # files that need to be uploaded.
177         if d.startswith("file:"):
178             sc.extend(discovered[d])
179         else:
180             del discovered[d]
181
182     mapper = ArvPathMapper(arvrunner, sc, "",
183                            "keep:%s",
184                            "keep:%s/%s",
185                            name=name,
186                            single_collection=True)
187
188     def setloc(p):
189         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
190             p["location"] = mapper.mapper(p["location"]).resolved
191
192     visit_class(workflowobj, ("File", "Directory"), setloc)
193     visit_class(discovered, ("File", "Directory"), setloc)
194
195     if discovered_secondaryfiles is not None:
196         for d in discovered:
197             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
198
199     if "$schemas" in workflowobj:
200         sch = []
201         for s in workflowobj["$schemas"]:
202             sch.append(mapper.mapper(s).resolved)
203         workflowobj["$schemas"] = sch
204
205     return mapper
206
207
208 def upload_docker(arvrunner, tool):
209     """Uploads Docker images used in CommandLineTool objects."""
210
211     if isinstance(tool, CommandLineTool):
212         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
213         if docker_req:
214             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
215                 # TODO: can be supported by containers API, but not jobs API.
216                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
217                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
218             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
219         else:
220             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
221     elif isinstance(tool, cwltool.workflow.Workflow):
222         for s in tool.steps:
223             upload_docker(arvrunner, s.embedded_tool)
224
225
226 def packed_workflow(arvrunner, tool, merged_map):
227     """Create a packed workflow.
228
229     A "packed" workflow is one where all the components have been combined into a single document."""
230
231     rewrites = {}
232     packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
233                   tool.tool["id"], tool.metadata, rewrite_out=rewrites)
234
235     rewrite_to_orig = {v: k for k,v in rewrites.items()}
236
237     def visit(v, cur_id):
238         if isinstance(v, dict):
239             if v.get("class") in ("CommandLineTool", "Workflow"):
240                 if "id" not in v:
241                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
242                 cur_id = rewrite_to_orig.get(v["id"], v["id"])
243             if "location" in v and not v["location"].startswith("keep:"):
244                 v["location"] = merged_map[cur_id].resolved[v["location"]]
245             if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
246                 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
247             if v.get("class") == "DockerRequirement":
248                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
249             for l in v:
250                 visit(v[l], cur_id)
251         if isinstance(v, list):
252             for l in v:
253                 visit(l, cur_id)
254     visit(packed, None)
255     return packed
256
257
258 def tag_git_version(packed):
259     if tool.tool["id"].startswith("file://"):
260         path = os.path.dirname(tool.tool["id"][7:])
261         try:
262             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
263         except (OSError, subprocess.CalledProcessError):
264             pass
265         else:
266             packed["http://schema.org/version"] = githash
267
268
269 def upload_job_order(arvrunner, name, tool, job_order):
270     """Upload local files referenced in the input object and return updated input
271     object with 'location' updated to the proper keep references.
272     """
273
274     discover_secondary_files(tool.tool["inputs"], job_order)
275
276     jobmapper = upload_dependencies(arvrunner,
277                                     name,
278                                     tool.doc_loader,
279                                     job_order,
280                                     job_order.get("id", "#"),
281                                     False)
282
283     if "id" in job_order:
284         del job_order["id"]
285
286     # Need to filter this out, gets added by cwltool when providing
287     # parameters on the command line.
288     if "job_order" in job_order:
289         del job_order["job_order"]
290
291     return job_order
292
293 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
294
295 def upload_workflow_deps(arvrunner, tool):
296     # Ensure that Docker images needed by this workflow are available
297
298     upload_docker(arvrunner, tool)
299
300     document_loader = tool.doc_loader
301
302     merged_map = {}
303
304     def upload_tool_deps(deptool):
305         if "id" in deptool:
306             discovered_secondaryfiles = {}
307             pm = upload_dependencies(arvrunner,
308                                      "%s dependencies" % (shortname(deptool["id"])),
309                                      document_loader,
310                                      deptool,
311                                      deptool["id"],
312                                      False,
313                                      include_primary=False,
314                                      discovered_secondaryfiles=discovered_secondaryfiles)
315             document_loader.idx[deptool["id"]] = deptool
316             toolmap = {}
317             for k,v in pm.items():
318                 toolmap[k] = v.resolved
319             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
320
321     tool.visit(upload_tool_deps)
322
323     return merged_map
324
325 def arvados_jobs_image(arvrunner, img):
326     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
327
328     try:
329         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
330     except Exception as e:
331         raise Exception("Docker image %s is not available\n%s" % (img, e) )
332
333
334 def upload_workflow_collection(arvrunner, name, packed):
335     collection = arvados.collection.Collection(api_client=arvrunner.api,
336                                                keep_client=arvrunner.keep_client,
337                                                num_retries=arvrunner.num_retries)
338     with collection.open("workflow.cwl", "w") as f:
339         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
340
341     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
342                ["name", "like", name+"%"]]
343     if arvrunner.project_uuid:
344         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
345     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
346
347     if exists["items"]:
348         logger.info("Using collection %s", exists["items"][0]["uuid"])
349     else:
350         collection.save_new(name=name,
351                             owner_uuid=arvrunner.project_uuid,
352                             ensure_unique_name=True,
353                             num_retries=arvrunner.num_retries)
354         logger.info("Uploaded to %s", collection.manifest_locator())
355
356     return collection.portable_data_hash()
357
358
359 class Runner(object):
360     """Base class for runner processes, which submit an instance of
361     arvados-cwl-runner and wait for the final result."""
362
363     def __init__(self, runner, tool, job_order, enable_reuse,
364                  output_name, output_tags, submit_runner_ram=0,
365                  name=None, on_error=None, submit_runner_image=None,
366                  intermediate_output_ttl=0, merged_map=None,
367                  priority=None, secret_store=None,
368                  collection_cache_size=256,
369                  collection_cache_is_default=True):
370         self.arvrunner = runner
371         self.tool = tool
372         self.job_order = job_order
373         self.running = False
374         if enable_reuse:
375             # If reuse is permitted by command line arguments but
376             # disabled by the workflow itself, disable it.
377             reuse_req, _ = self.tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
378             if reuse_req:
379                 enable_reuse = reuse_req["enableReuse"]
380         self.enable_reuse = enable_reuse
381         self.uuid = None
382         self.final_output = None
383         self.output_name = output_name
384         self.output_tags = output_tags
385         self.name = name
386         self.on_error = on_error
387         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
388         self.intermediate_output_ttl = intermediate_output_ttl
389         self.priority = priority
390         self.secret_store = secret_store
391
392         self.submit_runner_cores = 1
393         self.submit_runner_ram = 1024  # defaut 1 GiB
394         self.collection_cache_size = collection_cache_size
395
396         runner_resource_req, _ = self.tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
397         if runner_resource_req:
398             if runner_resource_req.get("coresMin"):
399                 self.submit_runner_cores = runner_resource_req["coresMin"]
400             if runner_resource_req.get("ramMin"):
401                 self.submit_runner_ram = runner_resource_req["ramMin"]
402             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
403                 self.collection_cache_size = runner_resource_req["keep_cache"]
404
405         if submit_runner_ram:
406             # Command line / initializer overrides default and/or spec from workflow
407             self.submit_runner_ram = submit_runner_ram
408
409         if self.submit_runner_ram <= 0:
410             raise Exception("Value of submit-runner-ram must be greater than zero")
411
412         if self.submit_runner_cores <= 0:
413             raise Exception("Value of submit-runner-cores must be greater than zero")
414
415         self.merged_map = merged_map or {}
416
417     def update_pipeline_component(self, record):
418         pass
419
420     def done(self, record):
421         """Base method for handling a completed runner."""
422
423         try:
424             if record["state"] == "Complete":
425                 if record.get("exit_code") is not None:
426                     if record["exit_code"] == 33:
427                         processStatus = "UnsupportedRequirement"
428                     elif record["exit_code"] == 0:
429                         processStatus = "success"
430                     else:
431                         processStatus = "permanentFail"
432                 else:
433                     processStatus = "success"
434             else:
435                 processStatus = "permanentFail"
436
437             outputs = {}
438
439             if processStatus == "permanentFail":
440                 logc = arvados.collection.CollectionReader(record["log"],
441                                                            api_client=self.arvrunner.api,
442                                                            keep_client=self.arvrunner.keep_client,
443                                                            num_retries=self.arvrunner.num_retries)
444                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
445
446             self.final_output = record["output"]
447             outc = arvados.collection.CollectionReader(self.final_output,
448                                                        api_client=self.arvrunner.api,
449                                                        keep_client=self.arvrunner.keep_client,
450                                                        num_retries=self.arvrunner.num_retries)
451             if "cwl.output.json" in outc:
452                 with outc.open("cwl.output.json") as f:
453                     if f.size() > 0:
454                         outputs = json.load(f)
455             def keepify(fileobj):
456                 path = fileobj["location"]
457                 if not path.startswith("keep:"):
458                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
459             adjustFileObjs(outputs, keepify)
460             adjustDirObjs(outputs, keepify)
461         except Exception as e:
462             logger.exception("[%s] While getting final output object: %s", self.name, e)
463             self.arvrunner.output_callback({}, "permanentFail")
464         else:
465             self.arvrunner.output_callback(outputs, processStatus)