Merge branch '14870-retry-logs' refs #14870
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8
9 import os
10 import sys
11 import urllib.parse
12 from functools import partial
13 import logging
14 import json
15 from collections import namedtuple
16 from io import StringIO
17
18 if os.name == "posix" and sys.version_info[0] < 3:
19     import subprocess32 as subprocess
20 else:
21     import subprocess
22
23 from schema_salad.sourceline import SourceLine, cmap
24
25 from cwltool.command_line_tool import CommandLineTool
26 import cwltool.workflow
27 from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process
28 from cwltool.load_tool import fetch_document
29 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
30 from cwltool.utils import aslist
31 from cwltool.builder import substitute
32 from cwltool.pack import pack
33
34 import arvados.collection
35 import ruamel.yaml as yaml
36
37 import arvados_cwl.arvdocker
38 from .pathmapper import ArvPathMapper, trim_listing
39 from ._version import __version__
40 from . import done
41
42 logger = logging.getLogger('arvados.cwl-runner')
43
44 def trim_anonymous_location(obj):
45     """Remove 'location' field from File and Directory literals.
46
47     To make internal handling easier, literals are assigned a random id for
48     'location'.  However, when writing the record back out, this can break
49     reproducibility.  Since it is valid for literals not have a 'location'
50     field, remove it.
51
52     """
53
54     if obj.get("location", "").startswith("_:"):
55         del obj["location"]
56
57
58 def remove_redundant_fields(obj):
59     for field in ("path", "nameext", "nameroot", "dirname"):
60         if field in obj:
61             del obj[field]
62
63
64 def find_defaults(d, op):
65     if isinstance(d, list):
66         for i in d:
67             find_defaults(i, op)
68     elif isinstance(d, dict):
69         if "default" in d:
70             op(d)
71         else:
72             for i in viewvalues(d):
73                 find_defaults(i, op)
74
75 def setSecondary(t, fileobj, discovered):
76     if isinstance(fileobj, dict) and fileobj.get("class") == "File":
77         if "secondaryFiles" not in fileobj:
78             fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]])
79             if discovered is not None:
80                 discovered[fileobj["location"]] = fileobj["secondaryFiles"]
81     elif isinstance(fileobj, list):
82         for e in fileobj:
83             setSecondary(t, e, discovered)
84
85 def discover_secondary_files(inputs, job_order, discovered=None):
86     for t in inputs:
87         if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
88             setSecondary(t, job_order[shortname(t["id"])], discovered)
89
90
91 def upload_dependencies(arvrunner, name, document_loader,
92                         workflowobj, uri, loadref_run,
93                         include_primary=True, discovered_secondaryfiles=None):
94     """Upload the dependencies of the workflowobj document to Keep.
95
96     Returns a pathmapper object mapping local paths to keep references.  Also
97     does an in-place update of references in "workflowobj".
98
99     Use scandeps to find $import, $include, $schemas, run, File and Directory
100     fields that represent external references.
101
102     If workflowobj has an "id" field, this will reload the document to ensure
103     it is scanning the raw document prior to preprocessing.
104     """
105
106     loaded = set()
107     def loadref(b, u):
108         joined = document_loader.fetcher.urljoin(b, u)
109         defrg, _ = urllib.parse.urldefrag(joined)
110         if defrg not in loaded:
111             loaded.add(defrg)
112             # Use fetch_text to get raw file (before preprocessing).
113             text = document_loader.fetch_text(defrg)
114             if isinstance(text, bytes):
115                 textIO = StringIO(text.decode('utf-8'))
116             else:
117                 textIO = StringIO(text)
118             return yaml.safe_load(textIO)
119         else:
120             return {}
121
122     if loadref_run:
123         loadref_fields = set(("$import", "run"))
124     else:
125         loadref_fields = set(("$import",))
126
127     scanobj = workflowobj
128     if "id" in workflowobj:
129         # Need raw file content (before preprocessing) to ensure
130         # that external references in $include and $mixin are captured.
131         scanobj = loadref("", workflowobj["id"])
132
133     sc_result = scandeps(uri, scanobj,
134                   loadref_fields,
135                   set(("$include", "$schemas", "location")),
136                   loadref, urljoin=document_loader.fetcher.urljoin)
137
138     sc = []
139     def only_real(obj):
140         # Only interested in local files than need to be uploaded,
141         # don't include file literals, keep references, etc.
142         sp = obj.get("location", "").split(":")
143         if len(sp) > 1 and sp[0] in ("file", "http", "https"):
144             sc.append(obj)
145
146     visit_class(sc_result, ("File", "Directory"), only_real)
147
148     normalizeFilesDirs(sc)
149
150     if include_primary and "id" in workflowobj:
151         sc.append({"class": "File", "location": workflowobj["id"]})
152
153     if "$schemas" in workflowobj:
154         for s in workflowobj["$schemas"]:
155             sc.append({"class": "File", "location": s})
156
157     def visit_default(obj):
158         remove = [False]
159         def ensure_default_location(f):
160             if "location" not in f and "path" in f:
161                 f["location"] = f["path"]
162                 del f["path"]
163             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
164                 # Doesn't exist, remove from list of dependencies to upload
165                 sc[:] = [x for x in sc if x["location"] != f["location"]]
166                 # Delete "default" from workflowobj
167                 remove[0] = True
168         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
169         if remove[0]:
170             del obj["default"]
171
172     find_defaults(workflowobj, visit_default)
173
174     discovered = {}
175     def discover_default_secondary_files(obj):
176         discover_secondary_files(obj["inputs"],
177                                  {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t},
178                                  discovered)
179
180     visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
181
182     for d in list(discovered):
183         # Only interested in discovered secondaryFiles which are local
184         # files that need to be uploaded.
185         if d.startswith("file:"):
186             sc.extend(discovered[d])
187         else:
188             del discovered[d]
189
190     mapper = ArvPathMapper(arvrunner, sc, "",
191                            "keep:%s",
192                            "keep:%s/%s",
193                            name=name,
194                            single_collection=True)
195
196     def setloc(p):
197         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
198             p["location"] = mapper.mapper(p["location"]).resolved
199
200     visit_class(workflowobj, ("File", "Directory"), setloc)
201     visit_class(discovered, ("File", "Directory"), setloc)
202
203     if discovered_secondaryfiles is not None:
204         for d in discovered:
205             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
206
207     if "$schemas" in workflowobj:
208         sch = []
209         for s in workflowobj["$schemas"]:
210             sch.append(mapper.mapper(s).resolved)
211         workflowobj["$schemas"] = sch
212
213     return mapper
214
215
216 def upload_docker(arvrunner, tool):
217     """Uploads Docker images used in CommandLineTool objects."""
218
219     if isinstance(tool, CommandLineTool):
220         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
221         if docker_req:
222             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
223                 # TODO: can be supported by containers API, but not jobs API.
224                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
225                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
226             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
227         else:
228             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
229     elif isinstance(tool, cwltool.workflow.Workflow):
230         for s in tool.steps:
231             upload_docker(arvrunner, s.embedded_tool)
232
233
234 def packed_workflow(arvrunner, tool, merged_map):
235     """Create a packed workflow.
236
237     A "packed" workflow is one where all the components have been combined into a single document."""
238
239     rewrites = {}
240     packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
241                   tool.tool["id"], tool.metadata, rewrite_out=rewrites)
242
243     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
244
245     def visit(v, cur_id):
246         if isinstance(v, dict):
247             if v.get("class") in ("CommandLineTool", "Workflow"):
248                 if "id" not in v:
249                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
250                 cur_id = rewrite_to_orig.get(v["id"], v["id"])
251             if "location" in v and not v["location"].startswith("keep:"):
252                 v["location"] = merged_map[cur_id].resolved[v["location"]]
253             if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
254                 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
255             if v.get("class") == "DockerRequirement":
256                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
257             for l in v:
258                 visit(v[l], cur_id)
259         if isinstance(v, list):
260             for l in v:
261                 visit(l, cur_id)
262     visit(packed, None)
263     return packed
264
265
266 def tag_git_version(packed):
267     if tool.tool["id"].startswith("file://"):
268         path = os.path.dirname(tool.tool["id"][7:])
269         try:
270             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
271         except (OSError, subprocess.CalledProcessError):
272             pass
273         else:
274             packed["http://schema.org/version"] = githash
275
276
277 def upload_job_order(arvrunner, name, tool, job_order):
278     """Upload local files referenced in the input object and return updated input
279     object with 'location' updated to the proper keep references.
280     """
281
282     discover_secondary_files(tool.tool["inputs"], job_order)
283
284     jobmapper = upload_dependencies(arvrunner,
285                                     name,
286                                     tool.doc_loader,
287                                     job_order,
288                                     job_order.get("id", "#"),
289                                     False)
290
291     if "id" in job_order:
292         del job_order["id"]
293
294     # Need to filter this out, gets added by cwltool when providing
295     # parameters on the command line.
296     if "job_order" in job_order:
297         del job_order["job_order"]
298
299     return job_order
300
301 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
302
303 def upload_workflow_deps(arvrunner, tool):
304     # Ensure that Docker images needed by this workflow are available
305
306     upload_docker(arvrunner, tool)
307
308     document_loader = tool.doc_loader
309
310     merged_map = {}
311
312     def upload_tool_deps(deptool):
313         if "id" in deptool:
314             discovered_secondaryfiles = {}
315             pm = upload_dependencies(arvrunner,
316                                      "%s dependencies" % (shortname(deptool["id"])),
317                                      document_loader,
318                                      deptool,
319                                      deptool["id"],
320                                      False,
321                                      include_primary=False,
322                                      discovered_secondaryfiles=discovered_secondaryfiles)
323             document_loader.idx[deptool["id"]] = deptool
324             toolmap = {}
325             for k,v in pm.items():
326                 toolmap[k] = v.resolved
327             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
328
329     tool.visit(upload_tool_deps)
330
331     return merged_map
332
333 def arvados_jobs_image(arvrunner, img):
334     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
335
336     try:
337         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
338     except Exception as e:
339         raise Exception("Docker image %s is not available\n%s" % (img, e) )
340
341
342 def upload_workflow_collection(arvrunner, name, packed):
343     collection = arvados.collection.Collection(api_client=arvrunner.api,
344                                                keep_client=arvrunner.keep_client,
345                                                num_retries=arvrunner.num_retries)
346     with collection.open("workflow.cwl", "w") as f:
347         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
348
349     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
350                ["name", "like", name+"%"]]
351     if arvrunner.project_uuid:
352         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
353     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
354
355     if exists["items"]:
356         logger.info("Using collection %s", exists["items"][0]["uuid"])
357     else:
358         collection.save_new(name=name,
359                             owner_uuid=arvrunner.project_uuid,
360                             ensure_unique_name=True,
361                             num_retries=arvrunner.num_retries)
362         logger.info("Uploaded to %s", collection.manifest_locator())
363
364     return collection.portable_data_hash()
365
366
367 class Runner(Process):
368     """Base class for runner processes, which submit an instance of
369     arvados-cwl-runner and wait for the final result."""
370
371     def __init__(self, runner, tool, loadingContext, enable_reuse,
372                  output_name, output_tags, submit_runner_ram=0,
373                  name=None, on_error=None, submit_runner_image=None,
374                  intermediate_output_ttl=0, merged_map=None,
375                  priority=None, secret_store=None,
376                  collection_cache_size=256,
377                  collection_cache_is_default=True):
378
379         super(Runner, self).__init__(tool.tool, loadingContext)
380
381         self.arvrunner = runner
382         self.embedded_tool = tool
383         self.job_order = None
384         self.running = False
385         if enable_reuse:
386             # If reuse is permitted by command line arguments but
387             # disabled by the workflow itself, disable it.
388             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
389             if reuse_req:
390                 enable_reuse = reuse_req["enableReuse"]
391         self.enable_reuse = enable_reuse
392         self.uuid = None
393         self.final_output = None
394         self.output_name = output_name
395         self.output_tags = output_tags
396         self.name = name
397         self.on_error = on_error
398         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
399         self.intermediate_output_ttl = intermediate_output_ttl
400         self.priority = priority
401         self.secret_store = secret_store
402
403         self.submit_runner_cores = 1
404         self.submit_runner_ram = 1024  # defaut 1 GiB
405         self.collection_cache_size = collection_cache_size
406
407         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
408         if runner_resource_req:
409             if runner_resource_req.get("coresMin"):
410                 self.submit_runner_cores = runner_resource_req["coresMin"]
411             if runner_resource_req.get("ramMin"):
412                 self.submit_runner_ram = runner_resource_req["ramMin"]
413             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
414                 self.collection_cache_size = runner_resource_req["keep_cache"]
415
416         if submit_runner_ram:
417             # Command line / initializer overrides default and/or spec from workflow
418             self.submit_runner_ram = submit_runner_ram
419
420         if self.submit_runner_ram <= 0:
421             raise Exception("Value of submit-runner-ram must be greater than zero")
422
423         if self.submit_runner_cores <= 0:
424             raise Exception("Value of submit-runner-cores must be greater than zero")
425
426         self.merged_map = merged_map or {}
427
428     def job(self,
429             job_order,         # type: Mapping[Text, Text]
430             output_callbacks,  # type: Callable[[Any, Any], Any]
431             runtimeContext     # type: RuntimeContext
432            ):  # type: (...) -> Generator[Any, None, None]
433         self.job_order = job_order
434         self._init_job(job_order, runtimeContext)
435         yield self
436
437     def update_pipeline_component(self, record):
438         pass
439
440     def done(self, record):
441         """Base method for handling a completed runner."""
442
443         try:
444             if record["state"] == "Complete":
445                 if record.get("exit_code") is not None:
446                     if record["exit_code"] == 33:
447                         processStatus = "UnsupportedRequirement"
448                     elif record["exit_code"] == 0:
449                         processStatus = "success"
450                     else:
451                         processStatus = "permanentFail"
452                 else:
453                     processStatus = "success"
454             else:
455                 processStatus = "permanentFail"
456
457             outputs = {}
458
459             if processStatus == "permanentFail":
460                 logc = arvados.collection.CollectionReader(record["log"],
461                                                            api_client=self.arvrunner.api,
462                                                            keep_client=self.arvrunner.keep_client,
463                                                            num_retries=self.arvrunner.num_retries)
464                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
465
466             self.final_output = record["output"]
467             outc = arvados.collection.CollectionReader(self.final_output,
468                                                        api_client=self.arvrunner.api,
469                                                        keep_client=self.arvrunner.keep_client,
470                                                        num_retries=self.arvrunner.num_retries)
471             if "cwl.output.json" in outc:
472                 with outc.open("cwl.output.json", "rb") as f:
473                     if f.size() > 0:
474                         outputs = json.loads(f.read().decode())
475             def keepify(fileobj):
476                 path = fileobj["location"]
477                 if not path.startswith("keep:"):
478                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
479             adjustFileObjs(outputs, keepify)
480             adjustDirObjs(outputs, keepify)
481         except Exception:
482             logger.exception("[%s] While getting final output object", self.name)
483             self.arvrunner.output_callback({}, "permanentFail")
484         else:
485             self.arvrunner.output_callback(outputs, processStatus)