19280: don't redundantly re-parse the file
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
21
22 if os.name == "posix" and sys.version_info[0] < 3:
23     import subprocess32 as subprocess
24 else:
25     import subprocess
26
27 from schema_salad.sourceline import SourceLine, cmap
28
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32                              shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
40
41 import arvados.collection
42 import arvados.util
43 from .util import collectionUUID
44 from ruamel.yaml import YAML
45 from ruamel.yaml.comments import CommentedMap, CommentedSeq
46
47 import arvados_cwl.arvdocker
48 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
49 from ._version import __version__
50 from . import done
51 from . context import ArvRuntimeContext
52
53 logger = logging.getLogger('arvados.cwl-runner')
54
55 def trim_anonymous_location(obj):
56     """Remove 'location' field from File and Directory literals.
57
58     To make internal handling easier, literals are assigned a random id for
59     'location'.  However, when writing the record back out, this can break
60     reproducibility.  Since it is valid for literals not have a 'location'
61     field, remove it.
62
63     """
64
65     if obj.get("location", "").startswith("_:"):
66         del obj["location"]
67
68
69 def remove_redundant_fields(obj):
70     for field in ("path", "nameext", "nameroot", "dirname"):
71         if field in obj:
72             del obj[field]
73
74
75 def find_defaults(d, op):
76     if isinstance(d, list):
77         for i in d:
78             find_defaults(i, op)
79     elif isinstance(d, dict):
80         if "default" in d:
81             op(d)
82         else:
83             for i in viewvalues(d):
84                 find_defaults(i, op)
85
86 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
87     return Builder(
88                  job=joborder,
89                  files=[],               # type: List[Dict[Text, Text]]
90                  bindings=[],            # type: List[Dict[Text, Any]]
91                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
92                  names=None,               # type: Names
93                  requirements=requirements,        # type: List[Dict[Text, Any]]
94                  hints=hints,               # type: List[Dict[Text, Any]]
95                  resources={},           # type: Dict[str, int]
96                  mutation_manager=None,    # type: Optional[MutationManager]
97                  formatgraph=None,         # type: Optional[Graph]
98                  make_fs_access=None,      # type: Type[StdFsAccess]
99                  fs_access=None,           # type: StdFsAccess
100                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
101                  timeout=runtimeContext.eval_timeout,             # type: float
102                  debug=runtimeContext.debug,               # type: bool
103                  js_console=runtimeContext.js_console,          # type: bool
104                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
105                  loadListing="",         # type: Text
106                  outdir="",              # type: Text
107                  tmpdir="",              # type: Text
108                  stagedir="",            # type: Text
109                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
110                  container_engine="docker"
111                 )
112
113 def search_schemadef(name, reqs):
114     for r in reqs:
115         if r["class"] == "SchemaDefRequirement":
116             for sd in r["types"]:
117                 if sd["name"] == name:
118                     return sd
119     return None
120
121 primitive_types_set = frozenset(("null", "boolean", "int", "long",
122                                  "float", "double", "string", "record",
123                                  "array", "enum"))
124
125 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
126     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
127         # union type, collect all possible secondaryFiles
128         for i in inputschema:
129             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
130         return
131
132     if inputschema == "File":
133         inputschema = {"type": "File"}
134
135     if isinstance(inputschema, basestring):
136         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
137         if sd:
138             inputschema = sd
139         else:
140             return
141
142     if "secondaryFiles" in inputschema:
143         # set secondaryFiles, may be inherited by compound types.
144         secondaryspec = inputschema["secondaryFiles"]
145
146     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
147         not isinstance(inputschema["type"], basestring)):
148         # compound type (union, array, record)
149         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
150
151     elif (inputschema["type"] == "record" and
152           isinstance(primary, Mapping)):
153         #
154         # record type, find secondary files associated with fields.
155         #
156         for f in inputschema["fields"]:
157             p = primary.get(shortname(f["name"]))
158             if p:
159                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
160
161     elif (inputschema["type"] == "array" and
162           isinstance(primary, Sequence)):
163         #
164         # array type, find secondary files of elements
165         #
166         for p in primary:
167             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
168
169     elif (inputschema["type"] == "File" and
170           isinstance(primary, Mapping) and
171           primary.get("class") == "File"):
172
173         if "secondaryFiles" in primary or not secondaryspec:
174             # Nothing to do.
175             return
176
177         #
178         # Found a file, check for secondaryFiles
179         #
180         specs = []
181         primary["secondaryFiles"] = secondaryspec
182         for i, sf in enumerate(aslist(secondaryspec)):
183             if builder.cwlVersion == "v1.0":
184                 pattern = sf
185             else:
186                 pattern = sf["pattern"]
187             if pattern is None:
188                 continue
189             if isinstance(pattern, list):
190                 specs.extend(pattern)
191             elif isinstance(pattern, dict):
192                 specs.append(pattern)
193             elif isinstance(pattern, str):
194                 if builder.cwlVersion == "v1.0":
195                     specs.append({"pattern": pattern, "required": True})
196                 else:
197                     specs.append({"pattern": pattern, "required": sf.get("required")})
198             else:
199                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
200                     "Expression must return list, object, string or null")
201
202         found = []
203         for i, sf in enumerate(specs):
204             if isinstance(sf, dict):
205                 if sf.get("class") == "File":
206                     pattern = None
207                     if sf.get("location") is None:
208                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
209                             "File object is missing 'location': %s" % sf)
210                     sfpath = sf["location"]
211                     required = True
212                 else:
213                     pattern = sf["pattern"]
214                     required = sf.get("required")
215             elif isinstance(sf, str):
216                 pattern = sf
217                 required = True
218             else:
219                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
220                     "Expression must return list, object, string or null")
221
222             if pattern is not None:
223                 if "${" in pattern or "$(" in pattern:
224                     sfname = builder.do_eval(pattern, context=primary)
225                 else:
226                     sfname = substitute(primary["basename"], pattern)
227
228                 if sfname is None:
229                     continue
230
231                 p_location = primary["location"]
232                 if "/" in p_location:
233                     sfpath = (
234                         p_location[0 : p_location.rindex("/") + 1]
235                         + sfname
236                     )
237
238             required = builder.do_eval(required, context=primary)
239
240             if fsaccess.exists(sfpath):
241                 if pattern is not None:
242                     found.append({"location": sfpath, "class": "File"})
243                 else:
244                     found.append(sf)
245             elif required:
246                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
247                     "Required secondary file '%s' does not exist" % sfpath)
248
249         primary["secondaryFiles"] = cmap(found)
250         if discovered is not None:
251             discovered[primary["location"]] = primary["secondaryFiles"]
252     elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
253         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
254
255 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
256     for inputschema in inputs:
257         primary = job_order.get(shortname(inputschema["id"]))
258         if isinstance(primary, (Mapping, Sequence)):
259             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
260
261 def upload_dependencies(arvrunner, name, document_loader,
262                         workflowobj, uri, loadref_run, runtimeContext,
263                         include_primary=True, discovered_secondaryfiles=None,
264                         cache=None):
265     """Upload the dependencies of the workflowobj document to Keep.
266
267     Returns a pathmapper object mapping local paths to keep references.  Also
268     does an in-place update of references in "workflowobj".
269
270     Use scandeps to find $import, $include, $schemas, run, File and Directory
271     fields that represent external references.
272
273     If workflowobj has an "id" field, this will reload the document to ensure
274     it is scanning the raw document prior to preprocessing.
275     """
276
277     loaded = set()
278     def loadref(b, u):
279         joined = document_loader.fetcher.urljoin(b, u)
280         defrg, _ = urllib.parse.urldefrag(joined)
281         if defrg not in loaded:
282             loaded.add(defrg)
283             if cache is not None and defrg in cache:
284                 return cache[defrg]
285             # Use fetch_text to get raw file (before preprocessing).
286             text = document_loader.fetch_text(defrg)
287             if isinstance(text, bytes):
288                 textIO = StringIO(text.decode('utf-8'))
289             else:
290                 textIO = StringIO(text)
291             yamlloader = YAML(typ='safe', pure=True)
292             result = yamlloader.load(textIO)
293             if cache is not None:
294                 cache[defrg] = result
295             return result
296         else:
297             return {}
298
299     if loadref_run:
300         loadref_fields = set(("$import", "run"))
301     else:
302         loadref_fields = set(("$import",))
303
304     scanobj = workflowobj
305     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
306         # Need raw file content (before preprocessing) to ensure
307         # that external references in $include and $mixin are captured.
308         scanobj = loadref("", workflowobj["id"])
309
310     metadata = scanobj
311
312     sc_result = scandeps(uri, scanobj,
313                          loadref_fields,
314                          set(("$include", "location")),
315                          loadref, urljoin=document_loader.fetcher.urljoin,
316                          nestdirs=False)
317
318     optional_deps = scandeps(uri, scanobj,
319                                   loadref_fields,
320                                   set(("$schemas",)),
321                                   loadref, urljoin=document_loader.fetcher.urljoin,
322                                   nestdirs=False)
323
324     sc_result.extend(optional_deps)
325
326     sc = []
327     uuids = {}
328
329     def collect_uuids(obj):
330         loc = obj.get("location", "")
331         sp = loc.split(":")
332         if sp[0] == "keep":
333             # Collect collection uuids that need to be resolved to
334             # portable data hashes
335             gp = collection_uuid_pattern.match(loc)
336             if gp:
337                 uuids[gp.groups()[0]] = obj
338             if collectionUUID in obj:
339                 uuids[obj[collectionUUID]] = obj
340
341     def collect_uploads(obj):
342         loc = obj.get("location", "")
343         sp = loc.split(":")
344         if len(sp) < 1:
345             return
346         if sp[0] in ("file", "http", "https"):
347             # Record local files than need to be uploaded,
348             # don't include file literals, keep references, etc.
349             sc.append(obj)
350         collect_uuids(obj)
351
352     visit_class(workflowobj, ("File", "Directory"), collect_uuids)
353     visit_class(sc_result, ("File", "Directory"), collect_uploads)
354
355     # Resolve any collection uuids we found to portable data hashes
356     # and assign them to uuid_map
357     uuid_map = {}
358     fetch_uuids = list(uuids.keys())
359     while fetch_uuids:
360         # For a large number of fetch_uuids, API server may limit
361         # response size, so keep fetching from API server has nothing
362         # more to give us.
363         lookups = arvrunner.api.collections().list(
364             filters=[["uuid", "in", fetch_uuids]],
365             count="none",
366             select=["uuid", "portable_data_hash"]).execute(
367                 num_retries=arvrunner.num_retries)
368
369         if not lookups["items"]:
370             break
371
372         for l in lookups["items"]:
373             uuid_map[l["uuid"]] = l["portable_data_hash"]
374
375         fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
376
377     normalizeFilesDirs(sc)
378
379     if include_primary and "id" in workflowobj:
380         sc.append({"class": "File", "location": workflowobj["id"]})
381
382     def visit_default(obj):
383         def defaults_are_optional(f):
384             if "location" not in f and "path" in f:
385                 f["location"] = f["path"]
386                 del f["path"]
387             normalizeFilesDirs(f)
388             optional_deps.append(f)
389         visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
390
391     find_defaults(workflowobj, visit_default)
392
393     discovered = {}
394     def discover_default_secondary_files(obj):
395         builder_job_order = {}
396         for t in obj["inputs"]:
397             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
398         # Need to create a builder object to evaluate expressions.
399         builder = make_builder(builder_job_order,
400                                obj.get("hints", []),
401                                obj.get("requirements", []),
402                                ArvRuntimeContext(),
403                                metadata)
404         discover_secondary_files(arvrunner.fs_access,
405                                  builder,
406                                  obj["inputs"],
407                                  builder_job_order,
408                                  discovered)
409
410     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
411     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
412
413     for d in list(discovered):
414         # Only interested in discovered secondaryFiles which are local
415         # files that need to be uploaded.
416         if d.startswith("file:"):
417             sc.extend(discovered[d])
418         else:
419             del discovered[d]
420
421     mapper = ArvPathMapper(arvrunner, sc, "",
422                            "keep:%s",
423                            "keep:%s/%s",
424                            name=name,
425                            single_collection=True,
426                            optional_deps=optional_deps)
427
428     keeprefs = set()
429     def addkeepref(k):
430         if k.startswith("keep:"):
431             keeprefs.add(collection_pdh_pattern.match(k).group(1))
432
433     def setloc(p):
434         loc = p.get("location")
435         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
436             p["location"] = mapper.mapper(p["location"]).resolved
437             addkeepref(p["location"])
438             return
439
440         if not loc:
441             return
442
443         if collectionUUID in p:
444             uuid = p[collectionUUID]
445             if uuid not in uuid_map:
446                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
447                     "Collection uuid %s not found" % uuid)
448             gp = collection_pdh_pattern.match(loc)
449             if gp and uuid_map[uuid] != gp.groups()[0]:
450                 # This file entry has both collectionUUID and a PDH
451                 # location. If the PDH doesn't match the one returned
452                 # the API server, raise an error.
453                 raise SourceLine(p, "location", validate.ValidationException).makeError(
454                     "Expected collection uuid %s to be %s but API server reported %s" % (
455                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
456
457         gp = collection_uuid_pattern.match(loc)
458         if not gp:
459             # Not a uuid pattern (must be a pdh pattern)
460             addkeepref(p["location"])
461             return
462
463         uuid = gp.groups()[0]
464         if uuid not in uuid_map:
465             raise SourceLine(p, "location", validate.ValidationException).makeError(
466                 "Collection uuid %s not found" % uuid)
467         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
468         p[collectionUUID] = uuid
469
470     visit_class(workflowobj, ("File", "Directory"), setloc)
471     visit_class(discovered, ("File", "Directory"), setloc)
472
473     if discovered_secondaryfiles is not None:
474         for d in discovered:
475             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
476
477     if runtimeContext.copy_deps:
478         # Find referenced collections and copy them into the
479         # destination project, for easy sharing.
480         already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
481                                      filters=[["portable_data_hash", "in", list(keeprefs)],
482                                               ["owner_uuid", "=", runtimeContext.project_uuid]],
483                                      select=["uuid", "portable_data_hash", "created_at"]))
484
485         keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
486         for kr in keeprefs:
487             col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
488                                                   order="created_at desc",
489                                                    select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
490                                                    limit=1).execute()
491             if len(col["items"]) == 0:
492                 logger.warning("Cannot find collection with portable data hash %s", kr)
493                 continue
494             col = col["items"][0]
495             try:
496                 arvrunner.api.collections().create(body={"collection": {
497                     "owner_uuid": runtimeContext.project_uuid,
498                     "name": col["name"],
499                     "description": col["description"],
500                     "properties": col["properties"],
501                     "portable_data_hash": col["portable_data_hash"],
502                     "manifest_text": col["manifest_text"],
503                     "storage_classes_desired": col["storage_classes_desired"],
504                     "trash_at": col["trash_at"]
505                 }}, ensure_unique_name=True).execute()
506             except Exception as e:
507                 logger.warning("Unable copy collection to destination: %s", e)
508
509     if "$schemas" in workflowobj:
510         sch = CommentedSeq()
511         for s in workflowobj["$schemas"]:
512             if s in mapper:
513                 sch.append(mapper.mapper(s).resolved)
514         workflowobj["$schemas"] = sch
515
516     return mapper
517
518
519 def upload_docker(arvrunner, tool, runtimeContext):
520     """Uploads Docker images used in CommandLineTool objects."""
521
522     if isinstance(tool, CommandLineTool):
523         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
524         if docker_req:
525             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
526                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
527                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
528
529             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
530                                                        runtimeContext.project_uuid,
531                                                        runtimeContext.force_docker_pull,
532                                                        runtimeContext.tmp_outdir_prefix,
533                                                        runtimeContext.match_local_docker,
534                                                        runtimeContext.copy_deps)
535         else:
536             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
537                                                        True,
538                                                        runtimeContext.project_uuid,
539                                                        runtimeContext.force_docker_pull,
540                                                        runtimeContext.tmp_outdir_prefix,
541                                                        runtimeContext.match_local_docker,
542                                                        runtimeContext.copy_deps)
543     elif isinstance(tool, cwltool.workflow.Workflow):
544         for s in tool.steps:
545             upload_docker(arvrunner, s.embedded_tool, runtimeContext)
546
547
548 def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
549     """Create a packed workflow.
550
551     A "packed" workflow is one where all the components have been combined into a single document."""
552
553     rewrites = {}
554     packed = pack(arvrunner.loadingContext, tool.tool["id"],
555                   rewrite_out=rewrites,
556                   loader=tool.doc_loader)
557
558     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
559
560     def visit(v, cur_id):
561         if isinstance(v, dict):
562             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
563                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
564                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
565                 if "id" in v:
566                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
567             if "path" in v and "location" not in v:
568                 v["location"] = v["path"]
569                 del v["path"]
570             if "location" in v and cur_id in merged_map:
571                 if v["location"] in merged_map[cur_id].resolved:
572                     v["location"] = merged_map[cur_id].resolved[v["location"]]
573                 if v["location"] in merged_map[cur_id].secondaryFiles:
574                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
575             if v.get("class") == "DockerRequirement":
576                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
577                                                                                                              runtimeContext.project_uuid,
578                                                                                                              runtimeContext.force_docker_pull,
579                                                                                                              runtimeContext.tmp_outdir_prefix,
580                                                                                                              runtimeContext.match_local_docker,
581                                                                                                              runtimeContext.copy_deps)
582             for l in v:
583                 visit(v[l], cur_id)
584         if isinstance(v, list):
585             for l in v:
586                 visit(l, cur_id)
587     visit(packed, None)
588     return packed
589
590
591 def tag_git_version(packed):
592     if tool.tool["id"].startswith("file://"):
593         path = os.path.dirname(tool.tool["id"][7:])
594         try:
595             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
596         except (OSError, subprocess.CalledProcessError):
597             pass
598         else:
599             packed["http://schema.org/version"] = githash
600
601
602 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
603     """Upload local files referenced in the input object and return updated input
604     object with 'location' updated to the proper keep references.
605     """
606
607     # Make a copy of the job order and set defaults.
608     builder_job_order = copy.copy(job_order)
609
610     # fill_in_defaults throws an error if there are any
611     # missing required parameters, we don't want it to do that
612     # so make them all optional.
613     inputs_copy = copy.deepcopy(tool.tool["inputs"])
614     for i in inputs_copy:
615         if "null" not in i["type"]:
616             i["type"] = ["null"] + aslist(i["type"])
617
618     fill_in_defaults(inputs_copy,
619                      builder_job_order,
620                      arvrunner.fs_access)
621     # Need to create a builder object to evaluate expressions.
622     builder = make_builder(builder_job_order,
623                            tool.hints,
624                            tool.requirements,
625                            ArvRuntimeContext(),
626                            tool.metadata)
627     # Now update job_order with secondaryFiles
628     discover_secondary_files(arvrunner.fs_access,
629                              builder,
630                              tool.tool["inputs"],
631                              job_order)
632
633     jobmapper = upload_dependencies(arvrunner,
634                                     name,
635                                     tool.doc_loader,
636                                     job_order,
637                                     job_order.get("id", "#"),
638                                     False,
639                                     runtimeContext)
640
641     if "id" in job_order:
642         del job_order["id"]
643
644     # Need to filter this out, gets added by cwltool when providing
645     # parameters on the command line.
646     if "job_order" in job_order:
647         del job_order["job_order"]
648
649     return job_order
650
651 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
652
653 def upload_workflow_deps(arvrunner, tool, runtimeContext):
654     # Ensure that Docker images needed by this workflow are available
655
656     upload_docker(arvrunner, tool, runtimeContext)
657
658     document_loader = tool.doc_loader
659
660     merged_map = {}
661     tool_dep_cache = {}
662     def upload_tool_deps(deptool):
663         if "id" in deptool:
664             discovered_secondaryfiles = {}
665             pm = upload_dependencies(arvrunner,
666                                      "%s dependencies" % (shortname(deptool["id"])),
667                                      document_loader,
668                                      deptool,
669                                      deptool["id"],
670                                      False,
671                                      runtimeContext,
672                                      include_primary=False,
673                                      discovered_secondaryfiles=discovered_secondaryfiles,
674                                      cache=tool_dep_cache)
675             document_loader.idx[deptool["id"]] = deptool
676             toolmap = {}
677             for k,v in pm.items():
678                 toolmap[k] = v.resolved
679             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
680
681     tool.visit(upload_tool_deps)
682
683     return merged_map
684
685 def arvados_jobs_image(arvrunner, img, runtimeContext):
686     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
687
688     try:
689         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
690                                                           True,
691                                                           runtimeContext.project_uuid,
692                                                           runtimeContext.force_docker_pull,
693                                                           runtimeContext.tmp_outdir_prefix,
694                                                           runtimeContext.match_local_docker,
695                                                           runtimeContext.copy_deps)
696     except Exception as e:
697         raise Exception("Docker image %s is not available\n%s" % (img, e) )
698
699
700 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
701     collection = arvados.collection.Collection(api_client=arvrunner.api,
702                                                keep_client=arvrunner.keep_client,
703                                                num_retries=arvrunner.num_retries)
704     with collection.open("workflow.cwl", "w") as f:
705         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
706
707     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
708                ["name", "like", name+"%"]]
709     if runtimeContext.project_uuid:
710         filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
711     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
712
713     if exists["items"]:
714         logger.info("Using collection %s", exists["items"][0]["uuid"])
715     else:
716         collection.save_new(name=name,
717                             owner_uuid=runtimeContext.project_uuid,
718                             ensure_unique_name=True,
719                             num_retries=arvrunner.num_retries)
720         logger.info("Uploaded to %s", collection.manifest_locator())
721
722     return collection.portable_data_hash()
723
724
725 class Runner(Process):
726     """Base class for runner processes, which submit an instance of
727     arvados-cwl-runner and wait for the final result."""
728
729     def __init__(self, runner, updated_tool,
730                  tool, loadingContext, enable_reuse,
731                  output_name, output_tags, submit_runner_ram=0,
732                  name=None, on_error=None, submit_runner_image=None,
733                  intermediate_output_ttl=0, merged_map=None,
734                  priority=None, secret_store=None,
735                  collection_cache_size=256,
736                  collection_cache_is_default=True):
737
738         loadingContext = loadingContext.copy()
739         loadingContext.metadata = updated_tool.metadata.copy()
740
741         super(Runner, self).__init__(updated_tool.tool, loadingContext)
742
743         self.arvrunner = runner
744         self.embedded_tool = tool
745         self.job_order = None
746         self.running = False
747         if enable_reuse:
748             # If reuse is permitted by command line arguments but
749             # disabled by the workflow itself, disable it.
750             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
751             if reuse_req:
752                 enable_reuse = reuse_req["enableReuse"]
753         self.enable_reuse = enable_reuse
754         self.uuid = None
755         self.final_output = None
756         self.output_name = output_name
757         self.output_tags = output_tags
758         self.name = name
759         self.on_error = on_error
760         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
761         self.intermediate_output_ttl = intermediate_output_ttl
762         self.priority = priority
763         self.secret_store = secret_store
764         self.enable_dev = loadingContext.enable_dev
765
766         self.submit_runner_cores = 1
767         self.submit_runner_ram = 1024  # defaut 1 GiB
768         self.collection_cache_size = collection_cache_size
769
770         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
771         if runner_resource_req:
772             if runner_resource_req.get("coresMin"):
773                 self.submit_runner_cores = runner_resource_req["coresMin"]
774             if runner_resource_req.get("ramMin"):
775                 self.submit_runner_ram = runner_resource_req["ramMin"]
776             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
777                 self.collection_cache_size = runner_resource_req["keep_cache"]
778
779         if submit_runner_ram:
780             # Command line / initializer overrides default and/or spec from workflow
781             self.submit_runner_ram = submit_runner_ram
782
783         if self.submit_runner_ram <= 0:
784             raise Exception("Value of submit-runner-ram must be greater than zero")
785
786         if self.submit_runner_cores <= 0:
787             raise Exception("Value of submit-runner-cores must be greater than zero")
788
789         self.merged_map = merged_map or {}
790
791     def job(self,
792             job_order,         # type: Mapping[Text, Text]
793             output_callbacks,  # type: Callable[[Any, Any], Any]
794             runtimeContext     # type: RuntimeContext
795            ):  # type: (...) -> Generator[Any, None, None]
796         self.job_order = job_order
797         self._init_job(job_order, runtimeContext)
798         yield self
799
800     def update_pipeline_component(self, record):
801         pass
802
803     def done(self, record):
804         """Base method for handling a completed runner."""
805
806         try:
807             if record["state"] == "Complete":
808                 if record.get("exit_code") is not None:
809                     if record["exit_code"] == 33:
810                         processStatus = "UnsupportedRequirement"
811                     elif record["exit_code"] == 0:
812                         processStatus = "success"
813                     else:
814                         processStatus = "permanentFail"
815                 else:
816                     processStatus = "success"
817             else:
818                 processStatus = "permanentFail"
819
820             outputs = {}
821
822             if processStatus == "permanentFail":
823                 logc = arvados.collection.CollectionReader(record["log"],
824                                                            api_client=self.arvrunner.api,
825                                                            keep_client=self.arvrunner.keep_client,
826                                                            num_retries=self.arvrunner.num_retries)
827                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
828
829             self.final_output = record["output"]
830             outc = arvados.collection.CollectionReader(self.final_output,
831                                                        api_client=self.arvrunner.api,
832                                                        keep_client=self.arvrunner.keep_client,
833                                                        num_retries=self.arvrunner.num_retries)
834             if "cwl.output.json" in outc:
835                 with outc.open("cwl.output.json", "rb") as f:
836                     if f.size() > 0:
837                         outputs = json.loads(f.read().decode())
838             def keepify(fileobj):
839                 path = fileobj["location"]
840                 if not path.startswith("keep:"):
841                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
842             adjustFileObjs(outputs, keepify)
843             adjustDirObjs(outputs, keepify)
844         except Exception:
845             logger.exception("[%s] While getting final output object", self.name)
846             self.arvrunner.output_callback({}, "permanentFail")
847         else:
848             self.arvrunner.output_callback(outputs, processStatus)