16053: Merge branch 'master'
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
21
22 if os.name == "posix" and sys.version_info[0] < 3:
23     import subprocess32 as subprocess
24 else:
25     import subprocess
26
27 from schema_salad.sourceline import SourceLine, cmap
28
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32                              shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.utils import aslist
36 from cwltool.builder import substitute
37 from cwltool.pack import pack
38 from cwltool.update import INTERNAL_VERSION
39 from cwltool.builder import Builder
40 import schema_salad.validate as validate
41
42 import arvados.collection
43 from .util import collectionUUID
44 import ruamel.yaml as yaml
45
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
48 from ._version import __version__
49 from . import done
50 from . context import ArvRuntimeContext
51
52 logger = logging.getLogger('arvados.cwl-runner')
53
54 def trim_anonymous_location(obj):
55     """Remove 'location' field from File and Directory literals.
56
57     To make internal handling easier, literals are assigned a random id for
58     'location'.  However, when writing the record back out, this can break
59     reproducibility.  Since it is valid for literals not have a 'location'
60     field, remove it.
61
62     """
63
64     if obj.get("location", "").startswith("_:"):
65         del obj["location"]
66
67
68 def remove_redundant_fields(obj):
69     for field in ("path", "nameext", "nameroot", "dirname"):
70         if field in obj:
71             del obj[field]
72
73
74 def find_defaults(d, op):
75     if isinstance(d, list):
76         for i in d:
77             find_defaults(i, op)
78     elif isinstance(d, dict):
79         if "default" in d:
80             op(d)
81         else:
82             for i in viewvalues(d):
83                 find_defaults(i, op)
84
85 def make_builder(joborder, hints, requirements, runtimeContext):
86     return Builder(
87                  job=joborder,
88                  files=[],               # type: List[Dict[Text, Text]]
89                  bindings=[],            # type: List[Dict[Text, Any]]
90                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
91                  names=None,               # type: Names
92                  requirements=requirements,        # type: List[Dict[Text, Any]]
93                  hints=hints,               # type: List[Dict[Text, Any]]
94                  resources={},           # type: Dict[str, int]
95                  mutation_manager=None,    # type: Optional[MutationManager]
96                  formatgraph=None,         # type: Optional[Graph]
97                  make_fs_access=None,      # type: Type[StdFsAccess]
98                  fs_access=None,           # type: StdFsAccess
99                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100                  timeout=runtimeContext.eval_timeout,             # type: float
101                  debug=runtimeContext.debug,               # type: bool
102                  js_console=runtimeContext.js_console,          # type: bool
103                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
104                  loadListing="",         # type: Text
105                  outdir="",              # type: Text
106                  tmpdir="",              # type: Text
107                  stagedir="",            # type: Text
108                 )
109
110 def search_schemadef(name, reqs):
111     for r in reqs:
112         if r["class"] == "SchemaDefRequirement":
113             for sd in r["types"]:
114                 if sd["name"] == name:
115                     return sd
116     return None
117
118 primitive_types_set = frozenset(("null", "boolean", "int", "long",
119                                  "float", "double", "string", "record",
120                                  "array", "enum"))
121
122 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
123     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
124         # union type, collect all possible secondaryFiles
125         for i in inputschema:
126             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
127         return
128
129     if isinstance(inputschema, basestring):
130         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
131         if sd:
132             inputschema = sd
133         else:
134             return
135
136     if "secondaryFiles" in inputschema:
137         # set secondaryFiles, may be inherited by compound types.
138         secondaryspec = inputschema["secondaryFiles"]
139
140     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
141         not isinstance(inputschema["type"], basestring)):
142         # compound type (union, array, record)
143         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
144
145     elif (inputschema["type"] == "record" and
146           isinstance(primary, Mapping)):
147         #
148         # record type, find secondary files associated with fields.
149         #
150         for f in inputschema["fields"]:
151             p = primary.get(shortname(f["name"]))
152             if p:
153                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
154
155     elif (inputschema["type"] == "array" and
156           isinstance(primary, Sequence)):
157         #
158         # array type, find secondary files of elements
159         #
160         for p in primary:
161             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
162
163     elif (inputschema["type"] == "File" and
164           secondaryspec and
165           isinstance(primary, Mapping) and
166           primary.get("class") == "File" and
167           "secondaryFiles" not in primary):
168         #
169         # Found a file, check for secondaryFiles
170         #
171         primary["secondaryFiles"] = []
172         for i, sf in enumerate(aslist(secondaryspec)):
173             pattern = builder.do_eval(sf["pattern"], context=primary)
174             if pattern is None:
175                 continue
176             sfpath = substitute(primary["location"], pattern)
177             required = builder.do_eval(sf.get("required"), context=primary)
178
179             if fsaccess.exists(sfpath):
180                 primary["secondaryFiles"].append({"location": sfpath, "class": "File"})
181             elif required:
182                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
183                     "Required secondary file '%s' does not exist" % sfpath)
184
185         primary["secondaryFiles"] = cmap(primary["secondaryFiles"])
186         if discovered is not None:
187             discovered[primary["location"]] = primary["secondaryFiles"]
188     elif inputschema["type"] not in primitive_types_set:
189         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
190
191 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
192     for inputschema in inputs:
193         primary = job_order.get(shortname(inputschema["id"]))
194         if isinstance(primary, (Mapping, Sequence)):
195             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
196
197 def upload_dependencies(arvrunner, name, document_loader,
198                         workflowobj, uri, loadref_run,
199                         include_primary=True, discovered_secondaryfiles=None):
200     """Upload the dependencies of the workflowobj document to Keep.
201
202     Returns a pathmapper object mapping local paths to keep references.  Also
203     does an in-place update of references in "workflowobj".
204
205     Use scandeps to find $import, $include, $schemas, run, File and Directory
206     fields that represent external references.
207
208     If workflowobj has an "id" field, this will reload the document to ensure
209     it is scanning the raw document prior to preprocessing.
210     """
211
212     loaded = set()
213     def loadref(b, u):
214         joined = document_loader.fetcher.urljoin(b, u)
215         defrg, _ = urllib.parse.urldefrag(joined)
216         if defrg not in loaded:
217             loaded.add(defrg)
218             # Use fetch_text to get raw file (before preprocessing).
219             text = document_loader.fetch_text(defrg)
220             if isinstance(text, bytes):
221                 textIO = StringIO(text.decode('utf-8'))
222             else:
223                 textIO = StringIO(text)
224             return yaml.safe_load(textIO)
225         else:
226             return {}
227
228     if loadref_run:
229         loadref_fields = set(("$import", "run"))
230     else:
231         loadref_fields = set(("$import",))
232
233     scanobj = workflowobj
234     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
235         # Need raw file content (before preprocessing) to ensure
236         # that external references in $include and $mixin are captured.
237         scanobj = loadref("", workflowobj["id"])
238
239     sc_result = scandeps(uri, scanobj,
240                   loadref_fields,
241                   set(("$include", "$schemas", "location")),
242                   loadref, urljoin=document_loader.fetcher.urljoin)
243
244     sc = []
245     uuids = {}
246
247     def collect_uuids(obj):
248         loc = obj.get("location", "")
249         sp = loc.split(":")
250         if sp[0] == "keep":
251             # Collect collection uuids that need to be resolved to
252             # portable data hashes
253             gp = collection_uuid_pattern.match(loc)
254             if gp:
255                 uuids[gp.groups()[0]] = obj
256             if collectionUUID in obj:
257                 uuids[obj[collectionUUID]] = obj
258
259     def collect_uploads(obj):
260         loc = obj.get("location", "")
261         sp = loc.split(":")
262         if len(sp) < 1:
263             return
264         if sp[0] in ("file", "http", "https"):
265             # Record local files than need to be uploaded,
266             # don't include file literals, keep references, etc.
267             sc.append(obj)
268         collect_uuids(obj)
269
270     visit_class(workflowobj, ("File", "Directory"), collect_uuids)
271     visit_class(sc_result, ("File", "Directory"), collect_uploads)
272
273     # Resolve any collection uuids we found to portable data hashes
274     # and assign them to uuid_map
275     uuid_map = {}
276     fetch_uuids = list(uuids.keys())
277     while fetch_uuids:
278         # For a large number of fetch_uuids, API server may limit
279         # response size, so keep fetching from API server has nothing
280         # more to give us.
281         lookups = arvrunner.api.collections().list(
282             filters=[["uuid", "in", fetch_uuids]],
283             count="none",
284             select=["uuid", "portable_data_hash"]).execute(
285                 num_retries=arvrunner.num_retries)
286
287         if not lookups["items"]:
288             break
289
290         for l in lookups["items"]:
291             uuid_map[l["uuid"]] = l["portable_data_hash"]
292
293         fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
294
295     normalizeFilesDirs(sc)
296
297     if include_primary and "id" in workflowobj:
298         sc.append({"class": "File", "location": workflowobj["id"]})
299
300     if "$schemas" in workflowobj:
301         for s in workflowobj["$schemas"]:
302             sc.append({"class": "File", "location": s})
303
304     def visit_default(obj):
305         remove = [False]
306         def ensure_default_location(f):
307             if "location" not in f and "path" in f:
308                 f["location"] = f["path"]
309                 del f["path"]
310             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
311                 # Doesn't exist, remove from list of dependencies to upload
312                 sc[:] = [x for x in sc if x["location"] != f["location"]]
313                 # Delete "default" from workflowobj
314                 remove[0] = True
315         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
316         if remove[0]:
317             del obj["default"]
318
319     find_defaults(workflowobj, visit_default)
320
321     discovered = {}
322     def discover_default_secondary_files(obj):
323         builder_job_order = {}
324         for t in obj["inputs"]:
325             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
326         # Need to create a builder object to evaluate expressions.
327         builder = make_builder(builder_job_order,
328                                obj.get("hints", []),
329                                obj.get("requirements", []),
330                                ArvRuntimeContext())
331         discover_secondary_files(arvrunner.fs_access,
332                                  builder,
333                                  obj["inputs"],
334                                  builder_job_order,
335                                  discovered)
336
337     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
338     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
339
340     for d in list(discovered):
341         # Only interested in discovered secondaryFiles which are local
342         # files that need to be uploaded.
343         if d.startswith("file:"):
344             sc.extend(discovered[d])
345         else:
346             del discovered[d]
347
348     mapper = ArvPathMapper(arvrunner, sc, "",
349                            "keep:%s",
350                            "keep:%s/%s",
351                            name=name,
352                            single_collection=True)
353
354     def setloc(p):
355         loc = p.get("location")
356         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
357             p["location"] = mapper.mapper(p["location"]).resolved
358             return
359
360         if not loc:
361             return
362
363         if collectionUUID in p:
364             uuid = p[collectionUUID]
365             if uuid not in uuid_map:
366                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
367                     "Collection uuid %s not found" % uuid)
368             gp = collection_pdh_pattern.match(loc)
369             if gp and uuid_map[uuid] != gp.groups()[0]:
370                 # This file entry has both collectionUUID and a PDH
371                 # location. If the PDH doesn't match the one returned
372                 # the API server, raise an error.
373                 raise SourceLine(p, "location", validate.ValidationException).makeError(
374                     "Expected collection uuid %s to be %s but API server reported %s" % (
375                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
376
377         gp = collection_uuid_pattern.match(loc)
378         if not gp:
379             return
380         uuid = gp.groups()[0]
381         if uuid not in uuid_map:
382             raise SourceLine(p, "location", validate.ValidationException).makeError(
383                 "Collection uuid %s not found" % uuid)
384         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
385         p[collectionUUID] = uuid
386
387     visit_class(workflowobj, ("File", "Directory"), setloc)
388     visit_class(discovered, ("File", "Directory"), setloc)
389
390     if discovered_secondaryfiles is not None:
391         for d in discovered:
392             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
393
394     if "$schemas" in workflowobj:
395         sch = []
396         for s in workflowobj["$schemas"]:
397             sch.append(mapper.mapper(s).resolved)
398         workflowobj["$schemas"] = sch
399
400     return mapper
401
402
403 def upload_docker(arvrunner, tool):
404     """Uploads Docker images used in CommandLineTool objects."""
405
406     if isinstance(tool, CommandLineTool):
407         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
408         if docker_req:
409             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
410                 # TODO: can be supported by containers API, but not jobs API.
411                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
412                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
413             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
414         else:
415             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
416     elif isinstance(tool, cwltool.workflow.Workflow):
417         for s in tool.steps:
418             upload_docker(arvrunner, s.embedded_tool)
419
420
421 def packed_workflow(arvrunner, tool, merged_map):
422     """Create a packed workflow.
423
424     A "packed" workflow is one where all the components have been combined into a single document."""
425
426     rewrites = {}
427     packed = pack(arvrunner.loadingContext, tool.tool["id"],
428                   rewrite_out=rewrites,
429                   loader=tool.doc_loader)
430
431     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
432
433     def visit(v, cur_id):
434         if isinstance(v, dict):
435             if v.get("class") in ("CommandLineTool", "Workflow"):
436                 if "id" not in v:
437                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
438                 cur_id = rewrite_to_orig.get(v["id"], v["id"])
439             if "location" in v and not v["location"].startswith("keep:"):
440                 v["location"] = merged_map[cur_id].resolved[v["location"]]
441             if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
442                 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
443             if v.get("class") == "DockerRequirement":
444                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
445             for l in v:
446                 visit(v[l], cur_id)
447         if isinstance(v, list):
448             for l in v:
449                 visit(l, cur_id)
450     visit(packed, None)
451     return packed
452
453
454 def tag_git_version(packed):
455     if tool.tool["id"].startswith("file://"):
456         path = os.path.dirname(tool.tool["id"][7:])
457         try:
458             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
459         except (OSError, subprocess.CalledProcessError):
460             pass
461         else:
462             packed["http://schema.org/version"] = githash
463
464
465 def upload_job_order(arvrunner, name, tool, job_order):
466     """Upload local files referenced in the input object and return updated input
467     object with 'location' updated to the proper keep references.
468     """
469
470     # Make a copy of the job order and set defaults.
471     builder_job_order = copy.copy(job_order)
472
473     # fill_in_defaults throws an error if there are any
474     # missing required parameters, we don't want it to do that
475     # so make them all optional.
476     inputs_copy = copy.deepcopy(tool.tool["inputs"])
477     for i in inputs_copy:
478         if "null" not in i["type"]:
479             i["type"] = ["null"] + aslist(i["type"])
480
481     fill_in_defaults(inputs_copy,
482                      builder_job_order,
483                      arvrunner.fs_access)
484     # Need to create a builder object to evaluate expressions.
485     builder = make_builder(builder_job_order,
486                            tool.hints,
487                            tool.requirements,
488                            ArvRuntimeContext())
489     # Now update job_order with secondaryFiles
490     discover_secondary_files(arvrunner.fs_access,
491                              builder,
492                              tool.tool["inputs"],
493                              job_order)
494
495     jobmapper = upload_dependencies(arvrunner,
496                                     name,
497                                     tool.doc_loader,
498                                     job_order,
499                                     job_order.get("id", "#"),
500                                     False)
501
502     if "id" in job_order:
503         del job_order["id"]
504
505     # Need to filter this out, gets added by cwltool when providing
506     # parameters on the command line.
507     if "job_order" in job_order:
508         del job_order["job_order"]
509
510     return job_order
511
512 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
513
514 def upload_workflow_deps(arvrunner, tool):
515     # Ensure that Docker images needed by this workflow are available
516
517     upload_docker(arvrunner, tool)
518
519     document_loader = tool.doc_loader
520
521     merged_map = {}
522
523     def upload_tool_deps(deptool):
524         if "id" in deptool:
525             discovered_secondaryfiles = {}
526             pm = upload_dependencies(arvrunner,
527                                      "%s dependencies" % (shortname(deptool["id"])),
528                                      document_loader,
529                                      deptool,
530                                      deptool["id"],
531                                      False,
532                                      include_primary=False,
533                                      discovered_secondaryfiles=discovered_secondaryfiles)
534             document_loader.idx[deptool["id"]] = deptool
535             toolmap = {}
536             for k,v in pm.items():
537                 toolmap[k] = v.resolved
538             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
539
540     tool.visit(upload_tool_deps)
541
542     return merged_map
543
544 def arvados_jobs_image(arvrunner, img):
545     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
546
547     try:
548         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
549     except Exception as e:
550         raise Exception("Docker image %s is not available\n%s" % (img, e) )
551
552
553 def upload_workflow_collection(arvrunner, name, packed):
554     collection = arvados.collection.Collection(api_client=arvrunner.api,
555                                                keep_client=arvrunner.keep_client,
556                                                num_retries=arvrunner.num_retries)
557     with collection.open("workflow.cwl", "w") as f:
558         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
559
560     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
561                ["name", "like", name+"%"]]
562     if arvrunner.project_uuid:
563         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
564     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
565
566     if exists["items"]:
567         logger.info("Using collection %s", exists["items"][0]["uuid"])
568     else:
569         collection.save_new(name=name,
570                             owner_uuid=arvrunner.project_uuid,
571                             ensure_unique_name=True,
572                             num_retries=arvrunner.num_retries)
573         logger.info("Uploaded to %s", collection.manifest_locator())
574
575     return collection.portable_data_hash()
576
577
578 class Runner(Process):
579     """Base class for runner processes, which submit an instance of
580     arvados-cwl-runner and wait for the final result."""
581
582     def __init__(self, runner, updated_tool,
583                  tool, loadingContext, enable_reuse,
584                  output_name, output_tags, submit_runner_ram=0,
585                  name=None, on_error=None, submit_runner_image=None,
586                  intermediate_output_ttl=0, merged_map=None,
587                  priority=None, secret_store=None,
588                  collection_cache_size=256,
589                  collection_cache_is_default=True):
590
591         loadingContext = loadingContext.copy()
592         loadingContext.metadata = updated_tool.metadata.copy()
593
594         super(Runner, self).__init__(updated_tool.tool, loadingContext)
595
596         self.arvrunner = runner
597         self.embedded_tool = tool
598         self.job_order = None
599         self.running = False
600         if enable_reuse:
601             # If reuse is permitted by command line arguments but
602             # disabled by the workflow itself, disable it.
603             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
604             if reuse_req:
605                 enable_reuse = reuse_req["enableReuse"]
606         self.enable_reuse = enable_reuse
607         self.uuid = None
608         self.final_output = None
609         self.output_name = output_name
610         self.output_tags = output_tags
611         self.name = name
612         self.on_error = on_error
613         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
614         self.intermediate_output_ttl = intermediate_output_ttl
615         self.priority = priority
616         self.secret_store = secret_store
617         self.enable_dev = loadingContext.enable_dev
618
619         self.submit_runner_cores = 1
620         self.submit_runner_ram = 1024  # defaut 1 GiB
621         self.collection_cache_size = collection_cache_size
622
623         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
624         if runner_resource_req:
625             if runner_resource_req.get("coresMin"):
626                 self.submit_runner_cores = runner_resource_req["coresMin"]
627             if runner_resource_req.get("ramMin"):
628                 self.submit_runner_ram = runner_resource_req["ramMin"]
629             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
630                 self.collection_cache_size = runner_resource_req["keep_cache"]
631
632         if submit_runner_ram:
633             # Command line / initializer overrides default and/or spec from workflow
634             self.submit_runner_ram = submit_runner_ram
635
636         if self.submit_runner_ram <= 0:
637             raise Exception("Value of submit-runner-ram must be greater than zero")
638
639         if self.submit_runner_cores <= 0:
640             raise Exception("Value of submit-runner-cores must be greater than zero")
641
642         self.merged_map = merged_map or {}
643
644     def job(self,
645             job_order,         # type: Mapping[Text, Text]
646             output_callbacks,  # type: Callable[[Any, Any], Any]
647             runtimeContext     # type: RuntimeContext
648            ):  # type: (...) -> Generator[Any, None, None]
649         self.job_order = job_order
650         self._init_job(job_order, runtimeContext)
651         yield self
652
653     def update_pipeline_component(self, record):
654         pass
655
656     def done(self, record):
657         """Base method for handling a completed runner."""
658
659         try:
660             if record["state"] == "Complete":
661                 if record.get("exit_code") is not None:
662                     if record["exit_code"] == 33:
663                         processStatus = "UnsupportedRequirement"
664                     elif record["exit_code"] == 0:
665                         processStatus = "success"
666                     else:
667                         processStatus = "permanentFail"
668                 else:
669                     processStatus = "success"
670             else:
671                 processStatus = "permanentFail"
672
673             outputs = {}
674
675             if processStatus == "permanentFail":
676                 logc = arvados.collection.CollectionReader(record["log"],
677                                                            api_client=self.arvrunner.api,
678                                                            keep_client=self.arvrunner.keep_client,
679                                                            num_retries=self.arvrunner.num_retries)
680                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
681
682             self.final_output = record["output"]
683             outc = arvados.collection.CollectionReader(self.final_output,
684                                                        api_client=self.arvrunner.api,
685                                                        keep_client=self.arvrunner.keep_client,
686                                                        num_retries=self.arvrunner.num_retries)
687             if "cwl.output.json" in outc:
688                 with outc.open("cwl.output.json", "rb") as f:
689                     if f.size() > 0:
690                         outputs = json.loads(f.read().decode())
691             def keepify(fileobj):
692                 path = fileobj["location"]
693                 if not path.startswith("keep:"):
694                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
695             adjustFileObjs(outputs, keepify)
696             adjustDirObjs(outputs, keepify)
697         except Exception:
698             logger.exception("[%s] While getting final output object", self.name)
699             self.arvrunner.output_callback({}, "permanentFail")
700         else:
701             self.arvrunner.output_callback(outputs, processStatus)