19385: Reference rewriting fixups
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import (
21     Any,
22     Callable,
23     Dict,
24     Iterable,
25     Iterator,
26     List,
27     Mapping,
28     MutableMapping,
29     Sequence,
30     MutableSequence,
31     Optional,
32     Set,
33     Sized,
34     Tuple,
35     Type,
36     Union,
37     cast,
38 )
39 from cwltool.utils import (
40     CWLObjectType,
41     CWLOutputAtomType,
42     CWLOutputType,
43 )
44
45 if os.name == "posix" and sys.version_info[0] < 3:
46     import subprocess32 as subprocess
47 else:
48     import subprocess
49
50 from schema_salad.sourceline import SourceLine, cmap
51
52 from cwltool.command_line_tool import CommandLineTool
53 import cwltool.workflow
54 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
55                              shortname, Process, fill_in_defaults)
56 from cwltool.load_tool import fetch_document, jobloaderctx
57 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
58 from cwltool.builder import substitute
59 from cwltool.pack import pack
60 from cwltool.update import INTERNAL_VERSION
61 from cwltool.builder import Builder
62 import schema_salad.validate as validate
63 import schema_salad.ref_resolver
64
65 import arvados.collection
66 import arvados.util
67 from .util import collectionUUID
68 from ruamel.yaml import YAML
69 from ruamel.yaml.comments import CommentedMap, CommentedSeq
70
71 import arvados_cwl.arvdocker
72 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern, MapperEnt
73 from ._version import __version__
74 from . import done
75 from . context import ArvRuntimeContext
76 from .perf import Perf
77
78 logger = logging.getLogger('arvados.cwl-runner')
79 metrics = logging.getLogger('arvados.cwl-runner.metrics')
80
81 def trim_anonymous_location(obj):
82     """Remove 'location' field from File and Directory literals.
83
84     To make internal handling easier, literals are assigned a random id for
85     'location'.  However, when writing the record back out, this can break
86     reproducibility.  Since it is valid for literals not have a 'location'
87     field, remove it.
88
89     """
90
91     if obj.get("location", "").startswith("_:"):
92         del obj["location"]
93
94
95 def remove_redundant_fields(obj):
96     for field in ("path", "nameext", "nameroot", "dirname"):
97         if field in obj:
98             del obj[field]
99
100
101 def find_defaults(d, op):
102     if isinstance(d, list):
103         for i in d:
104             find_defaults(i, op)
105     elif isinstance(d, dict):
106         if "default" in d:
107             op(d)
108         else:
109             for i in viewvalues(d):
110                 find_defaults(i, op)
111
112 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
113     return Builder(
114                  job=joborder,
115                  files=[],               # type: List[Dict[Text, Text]]
116                  bindings=[],            # type: List[Dict[Text, Any]]
117                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
118                  names=None,               # type: Names
119                  requirements=requirements,        # type: List[Dict[Text, Any]]
120                  hints=hints,               # type: List[Dict[Text, Any]]
121                  resources={},           # type: Dict[str, int]
122                  mutation_manager=None,    # type: Optional[MutationManager]
123                  formatgraph=None,         # type: Optional[Graph]
124                  make_fs_access=None,      # type: Type[StdFsAccess]
125                  fs_access=None,           # type: StdFsAccess
126                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
127                  timeout=runtimeContext.eval_timeout,             # type: float
128                  debug=runtimeContext.debug,               # type: bool
129                  js_console=runtimeContext.js_console,          # type: bool
130                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
131                  loadListing="",         # type: Text
132                  outdir="",              # type: Text
133                  tmpdir="",              # type: Text
134                  stagedir="",            # type: Text
135                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
136                  container_engine="docker"
137                 )
138
139 def search_schemadef(name, reqs):
140     for r in reqs:
141         if r["class"] == "SchemaDefRequirement":
142             for sd in r["types"]:
143                 if sd["name"] == name:
144                     return sd
145     return None
146
147 primitive_types_set = frozenset(("null", "boolean", "int", "long",
148                                  "float", "double", "string", "record",
149                                  "array", "enum"))
150
151 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
152     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
153         # union type, collect all possible secondaryFiles
154         for i in inputschema:
155             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
156         return
157
158     if inputschema == "File":
159         inputschema = {"type": "File"}
160
161     if isinstance(inputschema, basestring):
162         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
163         if sd:
164             inputschema = sd
165         else:
166             return
167
168     if "secondaryFiles" in inputschema:
169         # set secondaryFiles, may be inherited by compound types.
170         secondaryspec = inputschema["secondaryFiles"]
171
172     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
173         not isinstance(inputschema["type"], basestring)):
174         # compound type (union, array, record)
175         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
176
177     elif (inputschema["type"] == "record" and
178           isinstance(primary, Mapping)):
179         #
180         # record type, find secondary files associated with fields.
181         #
182         for f in inputschema["fields"]:
183             p = primary.get(shortname(f["name"]))
184             if p:
185                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
186
187     elif (inputschema["type"] == "array" and
188           isinstance(primary, Sequence)):
189         #
190         # array type, find secondary files of elements
191         #
192         for p in primary:
193             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
194
195     elif (inputschema["type"] == "File" and
196           isinstance(primary, Mapping) and
197           primary.get("class") == "File"):
198
199         if "secondaryFiles" in primary or not secondaryspec:
200             # Nothing to do.
201             return
202
203         #
204         # Found a file, check for secondaryFiles
205         #
206         specs = []
207         primary["secondaryFiles"] = secondaryspec
208         for i, sf in enumerate(aslist(secondaryspec)):
209             if builder.cwlVersion == "v1.0":
210                 pattern = sf
211             else:
212                 pattern = sf["pattern"]
213             if pattern is None:
214                 continue
215             if isinstance(pattern, list):
216                 specs.extend(pattern)
217             elif isinstance(pattern, dict):
218                 specs.append(pattern)
219             elif isinstance(pattern, str):
220                 if builder.cwlVersion == "v1.0":
221                     specs.append({"pattern": pattern, "required": True})
222                 else:
223                     specs.append({"pattern": pattern, "required": sf.get("required")})
224             else:
225                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
226                     "Expression must return list, object, string or null")
227
228         found = []
229         for i, sf in enumerate(specs):
230             if isinstance(sf, dict):
231                 if sf.get("class") == "File":
232                     pattern = None
233                     if sf.get("location") is None:
234                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
235                             "File object is missing 'location': %s" % sf)
236                     sfpath = sf["location"]
237                     required = True
238                 else:
239                     pattern = sf["pattern"]
240                     required = sf.get("required")
241             elif isinstance(sf, str):
242                 pattern = sf
243                 required = True
244             else:
245                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
246                     "Expression must return list, object, string or null")
247
248             if pattern is not None:
249                 if "${" in pattern or "$(" in pattern:
250                     sfname = builder.do_eval(pattern, context=primary)
251                 else:
252                     sfname = substitute(primary["basename"], pattern)
253
254                 if sfname is None:
255                     continue
256
257                 if isinstance(sfname, str):
258                     p_location = primary["location"]
259                     if "/" in p_location:
260                         sfpath = (
261                             p_location[0 : p_location.rindex("/") + 1]
262                             + sfname
263                         )
264
265             required = builder.do_eval(required, context=primary)
266
267             if isinstance(sfname, list) or isinstance(sfname, dict):
268                 each = aslist(sfname)
269                 for e in each:
270                     if required and not fsaccess.exists(e.get("location")):
271                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
272                             "Required secondary file '%s' does not exist" % e.get("location"))
273                 found.extend(each)
274
275             if isinstance(sfname, str):
276                 if fsaccess.exists(sfpath):
277                     if pattern is not None:
278                         found.append({"location": sfpath, "class": "File"})
279                     else:
280                         found.append(sf)
281                 elif required:
282                     raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
283                         "Required secondary file '%s' does not exist" % sfpath)
284
285         primary["secondaryFiles"] = cmap(found)
286         if discovered is not None:
287             discovered[primary["location"]] = primary["secondaryFiles"]
288     elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
289         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
290
291 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
292     for inputschema in inputs:
293         primary = job_order.get(shortname(inputschema["id"]))
294         if isinstance(primary, (Mapping, Sequence)):
295             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
296
297 def upload_dependencies(arvrunner, name, document_loader,
298                         workflowobj, uri, runtimeContext,
299                         include_primary=True, discovered_secondaryfiles=None,
300                         cache=None):
301     """Upload the dependencies of the workflowobj document to Keep.
302
303     Returns a pathmapper object mapping local paths to keep references.  Also
304     does an in-place update of references in "workflowobj".
305
306     Use scandeps to find $schemas, File and Directory
307     fields that represent external references.
308
309     If workflowobj has an "id" field, this will reload the document to ensure
310     it is scanning the raw document prior to preprocessing.
311     """
312
313     scanobj = workflowobj
314     metadata = scanobj
315
316     with Perf(metrics, "scandeps"):
317         sc_result = scandeps(uri, scanobj,
318                              set(),
319                              set(("location",)),
320                              None, urljoin=document_loader.fetcher.urljoin,
321                              nestdirs=False)
322         optional_deps = scandeps(uri, scanobj,
323                              set(),
324                              set(("$schemas",)),
325                              None, urljoin=document_loader.fetcher.urljoin,
326                              nestdirs=False)
327
328     if sc_result is None:
329         sc_result = []
330
331     if optional_deps is None:
332         optional_deps = []
333
334     if optional_deps:
335         sc_result.extend(optional_deps)
336
337     sc = []
338     uuids = {}
339
340     def collect_uuids(obj):
341         loc = obj.get("location", "")
342         sp = loc.split(":")
343         if sp[0] == "keep":
344             # Collect collection uuids that need to be resolved to
345             # portable data hashes
346             gp = collection_uuid_pattern.match(loc)
347             if gp:
348                 uuids[gp.groups()[0]] = obj
349             if collectionUUID in obj:
350                 uuids[obj[collectionUUID]] = obj
351
352     def collect_uploads(obj):
353         loc = obj.get("location", "")
354         sp = loc.split(":")
355         if len(sp) < 1:
356             return
357         if sp[0] in ("file", "http", "https"):
358             # Record local files than need to be uploaded,
359             # don't include file literals, keep references, etc.
360             sc.append(obj)
361         collect_uuids(obj)
362
363     with Perf(metrics, "collect uuids"):
364         visit_class(workflowobj, ("File", "Directory"), collect_uuids)
365
366     with Perf(metrics, "collect uploads"):
367         visit_class(sc_result, ("File", "Directory"), collect_uploads)
368
369     # Resolve any collection uuids we found to portable data hashes
370     # and assign them to uuid_map
371     uuid_map = {}
372     fetch_uuids = list(uuids.keys())
373     with Perf(metrics, "fetch_uuids"):
374         while fetch_uuids:
375             # For a large number of fetch_uuids, API server may limit
376             # response size, so keep fetching from API server has nothing
377             # more to give us.
378             lookups = arvrunner.api.collections().list(
379                 filters=[["uuid", "in", fetch_uuids]],
380                 count="none",
381                 select=["uuid", "portable_data_hash"]).execute(
382                     num_retries=arvrunner.num_retries)
383
384             if not lookups["items"]:
385                 break
386
387             for l in lookups["items"]:
388                 uuid_map[l["uuid"]] = l["portable_data_hash"]
389
390             fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
391
392     normalizeFilesDirs(sc)
393
394     if "id" in workflowobj:
395         defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
396         if include_primary:
397             # make sure it's included
398             sc.append({"class": "File", "location": defrg})
399         else:
400             # make sure it's excluded
401             sc = [d for d in sc if d.get("location") != defrg]
402
403     def visit_default(obj):
404         def defaults_are_optional(f):
405             if "location" not in f and "path" in f:
406                 f["location"] = f["path"]
407                 del f["path"]
408             normalizeFilesDirs(f)
409             optional_deps.append(f)
410         visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
411
412     find_defaults(workflowobj, visit_default)
413
414     discovered = {}
415     def discover_default_secondary_files(obj):
416         builder_job_order = {}
417         for t in obj["inputs"]:
418             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
419         # Need to create a builder object to evaluate expressions.
420         builder = make_builder(builder_job_order,
421                                obj.get("hints", []),
422                                obj.get("requirements", []),
423                                ArvRuntimeContext(),
424                                metadata)
425         discover_secondary_files(arvrunner.fs_access,
426                                  builder,
427                                  obj["inputs"],
428                                  builder_job_order,
429                                  discovered)
430
431     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
432     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
433
434     for d in list(discovered):
435         # Only interested in discovered secondaryFiles which are local
436         # files that need to be uploaded.
437         if d.startswith("file:"):
438             sc.extend(discovered[d])
439         else:
440             del discovered[d]
441
442     with Perf(metrics, "mapper"):
443         mapper = ArvPathMapper(arvrunner, sc, "",
444                                "keep:%s",
445                                "keep:%s/%s",
446                                name=name,
447                                single_collection=True,
448                                optional_deps=optional_deps)
449
450     for k, v in uuid_map.items():
451         mapper._pathmap["keep:"+k] = MapperEnt(v, "", "", False)
452
453     keeprefs = set()
454     def addkeepref(k):
455         if k.startswith("keep:"):
456             keeprefs.add(collection_pdh_pattern.match(k).group(1))
457
458
459     def collectloc(p):
460         loc = p.get("location")
461         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
462             addkeepref(p["location"])
463             return
464
465         if not loc:
466             return
467
468         if collectionUUID in p:
469             uuid = p[collectionUUID]
470             if uuid not in uuid_map:
471                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
472                     "Collection uuid %s not found" % uuid)
473             gp = collection_pdh_pattern.match(loc)
474             if gp and uuid_map[uuid] != gp.groups()[0]:
475                 # This file entry has both collectionUUID and a PDH
476                 # location. If the PDH doesn't match the one returned
477                 # the API server, raise an error.
478                 raise SourceLine(p, "location", validate.ValidationException).makeError(
479                     "Expected collection uuid %s to be %s but API server reported %s" % (
480                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
481
482         gp = collection_uuid_pattern.match(loc)
483         if not gp:
484             # Not a uuid pattern (must be a pdh pattern)
485             addkeepref(p["location"])
486             return
487
488         uuid = gp.groups()[0]
489         if uuid not in uuid_map:
490             raise SourceLine(p, "location", validate.ValidationException).makeError(
491                 "Collection uuid %s not found" % uuid)
492
493     with Perf(metrics, "collectloc"):
494         visit_class(workflowobj, ("File", "Directory"), collectloc)
495         visit_class(discovered, ("File", "Directory"), collectloc)
496
497     if discovered_secondaryfiles is not None:
498         for d in discovered:
499             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
500
501     if runtimeContext.copy_deps:
502         # Find referenced collections and copy them into the
503         # destination project, for easy sharing.
504         already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
505                                      filters=[["portable_data_hash", "in", list(keeprefs)],
506                                               ["owner_uuid", "=", runtimeContext.project_uuid]],
507                                      select=["uuid", "portable_data_hash", "created_at"]))
508
509         keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
510         for kr in keeprefs:
511             col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
512                                                   order="created_at desc",
513                                                    select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
514                                                    limit=1).execute()
515             if len(col["items"]) == 0:
516                 logger.warning("Cannot find collection with portable data hash %s", kr)
517                 continue
518             col = col["items"][0]
519             col["name"] = arvados.util.trim_name(col["name"])
520             try:
521                 arvrunner.api.collections().create(body={"collection": {
522                     "owner_uuid": runtimeContext.project_uuid,
523                     "name": col["name"],
524                     "description": col["description"],
525                     "properties": col["properties"],
526                     "portable_data_hash": col["portable_data_hash"],
527                     "manifest_text": col["manifest_text"],
528                     "storage_classes_desired": col["storage_classes_desired"],
529                     "trash_at": col["trash_at"]
530                 }}, ensure_unique_name=True).execute()
531             except Exception as e:
532                 logger.warning("Unable to copy collection to destination: %s", e)
533
534     if "$schemas" in workflowobj:
535         sch = CommentedSeq()
536         for s in workflowobj["$schemas"]:
537             if s in mapper:
538                 sch.append(mapper.mapper(s).resolved)
539         workflowobj["$schemas"] = sch
540
541     return mapper
542
543
544 def upload_docker(arvrunner, tool, runtimeContext):
545     """Uploads Docker images used in CommandLineTool objects."""
546
547     if isinstance(tool, CommandLineTool):
548         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
549         if docker_req:
550             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
551                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
552                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
553
554             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, runtimeContext)
555         else:
556             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
557                                                        True, runtimeContext)
558     elif isinstance(tool, cwltool.workflow.Workflow):
559         for s in tool.steps:
560             upload_docker(arvrunner, s.embedded_tool, runtimeContext)
561
562
563 def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info):
564     """Create a packed workflow.
565
566     A "packed" workflow is one where all the components have been combined into a single document."""
567
568     rewrites = {}
569     packed = pack(arvrunner.loadingContext, tool.tool["id"],
570                   rewrite_out=rewrites,
571                   loader=tool.doc_loader)
572
573     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
574
575     def visit(v, cur_id):
576         if isinstance(v, dict):
577             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
578                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
579                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
580                 if "id" in v:
581                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
582             if "path" in v and "location" not in v:
583                 v["location"] = v["path"]
584                 del v["path"]
585             if "location" in v and cur_id in merged_map:
586                 if v["location"] in merged_map[cur_id].resolved:
587                     v["location"] = merged_map[cur_id].resolved[v["location"]]
588                 if v["location"] in merged_map[cur_id].secondaryFiles:
589                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
590             if v.get("class") == "DockerRequirement":
591                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
592                                                                                                              runtimeContext)
593             for l in v:
594                 visit(v[l], cur_id)
595         if isinstance(v, list):
596             for l in v:
597                 visit(l, cur_id)
598     visit(packed, None)
599
600     if git_info:
601         for g in git_info:
602             packed[g] = git_info[g]
603
604     return packed
605
606
607 def tag_git_version(packed):
608     if tool.tool["id"].startswith("file://"):
609         path = os.path.dirname(tool.tool["id"][7:])
610         try:
611             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
612         except (OSError, subprocess.CalledProcessError):
613             pass
614         else:
615             packed["http://schema.org/version"] = githash
616
617 def setloc(mapper, p):
618     loc = p.get("location")
619     if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
620         p["location"] = mapper.mapper(p["location"]).resolved
621         return
622
623     if not loc:
624         return
625
626     if collectionUUID in p:
627         uuid = p[collectionUUID]
628         keepuuid = "keep:"+uuid
629         if keepuuid not in mapper:
630             raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
631                 "Collection uuid %s not found" % uuid)
632         gp = collection_pdh_pattern.match(loc)
633         if gp and mapper.mapper(keepuuid).resolved != gp.groups()[0]:
634             # This file entry has both collectionUUID and a PDH
635             # location. If the PDH doesn't match the one returned
636             # the API server, raise an error.
637             raise SourceLine(p, "location", validate.ValidationException).makeError(
638                 "Expected collection uuid %s to be %s but API server reported %s" % (
639                     uuid, gp.groups()[0], mapper.mapper(keepuuid).resolved))
640
641     gp = collection_uuid_pattern.match(loc)
642     if not gp:
643         # Not a uuid pattern (must be a pdh pattern)
644         return
645
646     uuid = gp.groups()[0]
647     keepuuid = "keep:"+uuid
648     if keepuuid not in mapper:
649         raise SourceLine(p, "location", validate.ValidationException).makeError(
650             "Collection uuid %s not found" % uuid)
651     p["location"] = "keep:%s%s" % (mapper.mapper(keepuuid).resolved, gp.groups()[1] if gp.groups()[1] else "")
652     p[collectionUUID] = uuid
653
654 def update_from_mapper(workflowobj, mapper):
655     with Perf(metrics, "setloc"):
656         visit_class(workflowobj, ("File", "Directory"), partial(setloc, mapper))
657
658 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
659     """Upload local files referenced in the input object and return updated input
660     object with 'location' updated to the proper keep references.
661     """
662
663     # Make a copy of the job order and set defaults.
664     builder_job_order = copy.copy(job_order)
665
666     # fill_in_defaults throws an error if there are any
667     # missing required parameters, we don't want it to do that
668     # so make them all optional.
669     inputs_copy = copy.deepcopy(tool.tool["inputs"])
670     for i in inputs_copy:
671         if "null" not in i["type"]:
672             i["type"] = ["null"] + aslist(i["type"])
673
674     fill_in_defaults(inputs_copy,
675                      builder_job_order,
676                      arvrunner.fs_access)
677     # Need to create a builder object to evaluate expressions.
678     builder = make_builder(builder_job_order,
679                            tool.hints,
680                            tool.requirements,
681                            ArvRuntimeContext(),
682                            tool.metadata)
683     # Now update job_order with secondaryFiles
684     discover_secondary_files(arvrunner.fs_access,
685                              builder,
686                              tool.tool["inputs"],
687                              job_order)
688
689     _jobloaderctx = jobloaderctx.copy()
690     jobloader = schema_salad.ref_resolver.Loader(_jobloaderctx, fetcher_constructor=tool.doc_loader.fetcher_constructor)
691
692     jobmapper = upload_dependencies(arvrunner,
693                                     name,
694                                     jobloader,
695                                     job_order,
696                                     job_order.get("id", "#"),
697                                     runtimeContext)
698
699     if "id" in job_order:
700         del job_order["id"]
701
702     # Need to filter this out, gets added by cwltool when providing
703     # parameters on the command line.
704     if "job_order" in job_order:
705         del job_order["job_order"]
706
707     update_from_mapper(job_order, jobmapper)
708
709     #print(json.dumps(job_order, indent=2))
710
711     return job_order
712
713 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
714
715 def upload_workflow_deps(arvrunner, tool, runtimeContext):
716     # Ensure that Docker images needed by this workflow are available
717
718     with Perf(metrics, "upload_docker"):
719         upload_docker(arvrunner, tool, runtimeContext)
720
721     document_loader = tool.doc_loader
722
723     merged_map = {}
724     tool_dep_cache = {}
725
726     todo = []
727
728     # Standard traversal is top down, we want to go bottom up, so use
729     # the visitor to accumalate a list of nodes to visit, then
730     # visit them in reverse order.
731     def upload_tool_deps(deptool):
732         if "id" in deptool:
733             todo.append(deptool)
734
735     tool.visit(upload_tool_deps)
736
737     for deptool in reversed(todo):
738         discovered_secondaryfiles = {}
739         with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
740             pm = upload_dependencies(arvrunner,
741                                      "%s dependencies" % (shortname(deptool["id"])),
742                                      document_loader,
743                                      deptool,
744                                      deptool["id"],
745                                      runtimeContext,
746                                      include_primary=False,
747                                      discovered_secondaryfiles=discovered_secondaryfiles,
748                                      cache=tool_dep_cache)
749
750         document_loader.idx[deptool["id"]] = deptool
751         toolmap = {}
752         for k,v in pm.items():
753             toolmap[k] = v.resolved
754         merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
755
756     return merged_map
757
758 def arvados_jobs_image(arvrunner, img, runtimeContext):
759     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
760
761     try:
762         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
763                                                           True, runtimeContext)
764     except Exception as e:
765         raise Exception("Docker image %s is not available\n%s" % (img, e) )
766
767
768 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
769     collection = arvados.collection.Collection(api_client=arvrunner.api,
770                                                keep_client=arvrunner.keep_client,
771                                                num_retries=arvrunner.num_retries)
772     with collection.open("workflow.cwl", "w") as f:
773         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
774
775     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
776                ["name", "like", name+"%"]]
777     if runtimeContext.project_uuid:
778         filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
779     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
780
781     if exists["items"]:
782         logger.info("Using collection %s", exists["items"][0]["uuid"])
783     else:
784         collection.save_new(name=name,
785                             owner_uuid=runtimeContext.project_uuid,
786                             ensure_unique_name=True,
787                             num_retries=arvrunner.num_retries)
788         logger.info("Uploaded to %s", collection.manifest_locator())
789
790     return collection.portable_data_hash()
791
792
793 class Runner(Process):
794     """Base class for runner processes, which submit an instance of
795     arvados-cwl-runner and wait for the final result."""
796
797     def __init__(self, runner, updated_tool,
798                  tool, loadingContext, enable_reuse,
799                  output_name, output_tags, submit_runner_ram=0,
800                  name=None, on_error=None, submit_runner_image=None,
801                  intermediate_output_ttl=0, merged_map=None,
802                  priority=None, secret_store=None,
803                  collection_cache_size=256,
804                  collection_cache_is_default=True,
805                  git_info=None):
806
807         self.loadingContext = loadingContext.copy()
808         self.loadingContext.metadata = updated_tool.metadata.copy()
809
810         super(Runner, self).__init__(updated_tool.tool, loadingContext)
811
812         self.arvrunner = runner
813         self.embedded_tool = tool
814         self.job_order = None
815         self.running = False
816         if enable_reuse:
817             # If reuse is permitted by command line arguments but
818             # disabled by the workflow itself, disable it.
819             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
820             if reuse_req:
821                 enable_reuse = reuse_req["enableReuse"]
822             reuse_req, _ = self.embedded_tool.get_requirement("WorkReuse")
823             if reuse_req:
824                 enable_reuse = reuse_req["enableReuse"]
825         self.enable_reuse = enable_reuse
826         self.uuid = None
827         self.final_output = None
828         self.output_name = output_name
829         self.output_tags = output_tags
830         self.name = name
831         self.on_error = on_error
832         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
833         self.intermediate_output_ttl = intermediate_output_ttl
834         self.priority = priority
835         self.secret_store = secret_store
836         self.enable_dev = self.loadingContext.enable_dev
837         self.git_info = git_info
838         self.fast_parser = self.loadingContext.fast_parser
839
840         self.submit_runner_cores = 1
841         self.submit_runner_ram = 1024  # defaut 1 GiB
842         self.collection_cache_size = collection_cache_size
843
844         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
845         if runner_resource_req:
846             if runner_resource_req.get("coresMin"):
847                 self.submit_runner_cores = runner_resource_req["coresMin"]
848             if runner_resource_req.get("ramMin"):
849                 self.submit_runner_ram = runner_resource_req["ramMin"]
850             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
851                 self.collection_cache_size = runner_resource_req["keep_cache"]
852
853         if submit_runner_ram:
854             # Command line / initializer overrides default and/or spec from workflow
855             self.submit_runner_ram = submit_runner_ram
856
857         if self.submit_runner_ram <= 0:
858             raise Exception("Value of submit-runner-ram must be greater than zero")
859
860         if self.submit_runner_cores <= 0:
861             raise Exception("Value of submit-runner-cores must be greater than zero")
862
863         self.merged_map = merged_map or {}
864
865     def job(self,
866             job_order,         # type: Mapping[Text, Text]
867             output_callbacks,  # type: Callable[[Any, Any], Any]
868             runtimeContext     # type: RuntimeContext
869            ):  # type: (...) -> Generator[Any, None, None]
870         self.job_order = job_order
871         self._init_job(job_order, runtimeContext)
872         yield self
873
874     def update_pipeline_component(self, record):
875         pass
876
877     def done(self, record):
878         """Base method for handling a completed runner."""
879
880         try:
881             if record["state"] == "Complete":
882                 if record.get("exit_code") is not None:
883                     if record["exit_code"] == 33:
884                         processStatus = "UnsupportedRequirement"
885                     elif record["exit_code"] == 0:
886                         processStatus = "success"
887                     else:
888                         processStatus = "permanentFail"
889                 else:
890                     processStatus = "success"
891             else:
892                 processStatus = "permanentFail"
893
894             outputs = {}
895
896             if processStatus == "permanentFail":
897                 logc = arvados.collection.CollectionReader(record["log"],
898                                                            api_client=self.arvrunner.api,
899                                                            keep_client=self.arvrunner.keep_client,
900                                                            num_retries=self.arvrunner.num_retries)
901                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
902
903             self.final_output = record["output"]
904             outc = arvados.collection.CollectionReader(self.final_output,
905                                                        api_client=self.arvrunner.api,
906                                                        keep_client=self.arvrunner.keep_client,
907                                                        num_retries=self.arvrunner.num_retries)
908             if "cwl.output.json" in outc:
909                 with outc.open("cwl.output.json", "rb") as f:
910                     if f.size() > 0:
911                         outputs = json.loads(f.read().decode())
912             def keepify(fileobj):
913                 path = fileobj["location"]
914                 if not path.startswith("keep:"):
915                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
916             adjustFileObjs(outputs, keepify)
917             adjustDirObjs(outputs, keepify)
918         except Exception:
919             logger.exception("[%s] While getting final output object", self.name)
920             self.arvrunner.output_callback({}, "permanentFail")
921         else:
922             self.arvrunner.output_callback(outputs, processStatus)