15028: Update cwltool/schema-salad deps, fix tests
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8
9 import os
10 import sys
11 import re
12 import urllib.parse
13 from functools import partial
14 import logging
15 import json
16 from collections import namedtuple
17 from io import StringIO
18
19 if os.name == "posix" and sys.version_info[0] < 3:
20     import subprocess32 as subprocess
21 else:
22     import subprocess
23
24 from schema_salad.sourceline import SourceLine, cmap
25
26 from cwltool.command_line_tool import CommandLineTool
27 import cwltool.workflow
28 from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process
29 from cwltool.load_tool import fetch_document
30 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
31 from cwltool.utils import aslist
32 from cwltool.builder import substitute
33 from cwltool.pack import pack
34 from cwltool.update import INTERNAL_VERSION
35 import schema_salad.validate as validate
36
37 import arvados.collection
38 from .util import collectionUUID
39 import ruamel.yaml as yaml
40
41 import arvados_cwl.arvdocker
42 from .pathmapper import ArvPathMapper, trim_listing
43 from ._version import __version__
44 from . import done
45
46 logger = logging.getLogger('arvados.cwl-runner')
47
48 def trim_anonymous_location(obj):
49     """Remove 'location' field from File and Directory literals.
50
51     To make internal handling easier, literals are assigned a random id for
52     'location'.  However, when writing the record back out, this can break
53     reproducibility.  Since it is valid for literals not have a 'location'
54     field, remove it.
55
56     """
57
58     if obj.get("location", "").startswith("_:"):
59         del obj["location"]
60
61
62 def remove_redundant_fields(obj):
63     for field in ("path", "nameext", "nameroot", "dirname"):
64         if field in obj:
65             del obj[field]
66
67
68 def find_defaults(d, op):
69     if isinstance(d, list):
70         for i in d:
71             find_defaults(i, op)
72     elif isinstance(d, dict):
73         if "default" in d:
74             op(d)
75         else:
76             for i in viewvalues(d):
77                 find_defaults(i, op)
78
79 def setSecondary(t, fileobj, discovered):
80     if isinstance(fileobj, dict) and fileobj.get("class") == "File":
81         if "secondaryFiles" not in fileobj:
82             fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]])
83             if discovered is not None:
84                 discovered[fileobj["location"]] = fileobj["secondaryFiles"]
85     elif isinstance(fileobj, list):
86         for e in fileobj:
87             setSecondary(t, e, discovered)
88
89 def discover_secondary_files(inputs, job_order, discovered=None):
90     for t in inputs:
91         if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
92             setSecondary(t, job_order[shortname(t["id"])], discovered)
93
94 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
95 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
96
97 def upload_dependencies(arvrunner, name, document_loader,
98                         workflowobj, uri, loadref_run,
99                         include_primary=True, discovered_secondaryfiles=None):
100     """Upload the dependencies of the workflowobj document to Keep.
101
102     Returns a pathmapper object mapping local paths to keep references.  Also
103     does an in-place update of references in "workflowobj".
104
105     Use scandeps to find $import, $include, $schemas, run, File and Directory
106     fields that represent external references.
107
108     If workflowobj has an "id" field, this will reload the document to ensure
109     it is scanning the raw document prior to preprocessing.
110     """
111
112     loaded = set()
113     def loadref(b, u):
114         joined = document_loader.fetcher.urljoin(b, u)
115         defrg, _ = urllib.parse.urldefrag(joined)
116         if defrg not in loaded:
117             loaded.add(defrg)
118             # Use fetch_text to get raw file (before preprocessing).
119             text = document_loader.fetch_text(defrg)
120             if isinstance(text, bytes):
121                 textIO = StringIO(text.decode('utf-8'))
122             else:
123                 textIO = StringIO(text)
124             return yaml.safe_load(textIO)
125         else:
126             return {}
127
128     if loadref_run:
129         loadref_fields = set(("$import", "run"))
130     else:
131         loadref_fields = set(("$import",))
132
133     scanobj = workflowobj
134     if "id" in workflowobj:
135         # Need raw file content (before preprocessing) to ensure
136         # that external references in $include and $mixin are captured.
137         scanobj = loadref("", workflowobj["id"])
138
139     sc_result = scandeps(uri, scanobj,
140                   loadref_fields,
141                   set(("$include", "$schemas", "location")),
142                   loadref, urljoin=document_loader.fetcher.urljoin)
143
144     sc = []
145     uuids = {}
146
147     def collect_uuids(obj):
148         loc = obj.get("location", "")
149         sp = loc.split(":")
150         if sp[0] == "keep":
151             # Collect collection uuids that need to be resolved to
152             # portable data hashes
153             gp = collection_uuid_pattern.match(loc)
154             if gp:
155                 uuids[gp.groups()[0]] = obj
156             if collectionUUID in obj:
157                 uuids[obj[collectionUUID]] = obj
158
159     def collect_uploads(obj):
160         loc = obj.get("location", "")
161         sp = loc.split(":")
162         if len(sp) < 1:
163             return
164         if sp[0] in ("file", "http", "https"):
165             # Record local files than need to be uploaded,
166             # don't include file literals, keep references, etc.
167             sc.append(obj)
168         collect_uuids(obj)
169
170     visit_class(workflowobj, ("File", "Directory"), collect_uuids)
171     visit_class(sc_result, ("File", "Directory"), collect_uploads)
172
173     # Resolve any collection uuids we found to portable data hashes
174     # and assign them to uuid_map
175     uuid_map = {}
176     fetch_uuids = list(uuids.keys())
177     while fetch_uuids:
178         # For a large number of fetch_uuids, API server may limit
179         # response size, so keep fetching from API server has nothing
180         # more to give us.
181         lookups = arvrunner.api.collections().list(
182             filters=[["uuid", "in", fetch_uuids]],
183             count="none",
184             select=["uuid", "portable_data_hash"]).execute(
185                 num_retries=arvrunner.num_retries)
186
187         if not lookups["items"]:
188             break
189
190         for l in lookups["items"]:
191             uuid_map[l["uuid"]] = l["portable_data_hash"]
192
193         fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
194
195     normalizeFilesDirs(sc)
196
197     if include_primary and "id" in workflowobj:
198         sc.append({"class": "File", "location": workflowobj["id"]})
199
200     if "$schemas" in workflowobj:
201         for s in workflowobj["$schemas"]:
202             sc.append({"class": "File", "location": s})
203
204     def visit_default(obj):
205         remove = [False]
206         def ensure_default_location(f):
207             if "location" not in f and "path" in f:
208                 f["location"] = f["path"]
209                 del f["path"]
210             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
211                 # Doesn't exist, remove from list of dependencies to upload
212                 sc[:] = [x for x in sc if x["location"] != f["location"]]
213                 # Delete "default" from workflowobj
214                 remove[0] = True
215         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
216         if remove[0]:
217             del obj["default"]
218
219     find_defaults(workflowobj, visit_default)
220
221     discovered = {}
222     def discover_default_secondary_files(obj):
223         discover_secondary_files(obj["inputs"],
224                                  {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t},
225                                  discovered)
226
227     visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
228
229     for d in list(discovered):
230         # Only interested in discovered secondaryFiles which are local
231         # files that need to be uploaded.
232         if d.startswith("file:"):
233             sc.extend(discovered[d])
234         else:
235             del discovered[d]
236
237     mapper = ArvPathMapper(arvrunner, sc, "",
238                            "keep:%s",
239                            "keep:%s/%s",
240                            name=name,
241                            single_collection=True)
242
243     def setloc(p):
244         loc = p.get("location")
245         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
246             p["location"] = mapper.mapper(p["location"]).resolved
247             return
248
249         if not loc:
250             return
251
252         if collectionUUID in p:
253             uuid = p[collectionUUID]
254             if uuid not in uuid_map:
255                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
256                     "Collection uuid %s not found" % uuid)
257             gp = collection_pdh_pattern.match(loc)
258             if gp and uuid_map[uuid] != gp.groups()[0]:
259                 # This file entry has both collectionUUID and a PDH
260                 # location. If the PDH doesn't match the one returned
261                 # the API server, raise an error.
262                 raise SourceLine(p, "location", validate.ValidationException).makeError(
263                     "Expected collection uuid %s to be %s but API server reported %s" % (
264                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
265
266         gp = collection_uuid_pattern.match(loc)
267         if not gp:
268             return
269         uuid = gp.groups()[0]
270         if uuid not in uuid_map:
271             raise SourceLine(p, "location", validate.ValidationException).makeError(
272                 "Collection uuid %s not found" % uuid)
273         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
274         p[collectionUUID] = uuid
275
276     visit_class(workflowobj, ("File", "Directory"), setloc)
277     visit_class(discovered, ("File", "Directory"), setloc)
278
279     if discovered_secondaryfiles is not None:
280         for d in discovered:
281             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
282
283     if "$schemas" in workflowobj:
284         sch = []
285         for s in workflowobj["$schemas"]:
286             sch.append(mapper.mapper(s).resolved)
287         workflowobj["$schemas"] = sch
288
289     return mapper
290
291
292 def upload_docker(arvrunner, tool):
293     """Uploads Docker images used in CommandLineTool objects."""
294
295     if isinstance(tool, CommandLineTool):
296         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
297         if docker_req:
298             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
299                 # TODO: can be supported by containers API, but not jobs API.
300                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
301                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
302             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
303         else:
304             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
305     elif isinstance(tool, cwltool.workflow.Workflow):
306         for s in tool.steps:
307             upload_docker(arvrunner, s.embedded_tool)
308
309
310 def packed_workflow(arvrunner, tool, merged_map):
311     """Create a packed workflow.
312
313     A "packed" workflow is one where all the components have been combined into a single document."""
314
315     rewrites = {}
316     packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
317                   tool.tool["id"], tool.metadata, rewrite_out=rewrites)
318
319     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
320
321     def visit(v, cur_id):
322         if isinstance(v, dict):
323             if v.get("class") in ("CommandLineTool", "Workflow"):
324                 if "id" not in v:
325                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
326                 cur_id = rewrite_to_orig.get(v["id"], v["id"])
327             if "location" in v and not v["location"].startswith("keep:"):
328                 v["location"] = merged_map[cur_id].resolved[v["location"]]
329             if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
330                 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
331             if v.get("class") == "DockerRequirement":
332                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
333             for l in v:
334                 visit(v[l], cur_id)
335         if isinstance(v, list):
336             for l in v:
337                 visit(l, cur_id)
338     visit(packed, None)
339     return packed
340
341
342 def tag_git_version(packed):
343     if tool.tool["id"].startswith("file://"):
344         path = os.path.dirname(tool.tool["id"][7:])
345         try:
346             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
347         except (OSError, subprocess.CalledProcessError):
348             pass
349         else:
350             packed["http://schema.org/version"] = githash
351
352
353 def upload_job_order(arvrunner, name, tool, job_order):
354     """Upload local files referenced in the input object and return updated input
355     object with 'location' updated to the proper keep references.
356     """
357
358     discover_secondary_files(tool.tool["inputs"], job_order)
359
360     jobmapper = upload_dependencies(arvrunner,
361                                     name,
362                                     tool.doc_loader,
363                                     job_order,
364                                     job_order.get("id", "#"),
365                                     False)
366
367     if "id" in job_order:
368         del job_order["id"]
369
370     # Need to filter this out, gets added by cwltool when providing
371     # parameters on the command line.
372     if "job_order" in job_order:
373         del job_order["job_order"]
374
375     return job_order
376
377 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
378
379 def upload_workflow_deps(arvrunner, tool):
380     # Ensure that Docker images needed by this workflow are available
381
382     upload_docker(arvrunner, tool)
383
384     document_loader = tool.doc_loader
385
386     merged_map = {}
387
388     def upload_tool_deps(deptool):
389         if "id" in deptool:
390             discovered_secondaryfiles = {}
391             pm = upload_dependencies(arvrunner,
392                                      "%s dependencies" % (shortname(deptool["id"])),
393                                      document_loader,
394                                      deptool,
395                                      deptool["id"],
396                                      False,
397                                      include_primary=False,
398                                      discovered_secondaryfiles=discovered_secondaryfiles)
399             document_loader.idx[deptool["id"]] = deptool
400             toolmap = {}
401             for k,v in pm.items():
402                 toolmap[k] = v.resolved
403             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
404
405     tool.visit(upload_tool_deps)
406
407     return merged_map
408
409 def arvados_jobs_image(arvrunner, img):
410     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
411
412     try:
413         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
414     except Exception as e:
415         raise Exception("Docker image %s is not available\n%s" % (img, e) )
416
417
418 def upload_workflow_collection(arvrunner, name, packed):
419     collection = arvados.collection.Collection(api_client=arvrunner.api,
420                                                keep_client=arvrunner.keep_client,
421                                                num_retries=arvrunner.num_retries)
422     with collection.open("workflow.cwl", "w") as f:
423         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
424
425     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
426                ["name", "like", name+"%"]]
427     if arvrunner.project_uuid:
428         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
429     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
430
431     if exists["items"]:
432         logger.info("Using collection %s", exists["items"][0]["uuid"])
433     else:
434         collection.save_new(name=name,
435                             owner_uuid=arvrunner.project_uuid,
436                             ensure_unique_name=True,
437                             num_retries=arvrunner.num_retries)
438         logger.info("Uploaded to %s", collection.manifest_locator())
439
440     return collection.portable_data_hash()
441
442
443 class Runner(Process):
444     """Base class for runner processes, which submit an instance of
445     arvados-cwl-runner and wait for the final result."""
446
447     def __init__(self, runner, tool, loadingContext, enable_reuse,
448                  output_name, output_tags, submit_runner_ram=0,
449                  name=None, on_error=None, submit_runner_image=None,
450                  intermediate_output_ttl=0, merged_map=None,
451                  priority=None, secret_store=None,
452                  collection_cache_size=256,
453                  collection_cache_is_default=True):
454
455         loadingContext = loadingContext.copy()
456         loadingContext.metadata = loadingContext.metadata.copy()
457         loadingContext.metadata["cwlVersion"] = INTERNAL_VERSION
458
459         super(Runner, self).__init__(tool.tool, loadingContext)
460
461         self.arvrunner = runner
462         self.embedded_tool = tool
463         self.job_order = None
464         self.running = False
465         if enable_reuse:
466             # If reuse is permitted by command line arguments but
467             # disabled by the workflow itself, disable it.
468             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
469             if reuse_req:
470                 enable_reuse = reuse_req["enableReuse"]
471         self.enable_reuse = enable_reuse
472         self.uuid = None
473         self.final_output = None
474         self.output_name = output_name
475         self.output_tags = output_tags
476         self.name = name
477         self.on_error = on_error
478         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
479         self.intermediate_output_ttl = intermediate_output_ttl
480         self.priority = priority
481         self.secret_store = secret_store
482
483         self.submit_runner_cores = 1
484         self.submit_runner_ram = 1024  # defaut 1 GiB
485         self.collection_cache_size = collection_cache_size
486
487         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
488         if runner_resource_req:
489             if runner_resource_req.get("coresMin"):
490                 self.submit_runner_cores = runner_resource_req["coresMin"]
491             if runner_resource_req.get("ramMin"):
492                 self.submit_runner_ram = runner_resource_req["ramMin"]
493             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
494                 self.collection_cache_size = runner_resource_req["keep_cache"]
495
496         if submit_runner_ram:
497             # Command line / initializer overrides default and/or spec from workflow
498             self.submit_runner_ram = submit_runner_ram
499
500         if self.submit_runner_ram <= 0:
501             raise Exception("Value of submit-runner-ram must be greater than zero")
502
503         if self.submit_runner_cores <= 0:
504             raise Exception("Value of submit-runner-cores must be greater than zero")
505
506         self.merged_map = merged_map or {}
507
508     def job(self,
509             job_order,         # type: Mapping[Text, Text]
510             output_callbacks,  # type: Callable[[Any, Any], Any]
511             runtimeContext     # type: RuntimeContext
512            ):  # type: (...) -> Generator[Any, None, None]
513         self.job_order = job_order
514         self._init_job(job_order, runtimeContext)
515         yield self
516
517     def update_pipeline_component(self, record):
518         pass
519
520     def done(self, record):
521         """Base method for handling a completed runner."""
522
523         try:
524             if record["state"] == "Complete":
525                 if record.get("exit_code") is not None:
526                     if record["exit_code"] == 33:
527                         processStatus = "UnsupportedRequirement"
528                     elif record["exit_code"] == 0:
529                         processStatus = "success"
530                     else:
531                         processStatus = "permanentFail"
532                 else:
533                     processStatus = "success"
534             else:
535                 processStatus = "permanentFail"
536
537             outputs = {}
538
539             if processStatus == "permanentFail":
540                 logc = arvados.collection.CollectionReader(record["log"],
541                                                            api_client=self.arvrunner.api,
542                                                            keep_client=self.arvrunner.keep_client,
543                                                            num_retries=self.arvrunner.num_retries)
544                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
545
546             self.final_output = record["output"]
547             outc = arvados.collection.CollectionReader(self.final_output,
548                                                        api_client=self.arvrunner.api,
549                                                        keep_client=self.arvrunner.keep_client,
550                                                        num_retries=self.arvrunner.num_retries)
551             if "cwl.output.json" in outc:
552                 with outc.open("cwl.output.json", "rb") as f:
553                     if f.size() > 0:
554                         outputs = json.loads(f.read().decode())
555             def keepify(fileobj):
556                 path = fileobj["location"]
557                 if not path.startswith("keep:"):
558                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
559             adjustFileObjs(outputs, keepify)
560             adjustDirObjs(outputs, keepify)
561         except Exception:
562             logger.exception("[%s] While getting final output object", self.name)
563             self.arvrunner.output_callback({}, "permanentFail")
564         else:
565             self.arvrunner.output_callback(outputs, processStatus)