19280: what are we scanning here actually
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import (
21     Any,
22     Callable,
23     Dict,
24     Iterable,
25     Iterator,
26     List,
27     Mapping,
28     MutableMapping,
29     Sequence,
30     MutableSequence,
31     Optional,
32     Set,
33     Sized,
34     Tuple,
35     Type,
36     Union,
37     cast,
38 )
39 from cwltool.utils import (
40     CWLObjectType,
41     CWLOutputAtomType,
42     CWLOutputType,
43 )
44
45 if os.name == "posix" and sys.version_info[0] < 3:
46     import subprocess32 as subprocess
47 else:
48     import subprocess
49
50 from schema_salad.sourceline import SourceLine, cmap
51
52 from cwltool.command_line_tool import CommandLineTool
53 import cwltool.workflow
54 from cwltool.process import (UnsupportedRequirement, normalizeFilesDirs,
55                              shortname, Process, fill_in_defaults)
56 from cwltool.load_tool import fetch_document
57 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
58 from cwltool.builder import substitute
59 from cwltool.pack import pack
60 from cwltool.update import INTERNAL_VERSION
61 from cwltool.builder import Builder
62 import schema_salad.validate as validate
63
64 import arvados.collection
65 import arvados.util
66 from .util import collectionUUID
67 from ruamel.yaml import YAML
68 from ruamel.yaml.comments import CommentedMap, CommentedSeq
69
70 import arvados_cwl.arvdocker
71 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
72 from ._version import __version__
73 from . import done
74 from . context import ArvRuntimeContext
75 from .perf import Perf
76
77 logger = logging.getLogger('arvados.cwl-runner')
78 metrics = logging.getLogger('arvados.cwl-runner.metrics')
79
80 def trim_anonymous_location(obj):
81     """Remove 'location' field from File and Directory literals.
82
83     To make internal handling easier, literals are assigned a random id for
84     'location'.  However, when writing the record back out, this can break
85     reproducibility.  Since it is valid for literals not have a 'location'
86     field, remove it.
87
88     """
89
90     if obj.get("location", "").startswith("_:"):
91         del obj["location"]
92
93
94 def remove_redundant_fields(obj):
95     for field in ("path", "nameext", "nameroot", "dirname"):
96         if field in obj:
97             del obj[field]
98
99
100 def find_defaults(d, op):
101     if isinstance(d, list):
102         for i in d:
103             find_defaults(i, op)
104     elif isinstance(d, dict):
105         if "default" in d:
106             op(d)
107         else:
108             for i in viewvalues(d):
109                 find_defaults(i, op)
110
111 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
112     return Builder(
113                  job=joborder,
114                  files=[],               # type: List[Dict[Text, Text]]
115                  bindings=[],            # type: List[Dict[Text, Any]]
116                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
117                  names=None,               # type: Names
118                  requirements=requirements,        # type: List[Dict[Text, Any]]
119                  hints=hints,               # type: List[Dict[Text, Any]]
120                  resources={},           # type: Dict[str, int]
121                  mutation_manager=None,    # type: Optional[MutationManager]
122                  formatgraph=None,         # type: Optional[Graph]
123                  make_fs_access=None,      # type: Type[StdFsAccess]
124                  fs_access=None,           # type: StdFsAccess
125                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
126                  timeout=runtimeContext.eval_timeout,             # type: float
127                  debug=runtimeContext.debug,               # type: bool
128                  js_console=runtimeContext.js_console,          # type: bool
129                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
130                  loadListing="",         # type: Text
131                  outdir="",              # type: Text
132                  tmpdir="",              # type: Text
133                  stagedir="",            # type: Text
134                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
135                  container_engine="docker"
136                 )
137
138 def search_schemadef(name, reqs):
139     for r in reqs:
140         if r["class"] == "SchemaDefRequirement":
141             for sd in r["types"]:
142                 if sd["name"] == name:
143                     return sd
144     return None
145
146 primitive_types_set = frozenset(("null", "boolean", "int", "long",
147                                  "float", "double", "string", "record",
148                                  "array", "enum"))
149
150 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
151     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
152         # union type, collect all possible secondaryFiles
153         for i in inputschema:
154             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
155         return
156
157     if inputschema == "File":
158         inputschema = {"type": "File"}
159
160     if isinstance(inputschema, basestring):
161         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
162         if sd:
163             inputschema = sd
164         else:
165             return
166
167     if "secondaryFiles" in inputschema:
168         # set secondaryFiles, may be inherited by compound types.
169         secondaryspec = inputschema["secondaryFiles"]
170
171     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
172         not isinstance(inputschema["type"], basestring)):
173         # compound type (union, array, record)
174         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
175
176     elif (inputschema["type"] == "record" and
177           isinstance(primary, Mapping)):
178         #
179         # record type, find secondary files associated with fields.
180         #
181         for f in inputschema["fields"]:
182             p = primary.get(shortname(f["name"]))
183             if p:
184                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
185
186     elif (inputschema["type"] == "array" and
187           isinstance(primary, Sequence)):
188         #
189         # array type, find secondary files of elements
190         #
191         for p in primary:
192             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
193
194     elif (inputschema["type"] == "File" and
195           isinstance(primary, Mapping) and
196           primary.get("class") == "File"):
197
198         if "secondaryFiles" in primary or not secondaryspec:
199             # Nothing to do.
200             return
201
202         #
203         # Found a file, check for secondaryFiles
204         #
205         specs = []
206         primary["secondaryFiles"] = secondaryspec
207         for i, sf in enumerate(aslist(secondaryspec)):
208             if builder.cwlVersion == "v1.0":
209                 pattern = sf
210             else:
211                 pattern = sf["pattern"]
212             if pattern is None:
213                 continue
214             if isinstance(pattern, list):
215                 specs.extend(pattern)
216             elif isinstance(pattern, dict):
217                 specs.append(pattern)
218             elif isinstance(pattern, str):
219                 if builder.cwlVersion == "v1.0":
220                     specs.append({"pattern": pattern, "required": True})
221                 else:
222                     specs.append({"pattern": pattern, "required": sf.get("required")})
223             else:
224                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
225                     "Expression must return list, object, string or null")
226
227         found = []
228         for i, sf in enumerate(specs):
229             if isinstance(sf, dict):
230                 if sf.get("class") == "File":
231                     pattern = None
232                     if sf.get("location") is None:
233                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
234                             "File object is missing 'location': %s" % sf)
235                     sfpath = sf["location"]
236                     required = True
237                 else:
238                     pattern = sf["pattern"]
239                     required = sf.get("required")
240             elif isinstance(sf, str):
241                 pattern = sf
242                 required = True
243             else:
244                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
245                     "Expression must return list, object, string or null")
246
247             if pattern is not None:
248                 if "${" in pattern or "$(" in pattern:
249                     sfname = builder.do_eval(pattern, context=primary)
250                 else:
251                     sfname = substitute(primary["basename"], pattern)
252
253                 if sfname is None:
254                     continue
255
256                 p_location = primary["location"]
257                 if "/" in p_location:
258                     sfpath = (
259                         p_location[0 : p_location.rindex("/") + 1]
260                         + sfname
261                     )
262
263             required = builder.do_eval(required, context=primary)
264
265             if fsaccess.exists(sfpath):
266                 if pattern is not None:
267                     found.append({"location": sfpath, "class": "File"})
268                 else:
269                     found.append(sf)
270             elif required:
271                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
272                     "Required secondary file '%s' does not exist" % sfpath)
273
274         primary["secondaryFiles"] = cmap(found)
275         if discovered is not None:
276             discovered[primary["location"]] = primary["secondaryFiles"]
277     elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
278         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
279
280 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
281     for inputschema in inputs:
282         primary = job_order.get(shortname(inputschema["id"]))
283         if isinstance(primary, (Mapping, Sequence)):
284             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
285
286 def upload_dependencies(arvrunner, name, document_loader,
287                         workflowobj, uri, loadref_run, runtimeContext,
288                         include_primary=True, discovered_secondaryfiles=None,
289                         cache=None):
290     """Upload the dependencies of the workflowobj document to Keep.
291
292     Returns a pathmapper object mapping local paths to keep references.  Also
293     does an in-place update of references in "workflowobj".
294
295     Use scandeps to find $import, $include, $schemas, run, File and Directory
296     fields that represent external references.
297
298     If workflowobj has an "id" field, this will reload the document to ensure
299     it is scanning the raw document prior to preprocessing.
300     """
301
302     loaded = set()
303     def loadref(b, u):
304         joined = document_loader.fetcher.urljoin(b, u)
305         defrg, _ = urllib.parse.urldefrag(joined)
306         if defrg not in loaded:
307             loaded.add(defrg)
308             if cache is not None and defrg in cache:
309                 return cache[defrg]
310             # Use fetch_text to get raw file (before preprocessing).
311             text = document_loader.fetch_text(defrg)
312             if isinstance(text, bytes):
313                 textIO = StringIO(text.decode('utf-8'))
314             else:
315                 textIO = StringIO(text)
316             yamlloader = YAML(typ='safe', pure=True)
317             result = yamlloader.load(textIO)
318             if cache is not None:
319                 cache[defrg] = result
320             return result
321         else:
322             return {}
323
324     if loadref_run:
325         loadref_fields = set(("$import", "run"))
326     else:
327         loadref_fields = set(("$import",))
328
329     scanobj = workflowobj
330     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
331         # Need raw file content (before preprocessing) to ensure
332         # that external references in $include and $mixin are captured.
333         scanobj = loadref("", workflowobj["id"])
334
335     metadata = scanobj
336
337     with Perf(metrics, "scandeps include, location"):
338         sc_result = scandeps(uri, scanobj,
339                              loadref_fields,
340                              set(("$include", "location")),
341                              loadref, urljoin=document_loader.fetcher.urljoin,
342                              nestdirs=False)
343
344     with Perf(metrics, "scandeps $schemas"):
345         optional_deps = scandeps(uri, scanobj,
346                                       loadref_fields,
347                                       set(("$schemas",)),
348                                       loadref, urljoin=document_loader.fetcher.urljoin,
349                                       nestdirs=False)
350
351     if sc_result is None:
352         sc_result = []
353
354     if optional_deps is None:
355         optional_deps = []
356
357     if optional_deps:
358         sc_result.extend(optional_deps)
359
360     sc = []
361     uuids = {}
362
363     def collect_uuids(obj):
364         loc = obj.get("location", "")
365         sp = loc.split(":")
366         if sp[0] == "keep":
367             # Collect collection uuids that need to be resolved to
368             # portable data hashes
369             gp = collection_uuid_pattern.match(loc)
370             if gp:
371                 uuids[gp.groups()[0]] = obj
372             if collectionUUID in obj:
373                 uuids[obj[collectionUUID]] = obj
374
375     def collect_uploads(obj):
376         loc = obj.get("location", "")
377         sp = loc.split(":")
378         if len(sp) < 1:
379             return
380         if sp[0] in ("file", "http", "https"):
381             # Record local files than need to be uploaded,
382             # don't include file literals, keep references, etc.
383             sc.append(obj)
384         collect_uuids(obj)
385
386     with Perf(metrics, "collect uuids"):
387         visit_class(workflowobj, ("File", "Directory"), collect_uuids)
388
389     with Perf(metrics, "collect uploads"):
390         visit_class(sc_result, ("File", "Directory"), collect_uploads)
391
392     # Resolve any collection uuids we found to portable data hashes
393     # and assign them to uuid_map
394     uuid_map = {}
395     fetch_uuids = list(uuids.keys())
396     with Perf(metrics, "fetch_uuids"):
397         while fetch_uuids:
398             # For a large number of fetch_uuids, API server may limit
399             # response size, so keep fetching from API server has nothing
400             # more to give us.
401             lookups = arvrunner.api.collections().list(
402                 filters=[["uuid", "in", fetch_uuids]],
403                 count="none",
404                 select=["uuid", "portable_data_hash"]).execute(
405                     num_retries=arvrunner.num_retries)
406
407             if not lookups["items"]:
408                 break
409
410             for l in lookups["items"]:
411                 uuid_map[l["uuid"]] = l["portable_data_hash"]
412
413             fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
414
415     normalizeFilesDirs(sc)
416
417     if include_primary and "id" in workflowobj:
418         sc.append({"class": "File", "location": workflowobj["id"]})
419
420     def visit_default(obj):
421         def defaults_are_optional(f):
422             if "location" not in f and "path" in f:
423                 f["location"] = f["path"]
424                 del f["path"]
425             normalizeFilesDirs(f)
426             optional_deps.append(f)
427         visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
428
429     find_defaults(workflowobj, visit_default)
430
431     discovered = {}
432     def discover_default_secondary_files(obj):
433         builder_job_order = {}
434         for t in obj["inputs"]:
435             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
436         # Need to create a builder object to evaluate expressions.
437         builder = make_builder(builder_job_order,
438                                obj.get("hints", []),
439                                obj.get("requirements", []),
440                                ArvRuntimeContext(),
441                                metadata)
442         discover_secondary_files(arvrunner.fs_access,
443                                  builder,
444                                  obj["inputs"],
445                                  builder_job_order,
446                                  discovered)
447
448     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
449     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
450
451     for d in list(discovered):
452         # Only interested in discovered secondaryFiles which are local
453         # files that need to be uploaded.
454         if d.startswith("file:"):
455             sc.extend(discovered[d])
456         else:
457             del discovered[d]
458
459     with Perf(metrics, "mapper"):
460         mapper = ArvPathMapper(arvrunner, sc, "",
461                                "keep:%s",
462                                "keep:%s/%s",
463                                name=name,
464                                single_collection=True,
465                                optional_deps=optional_deps)
466
467     keeprefs = set()
468     def addkeepref(k):
469         if k.startswith("keep:"):
470             keeprefs.add(collection_pdh_pattern.match(k).group(1))
471
472     def setloc(p):
473         loc = p.get("location")
474         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
475             p["location"] = mapper.mapper(p["location"]).resolved
476             addkeepref(p["location"])
477             return
478
479         if not loc:
480             return
481
482         if collectionUUID in p:
483             uuid = p[collectionUUID]
484             if uuid not in uuid_map:
485                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
486                     "Collection uuid %s not found" % uuid)
487             gp = collection_pdh_pattern.match(loc)
488             if gp and uuid_map[uuid] != gp.groups()[0]:
489                 # This file entry has both collectionUUID and a PDH
490                 # location. If the PDH doesn't match the one returned
491                 # the API server, raise an error.
492                 raise SourceLine(p, "location", validate.ValidationException).makeError(
493                     "Expected collection uuid %s to be %s but API server reported %s" % (
494                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
495
496         gp = collection_uuid_pattern.match(loc)
497         if not gp:
498             # Not a uuid pattern (must be a pdh pattern)
499             addkeepref(p["location"])
500             return
501
502         uuid = gp.groups()[0]
503         if uuid not in uuid_map:
504             raise SourceLine(p, "location", validate.ValidationException).makeError(
505                 "Collection uuid %s not found" % uuid)
506         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
507         p[collectionUUID] = uuid
508
509     with Perf(metrics, "setloc"):
510         visit_class(workflowobj, ("File", "Directory"), setloc)
511         visit_class(discovered, ("File", "Directory"), setloc)
512
513     if discovered_secondaryfiles is not None:
514         for d in discovered:
515             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
516
517     if runtimeContext.copy_deps:
518         # Find referenced collections and copy them into the
519         # destination project, for easy sharing.
520         already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
521                                      filters=[["portable_data_hash", "in", list(keeprefs)],
522                                               ["owner_uuid", "=", runtimeContext.project_uuid]],
523                                      select=["uuid", "portable_data_hash", "created_at"]))
524
525         keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
526         for kr in keeprefs:
527             col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
528                                                   order="created_at desc",
529                                                    select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
530                                                    limit=1).execute()
531             if len(col["items"]) == 0:
532                 logger.warning("Cannot find collection with portable data hash %s", kr)
533                 continue
534             col = col["items"][0]
535             try:
536                 arvrunner.api.collections().create(body={"collection": {
537                     "owner_uuid": runtimeContext.project_uuid,
538                     "name": col["name"],
539                     "description": col["description"],
540                     "properties": col["properties"],
541                     "portable_data_hash": col["portable_data_hash"],
542                     "manifest_text": col["manifest_text"],
543                     "storage_classes_desired": col["storage_classes_desired"],
544                     "trash_at": col["trash_at"]
545                 }}, ensure_unique_name=True).execute()
546             except Exception as e:
547                 logger.warning("Unable copy collection to destination: %s", e)
548
549     if "$schemas" in workflowobj:
550         sch = CommentedSeq()
551         for s in workflowobj["$schemas"]:
552             if s in mapper:
553                 sch.append(mapper.mapper(s).resolved)
554         workflowobj["$schemas"] = sch
555
556     return mapper
557
558
559 def upload_docker(arvrunner, tool, runtimeContext):
560     """Uploads Docker images used in CommandLineTool objects."""
561
562     if isinstance(tool, CommandLineTool):
563         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
564         if docker_req:
565             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
566                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
567                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
568
569             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
570                                                        runtimeContext.project_uuid,
571                                                        runtimeContext.force_docker_pull,
572                                                        runtimeContext.tmp_outdir_prefix,
573                                                        runtimeContext.match_local_docker,
574                                                        runtimeContext.copy_deps)
575         else:
576             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
577                                                        True,
578                                                        runtimeContext.project_uuid,
579                                                        runtimeContext.force_docker_pull,
580                                                        runtimeContext.tmp_outdir_prefix,
581                                                        runtimeContext.match_local_docker,
582                                                        runtimeContext.copy_deps)
583     elif isinstance(tool, cwltool.workflow.Workflow):
584         for s in tool.steps:
585             upload_docker(arvrunner, s.embedded_tool, runtimeContext)
586
587
588 def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
589     """Create a packed workflow.
590
591     A "packed" workflow is one where all the components have been combined into a single document."""
592
593     rewrites = {}
594     packed = pack(arvrunner.loadingContext, tool.tool["id"],
595                   rewrite_out=rewrites,
596                   loader=tool.doc_loader)
597
598     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
599
600     def visit(v, cur_id):
601         if isinstance(v, dict):
602             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
603                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
604                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
605                 if "id" in v:
606                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
607             if "path" in v and "location" not in v:
608                 v["location"] = v["path"]
609                 del v["path"]
610             if "location" in v and cur_id in merged_map:
611                 if v["location"] in merged_map[cur_id].resolved:
612                     v["location"] = merged_map[cur_id].resolved[v["location"]]
613                 if v["location"] in merged_map[cur_id].secondaryFiles:
614                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
615             if v.get("class") == "DockerRequirement":
616                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
617                                                                                                              runtimeContext.project_uuid,
618                                                                                                              runtimeContext.force_docker_pull,
619                                                                                                              runtimeContext.tmp_outdir_prefix,
620                                                                                                              runtimeContext.match_local_docker,
621                                                                                                              runtimeContext.copy_deps)
622             for l in v:
623                 visit(v[l], cur_id)
624         if isinstance(v, list):
625             for l in v:
626                 visit(l, cur_id)
627     visit(packed, None)
628     return packed
629
630
631 def tag_git_version(packed):
632     if tool.tool["id"].startswith("file://"):
633         path = os.path.dirname(tool.tool["id"][7:])
634         try:
635             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
636         except (OSError, subprocess.CalledProcessError):
637             pass
638         else:
639             packed["http://schema.org/version"] = githash
640
641
642 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
643     """Upload local files referenced in the input object and return updated input
644     object with 'location' updated to the proper keep references.
645     """
646
647     # Make a copy of the job order and set defaults.
648     builder_job_order = copy.copy(job_order)
649
650     # fill_in_defaults throws an error if there are any
651     # missing required parameters, we don't want it to do that
652     # so make them all optional.
653     inputs_copy = copy.deepcopy(tool.tool["inputs"])
654     for i in inputs_copy:
655         if "null" not in i["type"]:
656             i["type"] = ["null"] + aslist(i["type"])
657
658     fill_in_defaults(inputs_copy,
659                      builder_job_order,
660                      arvrunner.fs_access)
661     # Need to create a builder object to evaluate expressions.
662     builder = make_builder(builder_job_order,
663                            tool.hints,
664                            tool.requirements,
665                            ArvRuntimeContext(),
666                            tool.metadata)
667     # Now update job_order with secondaryFiles
668     discover_secondary_files(arvrunner.fs_access,
669                              builder,
670                              tool.tool["inputs"],
671                              job_order)
672
673     jobmapper = upload_dependencies(arvrunner,
674                                     name,
675                                     tool.doc_loader,
676                                     job_order,
677                                     job_order.get("id", "#"),
678                                     False,
679                                     runtimeContext)
680
681     if "id" in job_order:
682         del job_order["id"]
683
684     # Need to filter this out, gets added by cwltool when providing
685     # parameters on the command line.
686     if "job_order" in job_order:
687         del job_order["job_order"]
688
689     return job_order
690
691 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
692
693 def upload_workflow_deps(arvrunner, tool, runtimeContext):
694     # Ensure that Docker images needed by this workflow are available
695
696     with Perf(metrics, "upload_docker"):
697         upload_docker(arvrunner, tool, runtimeContext)
698
699     document_loader = tool.doc_loader
700
701     merged_map = {}
702     tool_dep_cache = {}
703     def upload_tool_deps(deptool):
704         if "id" in deptool:
705             discovered_secondaryfiles = {}
706             with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
707                 pm = upload_dependencies(arvrunner,
708                                          "%s dependencies" % (shortname(deptool["id"])),
709                                          document_loader,
710                                          deptool,
711                                          deptool["id"],
712                                          False,
713                                          runtimeContext,
714                                          include_primary=False,
715                                          discovered_secondaryfiles=discovered_secondaryfiles,
716                                          cache=tool_dep_cache)
717             document_loader.idx[deptool["id"]] = deptool
718             toolmap = {}
719             for k,v in pm.items():
720                 toolmap[k] = v.resolved
721             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
722
723     tool.visit(upload_tool_deps)
724
725     return merged_map
726
727 def arvados_jobs_image(arvrunner, img, runtimeContext):
728     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
729
730     try:
731         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
732                                                           True,
733                                                           runtimeContext.project_uuid,
734                                                           runtimeContext.force_docker_pull,
735                                                           runtimeContext.tmp_outdir_prefix,
736                                                           runtimeContext.match_local_docker,
737                                                           runtimeContext.copy_deps)
738     except Exception as e:
739         raise Exception("Docker image %s is not available\n%s" % (img, e) )
740
741
742 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
743     collection = arvados.collection.Collection(api_client=arvrunner.api,
744                                                keep_client=arvrunner.keep_client,
745                                                num_retries=arvrunner.num_retries)
746     with collection.open("workflow.cwl", "w") as f:
747         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
748
749     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
750                ["name", "like", name+"%"]]
751     if runtimeContext.project_uuid:
752         filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
753     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
754
755     if exists["items"]:
756         logger.info("Using collection %s", exists["items"][0]["uuid"])
757     else:
758         collection.save_new(name=name,
759                             owner_uuid=runtimeContext.project_uuid,
760                             ensure_unique_name=True,
761                             num_retries=arvrunner.num_retries)
762         logger.info("Uploaded to %s", collection.manifest_locator())
763
764     return collection.portable_data_hash()
765
766
767 class Runner(Process):
768     """Base class for runner processes, which submit an instance of
769     arvados-cwl-runner and wait for the final result."""
770
771     def __init__(self, runner, updated_tool,
772                  tool, loadingContext, enable_reuse,
773                  output_name, output_tags, submit_runner_ram=0,
774                  name=None, on_error=None, submit_runner_image=None,
775                  intermediate_output_ttl=0, merged_map=None,
776                  priority=None, secret_store=None,
777                  collection_cache_size=256,
778                  collection_cache_is_default=True):
779
780         loadingContext = loadingContext.copy()
781         loadingContext.metadata = updated_tool.metadata.copy()
782
783         super(Runner, self).__init__(updated_tool.tool, loadingContext)
784
785         self.arvrunner = runner
786         self.embedded_tool = tool
787         self.job_order = None
788         self.running = False
789         if enable_reuse:
790             # If reuse is permitted by command line arguments but
791             # disabled by the workflow itself, disable it.
792             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
793             if reuse_req:
794                 enable_reuse = reuse_req["enableReuse"]
795         self.enable_reuse = enable_reuse
796         self.uuid = None
797         self.final_output = None
798         self.output_name = output_name
799         self.output_tags = output_tags
800         self.name = name
801         self.on_error = on_error
802         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
803         self.intermediate_output_ttl = intermediate_output_ttl
804         self.priority = priority
805         self.secret_store = secret_store
806         self.enable_dev = loadingContext.enable_dev
807
808         self.submit_runner_cores = 1
809         self.submit_runner_ram = 1024  # defaut 1 GiB
810         self.collection_cache_size = collection_cache_size
811
812         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
813         if runner_resource_req:
814             if runner_resource_req.get("coresMin"):
815                 self.submit_runner_cores = runner_resource_req["coresMin"]
816             if runner_resource_req.get("ramMin"):
817                 self.submit_runner_ram = runner_resource_req["ramMin"]
818             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
819                 self.collection_cache_size = runner_resource_req["keep_cache"]
820
821         if submit_runner_ram:
822             # Command line / initializer overrides default and/or spec from workflow
823             self.submit_runner_ram = submit_runner_ram
824
825         if self.submit_runner_ram <= 0:
826             raise Exception("Value of submit-runner-ram must be greater than zero")
827
828         if self.submit_runner_cores <= 0:
829             raise Exception("Value of submit-runner-cores must be greater than zero")
830
831         self.merged_map = merged_map or {}
832
833     def job(self,
834             job_order,         # type: Mapping[Text, Text]
835             output_callbacks,  # type: Callable[[Any, Any], Any]
836             runtimeContext     # type: RuntimeContext
837            ):  # type: (...) -> Generator[Any, None, None]
838         self.job_order = job_order
839         self._init_job(job_order, runtimeContext)
840         yield self
841
842     def update_pipeline_component(self, record):
843         pass
844
845     def done(self, record):
846         """Base method for handling a completed runner."""
847
848         try:
849             if record["state"] == "Complete":
850                 if record.get("exit_code") is not None:
851                     if record["exit_code"] == 33:
852                         processStatus = "UnsupportedRequirement"
853                     elif record["exit_code"] == 0:
854                         processStatus = "success"
855                     else:
856                         processStatus = "permanentFail"
857                 else:
858                     processStatus = "success"
859             else:
860                 processStatus = "permanentFail"
861
862             outputs = {}
863
864             if processStatus == "permanentFail":
865                 logc = arvados.collection.CollectionReader(record["log"],
866                                                            api_client=self.arvrunner.api,
867                                                            keep_client=self.arvrunner.keep_client,
868                                                            num_retries=self.arvrunner.num_retries)
869                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
870
871             self.final_output = record["output"]
872             outc = arvados.collection.CollectionReader(self.final_output,
873                                                        api_client=self.arvrunner.api,
874                                                        keep_client=self.arvrunner.keep_client,
875                                                        num_retries=self.arvrunner.num_retries)
876             if "cwl.output.json" in outc:
877                 with outc.open("cwl.output.json", "rb") as f:
878                     if f.size() > 0:
879                         outputs = json.loads(f.read().decode())
880             def keepify(fileobj):
881                 path = fileobj["location"]
882                 if not path.startswith("keep:"):
883                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
884             adjustFileObjs(outputs, keepify)
885             adjustDirObjs(outputs, keepify)
886         except Exception:
887             logger.exception("[%s] While getting final output object", self.name)
888             self.arvrunner.output_callback({}, "permanentFail")
889         else:
890             self.arvrunner.output_callback(outputs, processStatus)
891
892
893
894
895 # --- from cwltool ---
896
897
898 CWL_IANA = "https://www.iana.org/assignments/media-types/application/cwl"
899
900
901 def scandeps(
902     base: str,
903     doc: Union[CWLObjectType, MutableSequence[CWLObjectType]],
904     reffields: Set[str],
905     urlfields: Set[str],
906     loadref: Callable[[str, str], Union[CommentedMap, CommentedSeq, str, None]],
907     urljoin: Callable[[str, str], str] = urllib.parse.urljoin,
908     nestdirs: bool = True,
909     do_normalize: bool = True,
910 ) -> Optional[MutableSequence[CWLObjectType]]:
911
912     """Given a CWL document or input object, search for dependencies
913     (references to external files) of 'doc' and return them as a list
914     of File or Directory objects.
915
916     The 'base' is the base URL for relative references.
917
918     Looks for objects with 'class: File' or 'class: Directory' and
919     adds them to the list of dependencies.
920
921     Anything in 'urlfields' is also added as a File dependency.
922
923     Anything in 'reffields' (such as workflow step 'run') will be
924     added as a dependency and also loaded (using the 'loadref'
925     function) and recursively scanned for dependencies.  Those
926     dependencies will be added as secondary files to the primary file.
927
928     If "nestdirs" is true, create intermediate directory objects when
929     a file is located in a subdirectory under the starting directory.
930     This is so that if the dependencies are materialized, they will
931     produce the same relative file system locations.
932
933     """
934
935     if do_normalize:
936         import pprint
937         pprint.pprint(doc)
938
939     r: Optional[MutableSequence[CWLObjectType]] = None
940     if isinstance(doc, MutableMapping):
941         if "id" in doc:
942             if cast(str, doc["id"]).startswith("file://"):
943                 df, _ = urllib.parse.urldefrag(cast(str, doc["id"]))
944                 if base != df:
945                     if r is None:
946                         r = []
947                     r.append({"class": "File", "location": df, "format": CWL_IANA})
948                     base = df
949
950         if doc.get("class") in ("File", "Directory") and "location" in urlfields:
951             with Perf(metrics, "File or Directory with location"):
952                 u = cast(Optional[str], doc.get("location", doc.get("path")))
953                 if u and not u.startswith("_:"):
954                     deps = {
955                         "class": doc["class"],
956                         "location": urljoin(base, u),
957                     }  # type: CWLObjectType
958                     if "basename" in doc:
959                         deps["basename"] = doc["basename"]
960                     if doc["class"] == "Directory" and "listing" in doc:
961                         deps["listing"] = doc["listing"]
962                     if doc["class"] == "File" and "secondaryFiles" in doc:
963                         sd = scandeps(
964                             base,
965                             cast(
966                                 Union[CWLObjectType, MutableSequence[CWLObjectType]],
967                                 doc["secondaryFiles"],
968                             ),
969                             reffields,
970                             urlfields,
971                             loadref,
972                             urljoin=urljoin,
973                             nestdirs=nestdirs,
974                             do_normalize=False,
975                         )
976                         if sd:
977                             deps["secondaryFiles"] = cast(
978                                 CWLOutputAtomType,
979                                 sd
980                             )
981                     if nestdirs:
982                         deps = nestdir(base, deps)
983                     if r is None:
984                         r = []
985                     r.append(deps)
986                 else:
987                     if doc["class"] == "Directory" and "listing" in doc:
988                         sd = scandeps(
989                                 base,
990                                 cast(MutableSequence[CWLObjectType], doc["listing"]),
991                                 reffields,
992                                 urlfields,
993                                 loadref,
994                                 urljoin=urljoin,
995                                 nestdirs=nestdirs,
996                                 do_normalize=False,
997                             )
998                         if sd:
999                             if r is None:
1000                                 r = []
1001                             r.extend(sd)
1002                     elif doc["class"] == "File" and "secondaryFiles" in doc:
1003                         sd = scandeps(
1004                                 base,
1005                                 cast(MutableSequence[CWLObjectType], doc["secondaryFiles"]),
1006                                 reffields,
1007                                 urlfields,
1008                                 loadref,
1009                                 urljoin=urljoin,
1010                                 nestdirs=nestdirs,
1011                                 do_normalize=False,
1012                             )
1013                         if sd:
1014                             if r is None:
1015                                 r = sd
1016                             else:
1017                                 r.extend(sd)
1018
1019         for k, v in doc.items():
1020             if k in reffields:
1021                 with Perf(metrics, "k in reffields"):
1022                     for u2 in aslist(v):
1023                         if isinstance(u2, MutableMapping):
1024                             sd = scandeps(
1025                                     base,
1026                                     u2,
1027                                     reffields,
1028                                     urlfields,
1029                                     loadref,
1030                                     urljoin=urljoin,
1031                                     nestdirs=nestdirs,
1032                                     do_normalize=False,
1033                                 )
1034                             if sd:
1035                                 if r is None:
1036                                     r = sd
1037                                 else:
1038                                     r.extend(sd)
1039                         else:
1040                             subid = urljoin(base, u2)
1041                             basedf, _ = urllib.parse.urldefrag(base)
1042                             subiddf, _ = urllib.parse.urldefrag(subid)
1043                             if basedf == subiddf:
1044                                 continue
1045                             sub = cast(
1046                                 Union[MutableSequence[CWLObjectType], CWLObjectType],
1047                                 loadref(base, u2),
1048                             )
1049                             deps2 = {
1050                                 "class": "File",
1051                                 "location": subid,
1052                                 "format": CWL_IANA,
1053                             }  # type: CWLObjectType
1054                             sf = scandeps(
1055                                 subid,
1056                                 sub,
1057                                 reffields,
1058                                 urlfields,
1059                                 loadref,
1060                                 urljoin=urljoin,
1061                                 nestdirs=nestdirs,
1062                                 do_normalize=False,
1063                             )
1064                             if sf:
1065                                 deps2["secondaryFiles"] = cast(
1066                                     MutableSequence[CWLOutputAtomType], mergedirs(sf)
1067                                 )
1068                             if nestdirs:
1069                                 deps2 = nestdir(base, deps2)
1070                             if r is None:
1071                                 r = []
1072                             r.append(deps2)
1073             elif k in urlfields and k != "location":
1074                 with Perf(metrics, "k in urlfields"):
1075                     for u3 in aslist(v):
1076                         deps = {"class": "File", "location": urljoin(base, u3)}
1077                         if nestdirs:
1078                             deps = nestdir(base, deps)
1079                         if r is None:
1080                             r = []
1081                         r.append(deps)
1082             elif doc.get("class") in ("File", "Directory") and k in (
1083                 "listing",
1084                 "secondaryFiles",
1085             ):
1086                 # should be handled earlier.
1087                 pass
1088             else:
1089                 with Perf(metrics, "k is something else"):
1090                     sd = scandeps(
1091                             base,
1092                             cast(Union[MutableSequence[CWLObjectType], CWLObjectType], v),
1093                             reffields,
1094                             urlfields,
1095                             loadref,
1096                             urljoin=urljoin,
1097                             nestdirs=nestdirs,
1098                             do_normalize=False,
1099                         )
1100                     if sd:
1101                         if r is None:
1102                             r = sd
1103                         else:
1104                             r.extend(sd)
1105     elif isinstance(doc, MutableSequence):
1106         with Perf(metrics, "d in doc"):
1107             for d in doc:
1108                 sd = scandeps(
1109                         base,
1110                         d,
1111                         reffields,
1112                         urlfields,
1113                         loadref,
1114                         urljoin=urljoin,
1115                         nestdirs=nestdirs,
1116                         do_normalize=False,
1117                     )
1118                 if r is None:
1119                     r = sd
1120                 else:
1121                     r.extend(sd)
1122
1123     if r and do_normalize:
1124         normalizeFilesDirs(r)
1125
1126     return r