19982: Preemption resubmit tests pass
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import os
6 import sys
7 import re
8 import urllib.parse
9 from functools import partial
10 import logging
11 import json
12 import copy
13 from collections import namedtuple
14 from io import StringIO
15 from typing import (
16     Any,
17     Callable,
18     Dict,
19     Iterable,
20     Iterator,
21     List,
22     Mapping,
23     MutableMapping,
24     Sequence,
25     MutableSequence,
26     Optional,
27     Set,
28     Sized,
29     Tuple,
30     Type,
31     Union,
32     cast,
33 )
34
35 import subprocess
36
37 from schema_salad.sourceline import SourceLine, cmap
38
39 from cwltool.command_line_tool import CommandLineTool
40 import cwltool.workflow
41 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
42                              shortname, Process, fill_in_defaults)
43 from cwltool.load_tool import fetch_document, jobloaderctx
44 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
45 from cwltool.builder import substitute
46 from cwltool.pack import pack
47 from cwltool.update import INTERNAL_VERSION
48 from cwltool.builder import Builder
49 import schema_salad.validate as validate
50 import schema_salad.ref_resolver
51 from cwltool.secrets import SecretStore
52
53 import arvados.collection
54 import arvados.util
55 from .util import collectionUUID
56 from ruamel.yaml import YAML
57 from ruamel.yaml.comments import CommentedMap, CommentedSeq
58
59 import arvados_cwl.arvdocker
60 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern, MapperEnt
61 from ._version import __version__
62 from . import done
63 from . context import ArvRuntimeContext
64 from .perf import Perf
65
66 basestring = (bytes, str)
67 logger = logging.getLogger('arvados.cwl-runner')
68 metrics = logging.getLogger('arvados.cwl-runner.metrics')
69
70 def trim_anonymous_location(obj):
71     """Remove 'location' field from File and Directory literals.
72
73     To make internal handling easier, literals are assigned a random id for
74     'location'.  However, when writing the record back out, this can break
75     reproducibility.  Since it is valid for literals not have a 'location'
76     field, remove it.
77
78     """
79
80     if obj.get("location", "").startswith("_:"):
81         del obj["location"]
82
83
84 def remove_redundant_fields(obj):
85     for field in ("path", "nameext", "nameroot", "dirname"):
86         if field in obj:
87             del obj[field]
88
89
90 def find_defaults(d, op):
91     if isinstance(d, list):
92         for i in d:
93             find_defaults(i, op)
94     elif isinstance(d, dict):
95         if "default" in d:
96             op(d)
97         else:
98             for i in d.values():
99                 find_defaults(i, op)
100
101 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
102     return Builder(
103                  job=joborder,
104                  files=[],               # type: List[Dict[Text, Text]]
105                  bindings=[],            # type: List[Dict[Text, Any]]
106                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
107                  names=None,               # type: Names
108                  requirements=requirements,        # type: List[Dict[Text, Any]]
109                  hints=hints,               # type: List[Dict[Text, Any]]
110                  resources={},           # type: Dict[str, int]
111                  mutation_manager=None,    # type: Optional[MutationManager]
112                  formatgraph=None,         # type: Optional[Graph]
113                  make_fs_access=None,      # type: Type[StdFsAccess]
114                  fs_access=None,           # type: StdFsAccess
115                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
116                  timeout=runtimeContext.eval_timeout,             # type: float
117                  debug=runtimeContext.debug,               # type: bool
118                  js_console=runtimeContext.js_console,          # type: bool
119                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
120                  loadListing="",         # type: Text
121                  outdir="",              # type: Text
122                  tmpdir="",              # type: Text
123                  stagedir="",            # type: Text
124                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
125                  container_engine="docker"
126                 )
127
128 def search_schemadef(name, reqs):
129     for r in reqs:
130         if r["class"] == "SchemaDefRequirement":
131             for sd in r["types"]:
132                 if sd["name"] == name:
133                     return sd
134     return None
135
136 primitive_types_set = frozenset(("null", "boolean", "int", "long",
137                                  "float", "double", "string", "record",
138                                  "array", "enum"))
139
140 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
141     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
142         # union type, collect all possible secondaryFiles
143         for i in inputschema:
144             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
145         return
146
147     if inputschema == "File":
148         inputschema = {"type": "File"}
149
150     if isinstance(inputschema, basestring):
151         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
152         if sd:
153             inputschema = sd
154         else:
155             return
156
157     if "secondaryFiles" in inputschema:
158         # set secondaryFiles, may be inherited by compound types.
159         secondaryspec = inputschema["secondaryFiles"]
160
161     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
162         not isinstance(inputschema["type"], basestring)):
163         # compound type (union, array, record)
164         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
165
166     elif (inputschema["type"] == "record" and
167           isinstance(primary, Mapping)):
168         #
169         # record type, find secondary files associated with fields.
170         #
171         for f in inputschema["fields"]:
172             p = primary.get(shortname(f["name"]))
173             if p:
174                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
175
176     elif (inputschema["type"] == "array" and
177           isinstance(primary, Sequence)):
178         #
179         # array type, find secondary files of elements
180         #
181         for p in primary:
182             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
183
184     elif (inputschema["type"] == "File" and
185           isinstance(primary, Mapping) and
186           primary.get("class") == "File"):
187
188         if "secondaryFiles" in primary or not secondaryspec:
189             # Nothing to do.
190             return
191
192         #
193         # Found a file, check for secondaryFiles
194         #
195         specs = []
196         primary["secondaryFiles"] = secondaryspec
197         for i, sf in enumerate(aslist(secondaryspec)):
198             if builder.cwlVersion == "v1.0":
199                 pattern = sf
200             else:
201                 pattern = sf["pattern"]
202             if pattern is None:
203                 continue
204             if isinstance(pattern, list):
205                 specs.extend(pattern)
206             elif isinstance(pattern, dict):
207                 specs.append(pattern)
208             elif isinstance(pattern, str):
209                 if builder.cwlVersion == "v1.0":
210                     specs.append({"pattern": pattern, "required": True})
211                 else:
212                     specs.append({"pattern": pattern, "required": sf.get("required")})
213             else:
214                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
215                     "Expression must return list, object, string or null")
216
217         found = []
218         for i, sf in enumerate(specs):
219             if isinstance(sf, dict):
220                 if sf.get("class") == "File":
221                     pattern = None
222                     if sf.get("location") is None:
223                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
224                             "File object is missing 'location': %s" % sf)
225                     sfpath = sf["location"]
226                     required = True
227                 else:
228                     pattern = sf["pattern"]
229                     required = sf.get("required")
230             elif isinstance(sf, str):
231                 pattern = sf
232                 required = True
233             else:
234                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
235                     "Expression must return list, object, string or null")
236
237             if pattern is not None:
238                 if "${" in pattern or "$(" in pattern:
239                     sfname = builder.do_eval(pattern, context=primary)
240                 else:
241                     sfname = substitute(primary["basename"], pattern)
242
243                 if sfname is None:
244                     continue
245
246                 if isinstance(sfname, str):
247                     p_location = primary["location"]
248                     if "/" in p_location:
249                         sfpath = (
250                             p_location[0 : p_location.rindex("/") + 1]
251                             + sfname
252                         )
253
254             required = builder.do_eval(required, context=primary)
255
256             if isinstance(sfname, list) or isinstance(sfname, dict):
257                 each = aslist(sfname)
258                 for e in each:
259                     if required and not fsaccess.exists(e.get("location")):
260                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
261                             "Required secondary file '%s' does not exist" % e.get("location"))
262                 found.extend(each)
263
264             if isinstance(sfname, str):
265                 if fsaccess.exists(sfpath):
266                     if pattern is not None:
267                         found.append({"location": sfpath, "class": "File"})
268                     else:
269                         found.append(sf)
270                 elif required:
271                     raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
272                         "Required secondary file '%s' does not exist" % sfpath)
273
274         primary["secondaryFiles"] = cmap(found)
275         if discovered is not None:
276             discovered[primary["location"]] = primary["secondaryFiles"]
277     elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
278         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
279
280 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
281     for inputschema in inputs:
282         primary = job_order.get(shortname(inputschema["id"]))
283         if isinstance(primary, (Mapping, Sequence)):
284             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
285
286 def upload_dependencies(arvrunner, name, document_loader,
287                         workflowobj, uri, runtimeContext,
288                         include_primary=True, discovered_secondaryfiles=None,
289                         cache=None):
290     """Upload the dependencies of the workflowobj document to Keep.
291
292     Returns a pathmapper object mapping local paths to keep references.  Also
293     does an in-place update of references in "workflowobj".
294
295     Use scandeps to find $schemas, File and Directory
296     fields that represent external references.
297
298     If workflowobj has an "id" field, this will reload the document to ensure
299     it is scanning the raw document prior to preprocessing.
300     """
301
302     scanobj = workflowobj
303     metadata = scanobj
304
305     with Perf(metrics, "scandeps"):
306         sc_result = scandeps(uri, scanobj,
307                              set(),
308                              set(("location",)),
309                              None, urljoin=document_loader.fetcher.urljoin,
310                              nestdirs=False)
311         optional_deps = scandeps(uri, scanobj,
312                              set(),
313                              set(("$schemas",)),
314                              None, urljoin=document_loader.fetcher.urljoin,
315                              nestdirs=False)
316
317     if sc_result is None:
318         sc_result = []
319
320     if optional_deps is None:
321         optional_deps = []
322
323     if optional_deps:
324         sc_result.extend(optional_deps)
325
326     sc = []
327     uuids = {}
328
329     def collect_uuids(obj):
330         loc = obj.get("location", "")
331         sp = loc.split(":")
332         if sp[0] == "keep":
333             # Collect collection uuids that need to be resolved to
334             # portable data hashes
335             gp = collection_uuid_pattern.match(loc)
336             if gp:
337                 uuids[gp.groups()[0]] = obj
338             if collectionUUID in obj:
339                 uuids[obj[collectionUUID]] = obj
340
341     def collect_uploads(obj):
342         loc = obj.get("location", "")
343         sp = loc.split(":")
344         if len(sp) < 1:
345             return
346         if sp[0] in ("file", "http", "https"):
347             # Record local files than need to be uploaded,
348             # don't include file literals, keep references, etc.
349             sc.append(obj)
350         collect_uuids(obj)
351
352     with Perf(metrics, "collect uuids"):
353         visit_class(workflowobj, ("File", "Directory"), collect_uuids)
354
355     with Perf(metrics, "collect uploads"):
356         visit_class(sc_result, ("File", "Directory"), collect_uploads)
357
358     # Resolve any collection uuids we found to portable data hashes
359     # and assign them to uuid_map
360     uuid_map = {}
361     fetch_uuids = list(uuids.keys())
362     with Perf(metrics, "fetch_uuids"):
363         while fetch_uuids:
364             # For a large number of fetch_uuids, API server may limit
365             # response size, so keep fetching from API server has nothing
366             # more to give us.
367             lookups = arvrunner.api.collections().list(
368                 filters=[["uuid", "in", fetch_uuids]],
369                 count="none",
370                 select=["uuid", "portable_data_hash"]).execute(
371                     num_retries=arvrunner.num_retries)
372
373             if not lookups["items"]:
374                 break
375
376             for l in lookups["items"]:
377                 uuid_map[l["uuid"]] = l["portable_data_hash"]
378
379             fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
380
381     normalizeFilesDirs(sc)
382
383     if "id" in workflowobj:
384         defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
385         if include_primary:
386             # make sure it's included
387             sc.append({"class": "File", "location": defrg})
388         else:
389             # make sure it's excluded
390             sc = [d for d in sc if d.get("location") != defrg]
391
392     def visit_default(obj):
393         def defaults_are_optional(f):
394             if "location" not in f and "path" in f:
395                 f["location"] = f["path"]
396                 del f["path"]
397             normalizeFilesDirs(f)
398             optional_deps.append(f)
399         visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
400
401     find_defaults(workflowobj, visit_default)
402
403     discovered = {}
404     def discover_default_secondary_files(obj):
405         builder_job_order = {}
406         for t in obj["inputs"]:
407             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
408         # Need to create a builder object to evaluate expressions.
409         builder = make_builder(builder_job_order,
410                                obj.get("hints", []),
411                                obj.get("requirements", []),
412                                ArvRuntimeContext(),
413                                metadata)
414         discover_secondary_files(arvrunner.fs_access,
415                                  builder,
416                                  obj["inputs"],
417                                  builder_job_order,
418                                  discovered)
419
420     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
421     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
422
423     for d in list(discovered):
424         # Only interested in discovered secondaryFiles which are local
425         # files that need to be uploaded.
426         if d.startswith("file:"):
427             sc.extend(discovered[d])
428         else:
429             del discovered[d]
430
431     with Perf(metrics, "mapper"):
432         mapper = ArvPathMapper(arvrunner, sc, "",
433                                "keep:%s",
434                                "keep:%s/%s",
435                                name=name,
436                                single_collection=True,
437                                optional_deps=optional_deps)
438
439     for k, v in uuid_map.items():
440         mapper._pathmap["keep:"+k] = MapperEnt(v, "", "", False)
441
442     keeprefs = set()
443     def addkeepref(k):
444         if k.startswith("keep:"):
445             keeprefs.add(collection_pdh_pattern.match(k).group(1))
446
447
448     def collectloc(p):
449         loc = p.get("location")
450         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
451             addkeepref(p["location"])
452             return
453
454         if not loc:
455             return
456
457         if collectionUUID in p:
458             uuid = p[collectionUUID]
459             if uuid not in uuid_map:
460                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
461                     "Collection uuid %s not found" % uuid)
462             gp = collection_pdh_pattern.match(loc)
463             if gp and uuid_map[uuid] != gp.groups()[0]:
464                 # This file entry has both collectionUUID and a PDH
465                 # location. If the PDH doesn't match the one returned
466                 # the API server, raise an error.
467                 raise SourceLine(p, "location", validate.ValidationException).makeError(
468                     "Expected collection uuid %s to be %s but API server reported %s" % (
469                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
470
471         gp = collection_uuid_pattern.match(loc)
472         if not gp:
473             # Not a uuid pattern (must be a pdh pattern)
474             addkeepref(p["location"])
475             return
476
477         uuid = gp.groups()[0]
478         if uuid not in uuid_map:
479             raise SourceLine(p, "location", validate.ValidationException).makeError(
480                 "Collection uuid %s not found" % uuid)
481
482     with Perf(metrics, "collectloc"):
483         visit_class(workflowobj, ("File", "Directory"), collectloc)
484         visit_class(discovered, ("File", "Directory"), collectloc)
485
486     if discovered_secondaryfiles is not None:
487         for d in discovered:
488             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
489
490     if runtimeContext.copy_deps:
491         # Find referenced collections and copy them into the
492         # destination project, for easy sharing.
493         already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
494                                      filters=[["portable_data_hash", "in", list(keeprefs)],
495                                               ["owner_uuid", "=", runtimeContext.project_uuid]],
496                                      select=["uuid", "portable_data_hash", "created_at"]))
497
498         keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
499         for kr in keeprefs:
500             col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
501                                                   order="created_at desc",
502                                                    select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
503                                                    limit=1).execute()
504             if len(col["items"]) == 0:
505                 logger.warning("Cannot find collection with portable data hash %s", kr)
506                 continue
507             col = col["items"][0]
508             col["name"] = arvados.util.trim_name(col["name"])
509             try:
510                 arvrunner.api.collections().create(body={"collection": {
511                     "owner_uuid": runtimeContext.project_uuid,
512                     "name": col["name"],
513                     "description": col["description"],
514                     "properties": col["properties"],
515                     "portable_data_hash": col["portable_data_hash"],
516                     "manifest_text": col["manifest_text"],
517                     "storage_classes_desired": col["storage_classes_desired"],
518                     "trash_at": col["trash_at"]
519                 }}, ensure_unique_name=True).execute()
520             except Exception as e:
521                 logger.warning("Unable to copy collection to destination: %s", e)
522
523     if "$schemas" in workflowobj:
524         sch = CommentedSeq()
525         for s in workflowobj["$schemas"]:
526             if s in mapper:
527                 sch.append(mapper.mapper(s).resolved)
528         workflowobj["$schemas"] = sch
529
530     return mapper
531
532
533 def upload_docker(arvrunner, tool, runtimeContext):
534     """Uploads Docker images used in CommandLineTool objects."""
535
536     if isinstance(tool, CommandLineTool):
537         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
538         if docker_req:
539             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
540                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
541                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
542
543             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, runtimeContext)
544         else:
545             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
546                                                        True, runtimeContext)
547     elif isinstance(tool, cwltool.workflow.Workflow):
548         for s in tool.steps:
549             upload_docker(arvrunner, s.embedded_tool, runtimeContext)
550
551
552 def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info):
553     """Create a packed workflow.
554
555     A "packed" workflow is one where all the components have been combined into a single document."""
556
557     rewrites = {}
558     packed = pack(arvrunner.loadingContext, tool.tool["id"],
559                   rewrite_out=rewrites,
560                   loader=tool.doc_loader)
561
562     rewrite_to_orig = {v: k for k,v in rewrites.items()}
563
564     def visit(v, cur_id):
565         if isinstance(v, dict):
566             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
567                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
568                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
569                 if "id" in v:
570                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
571             if "path" in v and "location" not in v:
572                 v["location"] = v["path"]
573                 del v["path"]
574             if "location" in v and cur_id in merged_map:
575                 if v["location"] in merged_map[cur_id].resolved:
576                     v["location"] = merged_map[cur_id].resolved[v["location"]]
577                 if v["location"] in merged_map[cur_id].secondaryFiles:
578                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
579             if v.get("class") == "DockerRequirement":
580                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
581                                                                                                              runtimeContext)
582             for l in v:
583                 visit(v[l], cur_id)
584         if isinstance(v, list):
585             for l in v:
586                 visit(l, cur_id)
587     visit(packed, None)
588
589     if git_info:
590         for g in git_info:
591             packed[g] = git_info[g]
592
593     return packed
594
595
596 def tag_git_version(packed):
597     if tool.tool["id"].startswith("file://"):
598         path = os.path.dirname(tool.tool["id"][7:])
599         try:
600             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
601         except (OSError, subprocess.CalledProcessError):
602             pass
603         else:
604             packed["http://schema.org/version"] = githash
605
606 def setloc(mapper, p):
607     loc = p.get("location")
608     if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
609         p["location"] = mapper.mapper(p["location"]).resolved
610         return
611
612     if not loc:
613         return
614
615     if collectionUUID in p:
616         uuid = p[collectionUUID]
617         keepuuid = "keep:"+uuid
618         if keepuuid not in mapper:
619             raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
620                 "Collection uuid %s not found" % uuid)
621         gp = collection_pdh_pattern.match(loc)
622         if gp and mapper.mapper(keepuuid).resolved != gp.groups()[0]:
623             # This file entry has both collectionUUID and a PDH
624             # location. If the PDH doesn't match the one returned
625             # the API server, raise an error.
626             raise SourceLine(p, "location", validate.ValidationException).makeError(
627                 "Expected collection uuid %s to be %s but API server reported %s" % (
628                     uuid, gp.groups()[0], mapper.mapper(keepuuid).resolved))
629
630     gp = collection_uuid_pattern.match(loc)
631     if not gp:
632         # Not a uuid pattern (must be a pdh pattern)
633         return
634
635     uuid = gp.groups()[0]
636     keepuuid = "keep:"+uuid
637     if keepuuid not in mapper:
638         raise SourceLine(p, "location", validate.ValidationException).makeError(
639             "Collection uuid %s not found" % uuid)
640     p["location"] = "keep:%s%s" % (mapper.mapper(keepuuid).resolved, gp.groups()[1] if gp.groups()[1] else "")
641     p[collectionUUID] = uuid
642
643 def update_from_mapper(workflowobj, mapper):
644     with Perf(metrics, "setloc"):
645         visit_class(workflowobj, ("File", "Directory"), partial(setloc, mapper))
646
647 def apply_merged_map(merged_map, workflowobj):
648     def visit(v, cur_id):
649         if isinstance(v, dict):
650             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
651                 if "id" in v:
652                     cur_id = v["id"]
653             if "path" in v and "location" not in v:
654                 v["location"] = v["path"]
655                 del v["path"]
656             if "location" in v and cur_id in merged_map:
657                 if v["location"] in merged_map[cur_id].resolved:
658                     v["location"] = merged_map[cur_id].resolved[v["location"]]
659                 if v["location"] in merged_map[cur_id].secondaryFiles:
660                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
661             #if v.get("class") == "DockerRequirement":
662             #    v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
663             #                                                                                                 runtimeContext)
664             for l in v:
665                 visit(v[l], cur_id)
666         if isinstance(v, list):
667             for l in v:
668                 visit(l, cur_id)
669     visit(workflowobj, None)
670
671 def update_from_merged_map(tool, merged_map):
672     tool.visit(partial(apply_merged_map, merged_map))
673
674 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
675     """Upload local files referenced in the input object and return updated input
676     object with 'location' updated to the proper keep references.
677     """
678
679     # Make a copy of the job order and set defaults.
680     builder_job_order = copy.copy(job_order)
681
682     # fill_in_defaults throws an error if there are any
683     # missing required parameters, we don't want it to do that
684     # so make them all optional.
685     inputs_copy = copy.deepcopy(tool.tool["inputs"])
686     for i in inputs_copy:
687         if "null" not in i["type"]:
688             i["type"] = ["null"] + aslist(i["type"])
689
690     fill_in_defaults(inputs_copy,
691                      builder_job_order,
692                      arvrunner.fs_access)
693     # Need to create a builder object to evaluate expressions.
694     builder = make_builder(builder_job_order,
695                            tool.hints,
696                            tool.requirements,
697                            ArvRuntimeContext(),
698                            tool.metadata)
699     # Now update job_order with secondaryFiles
700     discover_secondary_files(arvrunner.fs_access,
701                              builder,
702                              tool.tool["inputs"],
703                              job_order)
704
705     _jobloaderctx = jobloaderctx.copy()
706     jobloader = schema_salad.ref_resolver.Loader(_jobloaderctx, fetcher_constructor=tool.doc_loader.fetcher_constructor)
707
708     jobmapper = upload_dependencies(arvrunner,
709                                     name,
710                                     jobloader,
711                                     job_order,
712                                     job_order.get("id", "#"),
713                                     runtimeContext)
714
715     if "id" in job_order:
716         del job_order["id"]
717
718     # Need to filter this out, gets added by cwltool when providing
719     # parameters on the command line.
720     if "job_order" in job_order:
721         del job_order["job_order"]
722
723     update_from_mapper(job_order, jobmapper)
724
725     return job_order, jobmapper
726
727 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
728
729 def upload_workflow_deps(arvrunner, tool, runtimeContext):
730     # Ensure that Docker images needed by this workflow are available
731
732     with Perf(metrics, "upload_docker"):
733         upload_docker(arvrunner, tool, runtimeContext)
734
735     document_loader = tool.doc_loader
736
737     merged_map = {}
738     tool_dep_cache = {}
739
740     todo = []
741
742     # Standard traversal is top down, we want to go bottom up, so use
743     # the visitor to accumalate a list of nodes to visit, then
744     # visit them in reverse order.
745     def upload_tool_deps(deptool):
746         if "id" in deptool:
747             todo.append(deptool)
748
749     tool.visit(upload_tool_deps)
750
751     for deptool in reversed(todo):
752         discovered_secondaryfiles = {}
753         with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
754             pm = upload_dependencies(arvrunner,
755                                      "%s dependencies" % (shortname(deptool["id"])),
756                                      document_loader,
757                                      deptool,
758                                      deptool["id"],
759                                      runtimeContext,
760                                      include_primary=False,
761                                      discovered_secondaryfiles=discovered_secondaryfiles,
762                                      cache=tool_dep_cache)
763
764         document_loader.idx[deptool["id"]] = deptool
765         toolmap = {}
766         for k,v in pm.items():
767             toolmap[k] = v.resolved
768
769         merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
770
771     return merged_map
772
773 def arvados_jobs_image(arvrunner, img, runtimeContext):
774     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
775
776     try:
777         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
778                                                           True, runtimeContext)
779     except Exception as e:
780         raise Exception("Docker image %s is not available\n%s" % (img, e) )
781
782
783 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
784     collection = arvados.collection.Collection(api_client=arvrunner.api,
785                                                keep_client=arvrunner.keep_client,
786                                                num_retries=arvrunner.num_retries)
787     with collection.open("workflow.cwl", "w") as f:
788         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
789
790     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
791                ["name", "like", name+"%"]]
792     if runtimeContext.project_uuid:
793         filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
794     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
795
796     if exists["items"]:
797         logger.info("Using collection %s", exists["items"][0]["uuid"])
798     else:
799         collection.save_new(name=name,
800                             owner_uuid=runtimeContext.project_uuid,
801                             ensure_unique_name=True,
802                             num_retries=arvrunner.num_retries)
803         logger.info("Uploaded to %s", collection.manifest_locator())
804
805     return collection.portable_data_hash()
806
807
808 class Runner(Process):
809     """Base class for runner processes, which submit an instance of
810     arvados-cwl-runner and wait for the final result."""
811
812     def __init__(self, runner,
813                  tool, loadingContext, enable_reuse,
814                  output_name, output_tags, submit_runner_ram=0,
815                  name=None, on_error=None, submit_runner_image=None,
816                  intermediate_output_ttl=0, merged_map=None,
817                  priority=None, secret_store=None,
818                  collection_cache_size=256,
819                  collection_cache_is_default=True,
820                  git_info=None,
821                  reuse_runner=False):
822
823         self.loadingContext = loadingContext.copy()
824
825         super(Runner, self).__init__(tool.tool, loadingContext)
826
827         self.arvrunner = runner
828         self.embedded_tool = tool
829         self.job_order = None
830         self.running = False
831         if enable_reuse:
832             # If reuse is permitted by command line arguments but
833             # disabled by the workflow itself, disable it.
834             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
835             if reuse_req:
836                 enable_reuse = reuse_req["enableReuse"]
837             reuse_req, _ = self.embedded_tool.get_requirement("WorkReuse")
838             if reuse_req:
839                 enable_reuse = reuse_req["enableReuse"]
840         self.enable_reuse = enable_reuse
841         self.uuid = None
842         self.final_output = None
843         self.output_name = output_name
844         self.output_tags = output_tags
845         self.name = name
846         self.on_error = on_error
847         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
848         self.intermediate_output_ttl = intermediate_output_ttl
849         self.priority = priority
850         self.secret_store = secret_store
851         self.enable_dev = self.loadingContext.enable_dev
852         self.git_info = git_info
853         self.fast_parser = self.loadingContext.fast_parser
854         self.reuse_runner = reuse_runner
855
856         self.submit_runner_cores = 1
857         self.submit_runner_ram = 1024  # defaut 1 GiB
858         self.collection_cache_size = collection_cache_size
859
860         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
861         if runner_resource_req:
862             if runner_resource_req.get("coresMin"):
863                 self.submit_runner_cores = runner_resource_req["coresMin"]
864             if runner_resource_req.get("ramMin"):
865                 self.submit_runner_ram = runner_resource_req["ramMin"]
866             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
867                 self.collection_cache_size = runner_resource_req["keep_cache"]
868
869         if submit_runner_ram:
870             # Command line / initializer overrides default and/or spec from workflow
871             self.submit_runner_ram = submit_runner_ram
872
873         if self.submit_runner_ram <= 0:
874             raise Exception("Value of submit-runner-ram must be greater than zero")
875
876         if self.submit_runner_cores <= 0:
877             raise Exception("Value of submit-runner-cores must be greater than zero")
878
879         self.merged_map = merged_map or {}
880
881     def job(self,
882             job_order,         # type: Mapping[Text, Text]
883             output_callbacks,  # type: Callable[[Any, Any], Any]
884             runtimeContext     # type: RuntimeContext
885            ):  # type: (...) -> Generator[Any, None, None]
886         self.job_order = job_order
887         self._init_job(job_order, runtimeContext)
888         yield self
889
890     def update_pipeline_component(self, record):
891         pass
892
893     def done(self, record):
894         """Base method for handling a completed runner."""
895
896         try:
897             if record["state"] == "Complete":
898                 if record.get("exit_code") is not None:
899                     if record["exit_code"] == 33:
900                         processStatus = "UnsupportedRequirement"
901                     elif record["exit_code"] == 0:
902                         processStatus = "success"
903                     else:
904                         processStatus = "permanentFail"
905                 else:
906                     processStatus = "success"
907             else:
908                 processStatus = "permanentFail"
909
910             outputs = {}
911
912             if processStatus == "permanentFail":
913                 logc = arvados.collection.CollectionReader(record["log"],
914                                                            api_client=self.arvrunner.api,
915                                                            keep_client=self.arvrunner.keep_client,
916                                                            num_retries=self.arvrunner.num_retries)
917                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40,
918                              include_crunchrun=(record.get("exit_code") is None or record.get("exit_code") > 127))
919
920             self.final_output = record["output"]
921             outc = arvados.collection.CollectionReader(self.final_output,
922                                                        api_client=self.arvrunner.api,
923                                                        keep_client=self.arvrunner.keep_client,
924                                                        num_retries=self.arvrunner.num_retries)
925             if "cwl.output.json" in outc:
926                 with outc.open("cwl.output.json", "rb") as f:
927                     if f.size() > 0:
928                         outputs = json.loads(str(f.read(), 'utf-8'))
929             def keepify(fileobj):
930                 path = fileobj["location"]
931                 if not path.startswith("keep:"):
932                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
933             adjustFileObjs(outputs, keepify)
934             adjustDirObjs(outputs, keepify)
935         except Exception:
936             logger.exception("[%s] While getting final output object", self.name)
937             self.arvrunner.output_callback({}, "permanentFail")
938         else:
939             self.arvrunner.output_callback(outputs, processStatus)
940
941
942 def print_keep_deps_visitor(api, runtimeContext, references, doc_loader, tool):
943     def collect_locators(obj):
944         loc = obj.get("location", "")
945
946         g = arvados.util.keepuri_pattern.match(loc)
947         if g:
948             references.add(g[1])
949
950         if obj.get("class") == "http://arvados.org/cwl#WorkflowRunnerResources" and "acrContainerImage" in obj:
951             references.add(obj["acrContainerImage"])
952
953         if obj.get("class") == "DockerRequirement":
954             references.add(arvados_cwl.arvdocker.arv_docker_get_image(api, obj, False, runtimeContext))
955
956     sc_result = scandeps(tool["id"], tool,
957                          set(),
958                          set(("location", "id")),
959                          None, urljoin=doc_loader.fetcher.urljoin,
960                          nestdirs=False)
961
962     visit_class(sc_result, ("File", "Directory"), collect_locators)
963     visit_class(tool, ("DockerRequirement", "http://arvados.org/cwl#WorkflowRunnerResources"), collect_locators)
964
965
966 def print_keep_deps(arvRunner, runtimeContext, merged_map, tool):
967     references = set()
968
969     tool.visit(partial(print_keep_deps_visitor, arvRunner.api, runtimeContext, references, tool.doc_loader))
970
971     for mm in merged_map:
972         for k, v in merged_map[mm].resolved.items():
973             g = arvados.util.keepuri_pattern.match(v)
974             if g:
975                 references.add(g[1])
976
977     json.dump(sorted(references), arvRunner.stdout)
978     print(file=arvRunner.stdout)
979
980 class ArvSecretStore(SecretStore):
981     def add(self, value):
982         if value is None:
983             return None
984         return super().add(value)