20462: Refactor "common prefix" function & fix fencepost errors
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import (
21     Any,
22     Callable,
23     Dict,
24     Iterable,
25     Iterator,
26     List,
27     Mapping,
28     MutableMapping,
29     Sequence,
30     MutableSequence,
31     Optional,
32     Set,
33     Sized,
34     Tuple,
35     Type,
36     Union,
37     cast,
38 )
39 from cwltool.utils import (
40     CWLObjectType,
41     CWLOutputAtomType,
42     CWLOutputType,
43 )
44
45 if os.name == "posix" and sys.version_info[0] < 3:
46     import subprocess32 as subprocess
47 else:
48     import subprocess
49
50 from schema_salad.sourceline import SourceLine, cmap
51
52 from cwltool.command_line_tool import CommandLineTool
53 import cwltool.workflow
54 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
55                              shortname, Process, fill_in_defaults)
56 from cwltool.load_tool import fetch_document, jobloaderctx
57 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
58 from cwltool.builder import substitute
59 from cwltool.pack import pack
60 from cwltool.update import INTERNAL_VERSION
61 from cwltool.builder import Builder
62 import schema_salad.validate as validate
63 import schema_salad.ref_resolver
64
65 import arvados.collection
66 import arvados.util
67 from .util import collectionUUID
68 from ruamel.yaml import YAML
69 from ruamel.yaml.comments import CommentedMap, CommentedSeq
70
71 import arvados_cwl.arvdocker
72 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern, MapperEnt
73 from ._version import __version__
74 from . import done
75 from . context import ArvRuntimeContext
76 from .perf import Perf
77
78 logger = logging.getLogger('arvados.cwl-runner')
79 metrics = logging.getLogger('arvados.cwl-runner.metrics')
80
81 def trim_anonymous_location(obj):
82     """Remove 'location' field from File and Directory literals.
83
84     To make internal handling easier, literals are assigned a random id for
85     'location'.  However, when writing the record back out, this can break
86     reproducibility.  Since it is valid for literals not have a 'location'
87     field, remove it.
88
89     """
90
91     if obj.get("location", "").startswith("_:"):
92         del obj["location"]
93
94
95 def remove_redundant_fields(obj):
96     for field in ("path", "nameext", "nameroot", "dirname"):
97         if field in obj:
98             del obj[field]
99
100
101 def find_defaults(d, op):
102     if isinstance(d, list):
103         for i in d:
104             find_defaults(i, op)
105     elif isinstance(d, dict):
106         if "default" in d:
107             op(d)
108         else:
109             for i in viewvalues(d):
110                 find_defaults(i, op)
111
112 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
113     return Builder(
114                  job=joborder,
115                  files=[],               # type: List[Dict[Text, Text]]
116                  bindings=[],            # type: List[Dict[Text, Any]]
117                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
118                  names=None,               # type: Names
119                  requirements=requirements,        # type: List[Dict[Text, Any]]
120                  hints=hints,               # type: List[Dict[Text, Any]]
121                  resources={},           # type: Dict[str, int]
122                  mutation_manager=None,    # type: Optional[MutationManager]
123                  formatgraph=None,         # type: Optional[Graph]
124                  make_fs_access=None,      # type: Type[StdFsAccess]
125                  fs_access=None,           # type: StdFsAccess
126                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
127                  timeout=runtimeContext.eval_timeout,             # type: float
128                  debug=runtimeContext.debug,               # type: bool
129                  js_console=runtimeContext.js_console,          # type: bool
130                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
131                  loadListing="",         # type: Text
132                  outdir="",              # type: Text
133                  tmpdir="",              # type: Text
134                  stagedir="",            # type: Text
135                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
136                  container_engine="docker"
137                 )
138
139 def search_schemadef(name, reqs):
140     for r in reqs:
141         if r["class"] == "SchemaDefRequirement":
142             for sd in r["types"]:
143                 if sd["name"] == name:
144                     return sd
145     return None
146
147 primitive_types_set = frozenset(("null", "boolean", "int", "long",
148                                  "float", "double", "string", "record",
149                                  "array", "enum"))
150
151 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
152     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
153         # union type, collect all possible secondaryFiles
154         for i in inputschema:
155             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
156         return
157
158     if inputschema == "File":
159         inputschema = {"type": "File"}
160
161     if isinstance(inputschema, basestring):
162         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
163         if sd:
164             inputschema = sd
165         else:
166             return
167
168     if "secondaryFiles" in inputschema:
169         # set secondaryFiles, may be inherited by compound types.
170         secondaryspec = inputschema["secondaryFiles"]
171
172     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
173         not isinstance(inputschema["type"], basestring)):
174         # compound type (union, array, record)
175         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
176
177     elif (inputschema["type"] == "record" and
178           isinstance(primary, Mapping)):
179         #
180         # record type, find secondary files associated with fields.
181         #
182         for f in inputschema["fields"]:
183             p = primary.get(shortname(f["name"]))
184             if p:
185                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
186
187     elif (inputschema["type"] == "array" and
188           isinstance(primary, Sequence)):
189         #
190         # array type, find secondary files of elements
191         #
192         for p in primary:
193             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
194
195     elif (inputschema["type"] == "File" and
196           isinstance(primary, Mapping) and
197           primary.get("class") == "File"):
198
199         if "secondaryFiles" in primary or not secondaryspec:
200             # Nothing to do.
201             return
202
203         #
204         # Found a file, check for secondaryFiles
205         #
206         specs = []
207         primary["secondaryFiles"] = secondaryspec
208         for i, sf in enumerate(aslist(secondaryspec)):
209             if builder.cwlVersion == "v1.0":
210                 pattern = sf
211             else:
212                 pattern = sf["pattern"]
213             if pattern is None:
214                 continue
215             if isinstance(pattern, list):
216                 specs.extend(pattern)
217             elif isinstance(pattern, dict):
218                 specs.append(pattern)
219             elif isinstance(pattern, str):
220                 if builder.cwlVersion == "v1.0":
221                     specs.append({"pattern": pattern, "required": True})
222                 else:
223                     specs.append({"pattern": pattern, "required": sf.get("required")})
224             else:
225                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
226                     "Expression must return list, object, string or null")
227
228         found = []
229         for i, sf in enumerate(specs):
230             if isinstance(sf, dict):
231                 if sf.get("class") == "File":
232                     pattern = None
233                     if sf.get("location") is None:
234                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
235                             "File object is missing 'location': %s" % sf)
236                     sfpath = sf["location"]
237                     required = True
238                 else:
239                     pattern = sf["pattern"]
240                     required = sf.get("required")
241             elif isinstance(sf, str):
242                 pattern = sf
243                 required = True
244             else:
245                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
246                     "Expression must return list, object, string or null")
247
248             if pattern is not None:
249                 if "${" in pattern or "$(" in pattern:
250                     sfname = builder.do_eval(pattern, context=primary)
251                 else:
252                     sfname = substitute(primary["basename"], pattern)
253
254                 if sfname is None:
255                     continue
256
257                 if isinstance(sfname, str):
258                     p_location = primary["location"]
259                     if "/" in p_location:
260                         sfpath = (
261                             p_location[0 : p_location.rindex("/") + 1]
262                             + sfname
263                         )
264
265             required = builder.do_eval(required, context=primary)
266
267             if isinstance(sfname, list) or isinstance(sfname, dict):
268                 each = aslist(sfname)
269                 for e in each:
270                     if required and not fsaccess.exists(e.get("location")):
271                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
272                             "Required secondary file '%s' does not exist" % e.get("location"))
273                 found.extend(each)
274
275             if isinstance(sfname, str):
276                 if fsaccess.exists(sfpath):
277                     if pattern is not None:
278                         found.append({"location": sfpath, "class": "File"})
279                     else:
280                         found.append(sf)
281                 elif required:
282                     raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
283                         "Required secondary file '%s' does not exist" % sfpath)
284
285         primary["secondaryFiles"] = cmap(found)
286         if discovered is not None:
287             discovered[primary["location"]] = primary["secondaryFiles"]
288     elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
289         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
290
291 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
292     for inputschema in inputs:
293         primary = job_order.get(shortname(inputschema["id"]))
294         if isinstance(primary, (Mapping, Sequence)):
295             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
296
297 def upload_dependencies(arvrunner, name, document_loader,
298                         workflowobj, uri, runtimeContext,
299                         include_primary=True, discovered_secondaryfiles=None,
300                         cache=None):
301     """Upload the dependencies of the workflowobj document to Keep.
302
303     Returns a pathmapper object mapping local paths to keep references.  Also
304     does an in-place update of references in "workflowobj".
305
306     Use scandeps to find $schemas, File and Directory
307     fields that represent external references.
308
309     If workflowobj has an "id" field, this will reload the document to ensure
310     it is scanning the raw document prior to preprocessing.
311     """
312
313     scanobj = workflowobj
314     metadata = scanobj
315
316     with Perf(metrics, "scandeps"):
317         sc_result = scandeps(uri, scanobj,
318                              set(),
319                              set(("location",)),
320                              None, urljoin=document_loader.fetcher.urljoin,
321                              nestdirs=False)
322         optional_deps = scandeps(uri, scanobj,
323                              set(),
324                              set(("$schemas",)),
325                              None, urljoin=document_loader.fetcher.urljoin,
326                              nestdirs=False)
327
328     if sc_result is None:
329         sc_result = []
330
331     if optional_deps is None:
332         optional_deps = []
333
334     if optional_deps:
335         sc_result.extend(optional_deps)
336
337     sc = []
338     uuids = {}
339
340     def collect_uuids(obj):
341         loc = obj.get("location", "")
342         sp = loc.split(":")
343         if sp[0] == "keep":
344             # Collect collection uuids that need to be resolved to
345             # portable data hashes
346             gp = collection_uuid_pattern.match(loc)
347             if gp:
348                 uuids[gp.groups()[0]] = obj
349             if collectionUUID in obj:
350                 uuids[obj[collectionUUID]] = obj
351
352     def collect_uploads(obj):
353         loc = obj.get("location", "")
354         sp = loc.split(":")
355         if len(sp) < 1:
356             return
357         if sp[0] in ("file", "http", "https"):
358             # Record local files than need to be uploaded,
359             # don't include file literals, keep references, etc.
360             sc.append(obj)
361         collect_uuids(obj)
362
363     with Perf(metrics, "collect uuids"):
364         visit_class(workflowobj, ("File", "Directory"), collect_uuids)
365
366     with Perf(metrics, "collect uploads"):
367         visit_class(sc_result, ("File", "Directory"), collect_uploads)
368
369     # Resolve any collection uuids we found to portable data hashes
370     # and assign them to uuid_map
371     uuid_map = {}
372     fetch_uuids = list(uuids.keys())
373     with Perf(metrics, "fetch_uuids"):
374         while fetch_uuids:
375             # For a large number of fetch_uuids, API server may limit
376             # response size, so keep fetching from API server has nothing
377             # more to give us.
378             lookups = arvrunner.api.collections().list(
379                 filters=[["uuid", "in", fetch_uuids]],
380                 count="none",
381                 select=["uuid", "portable_data_hash"]).execute(
382                     num_retries=arvrunner.num_retries)
383
384             if not lookups["items"]:
385                 break
386
387             for l in lookups["items"]:
388                 uuid_map[l["uuid"]] = l["portable_data_hash"]
389
390             fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
391
392     normalizeFilesDirs(sc)
393
394     if "id" in workflowobj:
395         defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
396         if include_primary:
397             # make sure it's included
398             sc.append({"class": "File", "location": defrg})
399         else:
400             # make sure it's excluded
401             sc = [d for d in sc if d.get("location") != defrg]
402
403     def visit_default(obj):
404         def defaults_are_optional(f):
405             if "location" not in f and "path" in f:
406                 f["location"] = f["path"]
407                 del f["path"]
408             normalizeFilesDirs(f)
409             optional_deps.append(f)
410         visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
411
412     find_defaults(workflowobj, visit_default)
413
414     discovered = {}
415     def discover_default_secondary_files(obj):
416         builder_job_order = {}
417         for t in obj["inputs"]:
418             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
419         # Need to create a builder object to evaluate expressions.
420         builder = make_builder(builder_job_order,
421                                obj.get("hints", []),
422                                obj.get("requirements", []),
423                                ArvRuntimeContext(),
424                                metadata)
425         discover_secondary_files(arvrunner.fs_access,
426                                  builder,
427                                  obj["inputs"],
428                                  builder_job_order,
429                                  discovered)
430
431     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
432     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
433
434     for d in list(discovered):
435         # Only interested in discovered secondaryFiles which are local
436         # files that need to be uploaded.
437         if d.startswith("file:"):
438             sc.extend(discovered[d])
439         else:
440             del discovered[d]
441
442     with Perf(metrics, "mapper"):
443         mapper = ArvPathMapper(arvrunner, sc, "",
444                                "keep:%s",
445                                "keep:%s/%s",
446                                name=name,
447                                single_collection=True,
448                                optional_deps=optional_deps)
449
450     for k, v in uuid_map.items():
451         mapper._pathmap["keep:"+k] = MapperEnt(v, "", "", False)
452
453     keeprefs = set()
454     def addkeepref(k):
455         if k.startswith("keep:"):
456             keeprefs.add(collection_pdh_pattern.match(k).group(1))
457
458
459     def collectloc(p):
460         loc = p.get("location")
461         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
462             addkeepref(p["location"])
463             return
464
465         if not loc:
466             return
467
468         if collectionUUID in p:
469             uuid = p[collectionUUID]
470             if uuid not in uuid_map:
471                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
472                     "Collection uuid %s not found" % uuid)
473             gp = collection_pdh_pattern.match(loc)
474             if gp and uuid_map[uuid] != gp.groups()[0]:
475                 # This file entry has both collectionUUID and a PDH
476                 # location. If the PDH doesn't match the one returned
477                 # the API server, raise an error.
478                 raise SourceLine(p, "location", validate.ValidationException).makeError(
479                     "Expected collection uuid %s to be %s but API server reported %s" % (
480                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
481
482         gp = collection_uuid_pattern.match(loc)
483         if not gp:
484             # Not a uuid pattern (must be a pdh pattern)
485             addkeepref(p["location"])
486             return
487
488         uuid = gp.groups()[0]
489         if uuid not in uuid_map:
490             raise SourceLine(p, "location", validate.ValidationException).makeError(
491                 "Collection uuid %s not found" % uuid)
492
493     with Perf(metrics, "collectloc"):
494         visit_class(workflowobj, ("File", "Directory"), collectloc)
495         visit_class(discovered, ("File", "Directory"), collectloc)
496
497     if discovered_secondaryfiles is not None:
498         for d in discovered:
499             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
500
501     if runtimeContext.copy_deps:
502         # Find referenced collections and copy them into the
503         # destination project, for easy sharing.
504         already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
505                                      filters=[["portable_data_hash", "in", list(keeprefs)],
506                                               ["owner_uuid", "=", runtimeContext.project_uuid]],
507                                      select=["uuid", "portable_data_hash", "created_at"]))
508
509         keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
510         for kr in keeprefs:
511             col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
512                                                   order="created_at desc",
513                                                    select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
514                                                    limit=1).execute()
515             if len(col["items"]) == 0:
516                 logger.warning("Cannot find collection with portable data hash %s", kr)
517                 continue
518             col = col["items"][0]
519             col["name"] = arvados.util.trim_name(col["name"])
520             try:
521                 arvrunner.api.collections().create(body={"collection": {
522                     "owner_uuid": runtimeContext.project_uuid,
523                     "name": col["name"],
524                     "description": col["description"],
525                     "properties": col["properties"],
526                     "portable_data_hash": col["portable_data_hash"],
527                     "manifest_text": col["manifest_text"],
528                     "storage_classes_desired": col["storage_classes_desired"],
529                     "trash_at": col["trash_at"]
530                 }}, ensure_unique_name=True).execute()
531             except Exception as e:
532                 logger.warning("Unable to copy collection to destination: %s", e)
533
534     if "$schemas" in workflowobj:
535         sch = CommentedSeq()
536         for s in workflowobj["$schemas"]:
537             if s in mapper:
538                 sch.append(mapper.mapper(s).resolved)
539         workflowobj["$schemas"] = sch
540
541     return mapper
542
543
544 def upload_docker(arvrunner, tool, runtimeContext):
545     """Uploads Docker images used in CommandLineTool objects."""
546
547     if isinstance(tool, CommandLineTool):
548         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
549         if docker_req:
550             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
551                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
552                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
553
554             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, runtimeContext)
555         else:
556             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
557                                                        True, runtimeContext)
558     elif isinstance(tool, cwltool.workflow.Workflow):
559         for s in tool.steps:
560             upload_docker(arvrunner, s.embedded_tool, runtimeContext)
561
562
563 def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info):
564     """Create a packed workflow.
565
566     A "packed" workflow is one where all the components have been combined into a single document."""
567
568     rewrites = {}
569     packed = pack(arvrunner.loadingContext, tool.tool["id"],
570                   rewrite_out=rewrites,
571                   loader=tool.doc_loader)
572
573     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
574
575     def visit(v, cur_id):
576         if isinstance(v, dict):
577             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
578                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
579                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
580                 if "id" in v:
581                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
582             if "path" in v and "location" not in v:
583                 v["location"] = v["path"]
584                 del v["path"]
585             if "location" in v and cur_id in merged_map:
586                 if v["location"] in merged_map[cur_id].resolved:
587                     v["location"] = merged_map[cur_id].resolved[v["location"]]
588                 if v["location"] in merged_map[cur_id].secondaryFiles:
589                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
590             if v.get("class") == "DockerRequirement":
591                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
592                                                                                                              runtimeContext)
593             for l in v:
594                 visit(v[l], cur_id)
595         if isinstance(v, list):
596             for l in v:
597                 visit(l, cur_id)
598     visit(packed, None)
599
600     if git_info:
601         for g in git_info:
602             packed[g] = git_info[g]
603
604     return packed
605
606
607 def tag_git_version(packed):
608     if tool.tool["id"].startswith("file://"):
609         path = os.path.dirname(tool.tool["id"][7:])
610         try:
611             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
612         except (OSError, subprocess.CalledProcessError):
613             pass
614         else:
615             packed["http://schema.org/version"] = githash
616
617 def setloc(mapper, p):
618     loc = p.get("location")
619     if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
620         p["location"] = mapper.mapper(p["location"]).resolved
621         return
622
623     if not loc:
624         return
625
626     if collectionUUID in p:
627         uuid = p[collectionUUID]
628         keepuuid = "keep:"+uuid
629         if keepuuid not in mapper:
630             raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
631                 "Collection uuid %s not found" % uuid)
632         gp = collection_pdh_pattern.match(loc)
633         if gp and mapper.mapper(keepuuid).resolved != gp.groups()[0]:
634             # This file entry has both collectionUUID and a PDH
635             # location. If the PDH doesn't match the one returned
636             # the API server, raise an error.
637             raise SourceLine(p, "location", validate.ValidationException).makeError(
638                 "Expected collection uuid %s to be %s but API server reported %s" % (
639                     uuid, gp.groups()[0], mapper.mapper(keepuuid).resolved))
640
641     gp = collection_uuid_pattern.match(loc)
642     if not gp:
643         # Not a uuid pattern (must be a pdh pattern)
644         return
645
646     uuid = gp.groups()[0]
647     keepuuid = "keep:"+uuid
648     if keepuuid not in mapper:
649         raise SourceLine(p, "location", validate.ValidationException).makeError(
650             "Collection uuid %s not found" % uuid)
651     p["location"] = "keep:%s%s" % (mapper.mapper(keepuuid).resolved, gp.groups()[1] if gp.groups()[1] else "")
652     p[collectionUUID] = uuid
653
654 def update_from_mapper(workflowobj, mapper):
655     with Perf(metrics, "setloc"):
656         visit_class(workflowobj, ("File", "Directory"), partial(setloc, mapper))
657
658 def apply_merged_map(merged_map, workflowobj):
659     def visit(v, cur_id):
660         if isinstance(v, dict):
661             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
662                 if "id" in v:
663                     cur_id = v["id"]
664             if "path" in v and "location" not in v:
665                 v["location"] = v["path"]
666                 del v["path"]
667             if "location" in v and cur_id in merged_map:
668                 if v["location"] in merged_map[cur_id].resolved:
669                     v["location"] = merged_map[cur_id].resolved[v["location"]]
670                 if v["location"] in merged_map[cur_id].secondaryFiles:
671                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
672             #if v.get("class") == "DockerRequirement":
673             #    v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
674             #                                                                                                 runtimeContext)
675             for l in v:
676                 visit(v[l], cur_id)
677         if isinstance(v, list):
678             for l in v:
679                 visit(l, cur_id)
680     visit(workflowobj, None)
681
682 def update_from_merged_map(tool, merged_map):
683     tool.visit(partial(apply_merged_map, merged_map))
684
685 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
686     """Upload local files referenced in the input object and return updated input
687     object with 'location' updated to the proper keep references.
688     """
689
690     # Make a copy of the job order and set defaults.
691     builder_job_order = copy.copy(job_order)
692
693     # fill_in_defaults throws an error if there are any
694     # missing required parameters, we don't want it to do that
695     # so make them all optional.
696     inputs_copy = copy.deepcopy(tool.tool["inputs"])
697     for i in inputs_copy:
698         if "null" not in i["type"]:
699             i["type"] = ["null"] + aslist(i["type"])
700
701     fill_in_defaults(inputs_copy,
702                      builder_job_order,
703                      arvrunner.fs_access)
704     # Need to create a builder object to evaluate expressions.
705     builder = make_builder(builder_job_order,
706                            tool.hints,
707                            tool.requirements,
708                            ArvRuntimeContext(),
709                            tool.metadata)
710     # Now update job_order with secondaryFiles
711     discover_secondary_files(arvrunner.fs_access,
712                              builder,
713                              tool.tool["inputs"],
714                              job_order)
715
716     _jobloaderctx = jobloaderctx.copy()
717     jobloader = schema_salad.ref_resolver.Loader(_jobloaderctx, fetcher_constructor=tool.doc_loader.fetcher_constructor)
718
719     jobmapper = upload_dependencies(arvrunner,
720                                     name,
721                                     jobloader,
722                                     job_order,
723                                     job_order.get("id", "#"),
724                                     runtimeContext)
725
726     if "id" in job_order:
727         del job_order["id"]
728
729     # Need to filter this out, gets added by cwltool when providing
730     # parameters on the command line.
731     if "job_order" in job_order:
732         del job_order["job_order"]
733
734     update_from_mapper(job_order, jobmapper)
735
736     return job_order, jobmapper
737
738 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
739
740 def upload_workflow_deps(arvrunner, tool, runtimeContext):
741     # Ensure that Docker images needed by this workflow are available
742
743     with Perf(metrics, "upload_docker"):
744         upload_docker(arvrunner, tool, runtimeContext)
745
746     document_loader = tool.doc_loader
747
748     merged_map = {}
749     tool_dep_cache = {}
750
751     todo = []
752
753     # Standard traversal is top down, we want to go bottom up, so use
754     # the visitor to accumalate a list of nodes to visit, then
755     # visit them in reverse order.
756     def upload_tool_deps(deptool):
757         if "id" in deptool:
758             todo.append(deptool)
759
760     tool.visit(upload_tool_deps)
761
762     for deptool in reversed(todo):
763         discovered_secondaryfiles = {}
764         with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
765             pm = upload_dependencies(arvrunner,
766                                      "%s dependencies" % (shortname(deptool["id"])),
767                                      document_loader,
768                                      deptool,
769                                      deptool["id"],
770                                      runtimeContext,
771                                      include_primary=False,
772                                      discovered_secondaryfiles=discovered_secondaryfiles,
773                                      cache=tool_dep_cache)
774
775         document_loader.idx[deptool["id"]] = deptool
776         toolmap = {}
777         for k,v in pm.items():
778             toolmap[k] = v.resolved
779
780         merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
781
782     return merged_map
783
784 def arvados_jobs_image(arvrunner, img, runtimeContext):
785     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
786
787     try:
788         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
789                                                           True, runtimeContext)
790     except Exception as e:
791         raise Exception("Docker image %s is not available\n%s" % (img, e) )
792
793
794 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
795     collection = arvados.collection.Collection(api_client=arvrunner.api,
796                                                keep_client=arvrunner.keep_client,
797                                                num_retries=arvrunner.num_retries)
798     with collection.open("workflow.cwl", "w") as f:
799         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
800
801     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
802                ["name", "like", name+"%"]]
803     if runtimeContext.project_uuid:
804         filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
805     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
806
807     if exists["items"]:
808         logger.info("Using collection %s", exists["items"][0]["uuid"])
809     else:
810         collection.save_new(name=name,
811                             owner_uuid=runtimeContext.project_uuid,
812                             ensure_unique_name=True,
813                             num_retries=arvrunner.num_retries)
814         logger.info("Uploaded to %s", collection.manifest_locator())
815
816     return collection.portable_data_hash()
817
818
819 class Runner(Process):
820     """Base class for runner processes, which submit an instance of
821     arvados-cwl-runner and wait for the final result."""
822
823     def __init__(self, runner,
824                  tool, loadingContext, enable_reuse,
825                  output_name, output_tags, submit_runner_ram=0,
826                  name=None, on_error=None, submit_runner_image=None,
827                  intermediate_output_ttl=0, merged_map=None,
828                  priority=None, secret_store=None,
829                  collection_cache_size=256,
830                  collection_cache_is_default=True,
831                  git_info=None):
832
833         self.loadingContext = loadingContext.copy()
834
835         super(Runner, self).__init__(tool.tool, loadingContext)
836
837         self.arvrunner = runner
838         self.embedded_tool = tool
839         self.job_order = None
840         self.running = False
841         if enable_reuse:
842             # If reuse is permitted by command line arguments but
843             # disabled by the workflow itself, disable it.
844             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
845             if reuse_req:
846                 enable_reuse = reuse_req["enableReuse"]
847             reuse_req, _ = self.embedded_tool.get_requirement("WorkReuse")
848             if reuse_req:
849                 enable_reuse = reuse_req["enableReuse"]
850         self.enable_reuse = enable_reuse
851         self.uuid = None
852         self.final_output = None
853         self.output_name = output_name
854         self.output_tags = output_tags
855         self.name = name
856         self.on_error = on_error
857         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
858         self.intermediate_output_ttl = intermediate_output_ttl
859         self.priority = priority
860         self.secret_store = secret_store
861         self.enable_dev = self.loadingContext.enable_dev
862         self.git_info = git_info
863         self.fast_parser = self.loadingContext.fast_parser
864
865         self.submit_runner_cores = 1
866         self.submit_runner_ram = 1024  # defaut 1 GiB
867         self.collection_cache_size = collection_cache_size
868
869         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
870         if runner_resource_req:
871             if runner_resource_req.get("coresMin"):
872                 self.submit_runner_cores = runner_resource_req["coresMin"]
873             if runner_resource_req.get("ramMin"):
874                 self.submit_runner_ram = runner_resource_req["ramMin"]
875             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
876                 self.collection_cache_size = runner_resource_req["keep_cache"]
877
878         if submit_runner_ram:
879             # Command line / initializer overrides default and/or spec from workflow
880             self.submit_runner_ram = submit_runner_ram
881
882         if self.submit_runner_ram <= 0:
883             raise Exception("Value of submit-runner-ram must be greater than zero")
884
885         if self.submit_runner_cores <= 0:
886             raise Exception("Value of submit-runner-cores must be greater than zero")
887
888         self.merged_map = merged_map or {}
889
890     def job(self,
891             job_order,         # type: Mapping[Text, Text]
892             output_callbacks,  # type: Callable[[Any, Any], Any]
893             runtimeContext     # type: RuntimeContext
894            ):  # type: (...) -> Generator[Any, None, None]
895         self.job_order = job_order
896         self._init_job(job_order, runtimeContext)
897         yield self
898
899     def update_pipeline_component(self, record):
900         pass
901
902     def done(self, record):
903         """Base method for handling a completed runner."""
904
905         try:
906             if record["state"] == "Complete":
907                 if record.get("exit_code") is not None:
908                     if record["exit_code"] == 33:
909                         processStatus = "UnsupportedRequirement"
910                     elif record["exit_code"] == 0:
911                         processStatus = "success"
912                     else:
913                         processStatus = "permanentFail"
914                 else:
915                     processStatus = "success"
916             else:
917                 processStatus = "permanentFail"
918
919             outputs = {}
920
921             if processStatus == "permanentFail":
922                 logc = arvados.collection.CollectionReader(record["log"],
923                                                            api_client=self.arvrunner.api,
924                                                            keep_client=self.arvrunner.keep_client,
925                                                            num_retries=self.arvrunner.num_retries)
926                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
927
928             self.final_output = record["output"]
929             outc = arvados.collection.CollectionReader(self.final_output,
930                                                        api_client=self.arvrunner.api,
931                                                        keep_client=self.arvrunner.keep_client,
932                                                        num_retries=self.arvrunner.num_retries)
933             if "cwl.output.json" in outc:
934                 with outc.open("cwl.output.json", "rb") as f:
935                     if f.size() > 0:
936                         outputs = json.loads(f.read().decode())
937             def keepify(fileobj):
938                 path = fileobj["location"]
939                 if not path.startswith("keep:"):
940                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
941             adjustFileObjs(outputs, keepify)
942             adjustDirObjs(outputs, keepify)
943         except Exception:
944             logger.exception("[%s] While getting final output object", self.name)
945             self.arvrunner.output_callback({}, "permanentFail")
946         else:
947             self.arvrunner.output_callback(outputs, processStatus)