Merge branch '16392-url-trailing-slash'
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
21
22 if os.name == "posix" and sys.version_info[0] < 3:
23     import subprocess32 as subprocess
24 else:
25     import subprocess
26
27 from schema_salad.sourceline import SourceLine, cmap
28
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32                              shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.utils import aslist
36 from cwltool.builder import substitute
37 from cwltool.pack import pack
38 from cwltool.update import INTERNAL_VERSION
39 from cwltool.builder import Builder
40 import schema_salad.validate as validate
41
42 import arvados.collection
43 from .util import collectionUUID
44 import ruamel.yaml as yaml
45 from ruamel.yaml.comments import CommentedMap, CommentedSeq
46
47 import arvados_cwl.arvdocker
48 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
49 from ._version import __version__
50 from . import done
51 from . context import ArvRuntimeContext
52
53 logger = logging.getLogger('arvados.cwl-runner')
54
55 def trim_anonymous_location(obj):
56     """Remove 'location' field from File and Directory literals.
57
58     To make internal handling easier, literals are assigned a random id for
59     'location'.  However, when writing the record back out, this can break
60     reproducibility.  Since it is valid for literals not have a 'location'
61     field, remove it.
62
63     """
64
65     if obj.get("location", "").startswith("_:"):
66         del obj["location"]
67
68
69 def remove_redundant_fields(obj):
70     for field in ("path", "nameext", "nameroot", "dirname"):
71         if field in obj:
72             del obj[field]
73
74
75 def find_defaults(d, op):
76     if isinstance(d, list):
77         for i in d:
78             find_defaults(i, op)
79     elif isinstance(d, dict):
80         if "default" in d:
81             op(d)
82         else:
83             for i in viewvalues(d):
84                 find_defaults(i, op)
85
86 def make_builder(joborder, hints, requirements, runtimeContext):
87     return Builder(
88                  job=joborder,
89                  files=[],               # type: List[Dict[Text, Text]]
90                  bindings=[],            # type: List[Dict[Text, Any]]
91                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
92                  names=None,               # type: Names
93                  requirements=requirements,        # type: List[Dict[Text, Any]]
94                  hints=hints,               # type: List[Dict[Text, Any]]
95                  resources={},           # type: Dict[str, int]
96                  mutation_manager=None,    # type: Optional[MutationManager]
97                  formatgraph=None,         # type: Optional[Graph]
98                  make_fs_access=None,      # type: Type[StdFsAccess]
99                  fs_access=None,           # type: StdFsAccess
100                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
101                  timeout=runtimeContext.eval_timeout,             # type: float
102                  debug=runtimeContext.debug,               # type: bool
103                  js_console=runtimeContext.js_console,          # type: bool
104                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
105                  loadListing="",         # type: Text
106                  outdir="",              # type: Text
107                  tmpdir="",              # type: Text
108                  stagedir="",            # type: Text
109                 )
110
111 def search_schemadef(name, reqs):
112     for r in reqs:
113         if r["class"] == "SchemaDefRequirement":
114             for sd in r["types"]:
115                 if sd["name"] == name:
116                     return sd
117     return None
118
119 primitive_types_set = frozenset(("null", "boolean", "int", "long",
120                                  "float", "double", "string", "record",
121                                  "array", "enum"))
122
123 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
124     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
125         # union type, collect all possible secondaryFiles
126         for i in inputschema:
127             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
128         return
129
130     if isinstance(inputschema, basestring):
131         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
132         if sd:
133             inputschema = sd
134         else:
135             return
136
137     if "secondaryFiles" in inputschema:
138         # set secondaryFiles, may be inherited by compound types.
139         secondaryspec = inputschema["secondaryFiles"]
140
141     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
142         not isinstance(inputschema["type"], basestring)):
143         # compound type (union, array, record)
144         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
145
146     elif (inputschema["type"] == "record" and
147           isinstance(primary, Mapping)):
148         #
149         # record type, find secondary files associated with fields.
150         #
151         for f in inputschema["fields"]:
152             p = primary.get(shortname(f["name"]))
153             if p:
154                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
155
156     elif (inputschema["type"] == "array" and
157           isinstance(primary, Sequence)):
158         #
159         # array type, find secondary files of elements
160         #
161         for p in primary:
162             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
163
164     elif (inputschema["type"] == "File" and
165           secondaryspec and
166           isinstance(primary, Mapping) and
167           primary.get("class") == "File" and
168           "secondaryFiles" not in primary):
169         #
170         # Found a file, check for secondaryFiles
171         #
172         primary["secondaryFiles"] = []
173         for i, sf in enumerate(aslist(secondaryspec)):
174             pattern = builder.do_eval(sf["pattern"], context=primary)
175             if pattern is None:
176                 continue
177             sfpath = substitute(primary["location"], pattern)
178             required = builder.do_eval(sf.get("required"), context=primary)
179
180             if fsaccess.exists(sfpath):
181                 primary["secondaryFiles"].append({"location": sfpath, "class": "File"})
182             elif required:
183                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
184                     "Required secondary file '%s' does not exist" % sfpath)
185
186         primary["secondaryFiles"] = cmap(primary["secondaryFiles"])
187         if discovered is not None:
188             discovered[primary["location"]] = primary["secondaryFiles"]
189     elif inputschema["type"] not in primitive_types_set:
190         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
191
192 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
193     for inputschema in inputs:
194         primary = job_order.get(shortname(inputschema["id"]))
195         if isinstance(primary, (Mapping, Sequence)):
196             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
197
198 def upload_dependencies(arvrunner, name, document_loader,
199                         workflowobj, uri, loadref_run,
200                         include_primary=True, discovered_secondaryfiles=None):
201     """Upload the dependencies of the workflowobj document to Keep.
202
203     Returns a pathmapper object mapping local paths to keep references.  Also
204     does an in-place update of references in "workflowobj".
205
206     Use scandeps to find $import, $include, $schemas, run, File and Directory
207     fields that represent external references.
208
209     If workflowobj has an "id" field, this will reload the document to ensure
210     it is scanning the raw document prior to preprocessing.
211     """
212
213     loaded = set()
214     def loadref(b, u):
215         joined = document_loader.fetcher.urljoin(b, u)
216         defrg, _ = urllib.parse.urldefrag(joined)
217         if defrg not in loaded:
218             loaded.add(defrg)
219             # Use fetch_text to get raw file (before preprocessing).
220             text = document_loader.fetch_text(defrg)
221             if isinstance(text, bytes):
222                 textIO = StringIO(text.decode('utf-8'))
223             else:
224                 textIO = StringIO(text)
225             return yaml.safe_load(textIO)
226         else:
227             return {}
228
229     if loadref_run:
230         loadref_fields = set(("$import", "run"))
231     else:
232         loadref_fields = set(("$import",))
233
234     scanobj = workflowobj
235     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
236         # Need raw file content (before preprocessing) to ensure
237         # that external references in $include and $mixin are captured.
238         scanobj = loadref("", workflowobj["id"])
239
240     sc_result = scandeps(uri, scanobj,
241                   loadref_fields,
242                   set(("$include", "$schemas", "location")),
243                   loadref, urljoin=document_loader.fetcher.urljoin)
244
245     sc = []
246     uuids = {}
247
248     def collect_uuids(obj):
249         loc = obj.get("location", "")
250         sp = loc.split(":")
251         if sp[0] == "keep":
252             # Collect collection uuids that need to be resolved to
253             # portable data hashes
254             gp = collection_uuid_pattern.match(loc)
255             if gp:
256                 uuids[gp.groups()[0]] = obj
257             if collectionUUID in obj:
258                 uuids[obj[collectionUUID]] = obj
259
260     def collect_uploads(obj):
261         loc = obj.get("location", "")
262         sp = loc.split(":")
263         if len(sp) < 1:
264             return
265         if sp[0] in ("file", "http", "https"):
266             # Record local files than need to be uploaded,
267             # don't include file literals, keep references, etc.
268             sc.append(obj)
269         collect_uuids(obj)
270
271     visit_class(workflowobj, ("File", "Directory"), collect_uuids)
272     visit_class(sc_result, ("File", "Directory"), collect_uploads)
273
274     # Resolve any collection uuids we found to portable data hashes
275     # and assign them to uuid_map
276     uuid_map = {}
277     fetch_uuids = list(uuids.keys())
278     while fetch_uuids:
279         # For a large number of fetch_uuids, API server may limit
280         # response size, so keep fetching from API server has nothing
281         # more to give us.
282         lookups = arvrunner.api.collections().list(
283             filters=[["uuid", "in", fetch_uuids]],
284             count="none",
285             select=["uuid", "portable_data_hash"]).execute(
286                 num_retries=arvrunner.num_retries)
287
288         if not lookups["items"]:
289             break
290
291         for l in lookups["items"]:
292             uuid_map[l["uuid"]] = l["portable_data_hash"]
293
294         fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
295
296     normalizeFilesDirs(sc)
297
298     if include_primary and "id" in workflowobj:
299         sc.append({"class": "File", "location": workflowobj["id"]})
300
301     if "$schemas" in workflowobj:
302         for s in workflowobj["$schemas"]:
303             sc.append({"class": "File", "location": s})
304
305     def visit_default(obj):
306         remove = [False]
307         def ensure_default_location(f):
308             if "location" not in f and "path" in f:
309                 f["location"] = f["path"]
310                 del f["path"]
311             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
312                 # Doesn't exist, remove from list of dependencies to upload
313                 sc[:] = [x for x in sc if x["location"] != f["location"]]
314                 # Delete "default" from workflowobj
315                 remove[0] = True
316         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
317         if remove[0]:
318             del obj["default"]
319
320     find_defaults(workflowobj, visit_default)
321
322     discovered = {}
323     def discover_default_secondary_files(obj):
324         builder_job_order = {}
325         for t in obj["inputs"]:
326             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
327         # Need to create a builder object to evaluate expressions.
328         builder = make_builder(builder_job_order,
329                                obj.get("hints", []),
330                                obj.get("requirements", []),
331                                ArvRuntimeContext())
332         discover_secondary_files(arvrunner.fs_access,
333                                  builder,
334                                  obj["inputs"],
335                                  builder_job_order,
336                                  discovered)
337
338     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
339     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
340
341     for d in list(discovered):
342         # Only interested in discovered secondaryFiles which are local
343         # files that need to be uploaded.
344         if d.startswith("file:"):
345             sc.extend(discovered[d])
346         else:
347             del discovered[d]
348
349     mapper = ArvPathMapper(arvrunner, sc, "",
350                            "keep:%s",
351                            "keep:%s/%s",
352                            name=name,
353                            single_collection=True)
354
355     def setloc(p):
356         loc = p.get("location")
357         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
358             p["location"] = mapper.mapper(p["location"]).resolved
359             return
360
361         if not loc:
362             return
363
364         if collectionUUID in p:
365             uuid = p[collectionUUID]
366             if uuid not in uuid_map:
367                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
368                     "Collection uuid %s not found" % uuid)
369             gp = collection_pdh_pattern.match(loc)
370             if gp and uuid_map[uuid] != gp.groups()[0]:
371                 # This file entry has both collectionUUID and a PDH
372                 # location. If the PDH doesn't match the one returned
373                 # the API server, raise an error.
374                 raise SourceLine(p, "location", validate.ValidationException).makeError(
375                     "Expected collection uuid %s to be %s but API server reported %s" % (
376                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
377
378         gp = collection_uuid_pattern.match(loc)
379         if not gp:
380             return
381         uuid = gp.groups()[0]
382         if uuid not in uuid_map:
383             raise SourceLine(p, "location", validate.ValidationException).makeError(
384                 "Collection uuid %s not found" % uuid)
385         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
386         p[collectionUUID] = uuid
387
388     visit_class(workflowobj, ("File", "Directory"), setloc)
389     visit_class(discovered, ("File", "Directory"), setloc)
390
391     if discovered_secondaryfiles is not None:
392         for d in discovered:
393             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
394
395     if "$schemas" in workflowobj:
396         sch = CommentedSeq()
397         for s in workflowobj["$schemas"]:
398             sch.append(mapper.mapper(s).resolved)
399         workflowobj["$schemas"] = sch
400
401     return mapper
402
403
404 def upload_docker(arvrunner, tool):
405     """Uploads Docker images used in CommandLineTool objects."""
406
407     if isinstance(tool, CommandLineTool):
408         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
409         if docker_req:
410             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
411                 # TODO: can be supported by containers API, but not jobs API.
412                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
413                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
414             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
415         else:
416             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
417     elif isinstance(tool, cwltool.workflow.Workflow):
418         for s in tool.steps:
419             upload_docker(arvrunner, s.embedded_tool)
420
421
422 def packed_workflow(arvrunner, tool, merged_map):
423     """Create a packed workflow.
424
425     A "packed" workflow is one where all the components have been combined into a single document."""
426
427     rewrites = {}
428     packed = pack(arvrunner.loadingContext, tool.tool["id"],
429                   rewrite_out=rewrites,
430                   loader=tool.doc_loader)
431
432     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
433
434     def visit(v, cur_id):
435         if isinstance(v, dict):
436             if v.get("class") in ("CommandLineTool", "Workflow"):
437                 if "id" not in v:
438                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
439                 cur_id = rewrite_to_orig.get(v["id"], v["id"])
440             if "location" in v and not v["location"].startswith("keep:"):
441                 v["location"] = merged_map[cur_id].resolved[v["location"]]
442             if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
443                 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
444             if v.get("class") == "DockerRequirement":
445                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
446             for l in v:
447                 visit(v[l], cur_id)
448         if isinstance(v, list):
449             for l in v:
450                 visit(l, cur_id)
451     visit(packed, None)
452     return packed
453
454
455 def tag_git_version(packed):
456     if tool.tool["id"].startswith("file://"):
457         path = os.path.dirname(tool.tool["id"][7:])
458         try:
459             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
460         except (OSError, subprocess.CalledProcessError):
461             pass
462         else:
463             packed["http://schema.org/version"] = githash
464
465
466 def upload_job_order(arvrunner, name, tool, job_order):
467     """Upload local files referenced in the input object and return updated input
468     object with 'location' updated to the proper keep references.
469     """
470
471     # Make a copy of the job order and set defaults.
472     builder_job_order = copy.copy(job_order)
473
474     # fill_in_defaults throws an error if there are any
475     # missing required parameters, we don't want it to do that
476     # so make them all optional.
477     inputs_copy = copy.deepcopy(tool.tool["inputs"])
478     for i in inputs_copy:
479         if "null" not in i["type"]:
480             i["type"] = ["null"] + aslist(i["type"])
481
482     fill_in_defaults(inputs_copy,
483                      builder_job_order,
484                      arvrunner.fs_access)
485     # Need to create a builder object to evaluate expressions.
486     builder = make_builder(builder_job_order,
487                            tool.hints,
488                            tool.requirements,
489                            ArvRuntimeContext())
490     # Now update job_order with secondaryFiles
491     discover_secondary_files(arvrunner.fs_access,
492                              builder,
493                              tool.tool["inputs"],
494                              job_order)
495
496     jobmapper = upload_dependencies(arvrunner,
497                                     name,
498                                     tool.doc_loader,
499                                     job_order,
500                                     job_order.get("id", "#"),
501                                     False)
502
503     if "id" in job_order:
504         del job_order["id"]
505
506     # Need to filter this out, gets added by cwltool when providing
507     # parameters on the command line.
508     if "job_order" in job_order:
509         del job_order["job_order"]
510
511     return job_order
512
513 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
514
515 def upload_workflow_deps(arvrunner, tool):
516     # Ensure that Docker images needed by this workflow are available
517
518     upload_docker(arvrunner, tool)
519
520     document_loader = tool.doc_loader
521
522     merged_map = {}
523
524     def upload_tool_deps(deptool):
525         if "id" in deptool:
526             discovered_secondaryfiles = {}
527             pm = upload_dependencies(arvrunner,
528                                      "%s dependencies" % (shortname(deptool["id"])),
529                                      document_loader,
530                                      deptool,
531                                      deptool["id"],
532                                      False,
533                                      include_primary=False,
534                                      discovered_secondaryfiles=discovered_secondaryfiles)
535             document_loader.idx[deptool["id"]] = deptool
536             toolmap = {}
537             for k,v in pm.items():
538                 toolmap[k] = v.resolved
539             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
540
541     tool.visit(upload_tool_deps)
542
543     return merged_map
544
545 def arvados_jobs_image(arvrunner, img):
546     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
547
548     try:
549         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
550     except Exception as e:
551         raise Exception("Docker image %s is not available\n%s" % (img, e) )
552
553
554 def upload_workflow_collection(arvrunner, name, packed):
555     collection = arvados.collection.Collection(api_client=arvrunner.api,
556                                                keep_client=arvrunner.keep_client,
557                                                num_retries=arvrunner.num_retries)
558     with collection.open("workflow.cwl", "w") as f:
559         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
560
561     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
562                ["name", "like", name+"%"]]
563     if arvrunner.project_uuid:
564         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
565     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
566
567     if exists["items"]:
568         logger.info("Using collection %s", exists["items"][0]["uuid"])
569     else:
570         collection.save_new(name=name,
571                             owner_uuid=arvrunner.project_uuid,
572                             ensure_unique_name=True,
573                             num_retries=arvrunner.num_retries)
574         logger.info("Uploaded to %s", collection.manifest_locator())
575
576     return collection.portable_data_hash()
577
578
579 class Runner(Process):
580     """Base class for runner processes, which submit an instance of
581     arvados-cwl-runner and wait for the final result."""
582
583     def __init__(self, runner, updated_tool,
584                  tool, loadingContext, enable_reuse,
585                  output_name, output_tags, submit_runner_ram=0,
586                  name=None, on_error=None, submit_runner_image=None,
587                  intermediate_output_ttl=0, merged_map=None,
588                  priority=None, secret_store=None,
589                  collection_cache_size=256,
590                  collection_cache_is_default=True):
591
592         loadingContext = loadingContext.copy()
593         loadingContext.metadata = updated_tool.metadata.copy()
594
595         super(Runner, self).__init__(updated_tool.tool, loadingContext)
596
597         self.arvrunner = runner
598         self.embedded_tool = tool
599         self.job_order = None
600         self.running = False
601         if enable_reuse:
602             # If reuse is permitted by command line arguments but
603             # disabled by the workflow itself, disable it.
604             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
605             if reuse_req:
606                 enable_reuse = reuse_req["enableReuse"]
607         self.enable_reuse = enable_reuse
608         self.uuid = None
609         self.final_output = None
610         self.output_name = output_name
611         self.output_tags = output_tags
612         self.name = name
613         self.on_error = on_error
614         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
615         self.intermediate_output_ttl = intermediate_output_ttl
616         self.priority = priority
617         self.secret_store = secret_store
618         self.enable_dev = loadingContext.enable_dev
619
620         self.submit_runner_cores = 1
621         self.submit_runner_ram = 1024  # defaut 1 GiB
622         self.collection_cache_size = collection_cache_size
623
624         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
625         if runner_resource_req:
626             if runner_resource_req.get("coresMin"):
627                 self.submit_runner_cores = runner_resource_req["coresMin"]
628             if runner_resource_req.get("ramMin"):
629                 self.submit_runner_ram = runner_resource_req["ramMin"]
630             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
631                 self.collection_cache_size = runner_resource_req["keep_cache"]
632
633         if submit_runner_ram:
634             # Command line / initializer overrides default and/or spec from workflow
635             self.submit_runner_ram = submit_runner_ram
636
637         if self.submit_runner_ram <= 0:
638             raise Exception("Value of submit-runner-ram must be greater than zero")
639
640         if self.submit_runner_cores <= 0:
641             raise Exception("Value of submit-runner-cores must be greater than zero")
642
643         self.merged_map = merged_map or {}
644
645     def job(self,
646             job_order,         # type: Mapping[Text, Text]
647             output_callbacks,  # type: Callable[[Any, Any], Any]
648             runtimeContext     # type: RuntimeContext
649            ):  # type: (...) -> Generator[Any, None, None]
650         self.job_order = job_order
651         self._init_job(job_order, runtimeContext)
652         yield self
653
654     def update_pipeline_component(self, record):
655         pass
656
657     def done(self, record):
658         """Base method for handling a completed runner."""
659
660         try:
661             if record["state"] == "Complete":
662                 if record.get("exit_code") is not None:
663                     if record["exit_code"] == 33:
664                         processStatus = "UnsupportedRequirement"
665                     elif record["exit_code"] == 0:
666                         processStatus = "success"
667                     else:
668                         processStatus = "permanentFail"
669                 else:
670                     processStatus = "success"
671             else:
672                 processStatus = "permanentFail"
673
674             outputs = {}
675
676             if processStatus == "permanentFail":
677                 logc = arvados.collection.CollectionReader(record["log"],
678                                                            api_client=self.arvrunner.api,
679                                                            keep_client=self.arvrunner.keep_client,
680                                                            num_retries=self.arvrunner.num_retries)
681                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
682
683             self.final_output = record["output"]
684             outc = arvados.collection.CollectionReader(self.final_output,
685                                                        api_client=self.arvrunner.api,
686                                                        keep_client=self.arvrunner.keep_client,
687                                                        num_retries=self.arvrunner.num_retries)
688             if "cwl.output.json" in outc:
689                 with outc.open("cwl.output.json", "rb") as f:
690                     if f.size() > 0:
691                         outputs = json.loads(f.read().decode())
692             def keepify(fileobj):
693                 path = fileobj["location"]
694                 if not path.startswith("keep:"):
695                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
696             adjustFileObjs(outputs, keepify)
697             adjustDirObjs(outputs, keepify)
698         except Exception:
699             logger.exception("[%s] While getting final output object", self.name)
700             self.arvrunner.output_callback({}, "permanentFail")
701         else:
702             self.arvrunner.output_callback(outputs, processStatus)