19385: Fixing tests
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import (
21     Any,
22     Callable,
23     Dict,
24     Iterable,
25     Iterator,
26     List,
27     Mapping,
28     MutableMapping,
29     Sequence,
30     MutableSequence,
31     Optional,
32     Set,
33     Sized,
34     Tuple,
35     Type,
36     Union,
37     cast,
38 )
39 from cwltool.utils import (
40     CWLObjectType,
41     CWLOutputAtomType,
42     CWLOutputType,
43 )
44
45 if os.name == "posix" and sys.version_info[0] < 3:
46     import subprocess32 as subprocess
47 else:
48     import subprocess
49
50 from schema_salad.sourceline import SourceLine, cmap
51
52 from cwltool.command_line_tool import CommandLineTool
53 import cwltool.workflow
54 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
55                              shortname, Process, fill_in_defaults)
56 from cwltool.load_tool import fetch_document, jobloaderctx
57 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
58 from cwltool.builder import substitute
59 from cwltool.pack import pack
60 from cwltool.update import INTERNAL_VERSION
61 from cwltool.builder import Builder
62 import schema_salad.validate as validate
63 import schema_salad.ref_resolver
64
65 import arvados.collection
66 import arvados.util
67 from .util import collectionUUID
68 from ruamel.yaml import YAML
69 from ruamel.yaml.comments import CommentedMap, CommentedSeq
70
71 import arvados_cwl.arvdocker
72 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
73 from ._version import __version__
74 from . import done
75 from . context import ArvRuntimeContext
76 from .perf import Perf
77
78 logger = logging.getLogger('arvados.cwl-runner')
79 metrics = logging.getLogger('arvados.cwl-runner.metrics')
80
81 def trim_anonymous_location(obj):
82     """Remove 'location' field from File and Directory literals.
83
84     To make internal handling easier, literals are assigned a random id for
85     'location'.  However, when writing the record back out, this can break
86     reproducibility.  Since it is valid for literals not have a 'location'
87     field, remove it.
88
89     """
90
91     if obj.get("location", "").startswith("_:"):
92         del obj["location"]
93
94
95 def remove_redundant_fields(obj):
96     for field in ("path", "nameext", "nameroot", "dirname"):
97         if field in obj:
98             del obj[field]
99
100
101 def find_defaults(d, op):
102     if isinstance(d, list):
103         for i in d:
104             find_defaults(i, op)
105     elif isinstance(d, dict):
106         if "default" in d:
107             op(d)
108         else:
109             for i in viewvalues(d):
110                 find_defaults(i, op)
111
112 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
113     return Builder(
114                  job=joborder,
115                  files=[],               # type: List[Dict[Text, Text]]
116                  bindings=[],            # type: List[Dict[Text, Any]]
117                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
118                  names=None,               # type: Names
119                  requirements=requirements,        # type: List[Dict[Text, Any]]
120                  hints=hints,               # type: List[Dict[Text, Any]]
121                  resources={},           # type: Dict[str, int]
122                  mutation_manager=None,    # type: Optional[MutationManager]
123                  formatgraph=None,         # type: Optional[Graph]
124                  make_fs_access=None,      # type: Type[StdFsAccess]
125                  fs_access=None,           # type: StdFsAccess
126                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
127                  timeout=runtimeContext.eval_timeout,             # type: float
128                  debug=runtimeContext.debug,               # type: bool
129                  js_console=runtimeContext.js_console,          # type: bool
130                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
131                  loadListing="",         # type: Text
132                  outdir="",              # type: Text
133                  tmpdir="",              # type: Text
134                  stagedir="",            # type: Text
135                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
136                  container_engine="docker"
137                 )
138
139 def search_schemadef(name, reqs):
140     for r in reqs:
141         if r["class"] == "SchemaDefRequirement":
142             for sd in r["types"]:
143                 if sd["name"] == name:
144                     return sd
145     return None
146
147 primitive_types_set = frozenset(("null", "boolean", "int", "long",
148                                  "float", "double", "string", "record",
149                                  "array", "enum"))
150
151 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
152     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
153         # union type, collect all possible secondaryFiles
154         for i in inputschema:
155             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
156         return
157
158     if inputschema == "File":
159         inputschema = {"type": "File"}
160
161     if isinstance(inputschema, basestring):
162         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
163         if sd:
164             inputschema = sd
165         else:
166             return
167
168     if "secondaryFiles" in inputschema:
169         # set secondaryFiles, may be inherited by compound types.
170         secondaryspec = inputschema["secondaryFiles"]
171
172     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
173         not isinstance(inputschema["type"], basestring)):
174         # compound type (union, array, record)
175         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
176
177     elif (inputschema["type"] == "record" and
178           isinstance(primary, Mapping)):
179         #
180         # record type, find secondary files associated with fields.
181         #
182         for f in inputschema["fields"]:
183             p = primary.get(shortname(f["name"]))
184             if p:
185                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
186
187     elif (inputschema["type"] == "array" and
188           isinstance(primary, Sequence)):
189         #
190         # array type, find secondary files of elements
191         #
192         for p in primary:
193             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
194
195     elif (inputschema["type"] == "File" and
196           isinstance(primary, Mapping) and
197           primary.get("class") == "File"):
198
199         if "secondaryFiles" in primary or not secondaryspec:
200             # Nothing to do.
201             return
202
203         #
204         # Found a file, check for secondaryFiles
205         #
206         specs = []
207         primary["secondaryFiles"] = secondaryspec
208         for i, sf in enumerate(aslist(secondaryspec)):
209             if builder.cwlVersion == "v1.0":
210                 pattern = sf
211             else:
212                 pattern = sf["pattern"]
213             if pattern is None:
214                 continue
215             if isinstance(pattern, list):
216                 specs.extend(pattern)
217             elif isinstance(pattern, dict):
218                 specs.append(pattern)
219             elif isinstance(pattern, str):
220                 if builder.cwlVersion == "v1.0":
221                     specs.append({"pattern": pattern, "required": True})
222                 else:
223                     specs.append({"pattern": pattern, "required": sf.get("required")})
224             else:
225                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
226                     "Expression must return list, object, string or null")
227
228         found = []
229         for i, sf in enumerate(specs):
230             if isinstance(sf, dict):
231                 if sf.get("class") == "File":
232                     pattern = None
233                     if sf.get("location") is None:
234                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
235                             "File object is missing 'location': %s" % sf)
236                     sfpath = sf["location"]
237                     required = True
238                 else:
239                     pattern = sf["pattern"]
240                     required = sf.get("required")
241             elif isinstance(sf, str):
242                 pattern = sf
243                 required = True
244             else:
245                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
246                     "Expression must return list, object, string or null")
247
248             if pattern is not None:
249                 if "${" in pattern or "$(" in pattern:
250                     sfname = builder.do_eval(pattern, context=primary)
251                 else:
252                     sfname = substitute(primary["basename"], pattern)
253
254                 if sfname is None:
255                     continue
256
257                 if isinstance(sfname, str):
258                     p_location = primary["location"]
259                     if "/" in p_location:
260                         sfpath = (
261                             p_location[0 : p_location.rindex("/") + 1]
262                             + sfname
263                         )
264
265             required = builder.do_eval(required, context=primary)
266
267             if isinstance(sfname, list) or isinstance(sfname, dict):
268                 each = aslist(sfname)
269                 for e in each:
270                     if required and not fsaccess.exists(e.get("location")):
271                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
272                             "Required secondary file '%s' does not exist" % e.get("location"))
273                 found.extend(each)
274
275             if isinstance(sfname, str):
276                 if fsaccess.exists(sfpath):
277                     if pattern is not None:
278                         found.append({"location": sfpath, "class": "File"})
279                     else:
280                         found.append(sf)
281                 elif required:
282                     raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
283                         "Required secondary file '%s' does not exist" % sfpath)
284
285         primary["secondaryFiles"] = cmap(found)
286         if discovered is not None:
287             discovered[primary["location"]] = primary["secondaryFiles"]
288     elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
289         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
290
291 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
292     for inputschema in inputs:
293         primary = job_order.get(shortname(inputschema["id"]))
294         if isinstance(primary, (Mapping, Sequence)):
295             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
296
297 def upload_dependencies(arvrunner, name, document_loader,
298                         workflowobj, uri, runtimeContext,
299                         include_primary=True, discovered_secondaryfiles=None,
300                         cache=None):
301     """Upload the dependencies of the workflowobj document to Keep.
302
303     Returns a pathmapper object mapping local paths to keep references.  Also
304     does an in-place update of references in "workflowobj".
305
306     Use scandeps to find $schemas, File and Directory
307     fields that represent external references.
308
309     If workflowobj has an "id" field, this will reload the document to ensure
310     it is scanning the raw document prior to preprocessing.
311     """
312
313     scanobj = workflowobj
314     metadata = scanobj
315
316     with Perf(metrics, "scandeps"):
317         sc_result = scandeps(uri, scanobj,
318                              set(),
319                              set(("location",)),
320                              None, urljoin=document_loader.fetcher.urljoin,
321                              nestdirs=False)
322         optional_deps = scandeps(uri, scanobj,
323                              set(),
324                              set(("$schemas",)),
325                              None, urljoin=document_loader.fetcher.urljoin,
326                              nestdirs=False)
327
328     if sc_result is None:
329         sc_result = []
330
331     if optional_deps is None:
332         optional_deps = []
333
334     if optional_deps:
335         sc_result.extend(optional_deps)
336
337     sc = []
338     uuids = {}
339
340     def collect_uuids(obj):
341         loc = obj.get("location", "")
342         sp = loc.split(":")
343         if sp[0] == "keep":
344             # Collect collection uuids that need to be resolved to
345             # portable data hashes
346             gp = collection_uuid_pattern.match(loc)
347             if gp:
348                 uuids[gp.groups()[0]] = obj
349             if collectionUUID in obj:
350                 uuids[obj[collectionUUID]] = obj
351
352     def collect_uploads(obj):
353         loc = obj.get("location", "")
354         sp = loc.split(":")
355         if len(sp) < 1:
356             return
357         if sp[0] in ("file", "http", "https"):
358             # Record local files than need to be uploaded,
359             # don't include file literals, keep references, etc.
360             sc.append(obj)
361         collect_uuids(obj)
362
363     with Perf(metrics, "collect uuids"):
364         visit_class(workflowobj, ("File", "Directory"), collect_uuids)
365
366     with Perf(metrics, "collect uploads"):
367         visit_class(sc_result, ("File", "Directory"), collect_uploads)
368
369     # Resolve any collection uuids we found to portable data hashes
370     # and assign them to uuid_map
371     uuid_map = {}
372     fetch_uuids = list(uuids.keys())
373     with Perf(metrics, "fetch_uuids"):
374         while fetch_uuids:
375             # For a large number of fetch_uuids, API server may limit
376             # response size, so keep fetching from API server has nothing
377             # more to give us.
378             lookups = arvrunner.api.collections().list(
379                 filters=[["uuid", "in", fetch_uuids]],
380                 count="none",
381                 select=["uuid", "portable_data_hash"]).execute(
382                     num_retries=arvrunner.num_retries)
383
384             if not lookups["items"]:
385                 break
386
387             for l in lookups["items"]:
388                 uuid_map[l["uuid"]] = l["portable_data_hash"]
389
390             fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
391
392     normalizeFilesDirs(sc)
393
394     if "id" in workflowobj:
395         defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
396         if include_primary:
397             # make sure it's included
398             sc.append({"class": "File", "location": defrg})
399         else:
400             # make sure it's excluded
401             sc = [d for d in sc if d.get("location") != defrg]
402
403     def visit_default(obj):
404         def defaults_are_optional(f):
405             if "location" not in f and "path" in f:
406                 f["location"] = f["path"]
407                 del f["path"]
408             normalizeFilesDirs(f)
409             optional_deps.append(f)
410         visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
411
412     find_defaults(workflowobj, visit_default)
413
414     discovered = {}
415     def discover_default_secondary_files(obj):
416         builder_job_order = {}
417         for t in obj["inputs"]:
418             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
419         # Need to create a builder object to evaluate expressions.
420         builder = make_builder(builder_job_order,
421                                obj.get("hints", []),
422                                obj.get("requirements", []),
423                                ArvRuntimeContext(),
424                                metadata)
425         discover_secondary_files(arvrunner.fs_access,
426                                  builder,
427                                  obj["inputs"],
428                                  builder_job_order,
429                                  discovered)
430
431     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
432     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
433
434     for d in list(discovered):
435         # Only interested in discovered secondaryFiles which are local
436         # files that need to be uploaded.
437         if d.startswith("file:"):
438             sc.extend(discovered[d])
439         else:
440             del discovered[d]
441
442     with Perf(metrics, "mapper"):
443         mapper = ArvPathMapper(arvrunner, sc, "",
444                                "keep:%s",
445                                "keep:%s/%s",
446                                name=name,
447                                single_collection=True,
448                                optional_deps=optional_deps)
449
450     keeprefs = set()
451     def addkeepref(k):
452         if k.startswith("keep:"):
453             keeprefs.add(collection_pdh_pattern.match(k).group(1))
454
455
456     def collectloc(p):
457         loc = p.get("location")
458         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
459             addkeepref(p["location"])
460             return
461
462         if not loc:
463             return
464
465         if collectionUUID in p:
466             uuid = p[collectionUUID]
467             if uuid not in uuid_map:
468                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
469                     "Collection uuid %s not found" % uuid)
470             gp = collection_pdh_pattern.match(loc)
471             if gp and uuid_map[uuid] != gp.groups()[0]:
472                 # This file entry has both collectionUUID and a PDH
473                 # location. If the PDH doesn't match the one returned
474                 # the API server, raise an error.
475                 raise SourceLine(p, "location", validate.ValidationException).makeError(
476                     "Expected collection uuid %s to be %s but API server reported %s" % (
477                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
478
479         gp = collection_uuid_pattern.match(loc)
480         if not gp:
481             # Not a uuid pattern (must be a pdh pattern)
482             addkeepref(p["location"])
483             return
484
485         uuid = gp.groups()[0]
486         if uuid not in uuid_map:
487             raise SourceLine(p, "location", validate.ValidationException).makeError(
488                 "Collection uuid %s not found" % uuid)
489
490     with Perf(metrics, "collectloc"):
491         visit_class(workflowobj, ("File", "Directory"), collectloc)
492         visit_class(discovered, ("File", "Directory"), collectloc)
493
494     if discovered_secondaryfiles is not None:
495         for d in discovered:
496             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
497
498     if runtimeContext.copy_deps:
499         # Find referenced collections and copy them into the
500         # destination project, for easy sharing.
501         already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
502                                      filters=[["portable_data_hash", "in", list(keeprefs)],
503                                               ["owner_uuid", "=", runtimeContext.project_uuid]],
504                                      select=["uuid", "portable_data_hash", "created_at"]))
505
506         keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
507         for kr in keeprefs:
508             col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
509                                                   order="created_at desc",
510                                                    select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
511                                                    limit=1).execute()
512             if len(col["items"]) == 0:
513                 logger.warning("Cannot find collection with portable data hash %s", kr)
514                 continue
515             col = col["items"][0]
516             col["name"] = arvados.util.trim_name(col["name"])
517             try:
518                 arvrunner.api.collections().create(body={"collection": {
519                     "owner_uuid": runtimeContext.project_uuid,
520                     "name": col["name"],
521                     "description": col["description"],
522                     "properties": col["properties"],
523                     "portable_data_hash": col["portable_data_hash"],
524                     "manifest_text": col["manifest_text"],
525                     "storage_classes_desired": col["storage_classes_desired"],
526                     "trash_at": col["trash_at"]
527                 }}, ensure_unique_name=True).execute()
528             except Exception as e:
529                 logger.warning("Unable to copy collection to destination: %s", e)
530
531     if "$schemas" in workflowobj:
532         sch = CommentedSeq()
533         for s in workflowobj["$schemas"]:
534             if s in mapper:
535                 sch.append(mapper.mapper(s).resolved)
536         workflowobj["$schemas"] = sch
537
538     return mapper
539
540
541 def upload_docker(arvrunner, tool, runtimeContext):
542     """Uploads Docker images used in CommandLineTool objects."""
543
544     if isinstance(tool, CommandLineTool):
545         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
546         if docker_req:
547             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
548                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
549                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
550
551             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, runtimeContext)
552         else:
553             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
554                                                        True, runtimeContext)
555     elif isinstance(tool, cwltool.workflow.Workflow):
556         for s in tool.steps:
557             upload_docker(arvrunner, s.embedded_tool, runtimeContext)
558
559
560 def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info):
561     """Create a packed workflow.
562
563     A "packed" workflow is one where all the components have been combined into a single document."""
564
565     rewrites = {}
566     packed = pack(arvrunner.loadingContext, tool.tool["id"],
567                   rewrite_out=rewrites,
568                   loader=tool.doc_loader)
569
570     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
571
572     def visit(v, cur_id):
573         if isinstance(v, dict):
574             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
575                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
576                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
577                 if "id" in v:
578                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
579             if "path" in v and "location" not in v:
580                 v["location"] = v["path"]
581                 del v["path"]
582             if "location" in v and cur_id in merged_map:
583                 if v["location"] in merged_map[cur_id].resolved:
584                     v["location"] = merged_map[cur_id].resolved[v["location"]]
585                 if v["location"] in merged_map[cur_id].secondaryFiles:
586                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
587             if v.get("class") == "DockerRequirement":
588                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
589                                                                                                              runtimeContext.project_uuid,
590                                                                                                              runtimeContext.force_docker_pull,
591                                                                                                              runtimeContext.tmp_outdir_prefix,
592                                                                                                              runtimeContext.match_local_docker,
593                                                                                                              runtimeContext.copy_deps)
594             for l in v:
595                 visit(v[l], cur_id)
596         if isinstance(v, list):
597             for l in v:
598                 visit(l, cur_id)
599     visit(packed, None)
600
601     if git_info:
602         for g in git_info:
603             packed[g] = git_info[g]
604
605     return packed
606
607
608 def tag_git_version(packed):
609     if tool.tool["id"].startswith("file://"):
610         path = os.path.dirname(tool.tool["id"][7:])
611         try:
612             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
613         except (OSError, subprocess.CalledProcessError):
614             pass
615         else:
616             packed["http://schema.org/version"] = githash
617
618 def setloc(mapper, p):
619     loc = p.get("location")
620     if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
621         p["location"] = mapper.mapper(p["location"]).resolved
622         return
623
624     if not loc:
625         return
626
627     if collectionUUID in p:
628         uuid = p[collectionUUID]
629         if uuid not in uuid_map:
630             raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
631                 "Collection uuid %s not found" % uuid)
632         gp = collection_pdh_pattern.match(loc)
633         if gp and uuid_map[uuid] != gp.groups()[0]:
634             # This file entry has both collectionUUID and a PDH
635             # location. If the PDH doesn't match the one returned
636             # the API server, raise an error.
637             raise SourceLine(p, "location", validate.ValidationException).makeError(
638                 "Expected collection uuid %s to be %s but API server reported %s" % (
639                     uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
640
641     gp = collection_uuid_pattern.match(loc)
642     if not gp:
643         # Not a uuid pattern (must be a pdh pattern)
644         return
645
646     uuid = gp.groups()[0]
647     if uuid not in uuid_map:
648         raise SourceLine(p, "location", validate.ValidationException).makeError(
649             "Collection uuid %s not found" % uuid)
650     p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
651     p[collectionUUID] = uuid
652
653
654 def update_from_mapper(workflowobj, mapper):
655     with Perf(metrics, "setloc"):
656         visit_class(workflowobj, ("File", "Directory"), partial(setloc, mapper))
657
658 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
659     """Upload local files referenced in the input object and return updated input
660     object with 'location' updated to the proper keep references.
661     """
662
663     # Make a copy of the job order and set defaults.
664     builder_job_order = copy.copy(job_order)
665
666     # fill_in_defaults throws an error if there are any
667     # missing required parameters, we don't want it to do that
668     # so make them all optional.
669     inputs_copy = copy.deepcopy(tool.tool["inputs"])
670     for i in inputs_copy:
671         if "null" not in i["type"]:
672             i["type"] = ["null"] + aslist(i["type"])
673
674     fill_in_defaults(inputs_copy,
675                      builder_job_order,
676                      arvrunner.fs_access)
677     # Need to create a builder object to evaluate expressions.
678     builder = make_builder(builder_job_order,
679                            tool.hints,
680                            tool.requirements,
681                            ArvRuntimeContext(),
682                            tool.metadata)
683     # Now update job_order with secondaryFiles
684     discover_secondary_files(arvrunner.fs_access,
685                              builder,
686                              tool.tool["inputs"],
687                              job_order)
688
689     _jobloaderctx = jobloaderctx.copy()
690     jobloader = schema_salad.ref_resolver.Loader(_jobloaderctx, fetcher_constructor=tool.doc_loader.fetcher_constructor)
691
692     jobmapper = upload_dependencies(arvrunner,
693                                     name,
694                                     jobloader,
695                                     job_order,
696                                     job_order.get("id", "#"),
697                                     runtimeContext)
698
699     if "id" in job_order:
700         del job_order["id"]
701
702     # Need to filter this out, gets added by cwltool when providing
703     # parameters on the command line.
704     if "job_order" in job_order:
705         del job_order["job_order"]
706
707     update_from_mapper(job_order, jobmapper)
708
709     return job_order
710
711 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
712
713 def upload_workflow_deps(arvrunner, tool, runtimeContext):
714     # Ensure that Docker images needed by this workflow are available
715
716     # commented out for testing only, uncomment me
717     with Perf(metrics, "upload_docker"):
718         upload_docker(arvrunner, tool, runtimeContext)
719
720     document_loader = tool.doc_loader
721
722     merged_map = {}
723     tool_dep_cache = {}
724
725     todo = []
726
727     # Standard traversal is top down, we want to go bottom up, so use
728     # the visitor to accumalate a list of nodes to visit, then
729     # visit them in reverse order.
730     def upload_tool_deps(deptool):
731         if "id" in deptool:
732             todo.append(deptool)
733
734     tool.visit(upload_tool_deps)
735
736     for deptool in reversed(todo):
737         discovered_secondaryfiles = {}
738         with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
739             pm = upload_dependencies(arvrunner,
740                                      "%s dependencies" % (shortname(deptool["id"])),
741                                      document_loader,
742                                      deptool,
743                                      deptool["id"],
744                                      runtimeContext,
745                                      include_primary=False,
746                                      discovered_secondaryfiles=discovered_secondaryfiles,
747                                      cache=tool_dep_cache)
748
749         document_loader.idx[deptool["id"]] = deptool
750         toolmap = {}
751         for k,v in pm.items():
752             toolmap[k] = v.resolved
753         merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
754
755     return merged_map
756
757 def arvados_jobs_image(arvrunner, img, runtimeContext):
758     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
759
760     try:
761         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
762                                                           True, runtimeContext)
763     except Exception as e:
764         raise Exception("Docker image %s is not available\n%s" % (img, e) )
765
766
767 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
768     collection = arvados.collection.Collection(api_client=arvrunner.api,
769                                                keep_client=arvrunner.keep_client,
770                                                num_retries=arvrunner.num_retries)
771     with collection.open("workflow.cwl", "w") as f:
772         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
773
774     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
775                ["name", "like", name+"%"]]
776     if runtimeContext.project_uuid:
777         filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
778     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
779
780     if exists["items"]:
781         logger.info("Using collection %s", exists["items"][0]["uuid"])
782     else:
783         collection.save_new(name=name,
784                             owner_uuid=runtimeContext.project_uuid,
785                             ensure_unique_name=True,
786                             num_retries=arvrunner.num_retries)
787         logger.info("Uploaded to %s", collection.manifest_locator())
788
789     return collection.portable_data_hash()
790
791
792 class Runner(Process):
793     """Base class for runner processes, which submit an instance of
794     arvados-cwl-runner and wait for the final result."""
795
796     def __init__(self, runner, updated_tool,
797                  tool, loadingContext, enable_reuse,
798                  output_name, output_tags, submit_runner_ram=0,
799                  name=None, on_error=None, submit_runner_image=None,
800                  intermediate_output_ttl=0, merged_map=None,
801                  priority=None, secret_store=None,
802                  collection_cache_size=256,
803                  collection_cache_is_default=True,
804                  git_info=None):
805
806         self.loadingContext = loadingContext.copy()
807         self.loadingContext.metadata = updated_tool.metadata.copy()
808
809         super(Runner, self).__init__(updated_tool.tool, loadingContext)
810
811         self.arvrunner = runner
812         self.embedded_tool = tool
813         self.job_order = None
814         self.running = False
815         if enable_reuse:
816             # If reuse is permitted by command line arguments but
817             # disabled by the workflow itself, disable it.
818             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
819             if reuse_req:
820                 enable_reuse = reuse_req["enableReuse"]
821             reuse_req, _ = self.embedded_tool.get_requirement("WorkReuse")
822             if reuse_req:
823                 enable_reuse = reuse_req["enableReuse"]
824         self.enable_reuse = enable_reuse
825         self.uuid = None
826         self.final_output = None
827         self.output_name = output_name
828         self.output_tags = output_tags
829         self.name = name
830         self.on_error = on_error
831         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
832         self.intermediate_output_ttl = intermediate_output_ttl
833         self.priority = priority
834         self.secret_store = secret_store
835         self.enable_dev = self.loadingContext.enable_dev
836         self.git_info = git_info
837         self.fast_parser = self.loadingContext.fast_parser
838
839         self.submit_runner_cores = 1
840         self.submit_runner_ram = 1024  # defaut 1 GiB
841         self.collection_cache_size = collection_cache_size
842
843         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
844         if runner_resource_req:
845             if runner_resource_req.get("coresMin"):
846                 self.submit_runner_cores = runner_resource_req["coresMin"]
847             if runner_resource_req.get("ramMin"):
848                 self.submit_runner_ram = runner_resource_req["ramMin"]
849             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
850                 self.collection_cache_size = runner_resource_req["keep_cache"]
851
852         if submit_runner_ram:
853             # Command line / initializer overrides default and/or spec from workflow
854             self.submit_runner_ram = submit_runner_ram
855
856         if self.submit_runner_ram <= 0:
857             raise Exception("Value of submit-runner-ram must be greater than zero")
858
859         if self.submit_runner_cores <= 0:
860             raise Exception("Value of submit-runner-cores must be greater than zero")
861
862         self.merged_map = merged_map or {}
863
864     def job(self,
865             job_order,         # type: Mapping[Text, Text]
866             output_callbacks,  # type: Callable[[Any, Any], Any]
867             runtimeContext     # type: RuntimeContext
868            ):  # type: (...) -> Generator[Any, None, None]
869         self.job_order = job_order
870         self._init_job(job_order, runtimeContext)
871         yield self
872
873     def update_pipeline_component(self, record):
874         pass
875
876     def done(self, record):
877         """Base method for handling a completed runner."""
878
879         try:
880             if record["state"] == "Complete":
881                 if record.get("exit_code") is not None:
882                     if record["exit_code"] == 33:
883                         processStatus = "UnsupportedRequirement"
884                     elif record["exit_code"] == 0:
885                         processStatus = "success"
886                     else:
887                         processStatus = "permanentFail"
888                 else:
889                     processStatus = "success"
890             else:
891                 processStatus = "permanentFail"
892
893             outputs = {}
894
895             if processStatus == "permanentFail":
896                 logc = arvados.collection.CollectionReader(record["log"],
897                                                            api_client=self.arvrunner.api,
898                                                            keep_client=self.arvrunner.keep_client,
899                                                            num_retries=self.arvrunner.num_retries)
900                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
901
902             self.final_output = record["output"]
903             outc = arvados.collection.CollectionReader(self.final_output,
904                                                        api_client=self.arvrunner.api,
905                                                        keep_client=self.arvrunner.keep_client,
906                                                        num_retries=self.arvrunner.num_retries)
907             if "cwl.output.json" in outc:
908                 with outc.open("cwl.output.json", "rb") as f:
909                     if f.size() > 0:
910                         outputs = json.loads(f.read().decode())
911             def keepify(fileobj):
912                 path = fileobj["location"]
913                 if not path.startswith("keep:"):
914                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
915             adjustFileObjs(outputs, keepify)
916             adjustDirObjs(outputs, keepify)
917         except Exception:
918             logger.exception("[%s] While getting final output object", self.name)
919             self.arvrunner.output_callback({}, "permanentFail")
920         else:
921             self.arvrunner.output_callback(outputs, processStatus)