16169: Monkey patch load_tool.resolve_and_validate_document to fix bug
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
21
22 if os.name == "posix" and sys.version_info[0] < 3:
23     import subprocess32 as subprocess
24 else:
25     import subprocess
26
27 from schema_salad.sourceline import SourceLine, cmap
28
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32                              shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.utils import aslist
36 from cwltool.builder import substitute
37 from cwltool.pack import pack
38 from cwltool.update import INTERNAL_VERSION
39 from cwltool.builder import Builder
40 import schema_salad.validate as validate
41
42 import arvados.collection
43 from .util import collectionUUID
44 import ruamel.yaml as yaml
45
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
48 from ._version import __version__
49 from . import done
50 from . context import ArvRuntimeContext
51
52 logger = logging.getLogger('arvados.cwl-runner')
53
54 def trim_anonymous_location(obj):
55     """Remove 'location' field from File and Directory literals.
56
57     To make internal handling easier, literals are assigned a random id for
58     'location'.  However, when writing the record back out, this can break
59     reproducibility.  Since it is valid for literals not have a 'location'
60     field, remove it.
61
62     """
63
64     if obj.get("location", "").startswith("_:"):
65         del obj["location"]
66
67
68 def remove_redundant_fields(obj):
69     for field in ("path", "nameext", "nameroot", "dirname"):
70         if field in obj:
71             del obj[field]
72
73
74 def find_defaults(d, op):
75     if isinstance(d, list):
76         for i in d:
77             find_defaults(i, op)
78     elif isinstance(d, dict):
79         if "default" in d:
80             op(d)
81         else:
82             for i in viewvalues(d):
83                 find_defaults(i, op)
84
85 def make_builder(joborder, hints, requirements, runtimeContext):
86     return Builder(
87                  job=joborder,
88                  files=[],               # type: List[Dict[Text, Text]]
89                  bindings=[],            # type: List[Dict[Text, Any]]
90                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
91                  names=None,               # type: Names
92                  requirements=requirements,        # type: List[Dict[Text, Any]]
93                  hints=hints,               # type: List[Dict[Text, Any]]
94                  resources={},           # type: Dict[str, int]
95                  mutation_manager=None,    # type: Optional[MutationManager]
96                  formatgraph=None,         # type: Optional[Graph]
97                  make_fs_access=None,      # type: Type[StdFsAccess]
98                  fs_access=None,           # type: StdFsAccess
99                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100                  timeout=runtimeContext.eval_timeout,             # type: float
101                  debug=runtimeContext.debug,               # type: bool
102                  js_console=runtimeContext.js_console,          # type: bool
103                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
104                  loadListing="",         # type: Text
105                  outdir="",              # type: Text
106                  tmpdir="",              # type: Text
107                  stagedir="",            # type: Text
108                 )
109
110 def search_schemadef(name, reqs):
111     for r in reqs:
112         if r["class"] == "SchemaDefRequirement":
113             for sd in r["types"]:
114                 if sd["name"] == name:
115                     return sd
116     return None
117
118 primitive_types_set = frozenset(("null", "boolean", "int", "long",
119                                  "float", "double", "string", "record",
120                                  "array", "enum"))
121
122 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
123     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
124         # union type, collect all possible secondaryFiles
125         for i in inputschema:
126             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
127         return
128
129     if isinstance(inputschema, basestring):
130         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
131         if sd:
132             inputschema = sd
133         else:
134             return
135
136     if "secondaryFiles" in inputschema:
137         # set secondaryFiles, may be inherited by compound types.
138         secondaryspec = inputschema["secondaryFiles"]
139
140     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
141         not isinstance(inputschema["type"], basestring)):
142         # compound type (union, array, record)
143         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
144
145     elif (inputschema["type"] == "record" and
146           isinstance(primary, Mapping)):
147         #
148         # record type, find secondary files associated with fields.
149         #
150         for f in inputschema["fields"]:
151             p = primary.get(shortname(f["name"]))
152             if p:
153                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
154
155     elif (inputschema["type"] == "array" and
156           isinstance(primary, Sequence)):
157         #
158         # array type, find secondary files of elements
159         #
160         for p in primary:
161             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
162
163     elif (inputschema["type"] == "File" and
164           secondaryspec and
165           isinstance(primary, Mapping) and
166           primary.get("class") == "File" and
167           "secondaryFiles" not in primary):
168         #
169         # Found a file, check for secondaryFiles
170         #
171         primary["secondaryFiles"] = []
172         for i, sf in enumerate(aslist(secondaryspec)):
173             pattern = builder.do_eval(sf["pattern"], context=primary)
174             if pattern is None:
175                 continue
176             sfpath = substitute(primary["location"], pattern)
177             required = builder.do_eval(sf.get("required"), context=primary)
178
179             if fsaccess.exists(sfpath):
180                 primary["secondaryFiles"].append({"location": sfpath, "class": "File"})
181             elif required:
182                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
183                     "Required secondary file '%s' does not exist" % sfpath)
184
185         primary["secondaryFiles"] = cmap(primary["secondaryFiles"])
186         if discovered is not None:
187             discovered[primary["location"]] = primary["secondaryFiles"]
188     elif inputschema["type"] not in primitive_types_set:
189         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
190
191 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
192     for inputschema in inputs:
193         primary = job_order.get(shortname(inputschema["id"]))
194         if isinstance(primary, (Mapping, Sequence)):
195             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
196
197 def upload_dependencies(arvrunner, name, document_loader,
198                         workflowobj, uri, loadref_run,
199                         include_primary=True, discovered_secondaryfiles=None):
200     """Upload the dependencies of the workflowobj document to Keep.
201
202     Returns a pathmapper object mapping local paths to keep references.  Also
203     does an in-place update of references in "workflowobj".
204
205     Use scandeps to find $import, $include, $schemas, run, File and Directory
206     fields that represent external references.
207
208     If workflowobj has an "id" field, this will reload the document to ensure
209     it is scanning the raw document prior to preprocessing.
210     """
211
212     loaded = set()
213     def loadref(b, u):
214         joined = document_loader.fetcher.urljoin(b, u)
215         defrg, _ = urllib.parse.urldefrag(joined)
216         if defrg not in loaded:
217             loaded.add(defrg)
218             # Use fetch_text to get raw file (before preprocessing).
219             text = document_loader.fetch_text(defrg)
220             if isinstance(text, bytes):
221                 textIO = StringIO(text.decode('utf-8'))
222             else:
223                 textIO = StringIO(text)
224             return yaml.safe_load(textIO)
225         else:
226             return {}
227
228     if loadref_run:
229         loadref_fields = set(("$import", "run"))
230     else:
231         loadref_fields = set(("$import",))
232
233     scanobj = workflowobj
234     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
235         # Need raw file content (before preprocessing) to ensure
236         # that external references in $include and $mixin are captured.
237         scanobj = loadref("", workflowobj["id"])
238
239     sc_result = scandeps(uri, scanobj,
240                   loadref_fields,
241                   set(("$include", "$schemas", "location")),
242                   loadref, urljoin=document_loader.fetcher.urljoin)
243
244     sc = []
245     uuids = {}
246
247     def collect_uuids(obj):
248         loc = obj.get("location", "")
249         sp = loc.split(":")
250         if sp[0] == "keep":
251             # Collect collection uuids that need to be resolved to
252             # portable data hashes
253             gp = collection_uuid_pattern.match(loc)
254             if gp:
255                 uuids[gp.groups()[0]] = obj
256             if collectionUUID in obj:
257                 uuids[obj[collectionUUID]] = obj
258
259     def collect_uploads(obj):
260         loc = obj.get("location", "")
261         sp = loc.split(":")
262         if len(sp) < 1:
263             return
264         if sp[0] in ("file", "http", "https"):
265             # Record local files than need to be uploaded,
266             # don't include file literals, keep references, etc.
267             sc.append(obj)
268         collect_uuids(obj)
269
270     visit_class(workflowobj, ("File", "Directory"), collect_uuids)
271     visit_class(sc_result, ("File", "Directory"), collect_uploads)
272
273     # Resolve any collection uuids we found to portable data hashes
274     # and assign them to uuid_map
275     uuid_map = {}
276     fetch_uuids = list(uuids.keys())
277     while fetch_uuids:
278         # For a large number of fetch_uuids, API server may limit
279         # response size, so keep fetching from API server has nothing
280         # more to give us.
281         lookups = arvrunner.api.collections().list(
282             filters=[["uuid", "in", fetch_uuids]],
283             count="none",
284             select=["uuid", "portable_data_hash"]).execute(
285                 num_retries=arvrunner.num_retries)
286
287         if not lookups["items"]:
288             break
289
290         for l in lookups["items"]:
291             uuid_map[l["uuid"]] = l["portable_data_hash"]
292
293         fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
294
295     normalizeFilesDirs(sc)
296
297     if include_primary and "id" in workflowobj:
298         sc.append({"class": "File", "location": workflowobj["id"]})
299
300     if "$schemas" in workflowobj:
301         for s in workflowobj["$schemas"]:
302             sc.append({"class": "File", "location": s})
303
304     def visit_default(obj):
305         remove = [False]
306         def ensure_default_location(f):
307             if "location" not in f and "path" in f:
308                 f["location"] = f["path"]
309                 del f["path"]
310             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
311                 # Doesn't exist, remove from list of dependencies to upload
312                 sc[:] = [x for x in sc if x["location"] != f["location"]]
313                 # Delete "default" from workflowobj
314                 remove[0] = True
315         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
316         if remove[0]:
317             del obj["default"]
318
319     find_defaults(workflowobj, visit_default)
320
321     discovered = {}
322     def discover_default_secondary_files(obj):
323         builder_job_order = {}
324         for t in obj["inputs"]:
325             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
326         # Need to create a builder object to evaluate expressions.
327         builder = make_builder(builder_job_order,
328                                obj.get("hints", []),
329                                obj.get("requirements", []),
330                                ArvRuntimeContext())
331         discover_secondary_files(arvrunner.fs_access,
332                                  builder,
333                                  obj["inputs"],
334                                  builder_job_order,
335                                  discovered)
336
337     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
338     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
339
340     for d in list(discovered):
341         # Only interested in discovered secondaryFiles which are local
342         # files that need to be uploaded.
343         if d.startswith("file:"):
344             sc.extend(discovered[d])
345         else:
346             del discovered[d]
347
348     mapper = ArvPathMapper(arvrunner, sc, "",
349                            "keep:%s",
350                            "keep:%s/%s",
351                            name=name,
352                            single_collection=True)
353
354     def setloc(p):
355         loc = p.get("location")
356         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
357             p["location"] = mapper.mapper(p["location"]).resolved
358             return
359
360         if not loc:
361             return
362
363         if collectionUUID in p:
364             uuid = p[collectionUUID]
365             if uuid not in uuid_map:
366                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
367                     "Collection uuid %s not found" % uuid)
368             gp = collection_pdh_pattern.match(loc)
369             if gp and uuid_map[uuid] != gp.groups()[0]:
370                 # This file entry has both collectionUUID and a PDH
371                 # location. If the PDH doesn't match the one returned
372                 # the API server, raise an error.
373                 raise SourceLine(p, "location", validate.ValidationException).makeError(
374                     "Expected collection uuid %s to be %s but API server reported %s" % (
375                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
376
377         gp = collection_uuid_pattern.match(loc)
378         if not gp:
379             return
380         uuid = gp.groups()[0]
381         if uuid not in uuid_map:
382             raise SourceLine(p, "location", validate.ValidationException).makeError(
383                 "Collection uuid %s not found" % uuid)
384         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
385         p[collectionUUID] = uuid
386
387     visit_class(workflowobj, ("File", "Directory"), setloc)
388     visit_class(discovered, ("File", "Directory"), setloc)
389
390     if discovered_secondaryfiles is not None:
391         for d in discovered:
392             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
393
394     if "$schemas" in workflowobj:
395         sch = []
396         for s in workflowobj["$schemas"]:
397             sch.append(mapper.mapper(s).resolved)
398         workflowobj["$schemas"] = sch
399
400     return mapper
401
402
403 def upload_docker(arvrunner, tool):
404     """Uploads Docker images used in CommandLineTool objects."""
405
406     if isinstance(tool, CommandLineTool):
407         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
408         if docker_req:
409             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
410                 # TODO: can be supported by containers API, but not jobs API.
411                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
412                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
413             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
414         else:
415             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
416     elif isinstance(tool, cwltool.workflow.Workflow):
417         for s in tool.steps:
418             upload_docker(arvrunner, s.embedded_tool)
419
420
421 def packed_workflow(arvrunner, tool, merged_map):
422     """Create a packed workflow.
423
424     A "packed" workflow is one where all the components have been combined into a single document."""
425
426     rewrites = {}
427     packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
428                   tool.tool["id"], tool.metadata, rewrite_out=rewrites)
429
430     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
431
432     def visit(v, cur_id):
433         if isinstance(v, dict):
434             if v.get("class") in ("CommandLineTool", "Workflow"):
435                 if "id" not in v:
436                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
437                 cur_id = rewrite_to_orig.get(v["id"], v["id"])
438             if "location" in v and not v["location"].startswith("keep:"):
439                 v["location"] = merged_map[cur_id].resolved[v["location"]]
440             if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
441                 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
442             if v.get("class") == "DockerRequirement":
443                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
444             for l in v:
445                 visit(v[l], cur_id)
446         if isinstance(v, list):
447             for l in v:
448                 visit(l, cur_id)
449     visit(packed, None)
450     return packed
451
452
453 def tag_git_version(packed):
454     if tool.tool["id"].startswith("file://"):
455         path = os.path.dirname(tool.tool["id"][7:])
456         try:
457             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
458         except (OSError, subprocess.CalledProcessError):
459             pass
460         else:
461             packed["http://schema.org/version"] = githash
462
463
464 def upload_job_order(arvrunner, name, tool, job_order):
465     """Upload local files referenced in the input object and return updated input
466     object with 'location' updated to the proper keep references.
467     """
468
469     # Make a copy of the job order and set defaults.
470     builder_job_order = copy.copy(job_order)
471
472     # fill_in_defaults throws an error if there are any
473     # missing required parameters, we don't want it to do that
474     # so make them all optional.
475     inputs_copy = copy.deepcopy(tool.tool["inputs"])
476     for i in inputs_copy:
477         if "null" not in i["type"]:
478             i["type"] = ["null"] + aslist(i["type"])
479
480     fill_in_defaults(inputs_copy,
481                      builder_job_order,
482                      arvrunner.fs_access)
483     # Need to create a builder object to evaluate expressions.
484     builder = make_builder(builder_job_order,
485                            tool.hints,
486                            tool.requirements,
487                            ArvRuntimeContext())
488     # Now update job_order with secondaryFiles
489     discover_secondary_files(arvrunner.fs_access,
490                              builder,
491                              tool.tool["inputs"],
492                              job_order)
493
494     jobmapper = upload_dependencies(arvrunner,
495                                     name,
496                                     tool.doc_loader,
497                                     job_order,
498                                     job_order.get("id", "#"),
499                                     False)
500
501     if "id" in job_order:
502         del job_order["id"]
503
504     # Need to filter this out, gets added by cwltool when providing
505     # parameters on the command line.
506     if "job_order" in job_order:
507         del job_order["job_order"]
508
509     return job_order
510
511 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
512
513 def upload_workflow_deps(arvrunner, tool):
514     # Ensure that Docker images needed by this workflow are available
515
516     upload_docker(arvrunner, tool)
517
518     document_loader = tool.doc_loader
519
520     merged_map = {}
521
522     def upload_tool_deps(deptool):
523         if "id" in deptool:
524             discovered_secondaryfiles = {}
525             pm = upload_dependencies(arvrunner,
526                                      "%s dependencies" % (shortname(deptool["id"])),
527                                      document_loader,
528                                      deptool,
529                                      deptool["id"],
530                                      False,
531                                      include_primary=False,
532                                      discovered_secondaryfiles=discovered_secondaryfiles)
533             document_loader.idx[deptool["id"]] = deptool
534             toolmap = {}
535             for k,v in pm.items():
536                 toolmap[k] = v.resolved
537             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
538
539     tool.visit(upload_tool_deps)
540
541     return merged_map
542
543 def arvados_jobs_image(arvrunner, img):
544     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
545
546     try:
547         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
548     except Exception as e:
549         raise Exception("Docker image %s is not available\n%s" % (img, e) )
550
551
552 def upload_workflow_collection(arvrunner, name, packed):
553     collection = arvados.collection.Collection(api_client=arvrunner.api,
554                                                keep_client=arvrunner.keep_client,
555                                                num_retries=arvrunner.num_retries)
556     with collection.open("workflow.cwl", "w") as f:
557         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
558
559     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
560                ["name", "like", name+"%"]]
561     if arvrunner.project_uuid:
562         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
563     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
564
565     if exists["items"]:
566         logger.info("Using collection %s", exists["items"][0]["uuid"])
567     else:
568         collection.save_new(name=name,
569                             owner_uuid=arvrunner.project_uuid,
570                             ensure_unique_name=True,
571                             num_retries=arvrunner.num_retries)
572         logger.info("Uploaded to %s", collection.manifest_locator())
573
574     return collection.portable_data_hash()
575
576
577 class Runner(Process):
578     """Base class for runner processes, which submit an instance of
579     arvados-cwl-runner and wait for the final result."""
580
581     def __init__(self, runner, updated_tool,
582                  tool, loadingContext, enable_reuse,
583                  output_name, output_tags, submit_runner_ram=0,
584                  name=None, on_error=None, submit_runner_image=None,
585                  intermediate_output_ttl=0, merged_map=None,
586                  priority=None, secret_store=None,
587                  collection_cache_size=256,
588                  collection_cache_is_default=True):
589
590         loadingContext = loadingContext.copy()
591         loadingContext.metadata = updated_tool.metadata.copy()
592
593         super(Runner, self).__init__(updated_tool.tool, loadingContext)
594
595         self.arvrunner = runner
596         self.embedded_tool = tool
597         self.job_order = None
598         self.running = False
599         if enable_reuse:
600             # If reuse is permitted by command line arguments but
601             # disabled by the workflow itself, disable it.
602             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
603             if reuse_req:
604                 enable_reuse = reuse_req["enableReuse"]
605         self.enable_reuse = enable_reuse
606         self.uuid = None
607         self.final_output = None
608         self.output_name = output_name
609         self.output_tags = output_tags
610         self.name = name
611         self.on_error = on_error
612         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
613         self.intermediate_output_ttl = intermediate_output_ttl
614         self.priority = priority
615         self.secret_store = secret_store
616         self.enable_dev = loadingContext.enable_dev
617
618         self.submit_runner_cores = 1
619         self.submit_runner_ram = 1024  # defaut 1 GiB
620         self.collection_cache_size = collection_cache_size
621
622         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
623         if runner_resource_req:
624             if runner_resource_req.get("coresMin"):
625                 self.submit_runner_cores = runner_resource_req["coresMin"]
626             if runner_resource_req.get("ramMin"):
627                 self.submit_runner_ram = runner_resource_req["ramMin"]
628             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
629                 self.collection_cache_size = runner_resource_req["keep_cache"]
630
631         if submit_runner_ram:
632             # Command line / initializer overrides default and/or spec from workflow
633             self.submit_runner_ram = submit_runner_ram
634
635         if self.submit_runner_ram <= 0:
636             raise Exception("Value of submit-runner-ram must be greater than zero")
637
638         if self.submit_runner_cores <= 0:
639             raise Exception("Value of submit-runner-cores must be greater than zero")
640
641         self.merged_map = merged_map or {}
642
643     def job(self,
644             job_order,         # type: Mapping[Text, Text]
645             output_callbacks,  # type: Callable[[Any, Any], Any]
646             runtimeContext     # type: RuntimeContext
647            ):  # type: (...) -> Generator[Any, None, None]
648         self.job_order = job_order
649         self._init_job(job_order, runtimeContext)
650         yield self
651
652     def update_pipeline_component(self, record):
653         pass
654
655     def done(self, record):
656         """Base method for handling a completed runner."""
657
658         try:
659             if record["state"] == "Complete":
660                 if record.get("exit_code") is not None:
661                     if record["exit_code"] == 33:
662                         processStatus = "UnsupportedRequirement"
663                     elif record["exit_code"] == 0:
664                         processStatus = "success"
665                     else:
666                         processStatus = "permanentFail"
667                 else:
668                     processStatus = "success"
669             else:
670                 processStatus = "permanentFail"
671
672             outputs = {}
673
674             if processStatus == "permanentFail":
675                 logc = arvados.collection.CollectionReader(record["log"],
676                                                            api_client=self.arvrunner.api,
677                                                            keep_client=self.arvrunner.keep_client,
678                                                            num_retries=self.arvrunner.num_retries)
679                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
680
681             self.final_output = record["output"]
682             outc = arvados.collection.CollectionReader(self.final_output,
683                                                        api_client=self.arvrunner.api,
684                                                        keep_client=self.arvrunner.keep_client,
685                                                        num_retries=self.arvrunner.num_retries)
686             if "cwl.output.json" in outc:
687                 with outc.open("cwl.output.json", "rb") as f:
688                     if f.size() > 0:
689                         outputs = json.loads(f.read().decode())
690             def keepify(fileobj):
691                 path = fileobj["location"]
692                 if not path.startswith("keep:"):
693                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
694             adjustFileObjs(outputs, keepify)
695             adjustDirObjs(outputs, keepify)
696         except Exception:
697             logger.exception("[%s] While getting final output object", self.name)
698             self.arvrunner.output_callback({}, "permanentFail")
699         else:
700             self.arvrunner.output_callback(outputs, processStatus)