9385bde63c4aa60b55de22d7c8b87908039444e3
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8
9 import os
10 import sys
11 import re
12 import urllib.parse
13 from functools import partial
14 import logging
15 import json
16 from collections import namedtuple
17 from io import StringIO
18
19 if os.name == "posix" and sys.version_info[0] < 3:
20     import subprocess32 as subprocess
21 else:
22     import subprocess
23
24 from schema_salad.sourceline import SourceLine, cmap
25
26 from cwltool.command_line_tool import CommandLineTool
27 import cwltool.workflow
28 from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process
29 from cwltool.load_tool import fetch_document
30 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
31 from cwltool.utils import aslist
32 from cwltool.builder import substitute
33 from cwltool.pack import pack
34 import schema_salad.validate as validate
35
36 import arvados.collection
37 from .util import collectionUUID
38 import ruamel.yaml as yaml
39
40 import arvados_cwl.arvdocker
41 from .pathmapper import ArvPathMapper, trim_listing
42 from ._version import __version__
43 from . import done
44
45 logger = logging.getLogger('arvados.cwl-runner')
46
47 def trim_anonymous_location(obj):
48     """Remove 'location' field from File and Directory literals.
49
50     To make internal handling easier, literals are assigned a random id for
51     'location'.  However, when writing the record back out, this can break
52     reproducibility.  Since it is valid for literals not have a 'location'
53     field, remove it.
54
55     """
56
57     if obj.get("location", "").startswith("_:"):
58         del obj["location"]
59
60
61 def remove_redundant_fields(obj):
62     for field in ("path", "nameext", "nameroot", "dirname"):
63         if field in obj:
64             del obj[field]
65
66
67 def find_defaults(d, op):
68     if isinstance(d, list):
69         for i in d:
70             find_defaults(i, op)
71     elif isinstance(d, dict):
72         if "default" in d:
73             op(d)
74         else:
75             for i in viewvalues(d):
76                 find_defaults(i, op)
77
78 def setSecondary(t, fileobj, discovered):
79     if isinstance(fileobj, dict) and fileobj.get("class") == "File":
80         if "secondaryFiles" not in fileobj:
81             fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]])
82             if discovered is not None:
83                 discovered[fileobj["location"]] = fileobj["secondaryFiles"]
84     elif isinstance(fileobj, list):
85         for e in fileobj:
86             setSecondary(t, e, discovered)
87
88 def discover_secondary_files(inputs, job_order, discovered=None):
89     for t in inputs:
90         if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
91             setSecondary(t, job_order[shortname(t["id"])], discovered)
92
93 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
94 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
95
96 def upload_dependencies(arvrunner, name, document_loader,
97                         workflowobj, uri, loadref_run,
98                         include_primary=True, discovered_secondaryfiles=None):
99     """Upload the dependencies of the workflowobj document to Keep.
100
101     Returns a pathmapper object mapping local paths to keep references.  Also
102     does an in-place update of references in "workflowobj".
103
104     Use scandeps to find $import, $include, $schemas, run, File and Directory
105     fields that represent external references.
106
107     If workflowobj has an "id" field, this will reload the document to ensure
108     it is scanning the raw document prior to preprocessing.
109     """
110
111     loaded = set()
112     def loadref(b, u):
113         joined = document_loader.fetcher.urljoin(b, u)
114         defrg, _ = urllib.parse.urldefrag(joined)
115         if defrg not in loaded:
116             loaded.add(defrg)
117             # Use fetch_text to get raw file (before preprocessing).
118             text = document_loader.fetch_text(defrg)
119             if isinstance(text, bytes):
120                 textIO = StringIO(text.decode('utf-8'))
121             else:
122                 textIO = StringIO(text)
123             return yaml.safe_load(textIO)
124         else:
125             return {}
126
127     if loadref_run:
128         loadref_fields = set(("$import", "run"))
129     else:
130         loadref_fields = set(("$import",))
131
132     scanobj = workflowobj
133     if "id" in workflowobj:
134         # Need raw file content (before preprocessing) to ensure
135         # that external references in $include and $mixin are captured.
136         scanobj = loadref("", workflowobj["id"])
137
138     sc_result = scandeps(uri, scanobj,
139                   loadref_fields,
140                   set(("$include", "$schemas", "location")),
141                   loadref, urljoin=document_loader.fetcher.urljoin)
142
143     sc = []
144     uuids = {}
145
146     def collect_uuids(obj):
147         loc = obj.get("location", "")
148         sp = loc.split(":")
149         if sp[0] == "keep":
150             # Collect collection uuids that need to be resolved to
151             # portable data hashes
152             gp = collection_uuid_pattern.match(loc)
153             if gp:
154                 uuids[gp.groups()[0]] = obj
155             if collectionUUID in obj:
156                 uuids[obj[collectionUUID]] = obj
157
158     def collect_uploads(obj):
159         loc = obj.get("location", "")
160         sp = loc.split(":")
161         if len(sp) < 1:
162             return
163         if sp[0] in ("file", "http", "https"):
164             # Record local files than need to be uploaded,
165             # don't include file literals, keep references, etc.
166             sc.append(obj)
167         collect_uuids(obj)
168
169     visit_class(workflowobj, ("File", "Directory"), collect_uuids)
170     visit_class(sc_result, ("File", "Directory"), collect_uploads)
171
172     # Resolve any collection uuids we found to portable data hashes
173     # and assign them to uuid_map
174     uuid_map = {}
175     fetch_uuids = list(uuids.keys())
176     while fetch_uuids:
177         # For a large number of fetch_uuids, API server may limit
178         # response size, so keep fetching from API server has nothing
179         # more to give us.
180         lookups = arvrunner.api.collections().list(
181             filters=[["uuid", "in", fetch_uuids]],
182             count="none",
183             select=["uuid", "portable_data_hash"]).execute(
184                 num_retries=arvrunner.num_retries)
185
186         if not lookups["items"]:
187             break
188
189         for l in lookups["items"]:
190             uuid_map[l["uuid"]] = l["portable_data_hash"]
191
192         fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
193
194     normalizeFilesDirs(sc)
195
196     if include_primary and "id" in workflowobj:
197         sc.append({"class": "File", "location": workflowobj["id"]})
198
199     if "$schemas" in workflowobj:
200         for s in workflowobj["$schemas"]:
201             sc.append({"class": "File", "location": s})
202
203     def visit_default(obj):
204         remove = [False]
205         def ensure_default_location(f):
206             if "location" not in f and "path" in f:
207                 f["location"] = f["path"]
208                 del f["path"]
209             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
210                 # Doesn't exist, remove from list of dependencies to upload
211                 sc[:] = [x for x in sc if x["location"] != f["location"]]
212                 # Delete "default" from workflowobj
213                 remove[0] = True
214         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
215         if remove[0]:
216             del obj["default"]
217
218     find_defaults(workflowobj, visit_default)
219
220     discovered = {}
221     def discover_default_secondary_files(obj):
222         discover_secondary_files(obj["inputs"],
223                                  {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t},
224                                  discovered)
225
226     visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
227
228     for d in list(discovered):
229         # Only interested in discovered secondaryFiles which are local
230         # files that need to be uploaded.
231         if d.startswith("file:"):
232             sc.extend(discovered[d])
233         else:
234             del discovered[d]
235
236     mapper = ArvPathMapper(arvrunner, sc, "",
237                            "keep:%s",
238                            "keep:%s/%s",
239                            name=name,
240                            single_collection=True)
241
242     def setloc(p):
243         loc = p.get("location")
244         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
245             p["location"] = mapper.mapper(p["location"]).resolved
246             return
247
248         if not loc:
249             return
250
251         if collectionUUID in p:
252             uuid = p[collectionUUID]
253             if uuid not in uuid_map:
254                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
255                     "Collection uuid %s not found" % uuid)
256             gp = collection_pdh_pattern.match(loc)
257             if gp and uuid_map[uuid] != gp.groups()[0]:
258                 # This file entry has both collectionUUID and a PDH
259                 # location. If the PDH doesn't match the one returned
260                 # the API server, raise an error.
261                 raise SourceLine(p, "location", validate.ValidationException).makeError(
262                     "Expected collection uuid %s to be %s but API server reported %s" % (
263                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
264
265         gp = collection_uuid_pattern.match(loc)
266         if not gp:
267             return
268         uuid = gp.groups()[0]
269         if uuid not in uuid_map:
270             raise SourceLine(p, "location", validate.ValidationException).makeError(
271                 "Collection uuid %s not found" % uuid)
272         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
273         p[collectionUUID] = uuid
274
275     visit_class(workflowobj, ("File", "Directory"), setloc)
276     visit_class(discovered, ("File", "Directory"), setloc)
277
278     if discovered_secondaryfiles is not None:
279         for d in discovered:
280             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
281
282     if "$schemas" in workflowobj:
283         sch = []
284         for s in workflowobj["$schemas"]:
285             sch.append(mapper.mapper(s).resolved)
286         workflowobj["$schemas"] = sch
287
288     return mapper
289
290
291 def upload_docker(arvrunner, tool):
292     """Uploads Docker images used in CommandLineTool objects."""
293
294     if isinstance(tool, CommandLineTool):
295         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
296         if docker_req:
297             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
298                 # TODO: can be supported by containers API, but not jobs API.
299                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
300                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
301             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
302         else:
303             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
304     elif isinstance(tool, cwltool.workflow.Workflow):
305         for s in tool.steps:
306             upload_docker(arvrunner, s.embedded_tool)
307
308
309 def packed_workflow(arvrunner, tool, merged_map):
310     """Create a packed workflow.
311
312     A "packed" workflow is one where all the components have been combined into a single document."""
313
314     rewrites = {}
315     packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
316                   tool.tool["id"], tool.metadata, rewrite_out=rewrites)
317
318     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
319
320     def visit(v, cur_id):
321         if isinstance(v, dict):
322             if v.get("class") in ("CommandLineTool", "Workflow"):
323                 if "id" not in v:
324                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
325                 cur_id = rewrite_to_orig.get(v["id"], v["id"])
326             if "location" in v and not v["location"].startswith("keep:"):
327                 v["location"] = merged_map[cur_id].resolved[v["location"]]
328             if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
329                 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
330             if v.get("class") == "DockerRequirement":
331                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
332             for l in v:
333                 visit(v[l], cur_id)
334         if isinstance(v, list):
335             for l in v:
336                 visit(l, cur_id)
337     visit(packed, None)
338     return packed
339
340
341 def tag_git_version(packed):
342     if tool.tool["id"].startswith("file://"):
343         path = os.path.dirname(tool.tool["id"][7:])
344         try:
345             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
346         except (OSError, subprocess.CalledProcessError):
347             pass
348         else:
349             packed["http://schema.org/version"] = githash
350
351
352 def upload_job_order(arvrunner, name, tool, job_order):
353     """Upload local files referenced in the input object and return updated input
354     object with 'location' updated to the proper keep references.
355     """
356
357     discover_secondary_files(tool.tool["inputs"], job_order)
358
359     jobmapper = upload_dependencies(arvrunner,
360                                     name,
361                                     tool.doc_loader,
362                                     job_order,
363                                     job_order.get("id", "#"),
364                                     False)
365
366     if "id" in job_order:
367         del job_order["id"]
368
369     # Need to filter this out, gets added by cwltool when providing
370     # parameters on the command line.
371     if "job_order" in job_order:
372         del job_order["job_order"]
373
374     return job_order
375
376 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
377
378 def upload_workflow_deps(arvrunner, tool):
379     # Ensure that Docker images needed by this workflow are available
380
381     upload_docker(arvrunner, tool)
382
383     document_loader = tool.doc_loader
384
385     merged_map = {}
386
387     def upload_tool_deps(deptool):
388         if "id" in deptool:
389             discovered_secondaryfiles = {}
390             pm = upload_dependencies(arvrunner,
391                                      "%s dependencies" % (shortname(deptool["id"])),
392                                      document_loader,
393                                      deptool,
394                                      deptool["id"],
395                                      False,
396                                      include_primary=False,
397                                      discovered_secondaryfiles=discovered_secondaryfiles)
398             document_loader.idx[deptool["id"]] = deptool
399             toolmap = {}
400             for k,v in pm.items():
401                 toolmap[k] = v.resolved
402             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
403
404     tool.visit(upload_tool_deps)
405
406     return merged_map
407
408 def arvados_jobs_image(arvrunner, img):
409     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
410
411     try:
412         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
413     except Exception as e:
414         raise Exception("Docker image %s is not available\n%s" % (img, e) )
415
416
417 def upload_workflow_collection(arvrunner, name, packed):
418     collection = arvados.collection.Collection(api_client=arvrunner.api,
419                                                keep_client=arvrunner.keep_client,
420                                                num_retries=arvrunner.num_retries)
421     with collection.open("workflow.cwl", "w") as f:
422         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
423
424     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
425                ["name", "like", name+"%"]]
426     if arvrunner.project_uuid:
427         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
428     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
429
430     if exists["items"]:
431         logger.info("Using collection %s", exists["items"][0]["uuid"])
432     else:
433         collection.save_new(name=name,
434                             owner_uuid=arvrunner.project_uuid,
435                             ensure_unique_name=True,
436                             num_retries=arvrunner.num_retries)
437         logger.info("Uploaded to %s", collection.manifest_locator())
438
439     return collection.portable_data_hash()
440
441
442 class Runner(Process):
443     """Base class for runner processes, which submit an instance of
444     arvados-cwl-runner and wait for the final result."""
445
446     def __init__(self, runner, tool, loadingContext, enable_reuse,
447                  output_name, output_tags, submit_runner_ram=0,
448                  name=None, on_error=None, submit_runner_image=None,
449                  intermediate_output_ttl=0, merged_map=None,
450                  priority=None, secret_store=None,
451                  collection_cache_size=256,
452                  collection_cache_is_default=True):
453
454         super(Runner, self).__init__(tool.tool, loadingContext)
455
456         self.arvrunner = runner
457         self.embedded_tool = tool
458         self.job_order = None
459         self.running = False
460         if enable_reuse:
461             # If reuse is permitted by command line arguments but
462             # disabled by the workflow itself, disable it.
463             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
464             if reuse_req:
465                 enable_reuse = reuse_req["enableReuse"]
466         self.enable_reuse = enable_reuse
467         self.uuid = None
468         self.final_output = None
469         self.output_name = output_name
470         self.output_tags = output_tags
471         self.name = name
472         self.on_error = on_error
473         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
474         self.intermediate_output_ttl = intermediate_output_ttl
475         self.priority = priority
476         self.secret_store = secret_store
477
478         self.submit_runner_cores = 1
479         self.submit_runner_ram = 1024  # defaut 1 GiB
480         self.collection_cache_size = collection_cache_size
481
482         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
483         if runner_resource_req:
484             if runner_resource_req.get("coresMin"):
485                 self.submit_runner_cores = runner_resource_req["coresMin"]
486             if runner_resource_req.get("ramMin"):
487                 self.submit_runner_ram = runner_resource_req["ramMin"]
488             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
489                 self.collection_cache_size = runner_resource_req["keep_cache"]
490
491         if submit_runner_ram:
492             # Command line / initializer overrides default and/or spec from workflow
493             self.submit_runner_ram = submit_runner_ram
494
495         if self.submit_runner_ram <= 0:
496             raise Exception("Value of submit-runner-ram must be greater than zero")
497
498         if self.submit_runner_cores <= 0:
499             raise Exception("Value of submit-runner-cores must be greater than zero")
500
501         self.merged_map = merged_map or {}
502
503     def job(self,
504             job_order,         # type: Mapping[Text, Text]
505             output_callbacks,  # type: Callable[[Any, Any], Any]
506             runtimeContext     # type: RuntimeContext
507            ):  # type: (...) -> Generator[Any, None, None]
508         self.job_order = job_order
509         self._init_job(job_order, runtimeContext)
510         yield self
511
512     def update_pipeline_component(self, record):
513         pass
514
515     def done(self, record):
516         """Base method for handling a completed runner."""
517
518         try:
519             if record["state"] == "Complete":
520                 if record.get("exit_code") is not None:
521                     if record["exit_code"] == 33:
522                         processStatus = "UnsupportedRequirement"
523                     elif record["exit_code"] == 0:
524                         processStatus = "success"
525                     else:
526                         processStatus = "permanentFail"
527                 else:
528                     processStatus = "success"
529             else:
530                 processStatus = "permanentFail"
531
532             outputs = {}
533
534             if processStatus == "permanentFail":
535                 logc = arvados.collection.CollectionReader(record["log"],
536                                                            api_client=self.arvrunner.api,
537                                                            keep_client=self.arvrunner.keep_client,
538                                                            num_retries=self.arvrunner.num_retries)
539                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
540
541             self.final_output = record["output"]
542             outc = arvados.collection.CollectionReader(self.final_output,
543                                                        api_client=self.arvrunner.api,
544                                                        keep_client=self.arvrunner.keep_client,
545                                                        num_retries=self.arvrunner.num_retries)
546             if "cwl.output.json" in outc:
547                 with outc.open("cwl.output.json", "rb") as f:
548                     if f.size() > 0:
549                         outputs = json.loads(f.read().decode())
550             def keepify(fileobj):
551                 path = fileobj["location"]
552                 if not path.startswith("keep:"):
553                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
554             adjustFileObjs(outputs, keepify)
555             adjustDirObjs(outputs, keepify)
556         except Exception:
557             logger.exception("[%s] While getting final output object", self.name)
558             self.arvrunner.output_callback({}, "permanentFail")
559         else:
560             self.arvrunner.output_callback(outputs, processStatus)