19385: fast-parser fixups
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import (
21     Any,
22     Callable,
23     Dict,
24     Iterable,
25     Iterator,
26     List,
27     Mapping,
28     MutableMapping,
29     Sequence,
30     MutableSequence,
31     Optional,
32     Set,
33     Sized,
34     Tuple,
35     Type,
36     Union,
37     cast,
38 )
39 from cwltool.utils import (
40     CWLObjectType,
41     CWLOutputAtomType,
42     CWLOutputType,
43 )
44
45 if os.name == "posix" and sys.version_info[0] < 3:
46     import subprocess32 as subprocess
47 else:
48     import subprocess
49
50 from schema_salad.sourceline import SourceLine, cmap
51
52 from cwltool.command_line_tool import CommandLineTool
53 import cwltool.workflow
54 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
55                              shortname, Process, fill_in_defaults)
56 from cwltool.load_tool import fetch_document, jobloaderctx
57 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
58 from cwltool.builder import substitute
59 from cwltool.pack import pack
60 from cwltool.update import INTERNAL_VERSION
61 from cwltool.builder import Builder
62 import schema_salad.validate as validate
63 import schema_salad.ref_resolver
64
65 import arvados.collection
66 import arvados.util
67 from .util import collectionUUID
68 from ruamel.yaml import YAML
69 from ruamel.yaml.comments import CommentedMap, CommentedSeq
70
71 import arvados_cwl.arvdocker
72 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern, MapperEnt
73 from ._version import __version__
74 from . import done
75 from . context import ArvRuntimeContext
76 from .perf import Perf
77
78 logger = logging.getLogger('arvados.cwl-runner')
79 metrics = logging.getLogger('arvados.cwl-runner.metrics')
80
81 def trim_anonymous_location(obj):
82     """Remove 'location' field from File and Directory literals.
83
84     To make internal handling easier, literals are assigned a random id for
85     'location'.  However, when writing the record back out, this can break
86     reproducibility.  Since it is valid for literals not have a 'location'
87     field, remove it.
88
89     """
90
91     if obj.get("location", "").startswith("_:"):
92         del obj["location"]
93
94
95 def remove_redundant_fields(obj):
96     for field in ("path", "nameext", "nameroot", "dirname"):
97         if field in obj:
98             del obj[field]
99
100
101 def find_defaults(d, op):
102     if isinstance(d, list):
103         for i in d:
104             find_defaults(i, op)
105     elif isinstance(d, dict):
106         if "default" in d:
107             op(d)
108         else:
109             for i in viewvalues(d):
110                 find_defaults(i, op)
111
112 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
113     return Builder(
114                  job=joborder,
115                  files=[],               # type: List[Dict[Text, Text]]
116                  bindings=[],            # type: List[Dict[Text, Any]]
117                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
118                  names=None,               # type: Names
119                  requirements=requirements,        # type: List[Dict[Text, Any]]
120                  hints=hints,               # type: List[Dict[Text, Any]]
121                  resources={},           # type: Dict[str, int]
122                  mutation_manager=None,    # type: Optional[MutationManager]
123                  formatgraph=None,         # type: Optional[Graph]
124                  make_fs_access=None,      # type: Type[StdFsAccess]
125                  fs_access=None,           # type: StdFsAccess
126                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
127                  timeout=runtimeContext.eval_timeout,             # type: float
128                  debug=runtimeContext.debug,               # type: bool
129                  js_console=runtimeContext.js_console,          # type: bool
130                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
131                  loadListing="",         # type: Text
132                  outdir="",              # type: Text
133                  tmpdir="",              # type: Text
134                  stagedir="",            # type: Text
135                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
136                  container_engine="docker"
137                 )
138
139 def search_schemadef(name, reqs):
140     for r in reqs:
141         if r["class"] == "SchemaDefRequirement":
142             for sd in r["types"]:
143                 if sd["name"] == name:
144                     return sd
145     return None
146
147 primitive_types_set = frozenset(("null", "boolean", "int", "long",
148                                  "float", "double", "string", "record",
149                                  "array", "enum"))
150
151 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
152     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
153         # union type, collect all possible secondaryFiles
154         for i in inputschema:
155             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
156         return
157
158     if inputschema == "File":
159         inputschema = {"type": "File"}
160
161     if isinstance(inputschema, basestring):
162         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
163         if sd:
164             inputschema = sd
165         else:
166             return
167
168     if "secondaryFiles" in inputschema:
169         # set secondaryFiles, may be inherited by compound types.
170         secondaryspec = inputschema["secondaryFiles"]
171
172     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
173         not isinstance(inputschema["type"], basestring)):
174         # compound type (union, array, record)
175         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
176
177     elif (inputschema["type"] == "record" and
178           isinstance(primary, Mapping)):
179         #
180         # record type, find secondary files associated with fields.
181         #
182         for f in inputschema["fields"]:
183             p = primary.get(shortname(f["name"]))
184             if p:
185                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
186
187     elif (inputschema["type"] == "array" and
188           isinstance(primary, Sequence)):
189         #
190         # array type, find secondary files of elements
191         #
192         for p in primary:
193             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
194
195     elif (inputschema["type"] == "File" and
196           isinstance(primary, Mapping) and
197           primary.get("class") == "File"):
198
199         if "secondaryFiles" in primary or not secondaryspec:
200             # Nothing to do.
201             return
202
203         #
204         # Found a file, check for secondaryFiles
205         #
206         specs = []
207         primary["secondaryFiles"] = secondaryspec
208         for i, sf in enumerate(aslist(secondaryspec)):
209             if builder.cwlVersion == "v1.0":
210                 pattern = sf
211             else:
212                 pattern = sf["pattern"]
213             if pattern is None:
214                 continue
215             if isinstance(pattern, list):
216                 specs.extend(pattern)
217             elif isinstance(pattern, dict):
218                 specs.append(pattern)
219             elif isinstance(pattern, str):
220                 if builder.cwlVersion == "v1.0":
221                     specs.append({"pattern": pattern, "required": True})
222                 else:
223                     specs.append({"pattern": pattern, "required": sf.get("required")})
224             else:
225                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
226                     "Expression must return list, object, string or null")
227
228         found = []
229         for i, sf in enumerate(specs):
230             if isinstance(sf, dict):
231                 if sf.get("class") == "File":
232                     pattern = None
233                     if sf.get("location") is None:
234                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
235                             "File object is missing 'location': %s" % sf)
236                     sfpath = sf["location"]
237                     required = True
238                 else:
239                     pattern = sf["pattern"]
240                     required = sf.get("required")
241             elif isinstance(sf, str):
242                 pattern = sf
243                 required = True
244             else:
245                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
246                     "Expression must return list, object, string or null")
247
248             if pattern is not None:
249                 if "${" in pattern or "$(" in pattern:
250                     sfname = builder.do_eval(pattern, context=primary)
251                 else:
252                     sfname = substitute(primary["basename"], pattern)
253
254                 if sfname is None:
255                     continue
256
257                 if isinstance(sfname, str):
258                     p_location = primary["location"]
259                     if "/" in p_location:
260                         sfpath = (
261                             p_location[0 : p_location.rindex("/") + 1]
262                             + sfname
263                         )
264
265             required = builder.do_eval(required, context=primary)
266
267             if isinstance(sfname, list) or isinstance(sfname, dict):
268                 each = aslist(sfname)
269                 for e in each:
270                     if required and not fsaccess.exists(e.get("location")):
271                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
272                             "Required secondary file '%s' does not exist" % e.get("location"))
273                 found.extend(each)
274
275             if isinstance(sfname, str):
276                 if fsaccess.exists(sfpath):
277                     if pattern is not None:
278                         found.append({"location": sfpath, "class": "File"})
279                     else:
280                         found.append(sf)
281                 elif required:
282                     raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
283                         "Required secondary file '%s' does not exist" % sfpath)
284
285         primary["secondaryFiles"] = cmap(found)
286         if discovered is not None:
287             discovered[primary["location"]] = primary["secondaryFiles"]
288     elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
289         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
290
291 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
292     for inputschema in inputs:
293         primary = job_order.get(shortname(inputschema["id"]))
294         if isinstance(primary, (Mapping, Sequence)):
295             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
296
297 def upload_dependencies(arvrunner, name, document_loader,
298                         workflowobj, uri, runtimeContext,
299                         include_primary=True, discovered_secondaryfiles=None,
300                         cache=None):
301     """Upload the dependencies of the workflowobj document to Keep.
302
303     Returns a pathmapper object mapping local paths to keep references.  Also
304     does an in-place update of references in "workflowobj".
305
306     Use scandeps to find $schemas, File and Directory
307     fields that represent external references.
308
309     If workflowobj has an "id" field, this will reload the document to ensure
310     it is scanning the raw document prior to preprocessing.
311     """
312
313     scanobj = workflowobj
314     metadata = scanobj
315
316     with Perf(metrics, "scandeps"):
317         sc_result = scandeps(uri, scanobj,
318                              set(),
319                              set(("location",)),
320                              None, urljoin=document_loader.fetcher.urljoin,
321                              nestdirs=False)
322         optional_deps = scandeps(uri, scanobj,
323                              set(),
324                              set(("$schemas",)),
325                              None, urljoin=document_loader.fetcher.urljoin,
326                              nestdirs=False)
327
328     if sc_result is None:
329         sc_result = []
330
331     if optional_deps is None:
332         optional_deps = []
333
334     if optional_deps:
335         sc_result.extend(optional_deps)
336
337     #print("BOING", uri, sc_result)
338
339     sc = []
340     uuids = {}
341
342     def collect_uuids(obj):
343         loc = obj.get("location", "")
344         sp = loc.split(":")
345         if sp[0] == "keep":
346             # Collect collection uuids that need to be resolved to
347             # portable data hashes
348             gp = collection_uuid_pattern.match(loc)
349             if gp:
350                 uuids[gp.groups()[0]] = obj
351             if collectionUUID in obj:
352                 uuids[obj[collectionUUID]] = obj
353
354     def collect_uploads(obj):
355         loc = obj.get("location", "")
356         sp = loc.split(":")
357         if len(sp) < 1:
358             return
359         if sp[0] in ("file", "http", "https"):
360             # Record local files than need to be uploaded,
361             # don't include file literals, keep references, etc.
362             sc.append(obj)
363         collect_uuids(obj)
364
365     with Perf(metrics, "collect uuids"):
366         visit_class(workflowobj, ("File", "Directory"), collect_uuids)
367
368     with Perf(metrics, "collect uploads"):
369         visit_class(sc_result, ("File", "Directory"), collect_uploads)
370
371     # Resolve any collection uuids we found to portable data hashes
372     # and assign them to uuid_map
373     uuid_map = {}
374     fetch_uuids = list(uuids.keys())
375     with Perf(metrics, "fetch_uuids"):
376         while fetch_uuids:
377             # For a large number of fetch_uuids, API server may limit
378             # response size, so keep fetching from API server has nothing
379             # more to give us.
380             lookups = arvrunner.api.collections().list(
381                 filters=[["uuid", "in", fetch_uuids]],
382                 count="none",
383                 select=["uuid", "portable_data_hash"]).execute(
384                     num_retries=arvrunner.num_retries)
385
386             if not lookups["items"]:
387                 break
388
389             for l in lookups["items"]:
390                 uuid_map[l["uuid"]] = l["portable_data_hash"]
391
392             fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
393
394     normalizeFilesDirs(sc)
395
396     if "id" in workflowobj:
397         defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
398         if include_primary:
399             # make sure it's included
400             sc.append({"class": "File", "location": defrg})
401         else:
402             # make sure it's excluded
403             sc = [d for d in sc if d.get("location") != defrg]
404
405     def visit_default(obj):
406         def defaults_are_optional(f):
407             if "location" not in f and "path" in f:
408                 f["location"] = f["path"]
409                 del f["path"]
410             normalizeFilesDirs(f)
411             optional_deps.append(f)
412         visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
413
414     find_defaults(workflowobj, visit_default)
415
416     discovered = {}
417     def discover_default_secondary_files(obj):
418         builder_job_order = {}
419         for t in obj["inputs"]:
420             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
421         # Need to create a builder object to evaluate expressions.
422         builder = make_builder(builder_job_order,
423                                obj.get("hints", []),
424                                obj.get("requirements", []),
425                                ArvRuntimeContext(),
426                                metadata)
427         discover_secondary_files(arvrunner.fs_access,
428                                  builder,
429                                  obj["inputs"],
430                                  builder_job_order,
431                                  discovered)
432
433     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
434     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
435
436     for d in list(discovered):
437         # Only interested in discovered secondaryFiles which are local
438         # files that need to be uploaded.
439         if d.startswith("file:"):
440             sc.extend(discovered[d])
441         else:
442             del discovered[d]
443
444     with Perf(metrics, "mapper"):
445         mapper = ArvPathMapper(arvrunner, sc, "",
446                                "keep:%s",
447                                "keep:%s/%s",
448                                name=name,
449                                single_collection=True,
450                                optional_deps=optional_deps)
451
452     for k, v in uuid_map.items():
453         mapper._pathmap["keep:"+k] = MapperEnt(v, "", "", False)
454
455     keeprefs = set()
456     def addkeepref(k):
457         if k.startswith("keep:"):
458             keeprefs.add(collection_pdh_pattern.match(k).group(1))
459
460
461     def collectloc(p):
462         loc = p.get("location")
463         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
464             addkeepref(p["location"])
465             return
466
467         if not loc:
468             return
469
470         if collectionUUID in p:
471             uuid = p[collectionUUID]
472             if uuid not in uuid_map:
473                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
474                     "Collection uuid %s not found" % uuid)
475             gp = collection_pdh_pattern.match(loc)
476             if gp and uuid_map[uuid] != gp.groups()[0]:
477                 # This file entry has both collectionUUID and a PDH
478                 # location. If the PDH doesn't match the one returned
479                 # the API server, raise an error.
480                 raise SourceLine(p, "location", validate.ValidationException).makeError(
481                     "Expected collection uuid %s to be %s but API server reported %s" % (
482                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
483
484         gp = collection_uuid_pattern.match(loc)
485         if not gp:
486             # Not a uuid pattern (must be a pdh pattern)
487             addkeepref(p["location"])
488             return
489
490         uuid = gp.groups()[0]
491         if uuid not in uuid_map:
492             raise SourceLine(p, "location", validate.ValidationException).makeError(
493                 "Collection uuid %s not found" % uuid)
494
495     with Perf(metrics, "collectloc"):
496         visit_class(workflowobj, ("File", "Directory"), collectloc)
497         visit_class(discovered, ("File", "Directory"), collectloc)
498
499     if discovered_secondaryfiles is not None:
500         for d in discovered:
501             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
502
503     if runtimeContext.copy_deps:
504         # Find referenced collections and copy them into the
505         # destination project, for easy sharing.
506         already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
507                                      filters=[["portable_data_hash", "in", list(keeprefs)],
508                                               ["owner_uuid", "=", runtimeContext.project_uuid]],
509                                      select=["uuid", "portable_data_hash", "created_at"]))
510
511         keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
512         for kr in keeprefs:
513             col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
514                                                   order="created_at desc",
515                                                    select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
516                                                    limit=1).execute()
517             if len(col["items"]) == 0:
518                 logger.warning("Cannot find collection with portable data hash %s", kr)
519                 continue
520             col = col["items"][0]
521             col["name"] = arvados.util.trim_name(col["name"])
522             try:
523                 arvrunner.api.collections().create(body={"collection": {
524                     "owner_uuid": runtimeContext.project_uuid,
525                     "name": col["name"],
526                     "description": col["description"],
527                     "properties": col["properties"],
528                     "portable_data_hash": col["portable_data_hash"],
529                     "manifest_text": col["manifest_text"],
530                     "storage_classes_desired": col["storage_classes_desired"],
531                     "trash_at": col["trash_at"]
532                 }}, ensure_unique_name=True).execute()
533             except Exception as e:
534                 logger.warning("Unable to copy collection to destination: %s", e)
535
536     if "$schemas" in workflowobj:
537         sch = CommentedSeq()
538         for s in workflowobj["$schemas"]:
539             if s in mapper:
540                 sch.append(mapper.mapper(s).resolved)
541         workflowobj["$schemas"] = sch
542
543     return mapper
544
545
546 def upload_docker(arvrunner, tool, runtimeContext):
547     """Uploads Docker images used in CommandLineTool objects."""
548
549     if isinstance(tool, CommandLineTool):
550         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
551         if docker_req:
552             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
553                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
554                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
555
556             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, runtimeContext)
557         else:
558             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
559                                                        True, runtimeContext)
560     elif isinstance(tool, cwltool.workflow.Workflow):
561         for s in tool.steps:
562             upload_docker(arvrunner, s.embedded_tool, runtimeContext)
563
564
565 def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info):
566     """Create a packed workflow.
567
568     A "packed" workflow is one where all the components have been combined into a single document."""
569
570     rewrites = {}
571     packed = pack(arvrunner.loadingContext, tool.tool["id"],
572                   rewrite_out=rewrites,
573                   loader=tool.doc_loader)
574
575     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
576
577     def visit(v, cur_id):
578         if isinstance(v, dict):
579             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
580                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
581                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
582                 if "id" in v:
583                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
584             if "path" in v and "location" not in v:
585                 v["location"] = v["path"]
586                 del v["path"]
587             if "location" in v and cur_id in merged_map:
588                 if v["location"] in merged_map[cur_id].resolved:
589                     v["location"] = merged_map[cur_id].resolved[v["location"]]
590                 if v["location"] in merged_map[cur_id].secondaryFiles:
591                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
592             if v.get("class") == "DockerRequirement":
593                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
594                                                                                                              runtimeContext)
595             for l in v:
596                 visit(v[l], cur_id)
597         if isinstance(v, list):
598             for l in v:
599                 visit(l, cur_id)
600     visit(packed, None)
601
602     if git_info:
603         for g in git_info:
604             packed[g] = git_info[g]
605
606     return packed
607
608
609 def tag_git_version(packed):
610     if tool.tool["id"].startswith("file://"):
611         path = os.path.dirname(tool.tool["id"][7:])
612         try:
613             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
614         except (OSError, subprocess.CalledProcessError):
615             pass
616         else:
617             packed["http://schema.org/version"] = githash
618
619 def setloc(mapper, p):
620     loc = p.get("location")
621     if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
622         p["location"] = mapper.mapper(p["location"]).resolved
623         return
624
625     if not loc:
626         return
627
628     if collectionUUID in p:
629         uuid = p[collectionUUID]
630         keepuuid = "keep:"+uuid
631         if keepuuid not in mapper:
632             raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
633                 "Collection uuid %s not found" % uuid)
634         gp = collection_pdh_pattern.match(loc)
635         if gp and mapper.mapper(keepuuid).resolved != gp.groups()[0]:
636             # This file entry has both collectionUUID and a PDH
637             # location. If the PDH doesn't match the one returned
638             # the API server, raise an error.
639             raise SourceLine(p, "location", validate.ValidationException).makeError(
640                 "Expected collection uuid %s to be %s but API server reported %s" % (
641                     uuid, gp.groups()[0], mapper.mapper(keepuuid).resolved))
642
643     gp = collection_uuid_pattern.match(loc)
644     if not gp:
645         # Not a uuid pattern (must be a pdh pattern)
646         return
647
648     uuid = gp.groups()[0]
649     keepuuid = "keep:"+uuid
650     if keepuuid not in mapper:
651         raise SourceLine(p, "location", validate.ValidationException).makeError(
652             "Collection uuid %s not found" % uuid)
653     p["location"] = "keep:%s%s" % (mapper.mapper(keepuuid).resolved, gp.groups()[1] if gp.groups()[1] else "")
654     p[collectionUUID] = uuid
655
656 def update_from_mapper(workflowobj, mapper):
657     with Perf(metrics, "setloc"):
658         visit_class(workflowobj, ("File", "Directory"), partial(setloc, mapper))
659
660 def apply_merged_map(merged_map, workflowobj):
661     def visit(v, cur_id):
662         if isinstance(v, dict):
663             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
664                 if "id" in v:
665                     cur_id = v["id"]
666             if "path" in v and "location" not in v:
667                 v["location"] = v["path"]
668                 del v["path"]
669             if "location" in v and cur_id in merged_map:
670                 if v["location"] in merged_map[cur_id].resolved:
671                     v["location"] = merged_map[cur_id].resolved[v["location"]]
672                 if v["location"] in merged_map[cur_id].secondaryFiles:
673                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
674             #if v.get("class") == "DockerRequirement":
675             #    v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
676             #                                                                                                 runtimeContext)
677             for l in v:
678                 visit(v[l], cur_id)
679         if isinstance(v, list):
680             for l in v:
681                 visit(l, cur_id)
682     visit(workflowobj, None)
683
684 def update_from_merged_map(tool, merged_map):
685     tool.visit(partial(apply_merged_map, merged_map))
686
687 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
688     """Upload local files referenced in the input object and return updated input
689     object with 'location' updated to the proper keep references.
690     """
691
692     # Make a copy of the job order and set defaults.
693     builder_job_order = copy.copy(job_order)
694
695     # fill_in_defaults throws an error if there are any
696     # missing required parameters, we don't want it to do that
697     # so make them all optional.
698     inputs_copy = copy.deepcopy(tool.tool["inputs"])
699     for i in inputs_copy:
700         if "null" not in i["type"]:
701             i["type"] = ["null"] + aslist(i["type"])
702
703     fill_in_defaults(inputs_copy,
704                      builder_job_order,
705                      arvrunner.fs_access)
706     # Need to create a builder object to evaluate expressions.
707     builder = make_builder(builder_job_order,
708                            tool.hints,
709                            tool.requirements,
710                            ArvRuntimeContext(),
711                            tool.metadata)
712     # Now update job_order with secondaryFiles
713     discover_secondary_files(arvrunner.fs_access,
714                              builder,
715                              tool.tool["inputs"],
716                              job_order)
717
718     _jobloaderctx = jobloaderctx.copy()
719     jobloader = schema_salad.ref_resolver.Loader(_jobloaderctx, fetcher_constructor=tool.doc_loader.fetcher_constructor)
720
721     jobmapper = upload_dependencies(arvrunner,
722                                     name,
723                                     jobloader,
724                                     job_order,
725                                     job_order.get("id", "#"),
726                                     runtimeContext)
727
728     if "id" in job_order:
729         del job_order["id"]
730
731     # Need to filter this out, gets added by cwltool when providing
732     # parameters on the command line.
733     if "job_order" in job_order:
734         del job_order["job_order"]
735
736     update_from_mapper(job_order, jobmapper)
737
738     #print(json.dumps(job_order, indent=2))
739
740     return job_order, jobmapper
741
742 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
743
744 def upload_workflow_deps(arvrunner, tool, runtimeContext):
745     # Ensure that Docker images needed by this workflow are available
746
747     with Perf(metrics, "upload_docker"):
748         upload_docker(arvrunner, tool, runtimeContext)
749
750     document_loader = tool.doc_loader
751
752     merged_map = {}
753     tool_dep_cache = {}
754
755     todo = []
756
757     # Standard traversal is top down, we want to go bottom up, so use
758     # the visitor to accumalate a list of nodes to visit, then
759     # visit them in reverse order.
760     def upload_tool_deps(deptool):
761         if "id" in deptool:
762             todo.append(deptool)
763
764     tool.visit(upload_tool_deps)
765
766     for deptool in reversed(todo):
767         discovered_secondaryfiles = {}
768         with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
769             pm = upload_dependencies(arvrunner,
770                                      "%s dependencies" % (shortname(deptool["id"])),
771                                      document_loader,
772                                      deptool,
773                                      deptool["id"],
774                                      runtimeContext,
775                                      include_primary=False,
776                                      discovered_secondaryfiles=discovered_secondaryfiles,
777                                      cache=tool_dep_cache)
778
779         document_loader.idx[deptool["id"]] = deptool
780         toolmap = {}
781         for k,v in pm.items():
782             toolmap[k] = v.resolved
783
784         #print("visited", deptool["id"], toolmap, discovered_secondaryfiles)
785
786         merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
787
788     return merged_map
789
790 def arvados_jobs_image(arvrunner, img, runtimeContext):
791     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
792
793     try:
794         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
795                                                           True, runtimeContext)
796     except Exception as e:
797         raise Exception("Docker image %s is not available\n%s" % (img, e) )
798
799
800 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
801     collection = arvados.collection.Collection(api_client=arvrunner.api,
802                                                keep_client=arvrunner.keep_client,
803                                                num_retries=arvrunner.num_retries)
804     with collection.open("workflow.cwl", "w") as f:
805         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
806
807     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
808                ["name", "like", name+"%"]]
809     if runtimeContext.project_uuid:
810         filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
811     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
812
813     if exists["items"]:
814         logger.info("Using collection %s", exists["items"][0]["uuid"])
815     else:
816         collection.save_new(name=name,
817                             owner_uuid=runtimeContext.project_uuid,
818                             ensure_unique_name=True,
819                             num_retries=arvrunner.num_retries)
820         logger.info("Uploaded to %s", collection.manifest_locator())
821
822     return collection.portable_data_hash()
823
824
825 class Runner(Process):
826     """Base class for runner processes, which submit an instance of
827     arvados-cwl-runner and wait for the final result."""
828
829     def __init__(self, runner,
830                  tool, loadingContext, enable_reuse,
831                  output_name, output_tags, submit_runner_ram=0,
832                  name=None, on_error=None, submit_runner_image=None,
833                  intermediate_output_ttl=0, merged_map=None,
834                  priority=None, secret_store=None,
835                  collection_cache_size=256,
836                  collection_cache_is_default=True,
837                  git_info=None):
838
839         self.loadingContext = loadingContext.copy()
840
841         super(Runner, self).__init__(tool.tool, loadingContext)
842
843         self.arvrunner = runner
844         self.embedded_tool = tool
845         self.job_order = None
846         self.running = False
847         if enable_reuse:
848             # If reuse is permitted by command line arguments but
849             # disabled by the workflow itself, disable it.
850             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
851             if reuse_req:
852                 enable_reuse = reuse_req["enableReuse"]
853             reuse_req, _ = self.embedded_tool.get_requirement("WorkReuse")
854             if reuse_req:
855                 enable_reuse = reuse_req["enableReuse"]
856         self.enable_reuse = enable_reuse
857         self.uuid = None
858         self.final_output = None
859         self.output_name = output_name
860         self.output_tags = output_tags
861         self.name = name
862         self.on_error = on_error
863         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
864         self.intermediate_output_ttl = intermediate_output_ttl
865         self.priority = priority
866         self.secret_store = secret_store
867         self.enable_dev = self.loadingContext.enable_dev
868         self.git_info = git_info
869         self.fast_parser = self.loadingContext.fast_parser
870
871         self.submit_runner_cores = 1
872         self.submit_runner_ram = 1024  # defaut 1 GiB
873         self.collection_cache_size = collection_cache_size
874
875         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
876         if runner_resource_req:
877             if runner_resource_req.get("coresMin"):
878                 self.submit_runner_cores = runner_resource_req["coresMin"]
879             if runner_resource_req.get("ramMin"):
880                 self.submit_runner_ram = runner_resource_req["ramMin"]
881             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
882                 self.collection_cache_size = runner_resource_req["keep_cache"]
883
884         if submit_runner_ram:
885             # Command line / initializer overrides default and/or spec from workflow
886             self.submit_runner_ram = submit_runner_ram
887
888         if self.submit_runner_ram <= 0:
889             raise Exception("Value of submit-runner-ram must be greater than zero")
890
891         if self.submit_runner_cores <= 0:
892             raise Exception("Value of submit-runner-cores must be greater than zero")
893
894         self.merged_map = merged_map or {}
895
896     def job(self,
897             job_order,         # type: Mapping[Text, Text]
898             output_callbacks,  # type: Callable[[Any, Any], Any]
899             runtimeContext     # type: RuntimeContext
900            ):  # type: (...) -> Generator[Any, None, None]
901         self.job_order = job_order
902         self._init_job(job_order, runtimeContext)
903         yield self
904
905     def update_pipeline_component(self, record):
906         pass
907
908     def done(self, record):
909         """Base method for handling a completed runner."""
910
911         try:
912             if record["state"] == "Complete":
913                 if record.get("exit_code") is not None:
914                     if record["exit_code"] == 33:
915                         processStatus = "UnsupportedRequirement"
916                     elif record["exit_code"] == 0:
917                         processStatus = "success"
918                     else:
919                         processStatus = "permanentFail"
920                 else:
921                     processStatus = "success"
922             else:
923                 processStatus = "permanentFail"
924
925             outputs = {}
926
927             if processStatus == "permanentFail":
928                 logc = arvados.collection.CollectionReader(record["log"],
929                                                            api_client=self.arvrunner.api,
930                                                            keep_client=self.arvrunner.keep_client,
931                                                            num_retries=self.arvrunner.num_retries)
932                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
933
934             self.final_output = record["output"]
935             outc = arvados.collection.CollectionReader(self.final_output,
936                                                        api_client=self.arvrunner.api,
937                                                        keep_client=self.arvrunner.keep_client,
938                                                        num_retries=self.arvrunner.num_retries)
939             if "cwl.output.json" in outc:
940                 with outc.open("cwl.output.json", "rb") as f:
941                     if f.size() > 0:
942                         outputs = json.loads(f.read().decode())
943             def keepify(fileobj):
944                 path = fileobj["location"]
945                 if not path.startswith("keep:"):
946                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
947             adjustFileObjs(outputs, keepify)
948             adjustDirObjs(outputs, keepify)
949         except Exception:
950             logger.exception("[%s] While getting final output object", self.name)
951             self.arvrunner.output_callback({}, "permanentFail")
952         else:
953             self.arvrunner.output_callback(outputs, processStatus)