20846: Merge branch 'main' into 20846-ruby3
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import (
21     Any,
22     Callable,
23     Dict,
24     Iterable,
25     Iterator,
26     List,
27     Mapping,
28     MutableMapping,
29     Sequence,
30     MutableSequence,
31     Optional,
32     Set,
33     Sized,
34     Tuple,
35     Type,
36     Union,
37     cast,
38 )
39 from cwltool.utils import (
40     CWLObjectType,
41     CWLOutputAtomType,
42     CWLOutputType,
43 )
44
45 import subprocess
46
47 from schema_salad.sourceline import SourceLine, cmap
48
49 from cwltool.command_line_tool import CommandLineTool
50 import cwltool.workflow
51 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
52                              shortname, Process, fill_in_defaults)
53 from cwltool.load_tool import fetch_document, jobloaderctx
54 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
55 from cwltool.builder import substitute
56 from cwltool.pack import pack
57 from cwltool.update import INTERNAL_VERSION
58 from cwltool.builder import Builder
59 import schema_salad.validate as validate
60 import schema_salad.ref_resolver
61
62 import arvados.collection
63 import arvados.util
64 from .util import collectionUUID
65 from ruamel.yaml import YAML
66 from ruamel.yaml.comments import CommentedMap, CommentedSeq
67
68 import arvados_cwl.arvdocker
69 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern, MapperEnt
70 from ._version import __version__
71 from . import done
72 from . context import ArvRuntimeContext
73 from .perf import Perf
74
75 logger = logging.getLogger('arvados.cwl-runner')
76 metrics = logging.getLogger('arvados.cwl-runner.metrics')
77
78 def trim_anonymous_location(obj):
79     """Remove 'location' field from File and Directory literals.
80
81     To make internal handling easier, literals are assigned a random id for
82     'location'.  However, when writing the record back out, this can break
83     reproducibility.  Since it is valid for literals not have a 'location'
84     field, remove it.
85
86     """
87
88     if obj.get("location", "").startswith("_:"):
89         del obj["location"]
90
91
92 def remove_redundant_fields(obj):
93     for field in ("path", "nameext", "nameroot", "dirname"):
94         if field in obj:
95             del obj[field]
96
97
98 def find_defaults(d, op):
99     if isinstance(d, list):
100         for i in d:
101             find_defaults(i, op)
102     elif isinstance(d, dict):
103         if "default" in d:
104             op(d)
105         else:
106             for i in viewvalues(d):
107                 find_defaults(i, op)
108
109 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
110     return Builder(
111                  job=joborder,
112                  files=[],               # type: List[Dict[Text, Text]]
113                  bindings=[],            # type: List[Dict[Text, Any]]
114                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
115                  names=None,               # type: Names
116                  requirements=requirements,        # type: List[Dict[Text, Any]]
117                  hints=hints,               # type: List[Dict[Text, Any]]
118                  resources={},           # type: Dict[str, int]
119                  mutation_manager=None,    # type: Optional[MutationManager]
120                  formatgraph=None,         # type: Optional[Graph]
121                  make_fs_access=None,      # type: Type[StdFsAccess]
122                  fs_access=None,           # type: StdFsAccess
123                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
124                  timeout=runtimeContext.eval_timeout,             # type: float
125                  debug=runtimeContext.debug,               # type: bool
126                  js_console=runtimeContext.js_console,          # type: bool
127                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
128                  loadListing="",         # type: Text
129                  outdir="",              # type: Text
130                  tmpdir="",              # type: Text
131                  stagedir="",            # type: Text
132                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
133                  container_engine="docker"
134                 )
135
136 def search_schemadef(name, reqs):
137     for r in reqs:
138         if r["class"] == "SchemaDefRequirement":
139             for sd in r["types"]:
140                 if sd["name"] == name:
141                     return sd
142     return None
143
144 primitive_types_set = frozenset(("null", "boolean", "int", "long",
145                                  "float", "double", "string", "record",
146                                  "array", "enum"))
147
148 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
149     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
150         # union type, collect all possible secondaryFiles
151         for i in inputschema:
152             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
153         return
154
155     if inputschema == "File":
156         inputschema = {"type": "File"}
157
158     if isinstance(inputschema, basestring):
159         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
160         if sd:
161             inputschema = sd
162         else:
163             return
164
165     if "secondaryFiles" in inputschema:
166         # set secondaryFiles, may be inherited by compound types.
167         secondaryspec = inputschema["secondaryFiles"]
168
169     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
170         not isinstance(inputschema["type"], basestring)):
171         # compound type (union, array, record)
172         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
173
174     elif (inputschema["type"] == "record" and
175           isinstance(primary, Mapping)):
176         #
177         # record type, find secondary files associated with fields.
178         #
179         for f in inputschema["fields"]:
180             p = primary.get(shortname(f["name"]))
181             if p:
182                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
183
184     elif (inputschema["type"] == "array" and
185           isinstance(primary, Sequence)):
186         #
187         # array type, find secondary files of elements
188         #
189         for p in primary:
190             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
191
192     elif (inputschema["type"] == "File" and
193           isinstance(primary, Mapping) and
194           primary.get("class") == "File"):
195
196         if "secondaryFiles" in primary or not secondaryspec:
197             # Nothing to do.
198             return
199
200         #
201         # Found a file, check for secondaryFiles
202         #
203         specs = []
204         primary["secondaryFiles"] = secondaryspec
205         for i, sf in enumerate(aslist(secondaryspec)):
206             if builder.cwlVersion == "v1.0":
207                 pattern = sf
208             else:
209                 pattern = sf["pattern"]
210             if pattern is None:
211                 continue
212             if isinstance(pattern, list):
213                 specs.extend(pattern)
214             elif isinstance(pattern, dict):
215                 specs.append(pattern)
216             elif isinstance(pattern, str):
217                 if builder.cwlVersion == "v1.0":
218                     specs.append({"pattern": pattern, "required": True})
219                 else:
220                     specs.append({"pattern": pattern, "required": sf.get("required")})
221             else:
222                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
223                     "Expression must return list, object, string or null")
224
225         found = []
226         for i, sf in enumerate(specs):
227             if isinstance(sf, dict):
228                 if sf.get("class") == "File":
229                     pattern = None
230                     if sf.get("location") is None:
231                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
232                             "File object is missing 'location': %s" % sf)
233                     sfpath = sf["location"]
234                     required = True
235                 else:
236                     pattern = sf["pattern"]
237                     required = sf.get("required")
238             elif isinstance(sf, str):
239                 pattern = sf
240                 required = True
241             else:
242                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
243                     "Expression must return list, object, string or null")
244
245             if pattern is not None:
246                 if "${" in pattern or "$(" in pattern:
247                     sfname = builder.do_eval(pattern, context=primary)
248                 else:
249                     sfname = substitute(primary["basename"], pattern)
250
251                 if sfname is None:
252                     continue
253
254                 if isinstance(sfname, str):
255                     p_location = primary["location"]
256                     if "/" in p_location:
257                         sfpath = (
258                             p_location[0 : p_location.rindex("/") + 1]
259                             + sfname
260                         )
261
262             required = builder.do_eval(required, context=primary)
263
264             if isinstance(sfname, list) or isinstance(sfname, dict):
265                 each = aslist(sfname)
266                 for e in each:
267                     if required and not fsaccess.exists(e.get("location")):
268                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
269                             "Required secondary file '%s' does not exist" % e.get("location"))
270                 found.extend(each)
271
272             if isinstance(sfname, str):
273                 if fsaccess.exists(sfpath):
274                     if pattern is not None:
275                         found.append({"location": sfpath, "class": "File"})
276                     else:
277                         found.append(sf)
278                 elif required:
279                     raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
280                         "Required secondary file '%s' does not exist" % sfpath)
281
282         primary["secondaryFiles"] = cmap(found)
283         if discovered is not None:
284             discovered[primary["location"]] = primary["secondaryFiles"]
285     elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
286         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
287
288 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
289     for inputschema in inputs:
290         primary = job_order.get(shortname(inputschema["id"]))
291         if isinstance(primary, (Mapping, Sequence)):
292             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
293
294 def upload_dependencies(arvrunner, name, document_loader,
295                         workflowobj, uri, runtimeContext,
296                         include_primary=True, discovered_secondaryfiles=None,
297                         cache=None):
298     """Upload the dependencies of the workflowobj document to Keep.
299
300     Returns a pathmapper object mapping local paths to keep references.  Also
301     does an in-place update of references in "workflowobj".
302
303     Use scandeps to find $schemas, File and Directory
304     fields that represent external references.
305
306     If workflowobj has an "id" field, this will reload the document to ensure
307     it is scanning the raw document prior to preprocessing.
308     """
309
310     scanobj = workflowobj
311     metadata = scanobj
312
313     with Perf(metrics, "scandeps"):
314         sc_result = scandeps(uri, scanobj,
315                              set(),
316                              set(("location",)),
317                              None, urljoin=document_loader.fetcher.urljoin,
318                              nestdirs=False)
319         optional_deps = scandeps(uri, scanobj,
320                              set(),
321                              set(("$schemas",)),
322                              None, urljoin=document_loader.fetcher.urljoin,
323                              nestdirs=False)
324
325     if sc_result is None:
326         sc_result = []
327
328     if optional_deps is None:
329         optional_deps = []
330
331     if optional_deps:
332         sc_result.extend(optional_deps)
333
334     sc = []
335     uuids = {}
336
337     def collect_uuids(obj):
338         loc = obj.get("location", "")
339         sp = loc.split(":")
340         if sp[0] == "keep":
341             # Collect collection uuids that need to be resolved to
342             # portable data hashes
343             gp = collection_uuid_pattern.match(loc)
344             if gp:
345                 uuids[gp.groups()[0]] = obj
346             if collectionUUID in obj:
347                 uuids[obj[collectionUUID]] = obj
348
349     def collect_uploads(obj):
350         loc = obj.get("location", "")
351         sp = loc.split(":")
352         if len(sp) < 1:
353             return
354         if sp[0] in ("file", "http", "https"):
355             # Record local files than need to be uploaded,
356             # don't include file literals, keep references, etc.
357             sc.append(obj)
358         collect_uuids(obj)
359
360     with Perf(metrics, "collect uuids"):
361         visit_class(workflowobj, ("File", "Directory"), collect_uuids)
362
363     with Perf(metrics, "collect uploads"):
364         visit_class(sc_result, ("File", "Directory"), collect_uploads)
365
366     # Resolve any collection uuids we found to portable data hashes
367     # and assign them to uuid_map
368     uuid_map = {}
369     fetch_uuids = list(uuids.keys())
370     with Perf(metrics, "fetch_uuids"):
371         while fetch_uuids:
372             # For a large number of fetch_uuids, API server may limit
373             # response size, so keep fetching from API server has nothing
374             # more to give us.
375             lookups = arvrunner.api.collections().list(
376                 filters=[["uuid", "in", fetch_uuids]],
377                 count="none",
378                 select=["uuid", "portable_data_hash"]).execute(
379                     num_retries=arvrunner.num_retries)
380
381             if not lookups["items"]:
382                 break
383
384             for l in lookups["items"]:
385                 uuid_map[l["uuid"]] = l["portable_data_hash"]
386
387             fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
388
389     normalizeFilesDirs(sc)
390
391     if "id" in workflowobj:
392         defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
393         if include_primary:
394             # make sure it's included
395             sc.append({"class": "File", "location": defrg})
396         else:
397             # make sure it's excluded
398             sc = [d for d in sc if d.get("location") != defrg]
399
400     def visit_default(obj):
401         def defaults_are_optional(f):
402             if "location" not in f and "path" in f:
403                 f["location"] = f["path"]
404                 del f["path"]
405             normalizeFilesDirs(f)
406             optional_deps.append(f)
407         visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
408
409     find_defaults(workflowobj, visit_default)
410
411     discovered = {}
412     def discover_default_secondary_files(obj):
413         builder_job_order = {}
414         for t in obj["inputs"]:
415             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
416         # Need to create a builder object to evaluate expressions.
417         builder = make_builder(builder_job_order,
418                                obj.get("hints", []),
419                                obj.get("requirements", []),
420                                ArvRuntimeContext(),
421                                metadata)
422         discover_secondary_files(arvrunner.fs_access,
423                                  builder,
424                                  obj["inputs"],
425                                  builder_job_order,
426                                  discovered)
427
428     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
429     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
430
431     for d in list(discovered):
432         # Only interested in discovered secondaryFiles which are local
433         # files that need to be uploaded.
434         if d.startswith("file:"):
435             sc.extend(discovered[d])
436         else:
437             del discovered[d]
438
439     with Perf(metrics, "mapper"):
440         mapper = ArvPathMapper(arvrunner, sc, "",
441                                "keep:%s",
442                                "keep:%s/%s",
443                                name=name,
444                                single_collection=True,
445                                optional_deps=optional_deps)
446
447     for k, v in uuid_map.items():
448         mapper._pathmap["keep:"+k] = MapperEnt(v, "", "", False)
449
450     keeprefs = set()
451     def addkeepref(k):
452         if k.startswith("keep:"):
453             keeprefs.add(collection_pdh_pattern.match(k).group(1))
454
455
456     def collectloc(p):
457         loc = p.get("location")
458         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
459             addkeepref(p["location"])
460             return
461
462         if not loc:
463             return
464
465         if collectionUUID in p:
466             uuid = p[collectionUUID]
467             if uuid not in uuid_map:
468                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
469                     "Collection uuid %s not found" % uuid)
470             gp = collection_pdh_pattern.match(loc)
471             if gp and uuid_map[uuid] != gp.groups()[0]:
472                 # This file entry has both collectionUUID and a PDH
473                 # location. If the PDH doesn't match the one returned
474                 # the API server, raise an error.
475                 raise SourceLine(p, "location", validate.ValidationException).makeError(
476                     "Expected collection uuid %s to be %s but API server reported %s" % (
477                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
478
479         gp = collection_uuid_pattern.match(loc)
480         if not gp:
481             # Not a uuid pattern (must be a pdh pattern)
482             addkeepref(p["location"])
483             return
484
485         uuid = gp.groups()[0]
486         if uuid not in uuid_map:
487             raise SourceLine(p, "location", validate.ValidationException).makeError(
488                 "Collection uuid %s not found" % uuid)
489
490     with Perf(metrics, "collectloc"):
491         visit_class(workflowobj, ("File", "Directory"), collectloc)
492         visit_class(discovered, ("File", "Directory"), collectloc)
493
494     if discovered_secondaryfiles is not None:
495         for d in discovered:
496             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
497
498     if runtimeContext.copy_deps:
499         # Find referenced collections and copy them into the
500         # destination project, for easy sharing.
501         already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
502                                      filters=[["portable_data_hash", "in", list(keeprefs)],
503                                               ["owner_uuid", "=", runtimeContext.project_uuid]],
504                                      select=["uuid", "portable_data_hash", "created_at"]))
505
506         keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
507         for kr in keeprefs:
508             col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
509                                                   order="created_at desc",
510                                                    select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
511                                                    limit=1).execute()
512             if len(col["items"]) == 0:
513                 logger.warning("Cannot find collection with portable data hash %s", kr)
514                 continue
515             col = col["items"][0]
516             col["name"] = arvados.util.trim_name(col["name"])
517             try:
518                 arvrunner.api.collections().create(body={"collection": {
519                     "owner_uuid": runtimeContext.project_uuid,
520                     "name": col["name"],
521                     "description": col["description"],
522                     "properties": col["properties"],
523                     "portable_data_hash": col["portable_data_hash"],
524                     "manifest_text": col["manifest_text"],
525                     "storage_classes_desired": col["storage_classes_desired"],
526                     "trash_at": col["trash_at"]
527                 }}, ensure_unique_name=True).execute()
528             except Exception as e:
529                 logger.warning("Unable to copy collection to destination: %s", e)
530
531     if "$schemas" in workflowobj:
532         sch = CommentedSeq()
533         for s in workflowobj["$schemas"]:
534             if s in mapper:
535                 sch.append(mapper.mapper(s).resolved)
536         workflowobj["$schemas"] = sch
537
538     return mapper
539
540
541 def upload_docker(arvrunner, tool, runtimeContext):
542     """Uploads Docker images used in CommandLineTool objects."""
543
544     if isinstance(tool, CommandLineTool):
545         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
546         if docker_req:
547             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
548                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
549                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
550
551             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, runtimeContext)
552         else:
553             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
554                                                        True, runtimeContext)
555     elif isinstance(tool, cwltool.workflow.Workflow):
556         for s in tool.steps:
557             upload_docker(arvrunner, s.embedded_tool, runtimeContext)
558
559
560 def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info):
561     """Create a packed workflow.
562
563     A "packed" workflow is one where all the components have been combined into a single document."""
564
565     rewrites = {}
566     packed = pack(arvrunner.loadingContext, tool.tool["id"],
567                   rewrite_out=rewrites,
568                   loader=tool.doc_loader)
569
570     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
571
572     def visit(v, cur_id):
573         if isinstance(v, dict):
574             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
575                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
576                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
577                 if "id" in v:
578                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
579             if "path" in v and "location" not in v:
580                 v["location"] = v["path"]
581                 del v["path"]
582             if "location" in v and cur_id in merged_map:
583                 if v["location"] in merged_map[cur_id].resolved:
584                     v["location"] = merged_map[cur_id].resolved[v["location"]]
585                 if v["location"] in merged_map[cur_id].secondaryFiles:
586                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
587             if v.get("class") == "DockerRequirement":
588                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
589                                                                                                              runtimeContext)
590             for l in v:
591                 visit(v[l], cur_id)
592         if isinstance(v, list):
593             for l in v:
594                 visit(l, cur_id)
595     visit(packed, None)
596
597     if git_info:
598         for g in git_info:
599             packed[g] = git_info[g]
600
601     return packed
602
603
604 def tag_git_version(packed):
605     if tool.tool["id"].startswith("file://"):
606         path = os.path.dirname(tool.tool["id"][7:])
607         try:
608             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
609         except (OSError, subprocess.CalledProcessError):
610             pass
611         else:
612             packed["http://schema.org/version"] = githash
613
614 def setloc(mapper, p):
615     loc = p.get("location")
616     if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
617         p["location"] = mapper.mapper(p["location"]).resolved
618         return
619
620     if not loc:
621         return
622
623     if collectionUUID in p:
624         uuid = p[collectionUUID]
625         keepuuid = "keep:"+uuid
626         if keepuuid not in mapper:
627             raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
628                 "Collection uuid %s not found" % uuid)
629         gp = collection_pdh_pattern.match(loc)
630         if gp and mapper.mapper(keepuuid).resolved != gp.groups()[0]:
631             # This file entry has both collectionUUID and a PDH
632             # location. If the PDH doesn't match the one returned
633             # the API server, raise an error.
634             raise SourceLine(p, "location", validate.ValidationException).makeError(
635                 "Expected collection uuid %s to be %s but API server reported %s" % (
636                     uuid, gp.groups()[0], mapper.mapper(keepuuid).resolved))
637
638     gp = collection_uuid_pattern.match(loc)
639     if not gp:
640         # Not a uuid pattern (must be a pdh pattern)
641         return
642
643     uuid = gp.groups()[0]
644     keepuuid = "keep:"+uuid
645     if keepuuid not in mapper:
646         raise SourceLine(p, "location", validate.ValidationException).makeError(
647             "Collection uuid %s not found" % uuid)
648     p["location"] = "keep:%s%s" % (mapper.mapper(keepuuid).resolved, gp.groups()[1] if gp.groups()[1] else "")
649     p[collectionUUID] = uuid
650
651 def update_from_mapper(workflowobj, mapper):
652     with Perf(metrics, "setloc"):
653         visit_class(workflowobj, ("File", "Directory"), partial(setloc, mapper))
654
655 def apply_merged_map(merged_map, workflowobj):
656     def visit(v, cur_id):
657         if isinstance(v, dict):
658             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
659                 if "id" in v:
660                     cur_id = v["id"]
661             if "path" in v and "location" not in v:
662                 v["location"] = v["path"]
663                 del v["path"]
664             if "location" in v and cur_id in merged_map:
665                 if v["location"] in merged_map[cur_id].resolved:
666                     v["location"] = merged_map[cur_id].resolved[v["location"]]
667                 if v["location"] in merged_map[cur_id].secondaryFiles:
668                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
669             #if v.get("class") == "DockerRequirement":
670             #    v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
671             #                                                                                                 runtimeContext)
672             for l in v:
673                 visit(v[l], cur_id)
674         if isinstance(v, list):
675             for l in v:
676                 visit(l, cur_id)
677     visit(workflowobj, None)
678
679 def update_from_merged_map(tool, merged_map):
680     tool.visit(partial(apply_merged_map, merged_map))
681
682 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
683     """Upload local files referenced in the input object and return updated input
684     object with 'location' updated to the proper keep references.
685     """
686
687     # Make a copy of the job order and set defaults.
688     builder_job_order = copy.copy(job_order)
689
690     # fill_in_defaults throws an error if there are any
691     # missing required parameters, we don't want it to do that
692     # so make them all optional.
693     inputs_copy = copy.deepcopy(tool.tool["inputs"])
694     for i in inputs_copy:
695         if "null" not in i["type"]:
696             i["type"] = ["null"] + aslist(i["type"])
697
698     fill_in_defaults(inputs_copy,
699                      builder_job_order,
700                      arvrunner.fs_access)
701     # Need to create a builder object to evaluate expressions.
702     builder = make_builder(builder_job_order,
703                            tool.hints,
704                            tool.requirements,
705                            ArvRuntimeContext(),
706                            tool.metadata)
707     # Now update job_order with secondaryFiles
708     discover_secondary_files(arvrunner.fs_access,
709                              builder,
710                              tool.tool["inputs"],
711                              job_order)
712
713     _jobloaderctx = jobloaderctx.copy()
714     jobloader = schema_salad.ref_resolver.Loader(_jobloaderctx, fetcher_constructor=tool.doc_loader.fetcher_constructor)
715
716     jobmapper = upload_dependencies(arvrunner,
717                                     name,
718                                     jobloader,
719                                     job_order,
720                                     job_order.get("id", "#"),
721                                     runtimeContext)
722
723     if "id" in job_order:
724         del job_order["id"]
725
726     # Need to filter this out, gets added by cwltool when providing
727     # parameters on the command line.
728     if "job_order" in job_order:
729         del job_order["job_order"]
730
731     update_from_mapper(job_order, jobmapper)
732
733     return job_order, jobmapper
734
735 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
736
737 def upload_workflow_deps(arvrunner, tool, runtimeContext):
738     # Ensure that Docker images needed by this workflow are available
739
740     with Perf(metrics, "upload_docker"):
741         upload_docker(arvrunner, tool, runtimeContext)
742
743     document_loader = tool.doc_loader
744
745     merged_map = {}
746     tool_dep_cache = {}
747
748     todo = []
749
750     # Standard traversal is top down, we want to go bottom up, so use
751     # the visitor to accumalate a list of nodes to visit, then
752     # visit them in reverse order.
753     def upload_tool_deps(deptool):
754         if "id" in deptool:
755             todo.append(deptool)
756
757     tool.visit(upload_tool_deps)
758
759     for deptool in reversed(todo):
760         discovered_secondaryfiles = {}
761         with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
762             pm = upload_dependencies(arvrunner,
763                                      "%s dependencies" % (shortname(deptool["id"])),
764                                      document_loader,
765                                      deptool,
766                                      deptool["id"],
767                                      runtimeContext,
768                                      include_primary=False,
769                                      discovered_secondaryfiles=discovered_secondaryfiles,
770                                      cache=tool_dep_cache)
771
772         document_loader.idx[deptool["id"]] = deptool
773         toolmap = {}
774         for k,v in pm.items():
775             toolmap[k] = v.resolved
776
777         merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
778
779     return merged_map
780
781 def arvados_jobs_image(arvrunner, img, runtimeContext):
782     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
783
784     try:
785         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
786                                                           True, runtimeContext)
787     except Exception as e:
788         raise Exception("Docker image %s is not available\n%s" % (img, e) )
789
790
791 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
792     collection = arvados.collection.Collection(api_client=arvrunner.api,
793                                                keep_client=arvrunner.keep_client,
794                                                num_retries=arvrunner.num_retries)
795     with collection.open("workflow.cwl", "w") as f:
796         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
797
798     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
799                ["name", "like", name+"%"]]
800     if runtimeContext.project_uuid:
801         filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
802     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
803
804     if exists["items"]:
805         logger.info("Using collection %s", exists["items"][0]["uuid"])
806     else:
807         collection.save_new(name=name,
808                             owner_uuid=runtimeContext.project_uuid,
809                             ensure_unique_name=True,
810                             num_retries=arvrunner.num_retries)
811         logger.info("Uploaded to %s", collection.manifest_locator())
812
813     return collection.portable_data_hash()
814
815
816 class Runner(Process):
817     """Base class for runner processes, which submit an instance of
818     arvados-cwl-runner and wait for the final result."""
819
820     def __init__(self, runner,
821                  tool, loadingContext, enable_reuse,
822                  output_name, output_tags, submit_runner_ram=0,
823                  name=None, on_error=None, submit_runner_image=None,
824                  intermediate_output_ttl=0, merged_map=None,
825                  priority=None, secret_store=None,
826                  collection_cache_size=256,
827                  collection_cache_is_default=True,
828                  git_info=None,
829                  reuse_runner=False):
830
831         self.loadingContext = loadingContext.copy()
832
833         super(Runner, self).__init__(tool.tool, loadingContext)
834
835         self.arvrunner = runner
836         self.embedded_tool = tool
837         self.job_order = None
838         self.running = False
839         if enable_reuse:
840             # If reuse is permitted by command line arguments but
841             # disabled by the workflow itself, disable it.
842             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
843             if reuse_req:
844                 enable_reuse = reuse_req["enableReuse"]
845             reuse_req, _ = self.embedded_tool.get_requirement("WorkReuse")
846             if reuse_req:
847                 enable_reuse = reuse_req["enableReuse"]
848         self.enable_reuse = enable_reuse
849         self.uuid = None
850         self.final_output = None
851         self.output_name = output_name
852         self.output_tags = output_tags
853         self.name = name
854         self.on_error = on_error
855         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
856         self.intermediate_output_ttl = intermediate_output_ttl
857         self.priority = priority
858         self.secret_store = secret_store
859         self.enable_dev = self.loadingContext.enable_dev
860         self.git_info = git_info
861         self.fast_parser = self.loadingContext.fast_parser
862         self.reuse_runner = reuse_runner
863
864         self.submit_runner_cores = 1
865         self.submit_runner_ram = 1024  # defaut 1 GiB
866         self.collection_cache_size = collection_cache_size
867
868         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
869         if runner_resource_req:
870             if runner_resource_req.get("coresMin"):
871                 self.submit_runner_cores = runner_resource_req["coresMin"]
872             if runner_resource_req.get("ramMin"):
873                 self.submit_runner_ram = runner_resource_req["ramMin"]
874             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
875                 self.collection_cache_size = runner_resource_req["keep_cache"]
876
877         if submit_runner_ram:
878             # Command line / initializer overrides default and/or spec from workflow
879             self.submit_runner_ram = submit_runner_ram
880
881         if self.submit_runner_ram <= 0:
882             raise Exception("Value of submit-runner-ram must be greater than zero")
883
884         if self.submit_runner_cores <= 0:
885             raise Exception("Value of submit-runner-cores must be greater than zero")
886
887         self.merged_map = merged_map or {}
888
889     def job(self,
890             job_order,         # type: Mapping[Text, Text]
891             output_callbacks,  # type: Callable[[Any, Any], Any]
892             runtimeContext     # type: RuntimeContext
893            ):  # type: (...) -> Generator[Any, None, None]
894         self.job_order = job_order
895         self._init_job(job_order, runtimeContext)
896         yield self
897
898     def update_pipeline_component(self, record):
899         pass
900
901     def done(self, record):
902         """Base method for handling a completed runner."""
903
904         try:
905             if record["state"] == "Complete":
906                 if record.get("exit_code") is not None:
907                     if record["exit_code"] == 33:
908                         processStatus = "UnsupportedRequirement"
909                     elif record["exit_code"] == 0:
910                         processStatus = "success"
911                     else:
912                         processStatus = "permanentFail"
913                 else:
914                     processStatus = "success"
915             else:
916                 processStatus = "permanentFail"
917
918             outputs = {}
919
920             if processStatus == "permanentFail":
921                 logc = arvados.collection.CollectionReader(record["log"],
922                                                            api_client=self.arvrunner.api,
923                                                            keep_client=self.arvrunner.keep_client,
924                                                            num_retries=self.arvrunner.num_retries)
925                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40,
926                              include_crunchrun=(record.get("exit_code") is None or record.get("exit_code") > 127))
927
928             self.final_output = record["output"]
929             outc = arvados.collection.CollectionReader(self.final_output,
930                                                        api_client=self.arvrunner.api,
931                                                        keep_client=self.arvrunner.keep_client,
932                                                        num_retries=self.arvrunner.num_retries)
933             if "cwl.output.json" in outc:
934                 with outc.open("cwl.output.json", "rb") as f:
935                     if f.size() > 0:
936                         outputs = json.loads(f.read().decode())
937             def keepify(fileobj):
938                 path = fileobj["location"]
939                 if not path.startswith("keep:"):
940                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
941             adjustFileObjs(outputs, keepify)
942             adjustDirObjs(outputs, keepify)
943         except Exception:
944             logger.exception("[%s] While getting final output object", self.name)
945             self.arvrunner.output_callback({}, "permanentFail")
946         else:
947             self.arvrunner.output_callback(outputs, processStatus)
948
949
950 def print_keep_deps_visitor(api, runtimeContext, references, doc_loader, tool):
951     def collect_locators(obj):
952         loc = obj.get("location", "")
953
954         g = arvados.util.keepuri_pattern.match(loc)
955         if g:
956             references.add(g[1])
957
958         if obj.get("class") == "http://arvados.org/cwl#WorkflowRunnerResources" and "acrContainerImage" in obj:
959             references.add(obj["acrContainerImage"])
960
961         if obj.get("class") == "DockerRequirement":
962             references.add(arvados_cwl.arvdocker.arv_docker_get_image(api, obj, False, runtimeContext))
963
964     sc_result = scandeps(tool["id"], tool,
965                          set(),
966                          set(("location", "id")),
967                          None, urljoin=doc_loader.fetcher.urljoin,
968                          nestdirs=False)
969
970     visit_class(sc_result, ("File", "Directory"), collect_locators)
971     visit_class(tool, ("DockerRequirement", "http://arvados.org/cwl#WorkflowRunnerResources"), collect_locators)
972
973
974 def print_keep_deps(arvRunner, runtimeContext, merged_map, tool):
975     references = set()
976
977     tool.visit(partial(print_keep_deps_visitor, arvRunner.api, runtimeContext, references, tool.doc_loader))
978
979     for mm in merged_map:
980         for k, v in merged_map[mm].resolved.items():
981             g = arvados.util.keepuri_pattern.match(v)
982             if g:
983                 references.add(g[1])
984
985     json.dump(sorted(references), arvRunner.stdout)
986     print(file=arvRunner.stdout)