Merge remote-tracking branch 'origin/master' into 14965-arv-mount-py-three
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
21
22 if os.name == "posix" and sys.version_info[0] < 3:
23     import subprocess32 as subprocess
24 else:
25     import subprocess
26
27 from schema_salad.sourceline import SourceLine, cmap
28
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32                              shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.utils import aslist
36 from cwltool.builder import substitute
37 from cwltool.pack import pack
38 from cwltool.update import INTERNAL_VERSION
39 from cwltool.builder import Builder
40 import schema_salad.validate as validate
41
42 import arvados.collection
43 from .util import collectionUUID
44 import ruamel.yaml as yaml
45
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
48 from ._version import __version__
49 from . import done
50 from . context import ArvRuntimeContext
51
52 logger = logging.getLogger('arvados.cwl-runner')
53
54 def trim_anonymous_location(obj):
55     """Remove 'location' field from File and Directory literals.
56
57     To make internal handling easier, literals are assigned a random id for
58     'location'.  However, when writing the record back out, this can break
59     reproducibility.  Since it is valid for literals not have a 'location'
60     field, remove it.
61
62     """
63
64     if obj.get("location", "").startswith("_:"):
65         del obj["location"]
66
67
68 def remove_redundant_fields(obj):
69     for field in ("path", "nameext", "nameroot", "dirname"):
70         if field in obj:
71             del obj[field]
72
73
74 def find_defaults(d, op):
75     if isinstance(d, list):
76         for i in d:
77             find_defaults(i, op)
78     elif isinstance(d, dict):
79         if "default" in d:
80             op(d)
81         else:
82             for i in viewvalues(d):
83                 find_defaults(i, op)
84
85 def make_builder(joborder, hints, requirements, runtimeContext):
86     return Builder(
87                  job=joborder,
88                  files=[],               # type: List[Dict[Text, Text]]
89                  bindings=[],            # type: List[Dict[Text, Any]]
90                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
91                  names=None,               # type: Names
92                  requirements=requirements,        # type: List[Dict[Text, Any]]
93                  hints=hints,               # type: List[Dict[Text, Any]]
94                  resources={},           # type: Dict[str, int]
95                  mutation_manager=None,    # type: Optional[MutationManager]
96                  formatgraph=None,         # type: Optional[Graph]
97                  make_fs_access=None,      # type: Type[StdFsAccess]
98                  fs_access=None,           # type: StdFsAccess
99                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100                  timeout=runtimeContext.eval_timeout,             # type: float
101                  debug=runtimeContext.debug,               # type: bool
102                  js_console=runtimeContext.js_console,          # type: bool
103                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
104                  loadListing="",         # type: Text
105                  outdir="",              # type: Text
106                  tmpdir="",              # type: Text
107                  stagedir="",            # type: Text
108                 )
109
110 def search_schemadef(name, reqs):
111     for r in reqs:
112         if r["class"] == "SchemaDefRequirement":
113             for sd in r["types"]:
114                 if sd["name"] == name:
115                     return sd
116     return None
117
118 primitive_types_set = frozenset(("null", "boolean", "int", "long",
119                                  "float", "double", "string", "record",
120                                  "array", "enum"))
121
122 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
123     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
124         # union type, collect all possible secondaryFiles
125         for i in inputschema:
126             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
127         return
128
129     if isinstance(inputschema, basestring):
130         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
131         if sd:
132             inputschema = sd
133         else:
134             return
135
136     if "secondaryFiles" in inputschema:
137         # set secondaryFiles, may be inherited by compound types.
138         secondaryspec = inputschema["secondaryFiles"]
139
140     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
141         not isinstance(inputschema["type"], basestring)):
142         # compound type (union, array, record)
143         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
144
145     elif (inputschema["type"] == "record" and
146           isinstance(primary, Mapping)):
147         #
148         # record type, find secondary files associated with fields.
149         #
150         for f in inputschema["fields"]:
151             p = primary.get(shortname(f["name"]))
152             if p:
153                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
154
155     elif (inputschema["type"] == "array" and
156           isinstance(primary, Sequence)):
157         #
158         # array type, find secondary files of elements
159         #
160         for p in primary:
161             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
162
163     elif (inputschema["type"] == "File" and
164           secondaryspec and
165           isinstance(primary, Mapping) and
166           primary.get("class") == "File" and
167           "secondaryFiles" not in primary):
168         #
169         # Found a file, check for secondaryFiles
170         #
171         primary["secondaryFiles"] = []
172         for i, sf in enumerate(aslist(secondaryspec)):
173             pattern = builder.do_eval(sf["pattern"], context=primary)
174             if pattern is None:
175                 continue
176             sfpath = substitute(primary["location"], pattern)
177             required = builder.do_eval(sf.get("required"), context=primary)
178
179             if fsaccess.exists(sfpath):
180                 primary["secondaryFiles"].append({"location": sfpath, "class": "File"})
181             elif required:
182                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
183                     "Required secondary file '%s' does not exist" % sfpath)
184
185         primary["secondaryFiles"] = cmap(primary["secondaryFiles"])
186         if discovered is not None:
187             discovered[primary["location"]] = primary["secondaryFiles"]
188     elif inputschema["type"] not in primitive_types_set:
189         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
190
191 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
192     for inputschema in inputs:
193         primary = job_order.get(shortname(inputschema["id"]))
194         if isinstance(primary, (Mapping, Sequence)):
195             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
196
197 def upload_dependencies(arvrunner, name, document_loader,
198                         workflowobj, uri, loadref_run,
199                         include_primary=True, discovered_secondaryfiles=None):
200     """Upload the dependencies of the workflowobj document to Keep.
201
202     Returns a pathmapper object mapping local paths to keep references.  Also
203     does an in-place update of references in "workflowobj".
204
205     Use scandeps to find $import, $include, $schemas, run, File and Directory
206     fields that represent external references.
207
208     If workflowobj has an "id" field, this will reload the document to ensure
209     it is scanning the raw document prior to preprocessing.
210     """
211
212     loaded = set()
213     def loadref(b, u):
214         joined = document_loader.fetcher.urljoin(b, u)
215         defrg, _ = urllib.parse.urldefrag(joined)
216         if defrg not in loaded:
217             loaded.add(defrg)
218             # Use fetch_text to get raw file (before preprocessing).
219             text = document_loader.fetch_text(defrg)
220             if isinstance(text, bytes):
221                 textIO = StringIO(text.decode('utf-8'))
222             else:
223                 textIO = StringIO(text)
224             return yaml.safe_load(textIO)
225         else:
226             return {}
227
228     if loadref_run:
229         loadref_fields = set(("$import", "run"))
230     else:
231         loadref_fields = set(("$import",))
232
233     scanobj = workflowobj
234     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
235         # Need raw file content (before preprocessing) to ensure
236         # that external references in $include and $mixin are captured.
237         scanobj = loadref("", workflowobj["id"])
238
239     sc_result = scandeps(uri, scanobj,
240                   loadref_fields,
241                   set(("$include", "$schemas", "location")),
242                   loadref, urljoin=document_loader.fetcher.urljoin)
243
244     sc = []
245     uuids = {}
246
247     def collect_uuids(obj):
248         loc = obj.get("location", "")
249         sp = loc.split(":")
250         if sp[0] == "keep":
251             # Collect collection uuids that need to be resolved to
252             # portable data hashes
253             gp = collection_uuid_pattern.match(loc)
254             if gp:
255                 uuids[gp.groups()[0]] = obj
256             if collectionUUID in obj:
257                 uuids[obj[collectionUUID]] = obj
258
259     def collect_uploads(obj):
260         loc = obj.get("location", "")
261         sp = loc.split(":")
262         if len(sp) < 1:
263             return
264         if sp[0] in ("file", "http", "https"):
265             # Record local files than need to be uploaded,
266             # don't include file literals, keep references, etc.
267             sc.append(obj)
268         collect_uuids(obj)
269
270     visit_class(workflowobj, ("File", "Directory"), collect_uuids)
271     visit_class(sc_result, ("File", "Directory"), collect_uploads)
272
273     # Resolve any collection uuids we found to portable data hashes
274     # and assign them to uuid_map
275     uuid_map = {}
276     fetch_uuids = list(uuids.keys())
277     while fetch_uuids:
278         # For a large number of fetch_uuids, API server may limit
279         # response size, so keep fetching from API server has nothing
280         # more to give us.
281         lookups = arvrunner.api.collections().list(
282             filters=[["uuid", "in", fetch_uuids]],
283             count="none",
284             select=["uuid", "portable_data_hash"]).execute(
285                 num_retries=arvrunner.num_retries)
286
287         if not lookups["items"]:
288             break
289
290         for l in lookups["items"]:
291             uuid_map[l["uuid"]] = l["portable_data_hash"]
292
293         fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
294
295     normalizeFilesDirs(sc)
296
297     if include_primary and "id" in workflowobj:
298         sc.append({"class": "File", "location": workflowobj["id"]})
299
300     if "$schemas" in workflowobj:
301         for s in workflowobj["$schemas"]:
302             sc.append({"class": "File", "location": s})
303
304     def visit_default(obj):
305         remove = [False]
306         def ensure_default_location(f):
307             if "location" not in f and "path" in f:
308                 f["location"] = f["path"]
309                 del f["path"]
310             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
311                 # Doesn't exist, remove from list of dependencies to upload
312                 sc[:] = [x for x in sc if x["location"] != f["location"]]
313                 # Delete "default" from workflowobj
314                 remove[0] = True
315         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
316         if remove[0]:
317             del obj["default"]
318
319     find_defaults(workflowobj, visit_default)
320
321     discovered = {}
322     def discover_default_secondary_files(obj):
323         builder_job_order = {}
324         for t in obj["inputs"]:
325             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
326         # Need to create a builder object to evaluate expressions.
327         builder = make_builder(builder_job_order,
328                                obj.get("hints", []),
329                                obj.get("requirements", []),
330                                ArvRuntimeContext())
331         discover_secondary_files(arvrunner.fs_access,
332                                  builder,
333                                  obj["inputs"],
334                                  builder_job_order,
335                                  discovered)
336
337     visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
338
339     for d in list(discovered):
340         # Only interested in discovered secondaryFiles which are local
341         # files that need to be uploaded.
342         if d.startswith("file:"):
343             sc.extend(discovered[d])
344         else:
345             del discovered[d]
346
347     mapper = ArvPathMapper(arvrunner, sc, "",
348                            "keep:%s",
349                            "keep:%s/%s",
350                            name=name,
351                            single_collection=True)
352
353     def setloc(p):
354         loc = p.get("location")
355         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
356             p["location"] = mapper.mapper(p["location"]).resolved
357             return
358
359         if not loc:
360             return
361
362         if collectionUUID in p:
363             uuid = p[collectionUUID]
364             if uuid not in uuid_map:
365                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
366                     "Collection uuid %s not found" % uuid)
367             gp = collection_pdh_pattern.match(loc)
368             if gp and uuid_map[uuid] != gp.groups()[0]:
369                 # This file entry has both collectionUUID and a PDH
370                 # location. If the PDH doesn't match the one returned
371                 # the API server, raise an error.
372                 raise SourceLine(p, "location", validate.ValidationException).makeError(
373                     "Expected collection uuid %s to be %s but API server reported %s" % (
374                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
375
376         gp = collection_uuid_pattern.match(loc)
377         if not gp:
378             return
379         uuid = gp.groups()[0]
380         if uuid not in uuid_map:
381             raise SourceLine(p, "location", validate.ValidationException).makeError(
382                 "Collection uuid %s not found" % uuid)
383         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
384         p[collectionUUID] = uuid
385
386     visit_class(workflowobj, ("File", "Directory"), setloc)
387     visit_class(discovered, ("File", "Directory"), setloc)
388
389     if discovered_secondaryfiles is not None:
390         for d in discovered:
391             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
392
393     if "$schemas" in workflowobj:
394         sch = []
395         for s in workflowobj["$schemas"]:
396             sch.append(mapper.mapper(s).resolved)
397         workflowobj["$schemas"] = sch
398
399     return mapper
400
401
402 def upload_docker(arvrunner, tool):
403     """Uploads Docker images used in CommandLineTool objects."""
404
405     if isinstance(tool, CommandLineTool):
406         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
407         if docker_req:
408             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
409                 # TODO: can be supported by containers API, but not jobs API.
410                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
411                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
412             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
413         else:
414             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
415     elif isinstance(tool, cwltool.workflow.Workflow):
416         for s in tool.steps:
417             upload_docker(arvrunner, s.embedded_tool)
418
419
420 def packed_workflow(arvrunner, tool, merged_map):
421     """Create a packed workflow.
422
423     A "packed" workflow is one where all the components have been combined into a single document."""
424
425     rewrites = {}
426     packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
427                   tool.tool["id"], tool.metadata, rewrite_out=rewrites)
428
429     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
430
431     def visit(v, cur_id):
432         if isinstance(v, dict):
433             if v.get("class") in ("CommandLineTool", "Workflow"):
434                 if "id" not in v:
435                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
436                 cur_id = rewrite_to_orig.get(v["id"], v["id"])
437             if "location" in v and not v["location"].startswith("keep:"):
438                 v["location"] = merged_map[cur_id].resolved[v["location"]]
439             if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
440                 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
441             if v.get("class") == "DockerRequirement":
442                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
443             for l in v:
444                 visit(v[l], cur_id)
445         if isinstance(v, list):
446             for l in v:
447                 visit(l, cur_id)
448     visit(packed, None)
449     return packed
450
451
452 def tag_git_version(packed):
453     if tool.tool["id"].startswith("file://"):
454         path = os.path.dirname(tool.tool["id"][7:])
455         try:
456             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
457         except (OSError, subprocess.CalledProcessError):
458             pass
459         else:
460             packed["http://schema.org/version"] = githash
461
462
463 def upload_job_order(arvrunner, name, tool, job_order):
464     """Upload local files referenced in the input object and return updated input
465     object with 'location' updated to the proper keep references.
466     """
467
468     # Make a copy of the job order and set defaults.
469     builder_job_order = copy.copy(job_order)
470
471     # fill_in_defaults throws an error if there are any
472     # missing required parameters, we don't want it to do that
473     # so make them all optional.
474     inputs_copy = copy.deepcopy(tool.tool["inputs"])
475     for i in inputs_copy:
476         if "null" not in i["type"]:
477             i["type"] = ["null"] + aslist(i["type"])
478
479     fill_in_defaults(inputs_copy,
480                      builder_job_order,
481                      arvrunner.fs_access)
482     # Need to create a builder object to evaluate expressions.
483     builder = make_builder(builder_job_order,
484                            tool.hints,
485                            tool.requirements,
486                            ArvRuntimeContext())
487     # Now update job_order with secondaryFiles
488     discover_secondary_files(arvrunner.fs_access,
489                              builder,
490                              tool.tool["inputs"],
491                              job_order)
492
493     jobmapper = upload_dependencies(arvrunner,
494                                     name,
495                                     tool.doc_loader,
496                                     job_order,
497                                     job_order.get("id", "#"),
498                                     False)
499
500     if "id" in job_order:
501         del job_order["id"]
502
503     # Need to filter this out, gets added by cwltool when providing
504     # parameters on the command line.
505     if "job_order" in job_order:
506         del job_order["job_order"]
507
508     return job_order
509
510 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
511
512 def upload_workflow_deps(arvrunner, tool):
513     # Ensure that Docker images needed by this workflow are available
514
515     upload_docker(arvrunner, tool)
516
517     document_loader = tool.doc_loader
518
519     merged_map = {}
520
521     def upload_tool_deps(deptool):
522         if "id" in deptool:
523             discovered_secondaryfiles = {}
524             pm = upload_dependencies(arvrunner,
525                                      "%s dependencies" % (shortname(deptool["id"])),
526                                      document_loader,
527                                      deptool,
528                                      deptool["id"],
529                                      False,
530                                      include_primary=False,
531                                      discovered_secondaryfiles=discovered_secondaryfiles)
532             document_loader.idx[deptool["id"]] = deptool
533             toolmap = {}
534             for k,v in pm.items():
535                 toolmap[k] = v.resolved
536             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
537
538     tool.visit(upload_tool_deps)
539
540     return merged_map
541
542 def arvados_jobs_image(arvrunner, img):
543     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
544
545     try:
546         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
547     except Exception as e:
548         raise Exception("Docker image %s is not available\n%s" % (img, e) )
549
550
551 def upload_workflow_collection(arvrunner, name, packed):
552     collection = arvados.collection.Collection(api_client=arvrunner.api,
553                                                keep_client=arvrunner.keep_client,
554                                                num_retries=arvrunner.num_retries)
555     with collection.open("workflow.cwl", "w") as f:
556         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
557
558     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
559                ["name", "like", name+"%"]]
560     if arvrunner.project_uuid:
561         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
562     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
563
564     if exists["items"]:
565         logger.info("Using collection %s", exists["items"][0]["uuid"])
566     else:
567         collection.save_new(name=name,
568                             owner_uuid=arvrunner.project_uuid,
569                             ensure_unique_name=True,
570                             num_retries=arvrunner.num_retries)
571         logger.info("Uploaded to %s", collection.manifest_locator())
572
573     return collection.portable_data_hash()
574
575
576 class Runner(Process):
577     """Base class for runner processes, which submit an instance of
578     arvados-cwl-runner and wait for the final result."""
579
580     def __init__(self, runner, tool, loadingContext, enable_reuse,
581                  output_name, output_tags, submit_runner_ram=0,
582                  name=None, on_error=None, submit_runner_image=None,
583                  intermediate_output_ttl=0, merged_map=None,
584                  priority=None, secret_store=None,
585                  collection_cache_size=256,
586                  collection_cache_is_default=True):
587
588         loadingContext = loadingContext.copy()
589         loadingContext.metadata = loadingContext.metadata.copy()
590         loadingContext.metadata["cwlVersion"] = INTERNAL_VERSION
591
592         super(Runner, self).__init__(tool.tool, loadingContext)
593
594         self.arvrunner = runner
595         self.embedded_tool = tool
596         self.job_order = None
597         self.running = False
598         if enable_reuse:
599             # If reuse is permitted by command line arguments but
600             # disabled by the workflow itself, disable it.
601             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
602             if reuse_req:
603                 enable_reuse = reuse_req["enableReuse"]
604         self.enable_reuse = enable_reuse
605         self.uuid = None
606         self.final_output = None
607         self.output_name = output_name
608         self.output_tags = output_tags
609         self.name = name
610         self.on_error = on_error
611         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
612         self.intermediate_output_ttl = intermediate_output_ttl
613         self.priority = priority
614         self.secret_store = secret_store
615         self.enable_dev = loadingContext.enable_dev
616
617         self.submit_runner_cores = 1
618         self.submit_runner_ram = 1024  # defaut 1 GiB
619         self.collection_cache_size = collection_cache_size
620
621         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
622         if runner_resource_req:
623             if runner_resource_req.get("coresMin"):
624                 self.submit_runner_cores = runner_resource_req["coresMin"]
625             if runner_resource_req.get("ramMin"):
626                 self.submit_runner_ram = runner_resource_req["ramMin"]
627             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
628                 self.collection_cache_size = runner_resource_req["keep_cache"]
629
630         if submit_runner_ram:
631             # Command line / initializer overrides default and/or spec from workflow
632             self.submit_runner_ram = submit_runner_ram
633
634         if self.submit_runner_ram <= 0:
635             raise Exception("Value of submit-runner-ram must be greater than zero")
636
637         if self.submit_runner_cores <= 0:
638             raise Exception("Value of submit-runner-cores must be greater than zero")
639
640         self.merged_map = merged_map or {}
641
642     def job(self,
643             job_order,         # type: Mapping[Text, Text]
644             output_callbacks,  # type: Callable[[Any, Any], Any]
645             runtimeContext     # type: RuntimeContext
646            ):  # type: (...) -> Generator[Any, None, None]
647         self.job_order = job_order
648         self._init_job(job_order, runtimeContext)
649         yield self
650
651     def update_pipeline_component(self, record):
652         pass
653
654     def done(self, record):
655         """Base method for handling a completed runner."""
656
657         try:
658             if record["state"] == "Complete":
659                 if record.get("exit_code") is not None:
660                     if record["exit_code"] == 33:
661                         processStatus = "UnsupportedRequirement"
662                     elif record["exit_code"] == 0:
663                         processStatus = "success"
664                     else:
665                         processStatus = "permanentFail"
666                 else:
667                     processStatus = "success"
668             else:
669                 processStatus = "permanentFail"
670
671             outputs = {}
672
673             if processStatus == "permanentFail":
674                 logc = arvados.collection.CollectionReader(record["log"],
675                                                            api_client=self.arvrunner.api,
676                                                            keep_client=self.arvrunner.keep_client,
677                                                            num_retries=self.arvrunner.num_retries)
678                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
679
680             self.final_output = record["output"]
681             outc = arvados.collection.CollectionReader(self.final_output,
682                                                        api_client=self.arvrunner.api,
683                                                        keep_client=self.arvrunner.keep_client,
684                                                        num_retries=self.arvrunner.num_retries)
685             if "cwl.output.json" in outc:
686                 with outc.open("cwl.output.json", "rb") as f:
687                     if f.size() > 0:
688                         outputs = json.loads(f.read().decode())
689             def keepify(fileobj):
690                 path = fileobj["location"]
691                 if not path.startswith("keep:"):
692                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
693             adjustFileObjs(outputs, keepify)
694             adjustDirObjs(outputs, keepify)
695         except Exception:
696             logger.exception("[%s] While getting final output object", self.name)
697             self.arvrunner.output_callback({}, "permanentFail")
698         else:
699             self.arvrunner.output_callback(outputs, processStatus)