16353: Initialize 'builder' with document version
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
21
22 if os.name == "posix" and sys.version_info[0] < 3:
23     import subprocess32 as subprocess
24 else:
25     import subprocess
26
27 from schema_salad.sourceline import SourceLine, cmap
28
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32                              shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.utils import aslist
36 from cwltool.builder import substitute
37 from cwltool.pack import pack
38 from cwltool.update import INTERNAL_VERSION
39 from cwltool.builder import Builder
40 import schema_salad.validate as validate
41
42 import arvados.collection
43 from .util import collectionUUID
44 import ruamel.yaml as yaml
45 from ruamel.yaml.comments import CommentedMap, CommentedSeq
46
47 import arvados_cwl.arvdocker
48 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
49 from ._version import __version__
50 from . import done
51 from . context import ArvRuntimeContext
52
53 logger = logging.getLogger('arvados.cwl-runner')
54
55 def trim_anonymous_location(obj):
56     """Remove 'location' field from File and Directory literals.
57
58     To make internal handling easier, literals are assigned a random id for
59     'location'.  However, when writing the record back out, this can break
60     reproducibility.  Since it is valid for literals not have a 'location'
61     field, remove it.
62
63     """
64
65     if obj.get("location", "").startswith("_:"):
66         del obj["location"]
67
68
69 def remove_redundant_fields(obj):
70     for field in ("path", "nameext", "nameroot", "dirname"):
71         if field in obj:
72             del obj[field]
73
74
75 def find_defaults(d, op):
76     if isinstance(d, list):
77         for i in d:
78             find_defaults(i, op)
79     elif isinstance(d, dict):
80         if "default" in d:
81             op(d)
82         else:
83             for i in viewvalues(d):
84                 find_defaults(i, op)
85
86 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
87     return Builder(
88                  job=joborder,
89                  files=[],               # type: List[Dict[Text, Text]]
90                  bindings=[],            # type: List[Dict[Text, Any]]
91                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
92                  names=None,               # type: Names
93                  requirements=requirements,        # type: List[Dict[Text, Any]]
94                  hints=hints,               # type: List[Dict[Text, Any]]
95                  resources={},           # type: Dict[str, int]
96                  mutation_manager=None,    # type: Optional[MutationManager]
97                  formatgraph=None,         # type: Optional[Graph]
98                  make_fs_access=None,      # type: Type[StdFsAccess]
99                  fs_access=None,           # type: StdFsAccess
100                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
101                  timeout=runtimeContext.eval_timeout,             # type: float
102                  debug=runtimeContext.debug,               # type: bool
103                  js_console=runtimeContext.js_console,          # type: bool
104                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
105                  loadListing="",         # type: Text
106                  outdir="",              # type: Text
107                  tmpdir="",              # type: Text
108                  stagedir="",            # type: Text
109                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion")
110                 )
111
112 def search_schemadef(name, reqs):
113     for r in reqs:
114         if r["class"] == "SchemaDefRequirement":
115             for sd in r["types"]:
116                 if sd["name"] == name:
117                     return sd
118     return None
119
120 primitive_types_set = frozenset(("null", "boolean", "int", "long",
121                                  "float", "double", "string", "record",
122                                  "array", "enum"))
123
124 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
125     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
126         # union type, collect all possible secondaryFiles
127         for i in inputschema:
128             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
129         return
130
131     if isinstance(inputschema, basestring):
132         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
133         if sd:
134             inputschema = sd
135         else:
136             return
137
138     if "secondaryFiles" in inputschema:
139         # set secondaryFiles, may be inherited by compound types.
140         secondaryspec = inputschema["secondaryFiles"]
141
142     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
143         not isinstance(inputschema["type"], basestring)):
144         # compound type (union, array, record)
145         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
146
147     elif (inputschema["type"] == "record" and
148           isinstance(primary, Mapping)):
149         #
150         # record type, find secondary files associated with fields.
151         #
152         for f in inputschema["fields"]:
153             p = primary.get(shortname(f["name"]))
154             if p:
155                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
156
157     elif (inputschema["type"] == "array" and
158           isinstance(primary, Sequence)):
159         #
160         # array type, find secondary files of elements
161         #
162         for p in primary:
163             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
164
165     elif (inputschema["type"] == "File" and
166           secondaryspec and
167           isinstance(primary, Mapping) and
168           primary.get("class") == "File" and
169           "secondaryFiles" not in primary):
170         #
171         # Found a file, check for secondaryFiles
172         #
173         specs = []
174         primary["secondaryFiles"] = secondaryspec
175         for i, sf in enumerate(aslist(secondaryspec)):
176             pattern = builder.do_eval(sf["pattern"], context=primary)
177             if pattern is None:
178                 continue
179             if isinstance(pattern, list):
180                 specs.extend(pattern)
181             elif isinstance(pattern, dict):
182                 specs.append(pattern)
183             elif isinstance(pattern, str):
184                 specs.append({"pattern": pattern})
185             else:
186                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
187                     "Expression must return list, object, string or null")
188
189         found = []
190         for i, sf in enumerate(specs):
191             if isinstance(sf, dict):
192                 if sf.get("class") == "File":
193                     pattern = sf["basename"]
194                 else:
195                     pattern = sf["pattern"]
196                     required = sf.get("required")
197             elif isinstance(sf, str):
198                 pattern = sf
199                 required = True
200             else:
201                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
202                     "Expression must return list, object, string or null")
203
204             sfpath = substitute(primary["location"], pattern)
205             required = builder.do_eval(required, context=primary)
206
207             if fsaccess.exists(sfpath):
208                 found.append({"location": sfpath, "class": "File"})
209             elif required:
210                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
211                     "Required secondary file '%s' does not exist" % sfpath)
212
213         primary["secondaryFiles"] = cmap(found)
214         if discovered is not None:
215             discovered[primary["location"]] = primary["secondaryFiles"]
216     elif inputschema["type"] not in primitive_types_set:
217         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
218
219 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
220     for inputschema in inputs:
221         primary = job_order.get(shortname(inputschema["id"]))
222         if isinstance(primary, (Mapping, Sequence)):
223             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
224
225 def upload_dependencies(arvrunner, name, document_loader,
226                         workflowobj, uri, loadref_run,
227                         include_primary=True, discovered_secondaryfiles=None):
228     """Upload the dependencies of the workflowobj document to Keep.
229
230     Returns a pathmapper object mapping local paths to keep references.  Also
231     does an in-place update of references in "workflowobj".
232
233     Use scandeps to find $import, $include, $schemas, run, File and Directory
234     fields that represent external references.
235
236     If workflowobj has an "id" field, this will reload the document to ensure
237     it is scanning the raw document prior to preprocessing.
238     """
239
240     loaded = set()
241     def loadref(b, u):
242         joined = document_loader.fetcher.urljoin(b, u)
243         defrg, _ = urllib.parse.urldefrag(joined)
244         if defrg not in loaded:
245             loaded.add(defrg)
246             # Use fetch_text to get raw file (before preprocessing).
247             text = document_loader.fetch_text(defrg)
248             if isinstance(text, bytes):
249                 textIO = StringIO(text.decode('utf-8'))
250             else:
251                 textIO = StringIO(text)
252             return yaml.safe_load(textIO)
253         else:
254             return {}
255
256     if loadref_run:
257         loadref_fields = set(("$import", "run"))
258     else:
259         loadref_fields = set(("$import",))
260
261     scanobj = workflowobj
262     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
263         # Need raw file content (before preprocessing) to ensure
264         # that external references in $include and $mixin are captured.
265         scanobj = loadref("", workflowobj["id"])
266
267     sc_result = scandeps(uri, scanobj,
268                   loadref_fields,
269                   set(("$include", "$schemas", "location")),
270                   loadref, urljoin=document_loader.fetcher.urljoin)
271
272     sc = []
273     uuids = {}
274
275     def collect_uuids(obj):
276         loc = obj.get("location", "")
277         sp = loc.split(":")
278         if sp[0] == "keep":
279             # Collect collection uuids that need to be resolved to
280             # portable data hashes
281             gp = collection_uuid_pattern.match(loc)
282             if gp:
283                 uuids[gp.groups()[0]] = obj
284             if collectionUUID in obj:
285                 uuids[obj[collectionUUID]] = obj
286
287     def collect_uploads(obj):
288         loc = obj.get("location", "")
289         sp = loc.split(":")
290         if len(sp) < 1:
291             return
292         if sp[0] in ("file", "http", "https"):
293             # Record local files than need to be uploaded,
294             # don't include file literals, keep references, etc.
295             sc.append(obj)
296         collect_uuids(obj)
297
298     visit_class(workflowobj, ("File", "Directory"), collect_uuids)
299     visit_class(sc_result, ("File", "Directory"), collect_uploads)
300
301     # Resolve any collection uuids we found to portable data hashes
302     # and assign them to uuid_map
303     uuid_map = {}
304     fetch_uuids = list(uuids.keys())
305     while fetch_uuids:
306         # For a large number of fetch_uuids, API server may limit
307         # response size, so keep fetching from API server has nothing
308         # more to give us.
309         lookups = arvrunner.api.collections().list(
310             filters=[["uuid", "in", fetch_uuids]],
311             count="none",
312             select=["uuid", "portable_data_hash"]).execute(
313                 num_retries=arvrunner.num_retries)
314
315         if not lookups["items"]:
316             break
317
318         for l in lookups["items"]:
319             uuid_map[l["uuid"]] = l["portable_data_hash"]
320
321         fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
322
323     normalizeFilesDirs(sc)
324
325     if include_primary and "id" in workflowobj:
326         sc.append({"class": "File", "location": workflowobj["id"]})
327
328     if "$schemas" in workflowobj:
329         for s in workflowobj["$schemas"]:
330             sc.append({"class": "File", "location": s})
331
332     def visit_default(obj):
333         remove = [False]
334         def ensure_default_location(f):
335             if "location" not in f and "path" in f:
336                 f["location"] = f["path"]
337                 del f["path"]
338             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
339                 # Doesn't exist, remove from list of dependencies to upload
340                 sc[:] = [x for x in sc if x["location"] != f["location"]]
341                 # Delete "default" from workflowobj
342                 remove[0] = True
343         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
344         if remove[0]:
345             del obj["default"]
346
347     find_defaults(workflowobj, visit_default)
348
349     discovered = {}
350     def discover_default_secondary_files(obj):
351         builder_job_order = {}
352         for t in obj["inputs"]:
353             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
354         # Need to create a builder object to evaluate expressions.
355         builder = make_builder(builder_job_order,
356                                obj.get("hints", []),
357                                obj.get("requirements", []),
358                                ArvRuntimeContext(),
359                                {})
360         discover_secondary_files(arvrunner.fs_access,
361                                  builder,
362                                  obj["inputs"],
363                                  builder_job_order,
364                                  discovered)
365
366     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
367     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
368
369     for d in list(discovered):
370         # Only interested in discovered secondaryFiles which are local
371         # files that need to be uploaded.
372         if d.startswith("file:"):
373             sc.extend(discovered[d])
374         else:
375             del discovered[d]
376
377     mapper = ArvPathMapper(arvrunner, sc, "",
378                            "keep:%s",
379                            "keep:%s/%s",
380                            name=name,
381                            single_collection=True)
382
383     def setloc(p):
384         loc = p.get("location")
385         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
386             p["location"] = mapper.mapper(p["location"]).resolved
387             return
388
389         if not loc:
390             return
391
392         if collectionUUID in p:
393             uuid = p[collectionUUID]
394             if uuid not in uuid_map:
395                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
396                     "Collection uuid %s not found" % uuid)
397             gp = collection_pdh_pattern.match(loc)
398             if gp and uuid_map[uuid] != gp.groups()[0]:
399                 # This file entry has both collectionUUID and a PDH
400                 # location. If the PDH doesn't match the one returned
401                 # the API server, raise an error.
402                 raise SourceLine(p, "location", validate.ValidationException).makeError(
403                     "Expected collection uuid %s to be %s but API server reported %s" % (
404                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
405
406         gp = collection_uuid_pattern.match(loc)
407         if not gp:
408             return
409         uuid = gp.groups()[0]
410         if uuid not in uuid_map:
411             raise SourceLine(p, "location", validate.ValidationException).makeError(
412                 "Collection uuid %s not found" % uuid)
413         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
414         p[collectionUUID] = uuid
415
416     visit_class(workflowobj, ("File", "Directory"), setloc)
417     visit_class(discovered, ("File", "Directory"), setloc)
418
419     if discovered_secondaryfiles is not None:
420         for d in discovered:
421             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
422
423     if "$schemas" in workflowobj:
424         sch = CommentedSeq()
425         for s in workflowobj["$schemas"]:
426             sch.append(mapper.mapper(s).resolved)
427         workflowobj["$schemas"] = sch
428
429     return mapper
430
431
432 def upload_docker(arvrunner, tool):
433     """Uploads Docker images used in CommandLineTool objects."""
434
435     if isinstance(tool, CommandLineTool):
436         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
437         if docker_req:
438             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
439                 # TODO: can be supported by containers API, but not jobs API.
440                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
441                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
442             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
443         else:
444             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
445     elif isinstance(tool, cwltool.workflow.Workflow):
446         for s in tool.steps:
447             upload_docker(arvrunner, s.embedded_tool)
448
449
450 def packed_workflow(arvrunner, tool, merged_map):
451     """Create a packed workflow.
452
453     A "packed" workflow is one where all the components have been combined into a single document."""
454
455     rewrites = {}
456     packed = pack(arvrunner.loadingContext, tool.tool["id"],
457                   rewrite_out=rewrites,
458                   loader=tool.doc_loader)
459
460     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
461
462     def visit(v, cur_id):
463         if isinstance(v, dict):
464             if v.get("class") in ("CommandLineTool", "Workflow"):
465                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
466                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
467                 if "id" in v:
468                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
469             if "path" in v and "location" not in v:
470                 v["location"] = v["path"]
471                 del v["path"]
472             if "location" in v and not v["location"].startswith("keep:"):
473                 v["location"] = merged_map[cur_id].resolved[v["location"]]
474             if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
475                 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
476             if v.get("class") == "DockerRequirement":
477                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
478             for l in v:
479                 visit(v[l], cur_id)
480         if isinstance(v, list):
481             for l in v:
482                 visit(l, cur_id)
483     visit(packed, None)
484     return packed
485
486
487 def tag_git_version(packed):
488     if tool.tool["id"].startswith("file://"):
489         path = os.path.dirname(tool.tool["id"][7:])
490         try:
491             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
492         except (OSError, subprocess.CalledProcessError):
493             pass
494         else:
495             packed["http://schema.org/version"] = githash
496
497
498 def upload_job_order(arvrunner, name, tool, job_order):
499     """Upload local files referenced in the input object and return updated input
500     object with 'location' updated to the proper keep references.
501     """
502
503     # Make a copy of the job order and set defaults.
504     builder_job_order = copy.copy(job_order)
505
506     # fill_in_defaults throws an error if there are any
507     # missing required parameters, we don't want it to do that
508     # so make them all optional.
509     inputs_copy = copy.deepcopy(tool.tool["inputs"])
510     for i in inputs_copy:
511         if "null" not in i["type"]:
512             i["type"] = ["null"] + aslist(i["type"])
513
514     fill_in_defaults(inputs_copy,
515                      builder_job_order,
516                      arvrunner.fs_access)
517     # Need to create a builder object to evaluate expressions.
518     builder = make_builder(builder_job_order,
519                            tool.hints,
520                            tool.requirements,
521                            ArvRuntimeContext(),
522                            tool.metadata)
523     # Now update job_order with secondaryFiles
524     discover_secondary_files(arvrunner.fs_access,
525                              builder,
526                              tool.tool["inputs"],
527                              job_order)
528
529     jobmapper = upload_dependencies(arvrunner,
530                                     name,
531                                     tool.doc_loader,
532                                     job_order,
533                                     job_order.get("id", "#"),
534                                     False)
535
536     if "id" in job_order:
537         del job_order["id"]
538
539     # Need to filter this out, gets added by cwltool when providing
540     # parameters on the command line.
541     if "job_order" in job_order:
542         del job_order["job_order"]
543
544     return job_order
545
546 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
547
548 def upload_workflow_deps(arvrunner, tool):
549     # Ensure that Docker images needed by this workflow are available
550
551     upload_docker(arvrunner, tool)
552
553     document_loader = tool.doc_loader
554
555     merged_map = {}
556
557     def upload_tool_deps(deptool):
558         if "id" in deptool:
559             discovered_secondaryfiles = {}
560             pm = upload_dependencies(arvrunner,
561                                      "%s dependencies" % (shortname(deptool["id"])),
562                                      document_loader,
563                                      deptool,
564                                      deptool["id"],
565                                      False,
566                                      include_primary=False,
567                                      discovered_secondaryfiles=discovered_secondaryfiles)
568             document_loader.idx[deptool["id"]] = deptool
569             toolmap = {}
570             for k,v in pm.items():
571                 toolmap[k] = v.resolved
572             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
573
574     tool.visit(upload_tool_deps)
575
576     return merged_map
577
578 def arvados_jobs_image(arvrunner, img):
579     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
580
581     try:
582         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
583     except Exception as e:
584         raise Exception("Docker image %s is not available\n%s" % (img, e) )
585
586
587 def upload_workflow_collection(arvrunner, name, packed):
588     collection = arvados.collection.Collection(api_client=arvrunner.api,
589                                                keep_client=arvrunner.keep_client,
590                                                num_retries=arvrunner.num_retries)
591     with collection.open("workflow.cwl", "w") as f:
592         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
593
594     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
595                ["name", "like", name+"%"]]
596     if arvrunner.project_uuid:
597         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
598     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
599
600     if exists["items"]:
601         logger.info("Using collection %s", exists["items"][0]["uuid"])
602     else:
603         collection.save_new(name=name,
604                             owner_uuid=arvrunner.project_uuid,
605                             ensure_unique_name=True,
606                             num_retries=arvrunner.num_retries)
607         logger.info("Uploaded to %s", collection.manifest_locator())
608
609     return collection.portable_data_hash()
610
611
612 class Runner(Process):
613     """Base class for runner processes, which submit an instance of
614     arvados-cwl-runner and wait for the final result."""
615
616     def __init__(self, runner, updated_tool,
617                  tool, loadingContext, enable_reuse,
618                  output_name, output_tags, submit_runner_ram=0,
619                  name=None, on_error=None, submit_runner_image=None,
620                  intermediate_output_ttl=0, merged_map=None,
621                  priority=None, secret_store=None,
622                  collection_cache_size=256,
623                  collection_cache_is_default=True):
624
625         loadingContext = loadingContext.copy()
626         loadingContext.metadata = updated_tool.metadata.copy()
627
628         super(Runner, self).__init__(updated_tool.tool, loadingContext)
629
630         self.arvrunner = runner
631         self.embedded_tool = tool
632         self.job_order = None
633         self.running = False
634         if enable_reuse:
635             # If reuse is permitted by command line arguments but
636             # disabled by the workflow itself, disable it.
637             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
638             if reuse_req:
639                 enable_reuse = reuse_req["enableReuse"]
640         self.enable_reuse = enable_reuse
641         self.uuid = None
642         self.final_output = None
643         self.output_name = output_name
644         self.output_tags = output_tags
645         self.name = name
646         self.on_error = on_error
647         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
648         self.intermediate_output_ttl = intermediate_output_ttl
649         self.priority = priority
650         self.secret_store = secret_store
651         self.enable_dev = loadingContext.enable_dev
652
653         self.submit_runner_cores = 1
654         self.submit_runner_ram = 1024  # defaut 1 GiB
655         self.collection_cache_size = collection_cache_size
656
657         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
658         if runner_resource_req:
659             if runner_resource_req.get("coresMin"):
660                 self.submit_runner_cores = runner_resource_req["coresMin"]
661             if runner_resource_req.get("ramMin"):
662                 self.submit_runner_ram = runner_resource_req["ramMin"]
663             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
664                 self.collection_cache_size = runner_resource_req["keep_cache"]
665
666         if submit_runner_ram:
667             # Command line / initializer overrides default and/or spec from workflow
668             self.submit_runner_ram = submit_runner_ram
669
670         if self.submit_runner_ram <= 0:
671             raise Exception("Value of submit-runner-ram must be greater than zero")
672
673         if self.submit_runner_cores <= 0:
674             raise Exception("Value of submit-runner-cores must be greater than zero")
675
676         self.merged_map = merged_map or {}
677
678     def job(self,
679             job_order,         # type: Mapping[Text, Text]
680             output_callbacks,  # type: Callable[[Any, Any], Any]
681             runtimeContext     # type: RuntimeContext
682            ):  # type: (...) -> Generator[Any, None, None]
683         self.job_order = job_order
684         self._init_job(job_order, runtimeContext)
685         yield self
686
687     def update_pipeline_component(self, record):
688         pass
689
690     def done(self, record):
691         """Base method for handling a completed runner."""
692
693         try:
694             if record["state"] == "Complete":
695                 if record.get("exit_code") is not None:
696                     if record["exit_code"] == 33:
697                         processStatus = "UnsupportedRequirement"
698                     elif record["exit_code"] == 0:
699                         processStatus = "success"
700                     else:
701                         processStatus = "permanentFail"
702                 else:
703                     processStatus = "success"
704             else:
705                 processStatus = "permanentFail"
706
707             outputs = {}
708
709             if processStatus == "permanentFail":
710                 logc = arvados.collection.CollectionReader(record["log"],
711                                                            api_client=self.arvrunner.api,
712                                                            keep_client=self.arvrunner.keep_client,
713                                                            num_retries=self.arvrunner.num_retries)
714                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
715
716             self.final_output = record["output"]
717             outc = arvados.collection.CollectionReader(self.final_output,
718                                                        api_client=self.arvrunner.api,
719                                                        keep_client=self.arvrunner.keep_client,
720                                                        num_retries=self.arvrunner.num_retries)
721             if "cwl.output.json" in outc:
722                 with outc.open("cwl.output.json", "rb") as f:
723                     if f.size() > 0:
724                         outputs = json.loads(f.read().decode())
725             def keepify(fileobj):
726                 path = fileobj["location"]
727                 if not path.startswith("keep:"):
728                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
729             adjustFileObjs(outputs, keepify)
730             adjustDirObjs(outputs, keepify)
731         except Exception:
732             logger.exception("[%s] While getting final output object", self.name)
733             self.arvrunner.output_callback({}, "permanentFail")
734         else:
735             self.arvrunner.output_callback(outputs, processStatus)