14322: Tweak workbench support from manual testing
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8
9 import os
10 import sys
11 import re
12 import urllib.parse
13 from functools import partial
14 import logging
15 import json
16 from collections import namedtuple
17 from io import StringIO
18
19 if os.name == "posix" and sys.version_info[0] < 3:
20     import subprocess32 as subprocess
21 else:
22     import subprocess
23
24 from schema_salad.sourceline import SourceLine, cmap
25
26 from cwltool.command_line_tool import CommandLineTool
27 import cwltool.workflow
28 from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process
29 from cwltool.load_tool import fetch_document
30 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
31 from cwltool.utils import aslist
32 from cwltool.builder import substitute
33 from cwltool.pack import pack
34 import schema_salad.validate as validate
35
36 import arvados.collection
37 from .util import collectionUUID
38 import ruamel.yaml as yaml
39
40 import arvados_cwl.arvdocker
41 from .pathmapper import ArvPathMapper, trim_listing
42 from ._version import __version__
43 from . import done
44
45 logger = logging.getLogger('arvados.cwl-runner')
46
47 def trim_anonymous_location(obj):
48     """Remove 'location' field from File and Directory literals.
49
50     To make internal handling easier, literals are assigned a random id for
51     'location'.  However, when writing the record back out, this can break
52     reproducibility.  Since it is valid for literals not have a 'location'
53     field, remove it.
54
55     """
56
57     if obj.get("location", "").startswith("_:"):
58         del obj["location"]
59
60
61 def remove_redundant_fields(obj):
62     for field in ("path", "nameext", "nameroot", "dirname"):
63         if field in obj:
64             del obj[field]
65
66
67 def find_defaults(d, op):
68     if isinstance(d, list):
69         for i in d:
70             find_defaults(i, op)
71     elif isinstance(d, dict):
72         if "default" in d:
73             op(d)
74         else:
75             for i in viewvalues(d):
76                 find_defaults(i, op)
77
78 def setSecondary(t, fileobj, discovered):
79     if isinstance(fileobj, dict) and fileobj.get("class") == "File":
80         if "secondaryFiles" not in fileobj:
81             fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]])
82             if discovered is not None:
83                 discovered[fileobj["location"]] = fileobj["secondaryFiles"]
84     elif isinstance(fileobj, list):
85         for e in fileobj:
86             setSecondary(t, e, discovered)
87
88 def discover_secondary_files(inputs, job_order, discovered=None):
89     for t in inputs:
90         if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
91             setSecondary(t, job_order[shortname(t["id"])], discovered)
92
93 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
94 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
95
96 def upload_dependencies(arvrunner, name, document_loader,
97                         workflowobj, uri, loadref_run,
98                         include_primary=True, discovered_secondaryfiles=None):
99     """Upload the dependencies of the workflowobj document to Keep.
100
101     Returns a pathmapper object mapping local paths to keep references.  Also
102     does an in-place update of references in "workflowobj".
103
104     Use scandeps to find $import, $include, $schemas, run, File and Directory
105     fields that represent external references.
106
107     If workflowobj has an "id" field, this will reload the document to ensure
108     it is scanning the raw document prior to preprocessing.
109     """
110
111     loaded = set()
112     def loadref(b, u):
113         joined = document_loader.fetcher.urljoin(b, u)
114         defrg, _ = urllib.parse.urldefrag(joined)
115         if defrg not in loaded:
116             loaded.add(defrg)
117             # Use fetch_text to get raw file (before preprocessing).
118             text = document_loader.fetch_text(defrg)
119             if isinstance(text, bytes):
120                 textIO = StringIO(text.decode('utf-8'))
121             else:
122                 textIO = StringIO(text)
123             return yaml.safe_load(textIO)
124         else:
125             return {}
126
127     if loadref_run:
128         loadref_fields = set(("$import", "run"))
129     else:
130         loadref_fields = set(("$import",))
131
132     scanobj = workflowobj
133     if "id" in workflowobj:
134         # Need raw file content (before preprocessing) to ensure
135         # that external references in $include and $mixin are captured.
136         scanobj = loadref("", workflowobj["id"])
137
138     sc_result = scandeps(uri, scanobj,
139                   loadref_fields,
140                   set(("$include", "$schemas", "location")),
141                   loadref, urljoin=document_loader.fetcher.urljoin)
142
143     sc = []
144     uuids = {}
145
146     def collect_uuids(obj):
147         loc = obj.get("location", "")
148         sp = loc.split(":")
149         if sp[0] == "keep":
150             # Collect collection uuids that need to be resolved to
151             # portable data hashes
152             gp = collection_uuid_pattern.match(loc)
153             if gp:
154                 uuids[gp.groups()[0]] = obj
155             if collectionUUID in obj:
156                 uuids[obj[collectionUUID]] = obj
157
158     def collect_uploads(obj):
159         loc = obj.get("location", "")
160         sp = loc.split(":")
161         if len(sp) < 1:
162             return
163         if sp[0] in ("file", "http", "https"):
164             # Record local files than need to be uploaded,
165             # don't include file literals, keep references, etc.
166             sc.append(obj)
167         collect_uuids(obj)
168
169     visit_class(workflowobj, ("File", "Directory"), collect_uuids)
170     visit_class(sc_result, ("File", "Directory"), collect_uploads)
171
172     # Resolve any collection uuids we found to portable data hashes
173     # and assign them to uuid_map
174     uuid_map = {}
175     fetch_uuids = list(uuids.keys())
176     while fetch_uuids:
177         lookups = arvrunner.api.collections().list(
178             filters=[["uuid", "in", fetch_uuids]],
179             count="none",
180             select=["uuid", "portable_data_hash"]).execute(
181                 num_retries=arvrunner.num_retries)
182
183         if not lookups["items"]:
184             break
185
186         for l in lookups["items"]:
187             uuid_map[l["uuid"]] = l["portable_data_hash"]
188
189         fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
190
191     normalizeFilesDirs(sc)
192
193     if include_primary and "id" in workflowobj:
194         sc.append({"class": "File", "location": workflowobj["id"]})
195
196     if "$schemas" in workflowobj:
197         for s in workflowobj["$schemas"]:
198             sc.append({"class": "File", "location": s})
199
200     def visit_default(obj):
201         remove = [False]
202         def ensure_default_location(f):
203             if "location" not in f and "path" in f:
204                 f["location"] = f["path"]
205                 del f["path"]
206             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
207                 # Doesn't exist, remove from list of dependencies to upload
208                 sc[:] = [x for x in sc if x["location"] != f["location"]]
209                 # Delete "default" from workflowobj
210                 remove[0] = True
211         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
212         if remove[0]:
213             del obj["default"]
214
215     find_defaults(workflowobj, visit_default)
216
217     discovered = {}
218     def discover_default_secondary_files(obj):
219         discover_secondary_files(obj["inputs"],
220                                  {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t},
221                                  discovered)
222
223     visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
224
225     for d in list(discovered):
226         # Only interested in discovered secondaryFiles which are local
227         # files that need to be uploaded.
228         if d.startswith("file:"):
229             sc.extend(discovered[d])
230         else:
231             del discovered[d]
232
233     mapper = ArvPathMapper(arvrunner, sc, "",
234                            "keep:%s",
235                            "keep:%s/%s",
236                            name=name,
237                            single_collection=True)
238
239     def setloc(p):
240         loc = p.get("location")
241         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
242             p["location"] = mapper.mapper(p["location"]).resolved
243             return
244
245         if not loc:
246             return
247
248         if collectionUUID in p:
249             uuid = p[collectionUUID]
250             if uuid not in uuid_map:
251                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
252                     "Collection uuid %s not found" % uuid)
253             gp = collection_pdh_pattern.match(loc)
254             if gp and uuid_map[uuid] != gp.groups()[0]:
255                 # This file entry has both collectionUUID and a PDH
256                 # location. If the PDH doesn't match the one returned
257                 # the API server, raise an error.
258                 raise SourceLine(p, "location", validate.ValidationException).makeError(
259                     "Expected collection uuid %s to be %s but API server reported %s" % (
260                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
261
262         gp = collection_uuid_pattern.match(loc)
263         if not gp:
264             return
265         uuid = gp.groups()[0]
266         if uuid not in uuid_map:
267             raise SourceLine(p, "location", validate.ValidationException).makeError(
268                 "Collection uuid %s not found" % uuid)
269         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
270         p[collectionUUID] = uuid
271
272     visit_class(workflowobj, ("File", "Directory"), setloc)
273     visit_class(discovered, ("File", "Directory"), setloc)
274
275     if discovered_secondaryfiles is not None:
276         for d in discovered:
277             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
278
279     if "$schemas" in workflowobj:
280         sch = []
281         for s in workflowobj["$schemas"]:
282             sch.append(mapper.mapper(s).resolved)
283         workflowobj["$schemas"] = sch
284
285     return mapper
286
287
288 def upload_docker(arvrunner, tool):
289     """Uploads Docker images used in CommandLineTool objects."""
290
291     if isinstance(tool, CommandLineTool):
292         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
293         if docker_req:
294             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
295                 # TODO: can be supported by containers API, but not jobs API.
296                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
297                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
298             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
299         else:
300             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
301     elif isinstance(tool, cwltool.workflow.Workflow):
302         for s in tool.steps:
303             upload_docker(arvrunner, s.embedded_tool)
304
305
306 def packed_workflow(arvrunner, tool, merged_map):
307     """Create a packed workflow.
308
309     A "packed" workflow is one where all the components have been combined into a single document."""
310
311     rewrites = {}
312     packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
313                   tool.tool["id"], tool.metadata, rewrite_out=rewrites)
314
315     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
316
317     def visit(v, cur_id):
318         if isinstance(v, dict):
319             if v.get("class") in ("CommandLineTool", "Workflow"):
320                 if "id" not in v:
321                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
322                 cur_id = rewrite_to_orig.get(v["id"], v["id"])
323             if "location" in v and not v["location"].startswith("keep:"):
324                 v["location"] = merged_map[cur_id].resolved[v["location"]]
325             if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
326                 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
327             if v.get("class") == "DockerRequirement":
328                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
329             for l in v:
330                 visit(v[l], cur_id)
331         if isinstance(v, list):
332             for l in v:
333                 visit(l, cur_id)
334     visit(packed, None)
335     return packed
336
337
338 def tag_git_version(packed):
339     if tool.tool["id"].startswith("file://"):
340         path = os.path.dirname(tool.tool["id"][7:])
341         try:
342             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
343         except (OSError, subprocess.CalledProcessError):
344             pass
345         else:
346             packed["http://schema.org/version"] = githash
347
348
349 def upload_job_order(arvrunner, name, tool, job_order):
350     """Upload local files referenced in the input object and return updated input
351     object with 'location' updated to the proper keep references.
352     """
353
354     discover_secondary_files(tool.tool["inputs"], job_order)
355
356     jobmapper = upload_dependencies(arvrunner,
357                                     name,
358                                     tool.doc_loader,
359                                     job_order,
360                                     job_order.get("id", "#"),
361                                     False)
362
363     if "id" in job_order:
364         del job_order["id"]
365
366     # Need to filter this out, gets added by cwltool when providing
367     # parameters on the command line.
368     if "job_order" in job_order:
369         del job_order["job_order"]
370
371     return job_order
372
373 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
374
375 def upload_workflow_deps(arvrunner, tool):
376     # Ensure that Docker images needed by this workflow are available
377
378     upload_docker(arvrunner, tool)
379
380     document_loader = tool.doc_loader
381
382     merged_map = {}
383
384     def upload_tool_deps(deptool):
385         if "id" in deptool:
386             discovered_secondaryfiles = {}
387             pm = upload_dependencies(arvrunner,
388                                      "%s dependencies" % (shortname(deptool["id"])),
389                                      document_loader,
390                                      deptool,
391                                      deptool["id"],
392                                      False,
393                                      include_primary=False,
394                                      discovered_secondaryfiles=discovered_secondaryfiles)
395             document_loader.idx[deptool["id"]] = deptool
396             toolmap = {}
397             for k,v in pm.items():
398                 toolmap[k] = v.resolved
399             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
400
401     tool.visit(upload_tool_deps)
402
403     return merged_map
404
405 def arvados_jobs_image(arvrunner, img):
406     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
407
408     try:
409         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
410     except Exception as e:
411         raise Exception("Docker image %s is not available\n%s" % (img, e) )
412
413
414 def upload_workflow_collection(arvrunner, name, packed):
415     collection = arvados.collection.Collection(api_client=arvrunner.api,
416                                                keep_client=arvrunner.keep_client,
417                                                num_retries=arvrunner.num_retries)
418     with collection.open("workflow.cwl", "w") as f:
419         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
420
421     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
422                ["name", "like", name+"%"]]
423     if arvrunner.project_uuid:
424         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
425     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
426
427     if exists["items"]:
428         logger.info("Using collection %s", exists["items"][0]["uuid"])
429     else:
430         collection.save_new(name=name,
431                             owner_uuid=arvrunner.project_uuid,
432                             ensure_unique_name=True,
433                             num_retries=arvrunner.num_retries)
434         logger.info("Uploaded to %s", collection.manifest_locator())
435
436     return collection.portable_data_hash()
437
438
439 class Runner(Process):
440     """Base class for runner processes, which submit an instance of
441     arvados-cwl-runner and wait for the final result."""
442
443     def __init__(self, runner, tool, loadingContext, enable_reuse,
444                  output_name, output_tags, submit_runner_ram=0,
445                  name=None, on_error=None, submit_runner_image=None,
446                  intermediate_output_ttl=0, merged_map=None,
447                  priority=None, secret_store=None,
448                  collection_cache_size=256,
449                  collection_cache_is_default=True):
450
451         super(Runner, self).__init__(tool.tool, loadingContext)
452
453         self.arvrunner = runner
454         self.embedded_tool = tool
455         self.job_order = None
456         self.running = False
457         if enable_reuse:
458             # If reuse is permitted by command line arguments but
459             # disabled by the workflow itself, disable it.
460             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
461             if reuse_req:
462                 enable_reuse = reuse_req["enableReuse"]
463         self.enable_reuse = enable_reuse
464         self.uuid = None
465         self.final_output = None
466         self.output_name = output_name
467         self.output_tags = output_tags
468         self.name = name
469         self.on_error = on_error
470         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
471         self.intermediate_output_ttl = intermediate_output_ttl
472         self.priority = priority
473         self.secret_store = secret_store
474
475         self.submit_runner_cores = 1
476         self.submit_runner_ram = 1024  # defaut 1 GiB
477         self.collection_cache_size = collection_cache_size
478
479         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
480         if runner_resource_req:
481             if runner_resource_req.get("coresMin"):
482                 self.submit_runner_cores = runner_resource_req["coresMin"]
483             if runner_resource_req.get("ramMin"):
484                 self.submit_runner_ram = runner_resource_req["ramMin"]
485             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
486                 self.collection_cache_size = runner_resource_req["keep_cache"]
487
488         if submit_runner_ram:
489             # Command line / initializer overrides default and/or spec from workflow
490             self.submit_runner_ram = submit_runner_ram
491
492         if self.submit_runner_ram <= 0:
493             raise Exception("Value of submit-runner-ram must be greater than zero")
494
495         if self.submit_runner_cores <= 0:
496             raise Exception("Value of submit-runner-cores must be greater than zero")
497
498         self.merged_map = merged_map or {}
499
500     def job(self,
501             job_order,         # type: Mapping[Text, Text]
502             output_callbacks,  # type: Callable[[Any, Any], Any]
503             runtimeContext     # type: RuntimeContext
504            ):  # type: (...) -> Generator[Any, None, None]
505         self.job_order = job_order
506         self._init_job(job_order, runtimeContext)
507         yield self
508
509     def update_pipeline_component(self, record):
510         pass
511
512     def done(self, record):
513         """Base method for handling a completed runner."""
514
515         try:
516             if record["state"] == "Complete":
517                 if record.get("exit_code") is not None:
518                     if record["exit_code"] == 33:
519                         processStatus = "UnsupportedRequirement"
520                     elif record["exit_code"] == 0:
521                         processStatus = "success"
522                     else:
523                         processStatus = "permanentFail"
524                 else:
525                     processStatus = "success"
526             else:
527                 processStatus = "permanentFail"
528
529             outputs = {}
530
531             if processStatus == "permanentFail":
532                 logc = arvados.collection.CollectionReader(record["log"],
533                                                            api_client=self.arvrunner.api,
534                                                            keep_client=self.arvrunner.keep_client,
535                                                            num_retries=self.arvrunner.num_retries)
536                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
537
538             self.final_output = record["output"]
539             outc = arvados.collection.CollectionReader(self.final_output,
540                                                        api_client=self.arvrunner.api,
541                                                        keep_client=self.arvrunner.keep_client,
542                                                        num_retries=self.arvrunner.num_retries)
543             if "cwl.output.json" in outc:
544                 with outc.open("cwl.output.json", "rb") as f:
545                     if f.size() > 0:
546                         outputs = json.loads(f.read().decode())
547             def keepify(fileobj):
548                 path = fileobj["location"]
549                 if not path.startswith("keep:"):
550                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
551             adjustFileObjs(outputs, keepify)
552             adjustDirObjs(outputs, keepify)
553         except Exception:
554             logger.exception("[%s] While getting final output object", self.name)
555             self.arvrunner.output_callback({}, "permanentFail")
556         else:
557             self.arvrunner.output_callback(outputs, processStatus)