Merge branch '21356-clean-imports'
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import os
6 import sys
7 import re
8 import urllib.parse
9 from functools import partial
10 import logging
11 import json
12 import copy
13 from collections import namedtuple
14 from io import StringIO
15 from typing import (
16     Any,
17     Callable,
18     Dict,
19     Iterable,
20     Iterator,
21     List,
22     Mapping,
23     MutableMapping,
24     Sequence,
25     MutableSequence,
26     Optional,
27     Set,
28     Sized,
29     Tuple,
30     Type,
31     Union,
32     cast,
33 )
34 from cwltool.utils import (
35     CWLObjectType,
36     CWLOutputAtomType,
37     CWLOutputType,
38 )
39
40 import subprocess
41
42 from schema_salad.sourceline import SourceLine, cmap
43
44 from cwltool.command_line_tool import CommandLineTool
45 import cwltool.workflow
46 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
47                              shortname, Process, fill_in_defaults)
48 from cwltool.load_tool import fetch_document, jobloaderctx
49 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
50 from cwltool.builder import substitute
51 from cwltool.pack import pack
52 from cwltool.update import INTERNAL_VERSION
53 from cwltool.builder import Builder
54 import schema_salad.validate as validate
55 import schema_salad.ref_resolver
56
57 import arvados.collection
58 import arvados.util
59 from .util import collectionUUID
60 from ruamel.yaml import YAML
61 from ruamel.yaml.comments import CommentedMap, CommentedSeq
62
63 import arvados_cwl.arvdocker
64 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern, MapperEnt
65 from ._version import __version__
66 from . import done
67 from . context import ArvRuntimeContext
68 from .perf import Perf
69
70 basestring = (bytes, str)
71 logger = logging.getLogger('arvados.cwl-runner')
72 metrics = logging.getLogger('arvados.cwl-runner.metrics')
73
74 def trim_anonymous_location(obj):
75     """Remove 'location' field from File and Directory literals.
76
77     To make internal handling easier, literals are assigned a random id for
78     'location'.  However, when writing the record back out, this can break
79     reproducibility.  Since it is valid for literals not have a 'location'
80     field, remove it.
81
82     """
83
84     if obj.get("location", "").startswith("_:"):
85         del obj["location"]
86
87
88 def remove_redundant_fields(obj):
89     for field in ("path", "nameext", "nameroot", "dirname"):
90         if field in obj:
91             del obj[field]
92
93
94 def find_defaults(d, op):
95     if isinstance(d, list):
96         for i in d:
97             find_defaults(i, op)
98     elif isinstance(d, dict):
99         if "default" in d:
100             op(d)
101         else:
102             for i in d.values():
103                 find_defaults(i, op)
104
105 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
106     return Builder(
107                  job=joborder,
108                  files=[],               # type: List[Dict[Text, Text]]
109                  bindings=[],            # type: List[Dict[Text, Any]]
110                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
111                  names=None,               # type: Names
112                  requirements=requirements,        # type: List[Dict[Text, Any]]
113                  hints=hints,               # type: List[Dict[Text, Any]]
114                  resources={},           # type: Dict[str, int]
115                  mutation_manager=None,    # type: Optional[MutationManager]
116                  formatgraph=None,         # type: Optional[Graph]
117                  make_fs_access=None,      # type: Type[StdFsAccess]
118                  fs_access=None,           # type: StdFsAccess
119                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
120                  timeout=runtimeContext.eval_timeout,             # type: float
121                  debug=runtimeContext.debug,               # type: bool
122                  js_console=runtimeContext.js_console,          # type: bool
123                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
124                  loadListing="",         # type: Text
125                  outdir="",              # type: Text
126                  tmpdir="",              # type: Text
127                  stagedir="",            # type: Text
128                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
129                  container_engine="docker"
130                 )
131
132 def search_schemadef(name, reqs):
133     for r in reqs:
134         if r["class"] == "SchemaDefRequirement":
135             for sd in r["types"]:
136                 if sd["name"] == name:
137                     return sd
138     return None
139
140 primitive_types_set = frozenset(("null", "boolean", "int", "long",
141                                  "float", "double", "string", "record",
142                                  "array", "enum"))
143
144 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
145     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
146         # union type, collect all possible secondaryFiles
147         for i in inputschema:
148             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
149         return
150
151     if inputschema == "File":
152         inputschema = {"type": "File"}
153
154     if isinstance(inputschema, basestring):
155         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
156         if sd:
157             inputschema = sd
158         else:
159             return
160
161     if "secondaryFiles" in inputschema:
162         # set secondaryFiles, may be inherited by compound types.
163         secondaryspec = inputschema["secondaryFiles"]
164
165     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
166         not isinstance(inputschema["type"], basestring)):
167         # compound type (union, array, record)
168         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
169
170     elif (inputschema["type"] == "record" and
171           isinstance(primary, Mapping)):
172         #
173         # record type, find secondary files associated with fields.
174         #
175         for f in inputschema["fields"]:
176             p = primary.get(shortname(f["name"]))
177             if p:
178                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
179
180     elif (inputschema["type"] == "array" and
181           isinstance(primary, Sequence)):
182         #
183         # array type, find secondary files of elements
184         #
185         for p in primary:
186             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
187
188     elif (inputschema["type"] == "File" and
189           isinstance(primary, Mapping) and
190           primary.get("class") == "File"):
191
192         if "secondaryFiles" in primary or not secondaryspec:
193             # Nothing to do.
194             return
195
196         #
197         # Found a file, check for secondaryFiles
198         #
199         specs = []
200         primary["secondaryFiles"] = secondaryspec
201         for i, sf in enumerate(aslist(secondaryspec)):
202             if builder.cwlVersion == "v1.0":
203                 pattern = sf
204             else:
205                 pattern = sf["pattern"]
206             if pattern is None:
207                 continue
208             if isinstance(pattern, list):
209                 specs.extend(pattern)
210             elif isinstance(pattern, dict):
211                 specs.append(pattern)
212             elif isinstance(pattern, str):
213                 if builder.cwlVersion == "v1.0":
214                     specs.append({"pattern": pattern, "required": True})
215                 else:
216                     specs.append({"pattern": pattern, "required": sf.get("required")})
217             else:
218                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
219                     "Expression must return list, object, string or null")
220
221         found = []
222         for i, sf in enumerate(specs):
223             if isinstance(sf, dict):
224                 if sf.get("class") == "File":
225                     pattern = None
226                     if sf.get("location") is None:
227                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
228                             "File object is missing 'location': %s" % sf)
229                     sfpath = sf["location"]
230                     required = True
231                 else:
232                     pattern = sf["pattern"]
233                     required = sf.get("required")
234             elif isinstance(sf, str):
235                 pattern = sf
236                 required = True
237             else:
238                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
239                     "Expression must return list, object, string or null")
240
241             if pattern is not None:
242                 if "${" in pattern or "$(" in pattern:
243                     sfname = builder.do_eval(pattern, context=primary)
244                 else:
245                     sfname = substitute(primary["basename"], pattern)
246
247                 if sfname is None:
248                     continue
249
250                 if isinstance(sfname, str):
251                     p_location = primary["location"]
252                     if "/" in p_location:
253                         sfpath = (
254                             p_location[0 : p_location.rindex("/") + 1]
255                             + sfname
256                         )
257
258             required = builder.do_eval(required, context=primary)
259
260             if isinstance(sfname, list) or isinstance(sfname, dict):
261                 each = aslist(sfname)
262                 for e in each:
263                     if required and not fsaccess.exists(e.get("location")):
264                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
265                             "Required secondary file '%s' does not exist" % e.get("location"))
266                 found.extend(each)
267
268             if isinstance(sfname, str):
269                 if fsaccess.exists(sfpath):
270                     if pattern is not None:
271                         found.append({"location": sfpath, "class": "File"})
272                     else:
273                         found.append(sf)
274                 elif required:
275                     raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
276                         "Required secondary file '%s' does not exist" % sfpath)
277
278         primary["secondaryFiles"] = cmap(found)
279         if discovered is not None:
280             discovered[primary["location"]] = primary["secondaryFiles"]
281     elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
282         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
283
284 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
285     for inputschema in inputs:
286         primary = job_order.get(shortname(inputschema["id"]))
287         if isinstance(primary, (Mapping, Sequence)):
288             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
289
290 def upload_dependencies(arvrunner, name, document_loader,
291                         workflowobj, uri, runtimeContext,
292                         include_primary=True, discovered_secondaryfiles=None,
293                         cache=None):
294     """Upload the dependencies of the workflowobj document to Keep.
295
296     Returns a pathmapper object mapping local paths to keep references.  Also
297     does an in-place update of references in "workflowobj".
298
299     Use scandeps to find $schemas, File and Directory
300     fields that represent external references.
301
302     If workflowobj has an "id" field, this will reload the document to ensure
303     it is scanning the raw document prior to preprocessing.
304     """
305
306     scanobj = workflowobj
307     metadata = scanobj
308
309     with Perf(metrics, "scandeps"):
310         sc_result = scandeps(uri, scanobj,
311                              set(),
312                              set(("location",)),
313                              None, urljoin=document_loader.fetcher.urljoin,
314                              nestdirs=False)
315         optional_deps = scandeps(uri, scanobj,
316                              set(),
317                              set(("$schemas",)),
318                              None, urljoin=document_loader.fetcher.urljoin,
319                              nestdirs=False)
320
321     if sc_result is None:
322         sc_result = []
323
324     if optional_deps is None:
325         optional_deps = []
326
327     if optional_deps:
328         sc_result.extend(optional_deps)
329
330     sc = []
331     uuids = {}
332
333     def collect_uuids(obj):
334         loc = obj.get("location", "")
335         sp = loc.split(":")
336         if sp[0] == "keep":
337             # Collect collection uuids that need to be resolved to
338             # portable data hashes
339             gp = collection_uuid_pattern.match(loc)
340             if gp:
341                 uuids[gp.groups()[0]] = obj
342             if collectionUUID in obj:
343                 uuids[obj[collectionUUID]] = obj
344
345     def collect_uploads(obj):
346         loc = obj.get("location", "")
347         sp = loc.split(":")
348         if len(sp) < 1:
349             return
350         if sp[0] in ("file", "http", "https"):
351             # Record local files than need to be uploaded,
352             # don't include file literals, keep references, etc.
353             sc.append(obj)
354         collect_uuids(obj)
355
356     with Perf(metrics, "collect uuids"):
357         visit_class(workflowobj, ("File", "Directory"), collect_uuids)
358
359     with Perf(metrics, "collect uploads"):
360         visit_class(sc_result, ("File", "Directory"), collect_uploads)
361
362     # Resolve any collection uuids we found to portable data hashes
363     # and assign them to uuid_map
364     uuid_map = {}
365     fetch_uuids = list(uuids.keys())
366     with Perf(metrics, "fetch_uuids"):
367         while fetch_uuids:
368             # For a large number of fetch_uuids, API server may limit
369             # response size, so keep fetching from API server has nothing
370             # more to give us.
371             lookups = arvrunner.api.collections().list(
372                 filters=[["uuid", "in", fetch_uuids]],
373                 count="none",
374                 select=["uuid", "portable_data_hash"]).execute(
375                     num_retries=arvrunner.num_retries)
376
377             if not lookups["items"]:
378                 break
379
380             for l in lookups["items"]:
381                 uuid_map[l["uuid"]] = l["portable_data_hash"]
382
383             fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
384
385     normalizeFilesDirs(sc)
386
387     if "id" in workflowobj:
388         defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
389         if include_primary:
390             # make sure it's included
391             sc.append({"class": "File", "location": defrg})
392         else:
393             # make sure it's excluded
394             sc = [d for d in sc if d.get("location") != defrg]
395
396     def visit_default(obj):
397         def defaults_are_optional(f):
398             if "location" not in f and "path" in f:
399                 f["location"] = f["path"]
400                 del f["path"]
401             normalizeFilesDirs(f)
402             optional_deps.append(f)
403         visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
404
405     find_defaults(workflowobj, visit_default)
406
407     discovered = {}
408     def discover_default_secondary_files(obj):
409         builder_job_order = {}
410         for t in obj["inputs"]:
411             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
412         # Need to create a builder object to evaluate expressions.
413         builder = make_builder(builder_job_order,
414                                obj.get("hints", []),
415                                obj.get("requirements", []),
416                                ArvRuntimeContext(),
417                                metadata)
418         discover_secondary_files(arvrunner.fs_access,
419                                  builder,
420                                  obj["inputs"],
421                                  builder_job_order,
422                                  discovered)
423
424     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
425     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
426
427     for d in list(discovered):
428         # Only interested in discovered secondaryFiles which are local
429         # files that need to be uploaded.
430         if d.startswith("file:"):
431             sc.extend(discovered[d])
432         else:
433             del discovered[d]
434
435     with Perf(metrics, "mapper"):
436         mapper = ArvPathMapper(arvrunner, sc, "",
437                                "keep:%s",
438                                "keep:%s/%s",
439                                name=name,
440                                single_collection=True,
441                                optional_deps=optional_deps)
442
443     for k, v in uuid_map.items():
444         mapper._pathmap["keep:"+k] = MapperEnt(v, "", "", False)
445
446     keeprefs = set()
447     def addkeepref(k):
448         if k.startswith("keep:"):
449             keeprefs.add(collection_pdh_pattern.match(k).group(1))
450
451
452     def collectloc(p):
453         loc = p.get("location")
454         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
455             addkeepref(p["location"])
456             return
457
458         if not loc:
459             return
460
461         if collectionUUID in p:
462             uuid = p[collectionUUID]
463             if uuid not in uuid_map:
464                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
465                     "Collection uuid %s not found" % uuid)
466             gp = collection_pdh_pattern.match(loc)
467             if gp and uuid_map[uuid] != gp.groups()[0]:
468                 # This file entry has both collectionUUID and a PDH
469                 # location. If the PDH doesn't match the one returned
470                 # the API server, raise an error.
471                 raise SourceLine(p, "location", validate.ValidationException).makeError(
472                     "Expected collection uuid %s to be %s but API server reported %s" % (
473                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
474
475         gp = collection_uuid_pattern.match(loc)
476         if not gp:
477             # Not a uuid pattern (must be a pdh pattern)
478             addkeepref(p["location"])
479             return
480
481         uuid = gp.groups()[0]
482         if uuid not in uuid_map:
483             raise SourceLine(p, "location", validate.ValidationException).makeError(
484                 "Collection uuid %s not found" % uuid)
485
486     with Perf(metrics, "collectloc"):
487         visit_class(workflowobj, ("File", "Directory"), collectloc)
488         visit_class(discovered, ("File", "Directory"), collectloc)
489
490     if discovered_secondaryfiles is not None:
491         for d in discovered:
492             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
493
494     if runtimeContext.copy_deps:
495         # Find referenced collections and copy them into the
496         # destination project, for easy sharing.
497         already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
498                                      filters=[["portable_data_hash", "in", list(keeprefs)],
499                                               ["owner_uuid", "=", runtimeContext.project_uuid]],
500                                      select=["uuid", "portable_data_hash", "created_at"]))
501
502         keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
503         for kr in keeprefs:
504             col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
505                                                   order="created_at desc",
506                                                    select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
507                                                    limit=1).execute()
508             if len(col["items"]) == 0:
509                 logger.warning("Cannot find collection with portable data hash %s", kr)
510                 continue
511             col = col["items"][0]
512             col["name"] = arvados.util.trim_name(col["name"])
513             try:
514                 arvrunner.api.collections().create(body={"collection": {
515                     "owner_uuid": runtimeContext.project_uuid,
516                     "name": col["name"],
517                     "description": col["description"],
518                     "properties": col["properties"],
519                     "portable_data_hash": col["portable_data_hash"],
520                     "manifest_text": col["manifest_text"],
521                     "storage_classes_desired": col["storage_classes_desired"],
522                     "trash_at": col["trash_at"]
523                 }}, ensure_unique_name=True).execute()
524             except Exception as e:
525                 logger.warning("Unable to copy collection to destination: %s", e)
526
527     if "$schemas" in workflowobj:
528         sch = CommentedSeq()
529         for s in workflowobj["$schemas"]:
530             if s in mapper:
531                 sch.append(mapper.mapper(s).resolved)
532         workflowobj["$schemas"] = sch
533
534     return mapper
535
536
537 def upload_docker(arvrunner, tool, runtimeContext):
538     """Uploads Docker images used in CommandLineTool objects."""
539
540     if isinstance(tool, CommandLineTool):
541         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
542         if docker_req:
543             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
544                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
545                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
546
547             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, runtimeContext)
548         else:
549             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
550                                                        True, runtimeContext)
551     elif isinstance(tool, cwltool.workflow.Workflow):
552         for s in tool.steps:
553             upload_docker(arvrunner, s.embedded_tool, runtimeContext)
554
555
556 def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info):
557     """Create a packed workflow.
558
559     A "packed" workflow is one where all the components have been combined into a single document."""
560
561     rewrites = {}
562     packed = pack(arvrunner.loadingContext, tool.tool["id"],
563                   rewrite_out=rewrites,
564                   loader=tool.doc_loader)
565
566     rewrite_to_orig = {v: k for k,v in rewrites.items()}
567
568     def visit(v, cur_id):
569         if isinstance(v, dict):
570             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
571                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
572                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
573                 if "id" in v:
574                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
575             if "path" in v and "location" not in v:
576                 v["location"] = v["path"]
577                 del v["path"]
578             if "location" in v and cur_id in merged_map:
579                 if v["location"] in merged_map[cur_id].resolved:
580                     v["location"] = merged_map[cur_id].resolved[v["location"]]
581                 if v["location"] in merged_map[cur_id].secondaryFiles:
582                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
583             if v.get("class") == "DockerRequirement":
584                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
585                                                                                                              runtimeContext)
586             for l in v:
587                 visit(v[l], cur_id)
588         if isinstance(v, list):
589             for l in v:
590                 visit(l, cur_id)
591     visit(packed, None)
592
593     if git_info:
594         for g in git_info:
595             packed[g] = git_info[g]
596
597     return packed
598
599
600 def tag_git_version(packed):
601     if tool.tool["id"].startswith("file://"):
602         path = os.path.dirname(tool.tool["id"][7:])
603         try:
604             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
605         except (OSError, subprocess.CalledProcessError):
606             pass
607         else:
608             packed["http://schema.org/version"] = githash
609
610 def setloc(mapper, p):
611     loc = p.get("location")
612     if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
613         p["location"] = mapper.mapper(p["location"]).resolved
614         return
615
616     if not loc:
617         return
618
619     if collectionUUID in p:
620         uuid = p[collectionUUID]
621         keepuuid = "keep:"+uuid
622         if keepuuid not in mapper:
623             raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
624                 "Collection uuid %s not found" % uuid)
625         gp = collection_pdh_pattern.match(loc)
626         if gp and mapper.mapper(keepuuid).resolved != gp.groups()[0]:
627             # This file entry has both collectionUUID and a PDH
628             # location. If the PDH doesn't match the one returned
629             # the API server, raise an error.
630             raise SourceLine(p, "location", validate.ValidationException).makeError(
631                 "Expected collection uuid %s to be %s but API server reported %s" % (
632                     uuid, gp.groups()[0], mapper.mapper(keepuuid).resolved))
633
634     gp = collection_uuid_pattern.match(loc)
635     if not gp:
636         # Not a uuid pattern (must be a pdh pattern)
637         return
638
639     uuid = gp.groups()[0]
640     keepuuid = "keep:"+uuid
641     if keepuuid not in mapper:
642         raise SourceLine(p, "location", validate.ValidationException).makeError(
643             "Collection uuid %s not found" % uuid)
644     p["location"] = "keep:%s%s" % (mapper.mapper(keepuuid).resolved, gp.groups()[1] if gp.groups()[1] else "")
645     p[collectionUUID] = uuid
646
647 def update_from_mapper(workflowobj, mapper):
648     with Perf(metrics, "setloc"):
649         visit_class(workflowobj, ("File", "Directory"), partial(setloc, mapper))
650
651 def apply_merged_map(merged_map, workflowobj):
652     def visit(v, cur_id):
653         if isinstance(v, dict):
654             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
655                 if "id" in v:
656                     cur_id = v["id"]
657             if "path" in v and "location" not in v:
658                 v["location"] = v["path"]
659                 del v["path"]
660             if "location" in v and cur_id in merged_map:
661                 if v["location"] in merged_map[cur_id].resolved:
662                     v["location"] = merged_map[cur_id].resolved[v["location"]]
663                 if v["location"] in merged_map[cur_id].secondaryFiles:
664                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
665             #if v.get("class") == "DockerRequirement":
666             #    v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
667             #                                                                                                 runtimeContext)
668             for l in v:
669                 visit(v[l], cur_id)
670         if isinstance(v, list):
671             for l in v:
672                 visit(l, cur_id)
673     visit(workflowobj, None)
674
675 def update_from_merged_map(tool, merged_map):
676     tool.visit(partial(apply_merged_map, merged_map))
677
678 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
679     """Upload local files referenced in the input object and return updated input
680     object with 'location' updated to the proper keep references.
681     """
682
683     # Make a copy of the job order and set defaults.
684     builder_job_order = copy.copy(job_order)
685
686     # fill_in_defaults throws an error if there are any
687     # missing required parameters, we don't want it to do that
688     # so make them all optional.
689     inputs_copy = copy.deepcopy(tool.tool["inputs"])
690     for i in inputs_copy:
691         if "null" not in i["type"]:
692             i["type"] = ["null"] + aslist(i["type"])
693
694     fill_in_defaults(inputs_copy,
695                      builder_job_order,
696                      arvrunner.fs_access)
697     # Need to create a builder object to evaluate expressions.
698     builder = make_builder(builder_job_order,
699                            tool.hints,
700                            tool.requirements,
701                            ArvRuntimeContext(),
702                            tool.metadata)
703     # Now update job_order with secondaryFiles
704     discover_secondary_files(arvrunner.fs_access,
705                              builder,
706                              tool.tool["inputs"],
707                              job_order)
708
709     _jobloaderctx = jobloaderctx.copy()
710     jobloader = schema_salad.ref_resolver.Loader(_jobloaderctx, fetcher_constructor=tool.doc_loader.fetcher_constructor)
711
712     jobmapper = upload_dependencies(arvrunner,
713                                     name,
714                                     jobloader,
715                                     job_order,
716                                     job_order.get("id", "#"),
717                                     runtimeContext)
718
719     if "id" in job_order:
720         del job_order["id"]
721
722     # Need to filter this out, gets added by cwltool when providing
723     # parameters on the command line.
724     if "job_order" in job_order:
725         del job_order["job_order"]
726
727     update_from_mapper(job_order, jobmapper)
728
729     return job_order, jobmapper
730
731 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
732
733 def upload_workflow_deps(arvrunner, tool, runtimeContext):
734     # Ensure that Docker images needed by this workflow are available
735
736     with Perf(metrics, "upload_docker"):
737         upload_docker(arvrunner, tool, runtimeContext)
738
739     document_loader = tool.doc_loader
740
741     merged_map = {}
742     tool_dep_cache = {}
743
744     todo = []
745
746     # Standard traversal is top down, we want to go bottom up, so use
747     # the visitor to accumalate a list of nodes to visit, then
748     # visit them in reverse order.
749     def upload_tool_deps(deptool):
750         if "id" in deptool:
751             todo.append(deptool)
752
753     tool.visit(upload_tool_deps)
754
755     for deptool in reversed(todo):
756         discovered_secondaryfiles = {}
757         with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
758             pm = upload_dependencies(arvrunner,
759                                      "%s dependencies" % (shortname(deptool["id"])),
760                                      document_loader,
761                                      deptool,
762                                      deptool["id"],
763                                      runtimeContext,
764                                      include_primary=False,
765                                      discovered_secondaryfiles=discovered_secondaryfiles,
766                                      cache=tool_dep_cache)
767
768         document_loader.idx[deptool["id"]] = deptool
769         toolmap = {}
770         for k,v in pm.items():
771             toolmap[k] = v.resolved
772
773         merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
774
775     return merged_map
776
777 def arvados_jobs_image(arvrunner, img, runtimeContext):
778     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
779
780     try:
781         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
782                                                           True, runtimeContext)
783     except Exception as e:
784         raise Exception("Docker image %s is not available\n%s" % (img, e) )
785
786
787 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
788     collection = arvados.collection.Collection(api_client=arvrunner.api,
789                                                keep_client=arvrunner.keep_client,
790                                                num_retries=arvrunner.num_retries)
791     with collection.open("workflow.cwl", "w") as f:
792         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
793
794     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
795                ["name", "like", name+"%"]]
796     if runtimeContext.project_uuid:
797         filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
798     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
799
800     if exists["items"]:
801         logger.info("Using collection %s", exists["items"][0]["uuid"])
802     else:
803         collection.save_new(name=name,
804                             owner_uuid=runtimeContext.project_uuid,
805                             ensure_unique_name=True,
806                             num_retries=arvrunner.num_retries)
807         logger.info("Uploaded to %s", collection.manifest_locator())
808
809     return collection.portable_data_hash()
810
811
812 class Runner(Process):
813     """Base class for runner processes, which submit an instance of
814     arvados-cwl-runner and wait for the final result."""
815
816     def __init__(self, runner,
817                  tool, loadingContext, enable_reuse,
818                  output_name, output_tags, submit_runner_ram=0,
819                  name=None, on_error=None, submit_runner_image=None,
820                  intermediate_output_ttl=0, merged_map=None,
821                  priority=None, secret_store=None,
822                  collection_cache_size=256,
823                  collection_cache_is_default=True,
824                  git_info=None,
825                  reuse_runner=False):
826
827         self.loadingContext = loadingContext.copy()
828
829         super(Runner, self).__init__(tool.tool, loadingContext)
830
831         self.arvrunner = runner
832         self.embedded_tool = tool
833         self.job_order = None
834         self.running = False
835         if enable_reuse:
836             # If reuse is permitted by command line arguments but
837             # disabled by the workflow itself, disable it.
838             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
839             if reuse_req:
840                 enable_reuse = reuse_req["enableReuse"]
841             reuse_req, _ = self.embedded_tool.get_requirement("WorkReuse")
842             if reuse_req:
843                 enable_reuse = reuse_req["enableReuse"]
844         self.enable_reuse = enable_reuse
845         self.uuid = None
846         self.final_output = None
847         self.output_name = output_name
848         self.output_tags = output_tags
849         self.name = name
850         self.on_error = on_error
851         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
852         self.intermediate_output_ttl = intermediate_output_ttl
853         self.priority = priority
854         self.secret_store = secret_store
855         self.enable_dev = self.loadingContext.enable_dev
856         self.git_info = git_info
857         self.fast_parser = self.loadingContext.fast_parser
858         self.reuse_runner = reuse_runner
859
860         self.submit_runner_cores = 1
861         self.submit_runner_ram = 1024  # defaut 1 GiB
862         self.collection_cache_size = collection_cache_size
863
864         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
865         if runner_resource_req:
866             if runner_resource_req.get("coresMin"):
867                 self.submit_runner_cores = runner_resource_req["coresMin"]
868             if runner_resource_req.get("ramMin"):
869                 self.submit_runner_ram = runner_resource_req["ramMin"]
870             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
871                 self.collection_cache_size = runner_resource_req["keep_cache"]
872
873         if submit_runner_ram:
874             # Command line / initializer overrides default and/or spec from workflow
875             self.submit_runner_ram = submit_runner_ram
876
877         if self.submit_runner_ram <= 0:
878             raise Exception("Value of submit-runner-ram must be greater than zero")
879
880         if self.submit_runner_cores <= 0:
881             raise Exception("Value of submit-runner-cores must be greater than zero")
882
883         self.merged_map = merged_map or {}
884
885     def job(self,
886             job_order,         # type: Mapping[Text, Text]
887             output_callbacks,  # type: Callable[[Any, Any], Any]
888             runtimeContext     # type: RuntimeContext
889            ):  # type: (...) -> Generator[Any, None, None]
890         self.job_order = job_order
891         self._init_job(job_order, runtimeContext)
892         yield self
893
894     def update_pipeline_component(self, record):
895         pass
896
897     def done(self, record):
898         """Base method for handling a completed runner."""
899
900         try:
901             if record["state"] == "Complete":
902                 if record.get("exit_code") is not None:
903                     if record["exit_code"] == 33:
904                         processStatus = "UnsupportedRequirement"
905                     elif record["exit_code"] == 0:
906                         processStatus = "success"
907                     else:
908                         processStatus = "permanentFail"
909                 else:
910                     processStatus = "success"
911             else:
912                 processStatus = "permanentFail"
913
914             outputs = {}
915
916             if processStatus == "permanentFail":
917                 logc = arvados.collection.CollectionReader(record["log"],
918                                                            api_client=self.arvrunner.api,
919                                                            keep_client=self.arvrunner.keep_client,
920                                                            num_retries=self.arvrunner.num_retries)
921                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40,
922                              include_crunchrun=(record.get("exit_code") is None or record.get("exit_code") > 127))
923
924             self.final_output = record["output"]
925             outc = arvados.collection.CollectionReader(self.final_output,
926                                                        api_client=self.arvrunner.api,
927                                                        keep_client=self.arvrunner.keep_client,
928                                                        num_retries=self.arvrunner.num_retries)
929             if "cwl.output.json" in outc:
930                 with outc.open("cwl.output.json", "rb") as f:
931                     if f.size() > 0:
932                         outputs = json.loads(f.read().decode())
933             def keepify(fileobj):
934                 path = fileobj["location"]
935                 if not path.startswith("keep:"):
936                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
937             adjustFileObjs(outputs, keepify)
938             adjustDirObjs(outputs, keepify)
939         except Exception:
940             logger.exception("[%s] While getting final output object", self.name)
941             self.arvrunner.output_callback({}, "permanentFail")
942         else:
943             self.arvrunner.output_callback(outputs, processStatus)
944
945
946 def print_keep_deps_visitor(api, runtimeContext, references, doc_loader, tool):
947     def collect_locators(obj):
948         loc = obj.get("location", "")
949
950         g = arvados.util.keepuri_pattern.match(loc)
951         if g:
952             references.add(g[1])
953
954         if obj.get("class") == "http://arvados.org/cwl#WorkflowRunnerResources" and "acrContainerImage" in obj:
955             references.add(obj["acrContainerImage"])
956
957         if obj.get("class") == "DockerRequirement":
958             references.add(arvados_cwl.arvdocker.arv_docker_get_image(api, obj, False, runtimeContext))
959
960     sc_result = scandeps(tool["id"], tool,
961                          set(),
962                          set(("location", "id")),
963                          None, urljoin=doc_loader.fetcher.urljoin,
964                          nestdirs=False)
965
966     visit_class(sc_result, ("File", "Directory"), collect_locators)
967     visit_class(tool, ("DockerRequirement", "http://arvados.org/cwl#WorkflowRunnerResources"), collect_locators)
968
969
970 def print_keep_deps(arvRunner, runtimeContext, merged_map, tool):
971     references = set()
972
973     tool.visit(partial(print_keep_deps_visitor, arvRunner.api, runtimeContext, references, tool.doc_loader))
974
975     for mm in merged_map:
976         for k, v in merged_map[mm].resolved.items():
977             g = arvados.util.keepuri_pattern.match(v)
978             if g:
979                 references.add(g[1])
980
981     json.dump(sorted(references), arvRunner.stdout)
982     print(file=arvRunner.stdout)