Merge branch '18238-update-cwltool' refs #18238
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
21
22 if os.name == "posix" and sys.version_info[0] < 3:
23     import subprocess32 as subprocess
24 else:
25     import subprocess
26
27 from schema_salad.sourceline import SourceLine, cmap
28
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32                              shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
40
41 import arvados.collection
42 from .util import collectionUUID
43 import ruamel.yaml as yaml
44 from ruamel.yaml.comments import CommentedMap, CommentedSeq
45
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
48 from ._version import __version__
49 from . import done
50 from . context import ArvRuntimeContext
51
52 logger = logging.getLogger('arvados.cwl-runner')
53
54 def trim_anonymous_location(obj):
55     """Remove 'location' field from File and Directory literals.
56
57     To make internal handling easier, literals are assigned a random id for
58     'location'.  However, when writing the record back out, this can break
59     reproducibility.  Since it is valid for literals not have a 'location'
60     field, remove it.
61
62     """
63
64     if obj.get("location", "").startswith("_:"):
65         del obj["location"]
66
67
68 def remove_redundant_fields(obj):
69     for field in ("path", "nameext", "nameroot", "dirname"):
70         if field in obj:
71             del obj[field]
72
73
74 def find_defaults(d, op):
75     if isinstance(d, list):
76         for i in d:
77             find_defaults(i, op)
78     elif isinstance(d, dict):
79         if "default" in d:
80             op(d)
81         else:
82             for i in viewvalues(d):
83                 find_defaults(i, op)
84
85 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
86     return Builder(
87                  job=joborder,
88                  files=[],               # type: List[Dict[Text, Text]]
89                  bindings=[],            # type: List[Dict[Text, Any]]
90                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
91                  names=None,               # type: Names
92                  requirements=requirements,        # type: List[Dict[Text, Any]]
93                  hints=hints,               # type: List[Dict[Text, Any]]
94                  resources={},           # type: Dict[str, int]
95                  mutation_manager=None,    # type: Optional[MutationManager]
96                  formatgraph=None,         # type: Optional[Graph]
97                  make_fs_access=None,      # type: Type[StdFsAccess]
98                  fs_access=None,           # type: StdFsAccess
99                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100                  timeout=runtimeContext.eval_timeout,             # type: float
101                  debug=runtimeContext.debug,               # type: bool
102                  js_console=runtimeContext.js_console,          # type: bool
103                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
104                  loadListing="",         # type: Text
105                  outdir="",              # type: Text
106                  tmpdir="",              # type: Text
107                  stagedir="",            # type: Text
108                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
109                  container_engine="docker"
110                 )
111
112 def search_schemadef(name, reqs):
113     for r in reqs:
114         if r["class"] == "SchemaDefRequirement":
115             for sd in r["types"]:
116                 if sd["name"] == name:
117                     return sd
118     return None
119
120 primitive_types_set = frozenset(("null", "boolean", "int", "long",
121                                  "float", "double", "string", "record",
122                                  "array", "enum"))
123
124 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
125     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
126         # union type, collect all possible secondaryFiles
127         for i in inputschema:
128             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
129         return
130
131     if isinstance(inputschema, basestring):
132         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
133         if sd:
134             inputschema = sd
135         else:
136             return
137
138     if "secondaryFiles" in inputschema:
139         # set secondaryFiles, may be inherited by compound types.
140         secondaryspec = inputschema["secondaryFiles"]
141
142     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
143         not isinstance(inputschema["type"], basestring)):
144         # compound type (union, array, record)
145         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
146
147     elif (inputschema["type"] == "record" and
148           isinstance(primary, Mapping)):
149         #
150         # record type, find secondary files associated with fields.
151         #
152         for f in inputschema["fields"]:
153             p = primary.get(shortname(f["name"]))
154             if p:
155                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
156
157     elif (inputschema["type"] == "array" and
158           isinstance(primary, Sequence)):
159         #
160         # array type, find secondary files of elements
161         #
162         for p in primary:
163             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
164
165     elif (inputschema["type"] == "File" and
166           secondaryspec and
167           isinstance(primary, Mapping) and
168           primary.get("class") == "File" and
169           "secondaryFiles" not in primary):
170         #
171         # Found a file, check for secondaryFiles
172         #
173         specs = []
174         primary["secondaryFiles"] = secondaryspec
175         for i, sf in enumerate(aslist(secondaryspec)):
176             if builder.cwlVersion == "v1.0":
177                 pattern = builder.do_eval(sf, context=primary)
178             else:
179                 pattern = builder.do_eval(sf["pattern"], context=primary)
180             if pattern is None:
181                 continue
182             if isinstance(pattern, list):
183                 specs.extend(pattern)
184             elif isinstance(pattern, dict):
185                 specs.append(pattern)
186             elif isinstance(pattern, str):
187                 specs.append({"pattern": pattern})
188             else:
189                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
190                     "Expression must return list, object, string or null")
191
192         found = []
193         for i, sf in enumerate(specs):
194             if isinstance(sf, dict):
195                 if sf.get("class") == "File":
196                     pattern = sf["basename"]
197                 else:
198                     pattern = sf["pattern"]
199                     required = sf.get("required")
200             elif isinstance(sf, str):
201                 pattern = sf
202                 required = True
203             else:
204                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
205                     "Expression must return list, object, string or null")
206
207             sfpath = substitute(primary["location"], pattern)
208             required = builder.do_eval(required, context=primary)
209
210             if fsaccess.exists(sfpath):
211                 found.append({"location": sfpath, "class": "File"})
212             elif required:
213                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
214                     "Required secondary file '%s' does not exist" % sfpath)
215
216         primary["secondaryFiles"] = cmap(found)
217         if discovered is not None:
218             discovered[primary["location"]] = primary["secondaryFiles"]
219     elif inputschema["type"] not in primitive_types_set:
220         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
221
222 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
223     for inputschema in inputs:
224         primary = job_order.get(shortname(inputschema["id"]))
225         if isinstance(primary, (Mapping, Sequence)):
226             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
227
228 def upload_dependencies(arvrunner, name, document_loader,
229                         workflowobj, uri, loadref_run,
230                         include_primary=True, discovered_secondaryfiles=None):
231     """Upload the dependencies of the workflowobj document to Keep.
232
233     Returns a pathmapper object mapping local paths to keep references.  Also
234     does an in-place update of references in "workflowobj".
235
236     Use scandeps to find $import, $include, $schemas, run, File and Directory
237     fields that represent external references.
238
239     If workflowobj has an "id" field, this will reload the document to ensure
240     it is scanning the raw document prior to preprocessing.
241     """
242
243     loaded = set()
244     def loadref(b, u):
245         joined = document_loader.fetcher.urljoin(b, u)
246         defrg, _ = urllib.parse.urldefrag(joined)
247         if defrg not in loaded:
248             loaded.add(defrg)
249             # Use fetch_text to get raw file (before preprocessing).
250             text = document_loader.fetch_text(defrg)
251             if isinstance(text, bytes):
252                 textIO = StringIO(text.decode('utf-8'))
253             else:
254                 textIO = StringIO(text)
255             return yaml.safe_load(textIO)
256         else:
257             return {}
258
259     if loadref_run:
260         loadref_fields = set(("$import", "run"))
261     else:
262         loadref_fields = set(("$import",))
263
264     scanobj = workflowobj
265     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
266         # Need raw file content (before preprocessing) to ensure
267         # that external references in $include and $mixin are captured.
268         scanobj = loadref("", workflowobj["id"])
269
270     metadata = scanobj
271
272     sc_result = scandeps(uri, scanobj,
273                   loadref_fields,
274                   set(("$include", "$schemas", "location")),
275                   loadref, urljoin=document_loader.fetcher.urljoin)
276
277     sc = []
278     uuids = {}
279
280     def collect_uuids(obj):
281         loc = obj.get("location", "")
282         sp = loc.split(":")
283         if sp[0] == "keep":
284             # Collect collection uuids that need to be resolved to
285             # portable data hashes
286             gp = collection_uuid_pattern.match(loc)
287             if gp:
288                 uuids[gp.groups()[0]] = obj
289             if collectionUUID in obj:
290                 uuids[obj[collectionUUID]] = obj
291
292     def collect_uploads(obj):
293         loc = obj.get("location", "")
294         sp = loc.split(":")
295         if len(sp) < 1:
296             return
297         if sp[0] in ("file", "http", "https"):
298             # Record local files than need to be uploaded,
299             # don't include file literals, keep references, etc.
300             sc.append(obj)
301         collect_uuids(obj)
302
303     visit_class(workflowobj, ("File", "Directory"), collect_uuids)
304     visit_class(sc_result, ("File", "Directory"), collect_uploads)
305
306     # Resolve any collection uuids we found to portable data hashes
307     # and assign them to uuid_map
308     uuid_map = {}
309     fetch_uuids = list(uuids.keys())
310     while fetch_uuids:
311         # For a large number of fetch_uuids, API server may limit
312         # response size, so keep fetching from API server has nothing
313         # more to give us.
314         lookups = arvrunner.api.collections().list(
315             filters=[["uuid", "in", fetch_uuids]],
316             count="none",
317             select=["uuid", "portable_data_hash"]).execute(
318                 num_retries=arvrunner.num_retries)
319
320         if not lookups["items"]:
321             break
322
323         for l in lookups["items"]:
324             uuid_map[l["uuid"]] = l["portable_data_hash"]
325
326         fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
327
328     normalizeFilesDirs(sc)
329
330     if include_primary and "id" in workflowobj:
331         sc.append({"class": "File", "location": workflowobj["id"]})
332
333     if "$schemas" in workflowobj:
334         for s in workflowobj["$schemas"]:
335             sc.append({"class": "File", "location": s})
336
337     def visit_default(obj):
338         remove = [False]
339         def ensure_default_location(f):
340             if "location" not in f and "path" in f:
341                 f["location"] = f["path"]
342                 del f["path"]
343             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
344                 # Doesn't exist, remove from list of dependencies to upload
345                 sc[:] = [x for x in sc if x["location"] != f["location"]]
346                 # Delete "default" from workflowobj
347                 remove[0] = True
348         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
349         if remove[0]:
350             del obj["default"]
351
352     find_defaults(workflowobj, visit_default)
353
354     discovered = {}
355     def discover_default_secondary_files(obj):
356         builder_job_order = {}
357         for t in obj["inputs"]:
358             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
359         # Need to create a builder object to evaluate expressions.
360         builder = make_builder(builder_job_order,
361                                obj.get("hints", []),
362                                obj.get("requirements", []),
363                                ArvRuntimeContext(),
364                                metadata)
365         discover_secondary_files(arvrunner.fs_access,
366                                  builder,
367                                  obj["inputs"],
368                                  builder_job_order,
369                                  discovered)
370
371     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
372     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
373
374     for d in list(discovered):
375         # Only interested in discovered secondaryFiles which are local
376         # files that need to be uploaded.
377         if d.startswith("file:"):
378             sc.extend(discovered[d])
379         else:
380             del discovered[d]
381
382     mapper = ArvPathMapper(arvrunner, sc, "",
383                            "keep:%s",
384                            "keep:%s/%s",
385                            name=name,
386                            single_collection=True)
387
388     def setloc(p):
389         loc = p.get("location")
390         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
391             p["location"] = mapper.mapper(p["location"]).resolved
392             return
393
394         if not loc:
395             return
396
397         if collectionUUID in p:
398             uuid = p[collectionUUID]
399             if uuid not in uuid_map:
400                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
401                     "Collection uuid %s not found" % uuid)
402             gp = collection_pdh_pattern.match(loc)
403             if gp and uuid_map[uuid] != gp.groups()[0]:
404                 # This file entry has both collectionUUID and a PDH
405                 # location. If the PDH doesn't match the one returned
406                 # the API server, raise an error.
407                 raise SourceLine(p, "location", validate.ValidationException).makeError(
408                     "Expected collection uuid %s to be %s but API server reported %s" % (
409                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
410
411         gp = collection_uuid_pattern.match(loc)
412         if not gp:
413             return
414         uuid = gp.groups()[0]
415         if uuid not in uuid_map:
416             raise SourceLine(p, "location", validate.ValidationException).makeError(
417                 "Collection uuid %s not found" % uuid)
418         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
419         p[collectionUUID] = uuid
420
421     visit_class(workflowobj, ("File", "Directory"), setloc)
422     visit_class(discovered, ("File", "Directory"), setloc)
423
424     if discovered_secondaryfiles is not None:
425         for d in discovered:
426             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
427
428     if "$schemas" in workflowobj:
429         sch = CommentedSeq()
430         for s in workflowobj["$schemas"]:
431             if s in mapper:
432                 sch.append(mapper.mapper(s).resolved)
433         workflowobj["$schemas"] = sch
434
435     return mapper
436
437
438 def upload_docker(arvrunner, tool):
439     """Uploads Docker images used in CommandLineTool objects."""
440
441     if isinstance(tool, CommandLineTool):
442         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
443         if docker_req:
444             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
445                 # TODO: can be supported by containers API, but not jobs API.
446                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
447                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
448             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
449                                                        arvrunner.runtimeContext.force_docker_pull,
450                                                        arvrunner.runtimeContext.tmp_outdir_prefix)
451         else:
452             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
453                                                        True, arvrunner.project_uuid,
454                                                        arvrunner.runtimeContext.force_docker_pull,
455                                                        arvrunner.runtimeContext.tmp_outdir_prefix)
456     elif isinstance(tool, cwltool.workflow.Workflow):
457         for s in tool.steps:
458             upload_docker(arvrunner, s.embedded_tool)
459
460
461 def packed_workflow(arvrunner, tool, merged_map):
462     """Create a packed workflow.
463
464     A "packed" workflow is one where all the components have been combined into a single document."""
465
466     rewrites = {}
467     packed = pack(arvrunner.loadingContext, tool.tool["id"],
468                   rewrite_out=rewrites,
469                   loader=tool.doc_loader)
470
471     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
472
473     def visit(v, cur_id):
474         if isinstance(v, dict):
475             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
476                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
477                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
478                 if "id" in v:
479                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
480             if "path" in v and "location" not in v:
481                 v["location"] = v["path"]
482                 del v["path"]
483             if "location" in v and cur_id in merged_map:
484                 if v["location"] in merged_map[cur_id].resolved:
485                     v["location"] = merged_map[cur_id].resolved[v["location"]]
486                 if v["location"] in merged_map[cur_id].secondaryFiles:
487                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
488             if v.get("class") == "DockerRequirement":
489                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
490                                                                                                              arvrunner.project_uuid,
491                                                                                                              arvrunner.runtimeContext.force_docker_pull,
492                                                                                                              arvrunner.runtimeContext.tmp_outdir_prefix)
493             for l in v:
494                 visit(v[l], cur_id)
495         if isinstance(v, list):
496             for l in v:
497                 visit(l, cur_id)
498     visit(packed, None)
499     return packed
500
501
502 def tag_git_version(packed):
503     if tool.tool["id"].startswith("file://"):
504         path = os.path.dirname(tool.tool["id"][7:])
505         try:
506             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
507         except (OSError, subprocess.CalledProcessError):
508             pass
509         else:
510             packed["http://schema.org/version"] = githash
511
512
513 def upload_job_order(arvrunner, name, tool, job_order):
514     """Upload local files referenced in the input object and return updated input
515     object with 'location' updated to the proper keep references.
516     """
517
518     # Make a copy of the job order and set defaults.
519     builder_job_order = copy.copy(job_order)
520
521     # fill_in_defaults throws an error if there are any
522     # missing required parameters, we don't want it to do that
523     # so make them all optional.
524     inputs_copy = copy.deepcopy(tool.tool["inputs"])
525     for i in inputs_copy:
526         if "null" not in i["type"]:
527             i["type"] = ["null"] + aslist(i["type"])
528
529     fill_in_defaults(inputs_copy,
530                      builder_job_order,
531                      arvrunner.fs_access)
532     # Need to create a builder object to evaluate expressions.
533     builder = make_builder(builder_job_order,
534                            tool.hints,
535                            tool.requirements,
536                            ArvRuntimeContext(),
537                            tool.metadata)
538     # Now update job_order with secondaryFiles
539     discover_secondary_files(arvrunner.fs_access,
540                              builder,
541                              tool.tool["inputs"],
542                              job_order)
543
544     jobmapper = upload_dependencies(arvrunner,
545                                     name,
546                                     tool.doc_loader,
547                                     job_order,
548                                     job_order.get("id", "#"),
549                                     False)
550
551     if "id" in job_order:
552         del job_order["id"]
553
554     # Need to filter this out, gets added by cwltool when providing
555     # parameters on the command line.
556     if "job_order" in job_order:
557         del job_order["job_order"]
558
559     return job_order
560
561 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
562
563 def upload_workflow_deps(arvrunner, tool):
564     # Ensure that Docker images needed by this workflow are available
565
566     upload_docker(arvrunner, tool)
567
568     document_loader = tool.doc_loader
569
570     merged_map = {}
571
572     def upload_tool_deps(deptool):
573         if "id" in deptool:
574             discovered_secondaryfiles = {}
575             pm = upload_dependencies(arvrunner,
576                                      "%s dependencies" % (shortname(deptool["id"])),
577                                      document_loader,
578                                      deptool,
579                                      deptool["id"],
580                                      False,
581                                      include_primary=False,
582                                      discovered_secondaryfiles=discovered_secondaryfiles)
583             document_loader.idx[deptool["id"]] = deptool
584             toolmap = {}
585             for k,v in pm.items():
586                 toolmap[k] = v.resolved
587             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
588
589     tool.visit(upload_tool_deps)
590
591     return merged_map
592
593 def arvados_jobs_image(arvrunner, img):
594     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
595
596     try:
597         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
598                                                           arvrunner.runtimeContext.force_docker_pull,
599                                                           arvrunner.runtimeContext.tmp_outdir_prefix)
600     except Exception as e:
601         raise Exception("Docker image %s is not available\n%s" % (img, e) )
602
603
604 def upload_workflow_collection(arvrunner, name, packed):
605     collection = arvados.collection.Collection(api_client=arvrunner.api,
606                                                keep_client=arvrunner.keep_client,
607                                                num_retries=arvrunner.num_retries)
608     with collection.open("workflow.cwl", "w") as f:
609         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
610
611     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
612                ["name", "like", name+"%"]]
613     if arvrunner.project_uuid:
614         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
615     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
616
617     if exists["items"]:
618         logger.info("Using collection %s", exists["items"][0]["uuid"])
619     else:
620         collection.save_new(name=name,
621                             owner_uuid=arvrunner.project_uuid,
622                             ensure_unique_name=True,
623                             num_retries=arvrunner.num_retries)
624         logger.info("Uploaded to %s", collection.manifest_locator())
625
626     return collection.portable_data_hash()
627
628
629 class Runner(Process):
630     """Base class for runner processes, which submit an instance of
631     arvados-cwl-runner and wait for the final result."""
632
633     def __init__(self, runner, updated_tool,
634                  tool, loadingContext, enable_reuse,
635                  output_name, output_tags, submit_runner_ram=0,
636                  name=None, on_error=None, submit_runner_image=None,
637                  intermediate_output_ttl=0, merged_map=None,
638                  priority=None, secret_store=None,
639                  collection_cache_size=256,
640                  collection_cache_is_default=True):
641
642         loadingContext = loadingContext.copy()
643         loadingContext.metadata = updated_tool.metadata.copy()
644
645         super(Runner, self).__init__(updated_tool.tool, loadingContext)
646
647         self.arvrunner = runner
648         self.embedded_tool = tool
649         self.job_order = None
650         self.running = False
651         if enable_reuse:
652             # If reuse is permitted by command line arguments but
653             # disabled by the workflow itself, disable it.
654             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
655             if reuse_req:
656                 enable_reuse = reuse_req["enableReuse"]
657         self.enable_reuse = enable_reuse
658         self.uuid = None
659         self.final_output = None
660         self.output_name = output_name
661         self.output_tags = output_tags
662         self.name = name
663         self.on_error = on_error
664         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
665         self.intermediate_output_ttl = intermediate_output_ttl
666         self.priority = priority
667         self.secret_store = secret_store
668         self.enable_dev = loadingContext.enable_dev
669
670         self.submit_runner_cores = 1
671         self.submit_runner_ram = 1024  # defaut 1 GiB
672         self.collection_cache_size = collection_cache_size
673
674         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
675         if runner_resource_req:
676             if runner_resource_req.get("coresMin"):
677                 self.submit_runner_cores = runner_resource_req["coresMin"]
678             if runner_resource_req.get("ramMin"):
679                 self.submit_runner_ram = runner_resource_req["ramMin"]
680             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
681                 self.collection_cache_size = runner_resource_req["keep_cache"]
682
683         if submit_runner_ram:
684             # Command line / initializer overrides default and/or spec from workflow
685             self.submit_runner_ram = submit_runner_ram
686
687         if self.submit_runner_ram <= 0:
688             raise Exception("Value of submit-runner-ram must be greater than zero")
689
690         if self.submit_runner_cores <= 0:
691             raise Exception("Value of submit-runner-cores must be greater than zero")
692
693         self.merged_map = merged_map or {}
694
695     def job(self,
696             job_order,         # type: Mapping[Text, Text]
697             output_callbacks,  # type: Callable[[Any, Any], Any]
698             runtimeContext     # type: RuntimeContext
699            ):  # type: (...) -> Generator[Any, None, None]
700         self.job_order = job_order
701         self._init_job(job_order, runtimeContext)
702         yield self
703
704     def update_pipeline_component(self, record):
705         pass
706
707     def done(self, record):
708         """Base method for handling a completed runner."""
709
710         try:
711             if record["state"] == "Complete":
712                 if record.get("exit_code") is not None:
713                     if record["exit_code"] == 33:
714                         processStatus = "UnsupportedRequirement"
715                     elif record["exit_code"] == 0:
716                         processStatus = "success"
717                     else:
718                         processStatus = "permanentFail"
719                 else:
720                     processStatus = "success"
721             else:
722                 processStatus = "permanentFail"
723
724             outputs = {}
725
726             if processStatus == "permanentFail":
727                 logc = arvados.collection.CollectionReader(record["log"],
728                                                            api_client=self.arvrunner.api,
729                                                            keep_client=self.arvrunner.keep_client,
730                                                            num_retries=self.arvrunner.num_retries)
731                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
732
733             self.final_output = record["output"]
734             outc = arvados.collection.CollectionReader(self.final_output,
735                                                        api_client=self.arvrunner.api,
736                                                        keep_client=self.arvrunner.keep_client,
737                                                        num_retries=self.arvrunner.num_retries)
738             if "cwl.output.json" in outc:
739                 with outc.open("cwl.output.json", "rb") as f:
740                     if f.size() > 0:
741                         outputs = json.loads(f.read().decode())
742             def keepify(fileobj):
743                 path = fileobj["location"]
744                 if not path.startswith("keep:"):
745                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
746             adjustFileObjs(outputs, keepify)
747             adjustDirObjs(outputs, keepify)
748         except Exception:
749             logger.exception("[%s] While getting final output object", self.name)
750             self.arvrunner.output_callback({}, "permanentFail")
751         else:
752             self.arvrunner.output_callback(outputs, processStatus)