16382: When packing workflow convert "path" to "location"
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
21
22 if os.name == "posix" and sys.version_info[0] < 3:
23     import subprocess32 as subprocess
24 else:
25     import subprocess
26
27 from schema_salad.sourceline import SourceLine, cmap
28
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32                              shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.utils import aslist
36 from cwltool.builder import substitute
37 from cwltool.pack import pack
38 from cwltool.update import INTERNAL_VERSION
39 from cwltool.builder import Builder
40 import schema_salad.validate as validate
41
42 import arvados.collection
43 from .util import collectionUUID
44 import ruamel.yaml as yaml
45 from ruamel.yaml.comments import CommentedMap, CommentedSeq
46
47 import arvados_cwl.arvdocker
48 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
49 from ._version import __version__
50 from . import done
51 from . context import ArvRuntimeContext
52
53 logger = logging.getLogger('arvados.cwl-runner')
54
55 def trim_anonymous_location(obj):
56     """Remove 'location' field from File and Directory literals.
57
58     To make internal handling easier, literals are assigned a random id for
59     'location'.  However, when writing the record back out, this can break
60     reproducibility.  Since it is valid for literals not have a 'location'
61     field, remove it.
62
63     """
64
65     if obj.get("location", "").startswith("_:"):
66         del obj["location"]
67
68
69 def remove_redundant_fields(obj):
70     for field in ("path", "nameext", "nameroot", "dirname"):
71         if field in obj:
72             del obj[field]
73
74
75 def find_defaults(d, op):
76     if isinstance(d, list):
77         for i in d:
78             find_defaults(i, op)
79     elif isinstance(d, dict):
80         if "default" in d:
81             op(d)
82         else:
83             for i in viewvalues(d):
84                 find_defaults(i, op)
85
86 def make_builder(joborder, hints, requirements, runtimeContext):
87     return Builder(
88                  job=joborder,
89                  files=[],               # type: List[Dict[Text, Text]]
90                  bindings=[],            # type: List[Dict[Text, Any]]
91                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
92                  names=None,               # type: Names
93                  requirements=requirements,        # type: List[Dict[Text, Any]]
94                  hints=hints,               # type: List[Dict[Text, Any]]
95                  resources={},           # type: Dict[str, int]
96                  mutation_manager=None,    # type: Optional[MutationManager]
97                  formatgraph=None,         # type: Optional[Graph]
98                  make_fs_access=None,      # type: Type[StdFsAccess]
99                  fs_access=None,           # type: StdFsAccess
100                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
101                  timeout=runtimeContext.eval_timeout,             # type: float
102                  debug=runtimeContext.debug,               # type: bool
103                  js_console=runtimeContext.js_console,          # type: bool
104                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
105                  loadListing="",         # type: Text
106                  outdir="",              # type: Text
107                  tmpdir="",              # type: Text
108                  stagedir="",            # type: Text
109                 )
110
111 def search_schemadef(name, reqs):
112     for r in reqs:
113         if r["class"] == "SchemaDefRequirement":
114             for sd in r["types"]:
115                 if sd["name"] == name:
116                     return sd
117     return None
118
119 primitive_types_set = frozenset(("null", "boolean", "int", "long",
120                                  "float", "double", "string", "record",
121                                  "array", "enum"))
122
123 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
124     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
125         # union type, collect all possible secondaryFiles
126         for i in inputschema:
127             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
128         return
129
130     if isinstance(inputschema, basestring):
131         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
132         if sd:
133             inputschema = sd
134         else:
135             return
136
137     if "secondaryFiles" in inputschema:
138         # set secondaryFiles, may be inherited by compound types.
139         secondaryspec = inputschema["secondaryFiles"]
140
141     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
142         not isinstance(inputschema["type"], basestring)):
143         # compound type (union, array, record)
144         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
145
146     elif (inputschema["type"] == "record" and
147           isinstance(primary, Mapping)):
148         #
149         # record type, find secondary files associated with fields.
150         #
151         for f in inputschema["fields"]:
152             p = primary.get(shortname(f["name"]))
153             if p:
154                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
155
156     elif (inputschema["type"] == "array" and
157           isinstance(primary, Sequence)):
158         #
159         # array type, find secondary files of elements
160         #
161         for p in primary:
162             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
163
164     elif (inputschema["type"] == "File" and
165           secondaryspec and
166           isinstance(primary, Mapping) and
167           primary.get("class") == "File" and
168           "secondaryFiles" not in primary):
169         #
170         # Found a file, check for secondaryFiles
171         #
172         primary["secondaryFiles"] = []
173         for i, sf in enumerate(aslist(secondaryspec)):
174             pattern = builder.do_eval(sf["pattern"], context=primary)
175             if pattern is None:
176                 continue
177             sfpath = substitute(primary["location"], pattern)
178             required = builder.do_eval(sf.get("required"), context=primary)
179
180             if fsaccess.exists(sfpath):
181                 primary["secondaryFiles"].append({"location": sfpath, "class": "File"})
182             elif required:
183                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
184                     "Required secondary file '%s' does not exist" % sfpath)
185
186         primary["secondaryFiles"] = cmap(primary["secondaryFiles"])
187         if discovered is not None:
188             discovered[primary["location"]] = primary["secondaryFiles"]
189     elif inputschema["type"] not in primitive_types_set:
190         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
191
192 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
193     for inputschema in inputs:
194         primary = job_order.get(shortname(inputschema["id"]))
195         if isinstance(primary, (Mapping, Sequence)):
196             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
197
198 def upload_dependencies(arvrunner, name, document_loader,
199                         workflowobj, uri, loadref_run,
200                         include_primary=True, discovered_secondaryfiles=None):
201     """Upload the dependencies of the workflowobj document to Keep.
202
203     Returns a pathmapper object mapping local paths to keep references.  Also
204     does an in-place update of references in "workflowobj".
205
206     Use scandeps to find $import, $include, $schemas, run, File and Directory
207     fields that represent external references.
208
209     If workflowobj has an "id" field, this will reload the document to ensure
210     it is scanning the raw document prior to preprocessing.
211     """
212
213     loaded = set()
214     def loadref(b, u):
215         joined = document_loader.fetcher.urljoin(b, u)
216         defrg, _ = urllib.parse.urldefrag(joined)
217         if defrg not in loaded:
218             loaded.add(defrg)
219             # Use fetch_text to get raw file (before preprocessing).
220             text = document_loader.fetch_text(defrg)
221             if isinstance(text, bytes):
222                 textIO = StringIO(text.decode('utf-8'))
223             else:
224                 textIO = StringIO(text)
225             return yaml.safe_load(textIO)
226         else:
227             return {}
228
229     if loadref_run:
230         loadref_fields = set(("$import", "run"))
231     else:
232         loadref_fields = set(("$import",))
233
234     scanobj = workflowobj
235     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
236         # Need raw file content (before preprocessing) to ensure
237         # that external references in $include and $mixin are captured.
238         scanobj = loadref("", workflowobj["id"])
239
240     sc_result = scandeps(uri, scanobj,
241                   loadref_fields,
242                   set(("$include", "$schemas", "location")),
243                   loadref, urljoin=document_loader.fetcher.urljoin)
244
245     sc = []
246     uuids = {}
247
248     def collect_uuids(obj):
249         loc = obj.get("location", "")
250         sp = loc.split(":")
251         if sp[0] == "keep":
252             # Collect collection uuids that need to be resolved to
253             # portable data hashes
254             gp = collection_uuid_pattern.match(loc)
255             if gp:
256                 uuids[gp.groups()[0]] = obj
257             if collectionUUID in obj:
258                 uuids[obj[collectionUUID]] = obj
259
260     def collect_uploads(obj):
261         loc = obj.get("location", "")
262         sp = loc.split(":")
263         if len(sp) < 1:
264             return
265         if sp[0] in ("file", "http", "https"):
266             # Record local files than need to be uploaded,
267             # don't include file literals, keep references, etc.
268             sc.append(obj)
269         collect_uuids(obj)
270
271     visit_class(workflowobj, ("File", "Directory"), collect_uuids)
272     visit_class(sc_result, ("File", "Directory"), collect_uploads)
273
274     # Resolve any collection uuids we found to portable data hashes
275     # and assign them to uuid_map
276     uuid_map = {}
277     fetch_uuids = list(uuids.keys())
278     while fetch_uuids:
279         # For a large number of fetch_uuids, API server may limit
280         # response size, so keep fetching from API server has nothing
281         # more to give us.
282         lookups = arvrunner.api.collections().list(
283             filters=[["uuid", "in", fetch_uuids]],
284             count="none",
285             select=["uuid", "portable_data_hash"]).execute(
286                 num_retries=arvrunner.num_retries)
287
288         if not lookups["items"]:
289             break
290
291         for l in lookups["items"]:
292             uuid_map[l["uuid"]] = l["portable_data_hash"]
293
294         fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
295
296     normalizeFilesDirs(sc)
297
298     if include_primary and "id" in workflowobj:
299         sc.append({"class": "File", "location": workflowobj["id"]})
300
301     if "$schemas" in workflowobj:
302         for s in workflowobj["$schemas"]:
303             sc.append({"class": "File", "location": s})
304
305     def visit_default(obj):
306         remove = [False]
307         def ensure_default_location(f):
308             if "location" not in f and "path" in f:
309                 f["location"] = f["path"]
310                 del f["path"]
311             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
312                 # Doesn't exist, remove from list of dependencies to upload
313                 sc[:] = [x for x in sc if x["location"] != f["location"]]
314                 # Delete "default" from workflowobj
315                 remove[0] = True
316         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
317         if remove[0]:
318             del obj["default"]
319
320     find_defaults(workflowobj, visit_default)
321
322     discovered = {}
323     def discover_default_secondary_files(obj):
324         builder_job_order = {}
325         for t in obj["inputs"]:
326             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
327         # Need to create a builder object to evaluate expressions.
328         builder = make_builder(builder_job_order,
329                                obj.get("hints", []),
330                                obj.get("requirements", []),
331                                ArvRuntimeContext())
332         discover_secondary_files(arvrunner.fs_access,
333                                  builder,
334                                  obj["inputs"],
335                                  builder_job_order,
336                                  discovered)
337
338     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
339     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
340
341     for d in list(discovered):
342         # Only interested in discovered secondaryFiles which are local
343         # files that need to be uploaded.
344         if d.startswith("file:"):
345             sc.extend(discovered[d])
346         else:
347             del discovered[d]
348
349     mapper = ArvPathMapper(arvrunner, sc, "",
350                            "keep:%s",
351                            "keep:%s/%s",
352                            name=name,
353                            single_collection=True)
354
355     def setloc(p):
356         loc = p.get("location")
357         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
358             p["location"] = mapper.mapper(p["location"]).resolved
359             return
360
361         if not loc:
362             return
363
364         if collectionUUID in p:
365             uuid = p[collectionUUID]
366             if uuid not in uuid_map:
367                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
368                     "Collection uuid %s not found" % uuid)
369             gp = collection_pdh_pattern.match(loc)
370             if gp and uuid_map[uuid] != gp.groups()[0]:
371                 # This file entry has both collectionUUID and a PDH
372                 # location. If the PDH doesn't match the one returned
373                 # the API server, raise an error.
374                 raise SourceLine(p, "location", validate.ValidationException).makeError(
375                     "Expected collection uuid %s to be %s but API server reported %s" % (
376                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
377
378         gp = collection_uuid_pattern.match(loc)
379         if not gp:
380             return
381         uuid = gp.groups()[0]
382         if uuid not in uuid_map:
383             raise SourceLine(p, "location", validate.ValidationException).makeError(
384                 "Collection uuid %s not found" % uuid)
385         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
386         p[collectionUUID] = uuid
387
388     visit_class(workflowobj, ("File", "Directory"), setloc)
389     visit_class(discovered, ("File", "Directory"), setloc)
390
391     if discovered_secondaryfiles is not None:
392         for d in discovered:
393             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
394
395     if "$schemas" in workflowobj:
396         sch = CommentedSeq()
397         for s in workflowobj["$schemas"]:
398             sch.append(mapper.mapper(s).resolved)
399         workflowobj["$schemas"] = sch
400
401     return mapper
402
403
404 def upload_docker(arvrunner, tool):
405     """Uploads Docker images used in CommandLineTool objects."""
406
407     if isinstance(tool, CommandLineTool):
408         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
409         if docker_req:
410             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
411                 # TODO: can be supported by containers API, but not jobs API.
412                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
413                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
414             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
415         else:
416             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
417     elif isinstance(tool, cwltool.workflow.Workflow):
418         for s in tool.steps:
419             upload_docker(arvrunner, s.embedded_tool)
420
421
422 def packed_workflow(arvrunner, tool, merged_map):
423     """Create a packed workflow.
424
425     A "packed" workflow is one where all the components have been combined into a single document."""
426
427     rewrites = {}
428     packed = pack(arvrunner.loadingContext, tool.tool["id"],
429                   rewrite_out=rewrites,
430                   loader=tool.doc_loader)
431
432     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
433
434     def visit(v, cur_id):
435         if isinstance(v, dict):
436             if v.get("class") in ("CommandLineTool", "Workflow"):
437                 if "id" not in v:
438                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
439                 cur_id = rewrite_to_orig.get(v["id"], v["id"])
440             if "path" in v and "location" not in v:
441                 v["location"] = v["path"]
442                 del v["path"]
443             if "location" in v and not v["location"].startswith("keep:"):
444                 v["location"] = merged_map[cur_id].resolved[v["location"]]
445             if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
446                 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
447             if v.get("class") == "DockerRequirement":
448                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
449             for l in v:
450                 visit(v[l], cur_id)
451         if isinstance(v, list):
452             for l in v:
453                 visit(l, cur_id)
454     visit(packed, None)
455     return packed
456
457
458 def tag_git_version(packed):
459     if tool.tool["id"].startswith("file://"):
460         path = os.path.dirname(tool.tool["id"][7:])
461         try:
462             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
463         except (OSError, subprocess.CalledProcessError):
464             pass
465         else:
466             packed["http://schema.org/version"] = githash
467
468
469 def upload_job_order(arvrunner, name, tool, job_order):
470     """Upload local files referenced in the input object and return updated input
471     object with 'location' updated to the proper keep references.
472     """
473
474     # Make a copy of the job order and set defaults.
475     builder_job_order = copy.copy(job_order)
476
477     # fill_in_defaults throws an error if there are any
478     # missing required parameters, we don't want it to do that
479     # so make them all optional.
480     inputs_copy = copy.deepcopy(tool.tool["inputs"])
481     for i in inputs_copy:
482         if "null" not in i["type"]:
483             i["type"] = ["null"] + aslist(i["type"])
484
485     fill_in_defaults(inputs_copy,
486                      builder_job_order,
487                      arvrunner.fs_access)
488     # Need to create a builder object to evaluate expressions.
489     builder = make_builder(builder_job_order,
490                            tool.hints,
491                            tool.requirements,
492                            ArvRuntimeContext())
493     # Now update job_order with secondaryFiles
494     discover_secondary_files(arvrunner.fs_access,
495                              builder,
496                              tool.tool["inputs"],
497                              job_order)
498
499     jobmapper = upload_dependencies(arvrunner,
500                                     name,
501                                     tool.doc_loader,
502                                     job_order,
503                                     job_order.get("id", "#"),
504                                     False)
505
506     if "id" in job_order:
507         del job_order["id"]
508
509     # Need to filter this out, gets added by cwltool when providing
510     # parameters on the command line.
511     if "job_order" in job_order:
512         del job_order["job_order"]
513
514     return job_order
515
516 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
517
518 def upload_workflow_deps(arvrunner, tool):
519     # Ensure that Docker images needed by this workflow are available
520
521     upload_docker(arvrunner, tool)
522
523     document_loader = tool.doc_loader
524
525     merged_map = {}
526
527     def upload_tool_deps(deptool):
528         if "id" in deptool:
529             discovered_secondaryfiles = {}
530             pm = upload_dependencies(arvrunner,
531                                      "%s dependencies" % (shortname(deptool["id"])),
532                                      document_loader,
533                                      deptool,
534                                      deptool["id"],
535                                      False,
536                                      include_primary=False,
537                                      discovered_secondaryfiles=discovered_secondaryfiles)
538             document_loader.idx[deptool["id"]] = deptool
539             toolmap = {}
540             for k,v in pm.items():
541                 toolmap[k] = v.resolved
542             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
543
544     tool.visit(upload_tool_deps)
545
546     return merged_map
547
548 def arvados_jobs_image(arvrunner, img):
549     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
550
551     try:
552         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
553     except Exception as e:
554         raise Exception("Docker image %s is not available\n%s" % (img, e) )
555
556
557 def upload_workflow_collection(arvrunner, name, packed):
558     collection = arvados.collection.Collection(api_client=arvrunner.api,
559                                                keep_client=arvrunner.keep_client,
560                                                num_retries=arvrunner.num_retries)
561     with collection.open("workflow.cwl", "w") as f:
562         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
563
564     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
565                ["name", "like", name+"%"]]
566     if arvrunner.project_uuid:
567         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
568     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
569
570     if exists["items"]:
571         logger.info("Using collection %s", exists["items"][0]["uuid"])
572     else:
573         collection.save_new(name=name,
574                             owner_uuid=arvrunner.project_uuid,
575                             ensure_unique_name=True,
576                             num_retries=arvrunner.num_retries)
577         logger.info("Uploaded to %s", collection.manifest_locator())
578
579     return collection.portable_data_hash()
580
581
582 class Runner(Process):
583     """Base class for runner processes, which submit an instance of
584     arvados-cwl-runner and wait for the final result."""
585
586     def __init__(self, runner, updated_tool,
587                  tool, loadingContext, enable_reuse,
588                  output_name, output_tags, submit_runner_ram=0,
589                  name=None, on_error=None, submit_runner_image=None,
590                  intermediate_output_ttl=0, merged_map=None,
591                  priority=None, secret_store=None,
592                  collection_cache_size=256,
593                  collection_cache_is_default=True):
594
595         loadingContext = loadingContext.copy()
596         loadingContext.metadata = updated_tool.metadata.copy()
597
598         super(Runner, self).__init__(updated_tool.tool, loadingContext)
599
600         self.arvrunner = runner
601         self.embedded_tool = tool
602         self.job_order = None
603         self.running = False
604         if enable_reuse:
605             # If reuse is permitted by command line arguments but
606             # disabled by the workflow itself, disable it.
607             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
608             if reuse_req:
609                 enable_reuse = reuse_req["enableReuse"]
610         self.enable_reuse = enable_reuse
611         self.uuid = None
612         self.final_output = None
613         self.output_name = output_name
614         self.output_tags = output_tags
615         self.name = name
616         self.on_error = on_error
617         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
618         self.intermediate_output_ttl = intermediate_output_ttl
619         self.priority = priority
620         self.secret_store = secret_store
621         self.enable_dev = loadingContext.enable_dev
622
623         self.submit_runner_cores = 1
624         self.submit_runner_ram = 1024  # defaut 1 GiB
625         self.collection_cache_size = collection_cache_size
626
627         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
628         if runner_resource_req:
629             if runner_resource_req.get("coresMin"):
630                 self.submit_runner_cores = runner_resource_req["coresMin"]
631             if runner_resource_req.get("ramMin"):
632                 self.submit_runner_ram = runner_resource_req["ramMin"]
633             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
634                 self.collection_cache_size = runner_resource_req["keep_cache"]
635
636         if submit_runner_ram:
637             # Command line / initializer overrides default and/or spec from workflow
638             self.submit_runner_ram = submit_runner_ram
639
640         if self.submit_runner_ram <= 0:
641             raise Exception("Value of submit-runner-ram must be greater than zero")
642
643         if self.submit_runner_cores <= 0:
644             raise Exception("Value of submit-runner-cores must be greater than zero")
645
646         self.merged_map = merged_map or {}
647
648     def job(self,
649             job_order,         # type: Mapping[Text, Text]
650             output_callbacks,  # type: Callable[[Any, Any], Any]
651             runtimeContext     # type: RuntimeContext
652            ):  # type: (...) -> Generator[Any, None, None]
653         self.job_order = job_order
654         self._init_job(job_order, runtimeContext)
655         yield self
656
657     def update_pipeline_component(self, record):
658         pass
659
660     def done(self, record):
661         """Base method for handling a completed runner."""
662
663         try:
664             if record["state"] == "Complete":
665                 if record.get("exit_code") is not None:
666                     if record["exit_code"] == 33:
667                         processStatus = "UnsupportedRequirement"
668                     elif record["exit_code"] == 0:
669                         processStatus = "success"
670                     else:
671                         processStatus = "permanentFail"
672                 else:
673                     processStatus = "success"
674             else:
675                 processStatus = "permanentFail"
676
677             outputs = {}
678
679             if processStatus == "permanentFail":
680                 logc = arvados.collection.CollectionReader(record["log"],
681                                                            api_client=self.arvrunner.api,
682                                                            keep_client=self.arvrunner.keep_client,
683                                                            num_retries=self.arvrunner.num_retries)
684                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
685
686             self.final_output = record["output"]
687             outc = arvados.collection.CollectionReader(self.final_output,
688                                                        api_client=self.arvrunner.api,
689                                                        keep_client=self.arvrunner.keep_client,
690                                                        num_retries=self.arvrunner.num_retries)
691             if "cwl.output.json" in outc:
692                 with outc.open("cwl.output.json", "rb") as f:
693                     if f.size() > 0:
694                         outputs = json.loads(f.read().decode())
695             def keepify(fileobj):
696                 path = fileobj["location"]
697                 if not path.startswith("keep:"):
698                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
699             adjustFileObjs(outputs, keepify)
700             adjustDirObjs(outputs, keepify)
701         except Exception:
702             logger.exception("[%s] While getting final output object", self.name)
703             self.arvrunner.output_callback({}, "permanentFail")
704         else:
705             self.arvrunner.output_callback(outputs, processStatus)