15028: Bump cwltool version for bugfix.
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
21
22 if os.name == "posix" and sys.version_info[0] < 3:
23     import subprocess32 as subprocess
24 else:
25     import subprocess
26
27 from schema_salad.sourceline import SourceLine, cmap
28
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32                              shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.utils import aslist
36 from cwltool.builder import substitute
37 from cwltool.pack import pack
38 from cwltool.update import INTERNAL_VERSION
39 from cwltool.builder import Builder
40 import schema_salad.validate as validate
41
42 import arvados.collection
43 from .util import collectionUUID
44 import ruamel.yaml as yaml
45
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing
48 from ._version import __version__
49 from . import done
50 from . context import ArvRuntimeContext
51
52 logger = logging.getLogger('arvados.cwl-runner')
53
54 def trim_anonymous_location(obj):
55     """Remove 'location' field from File and Directory literals.
56
57     To make internal handling easier, literals are assigned a random id for
58     'location'.  However, when writing the record back out, this can break
59     reproducibility.  Since it is valid for literals not have a 'location'
60     field, remove it.
61
62     """
63
64     if obj.get("location", "").startswith("_:"):
65         del obj["location"]
66
67
68 def remove_redundant_fields(obj):
69     for field in ("path", "nameext", "nameroot", "dirname"):
70         if field in obj:
71             del obj[field]
72
73
74 def find_defaults(d, op):
75     if isinstance(d, list):
76         for i in d:
77             find_defaults(i, op)
78     elif isinstance(d, dict):
79         if "default" in d:
80             op(d)
81         else:
82             for i in viewvalues(d):
83                 find_defaults(i, op)
84
85 def make_builder(joborder, hints, requirements, runtimeContext):
86     return Builder(
87                  job=joborder,
88                  files=[],               # type: List[Dict[Text, Text]]
89                  bindings=[],            # type: List[Dict[Text, Any]]
90                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
91                  names=None,               # type: Names
92                  requirements=requirements,        # type: List[Dict[Text, Any]]
93                  hints=hints,               # type: List[Dict[Text, Any]]
94                  resources={},           # type: Dict[str, int]
95                  mutation_manager=None,    # type: Optional[MutationManager]
96                  formatgraph=None,         # type: Optional[Graph]
97                  make_fs_access=None,      # type: Type[StdFsAccess]
98                  fs_access=None,           # type: StdFsAccess
99                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100                  timeout=runtimeContext.eval_timeout,             # type: float
101                  debug=runtimeContext.debug,               # type: bool
102                  js_console=runtimeContext.js_console,          # type: bool
103                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
104                  loadListing="",         # type: Text
105                  outdir="",              # type: Text
106                  tmpdir="",              # type: Text
107                  stagedir="",            # type: Text
108                 )
109
110 def search_schemadef(name, reqs):
111     for r in reqs:
112         if r["class"] == "SchemaDefRequirement":
113             for sd in r["types"]:
114                 if sd["name"] == name:
115                     return sd
116     return None
117
118 primitive_types_set = frozenset(("null", "boolean", "int", "long",
119                                  "float", "double", "string", "record",
120                                  "array", "enum"))
121
122 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
123     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
124         # union type, collect all possible secondaryFiles
125         for i in inputschema:
126             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
127         return
128
129     if isinstance(inputschema, basestring):
130         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
131         if sd:
132             inputschema = sd
133         else:
134             return
135
136     if "secondaryFiles" in inputschema:
137         # set secondaryFiles, may be inherited by compound types.
138         secondaryspec = inputschema["secondaryFiles"]
139
140     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
141         not isinstance(inputschema["type"], basestring)):
142         # compound type (union, array, record)
143         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
144
145     elif (inputschema["type"] == "record" and
146           isinstance(primary, Mapping)):
147         #
148         # record type, find secondary files associated with fields.
149         #
150         for f in inputschema["fields"]:
151             p = primary.get(shortname(f["name"]))
152             if p:
153                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
154
155     elif (inputschema["type"] == "array" and
156           isinstance(primary, Sequence)):
157         #
158         # array type, find secondary files of elements
159         #
160         for p in primary:
161             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
162
163     elif (inputschema["type"] == "File" and
164           secondaryspec and
165           isinstance(primary, Mapping) and
166           primary.get("class") == "File" and
167           "secondaryFiles" not in primary):
168         #
169         # Found a file, check for secondaryFiles
170         #
171         primary["secondaryFiles"] = []
172         for i, sf in enumerate(aslist(secondaryspec)):
173             pattern = builder.do_eval(sf["pattern"], context=primary)
174             if pattern is None:
175                 continue
176             sfpath = substitute(primary["location"], pattern)
177             required = builder.do_eval(sf.get("required"), context=primary)
178
179             if fsaccess.exists(sfpath):
180                 primary["secondaryFiles"].append({"location": sfpath, "class": "File"})
181             elif required:
182                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
183                     "Required secondary file '%s' does not exist" % sfpath)
184
185         primary["secondaryFiles"] = cmap(primary["secondaryFiles"])
186         if discovered is not None:
187             discovered[primary["location"]] = primary["secondaryFiles"]
188     elif inputschema["type"] not in primitive_types_set:
189         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
190
191 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
192     for inputschema in inputs:
193         primary = job_order.get(shortname(inputschema["id"]))
194         if isinstance(primary, (Mapping, Sequence)):
195             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
196
197 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
198 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
199
200 def upload_dependencies(arvrunner, name, document_loader,
201                         workflowobj, uri, loadref_run,
202                         include_primary=True, discovered_secondaryfiles=None):
203     """Upload the dependencies of the workflowobj document to Keep.
204
205     Returns a pathmapper object mapping local paths to keep references.  Also
206     does an in-place update of references in "workflowobj".
207
208     Use scandeps to find $import, $include, $schemas, run, File and Directory
209     fields that represent external references.
210
211     If workflowobj has an "id" field, this will reload the document to ensure
212     it is scanning the raw document prior to preprocessing.
213     """
214
215     loaded = set()
216     def loadref(b, u):
217         joined = document_loader.fetcher.urljoin(b, u)
218         defrg, _ = urllib.parse.urldefrag(joined)
219         if defrg not in loaded:
220             loaded.add(defrg)
221             # Use fetch_text to get raw file (before preprocessing).
222             text = document_loader.fetch_text(defrg)
223             if isinstance(text, bytes):
224                 textIO = StringIO(text.decode('utf-8'))
225             else:
226                 textIO = StringIO(text)
227             return yaml.safe_load(textIO)
228         else:
229             return {}
230
231     if loadref_run:
232         loadref_fields = set(("$import", "run"))
233     else:
234         loadref_fields = set(("$import",))
235
236     scanobj = workflowobj
237     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
238         # Need raw file content (before preprocessing) to ensure
239         # that external references in $include and $mixin are captured.
240         scanobj = loadref("", workflowobj["id"])
241
242     sc_result = scandeps(uri, scanobj,
243                   loadref_fields,
244                   set(("$include", "$schemas", "location")),
245                   loadref, urljoin=document_loader.fetcher.urljoin)
246
247     sc = []
248     uuids = {}
249
250     def collect_uuids(obj):
251         loc = obj.get("location", "")
252         sp = loc.split(":")
253         if sp[0] == "keep":
254             # Collect collection uuids that need to be resolved to
255             # portable data hashes
256             gp = collection_uuid_pattern.match(loc)
257             if gp:
258                 uuids[gp.groups()[0]] = obj
259             if collectionUUID in obj:
260                 uuids[obj[collectionUUID]] = obj
261
262     def collect_uploads(obj):
263         loc = obj.get("location", "")
264         sp = loc.split(":")
265         if len(sp) < 1:
266             return
267         if sp[0] in ("file", "http", "https"):
268             # Record local files than need to be uploaded,
269             # don't include file literals, keep references, etc.
270             sc.append(obj)
271         collect_uuids(obj)
272
273     visit_class(workflowobj, ("File", "Directory"), collect_uuids)
274     visit_class(sc_result, ("File", "Directory"), collect_uploads)
275
276     # Resolve any collection uuids we found to portable data hashes
277     # and assign them to uuid_map
278     uuid_map = {}
279     fetch_uuids = list(uuids.keys())
280     while fetch_uuids:
281         # For a large number of fetch_uuids, API server may limit
282         # response size, so keep fetching from API server has nothing
283         # more to give us.
284         lookups = arvrunner.api.collections().list(
285             filters=[["uuid", "in", fetch_uuids]],
286             count="none",
287             select=["uuid", "portable_data_hash"]).execute(
288                 num_retries=arvrunner.num_retries)
289
290         if not lookups["items"]:
291             break
292
293         for l in lookups["items"]:
294             uuid_map[l["uuid"]] = l["portable_data_hash"]
295
296         fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
297
298     normalizeFilesDirs(sc)
299
300     if include_primary and "id" in workflowobj:
301         sc.append({"class": "File", "location": workflowobj["id"]})
302
303     if "$schemas" in workflowobj:
304         for s in workflowobj["$schemas"]:
305             sc.append({"class": "File", "location": s})
306
307     def visit_default(obj):
308         remove = [False]
309         def ensure_default_location(f):
310             if "location" not in f and "path" in f:
311                 f["location"] = f["path"]
312                 del f["path"]
313             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
314                 # Doesn't exist, remove from list of dependencies to upload
315                 sc[:] = [x for x in sc if x["location"] != f["location"]]
316                 # Delete "default" from workflowobj
317                 remove[0] = True
318         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
319         if remove[0]:
320             del obj["default"]
321
322     find_defaults(workflowobj, visit_default)
323
324     discovered = {}
325     def discover_default_secondary_files(obj):
326         builder_job_order = {}
327         for t in obj["inputs"]:
328             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
329         # Need to create a builder object to evaluate expressions.
330         builder = make_builder(builder_job_order,
331                                obj.get("hints", []),
332                                obj.get("requirements", []),
333                                ArvRuntimeContext())
334         discover_secondary_files(arvrunner.fs_access,
335                                  builder,
336                                  obj["inputs"],
337                                  builder_job_order,
338                                  discovered)
339
340     visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
341
342     for d in list(discovered):
343         # Only interested in discovered secondaryFiles which are local
344         # files that need to be uploaded.
345         if d.startswith("file:"):
346             sc.extend(discovered[d])
347         else:
348             del discovered[d]
349
350     mapper = ArvPathMapper(arvrunner, sc, "",
351                            "keep:%s",
352                            "keep:%s/%s",
353                            name=name,
354                            single_collection=True)
355
356     def setloc(p):
357         loc = p.get("location")
358         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
359             p["location"] = mapper.mapper(p["location"]).resolved
360             return
361
362         if not loc:
363             return
364
365         if collectionUUID in p:
366             uuid = p[collectionUUID]
367             if uuid not in uuid_map:
368                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
369                     "Collection uuid %s not found" % uuid)
370             gp = collection_pdh_pattern.match(loc)
371             if gp and uuid_map[uuid] != gp.groups()[0]:
372                 # This file entry has both collectionUUID and a PDH
373                 # location. If the PDH doesn't match the one returned
374                 # the API server, raise an error.
375                 raise SourceLine(p, "location", validate.ValidationException).makeError(
376                     "Expected collection uuid %s to be %s but API server reported %s" % (
377                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
378
379         gp = collection_uuid_pattern.match(loc)
380         if not gp:
381             return
382         uuid = gp.groups()[0]
383         if uuid not in uuid_map:
384             raise SourceLine(p, "location", validate.ValidationException).makeError(
385                 "Collection uuid %s not found" % uuid)
386         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
387         p[collectionUUID] = uuid
388
389     visit_class(workflowobj, ("File", "Directory"), setloc)
390     visit_class(discovered, ("File", "Directory"), setloc)
391
392     if discovered_secondaryfiles is not None:
393         for d in discovered:
394             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
395
396     if "$schemas" in workflowobj:
397         sch = []
398         for s in workflowobj["$schemas"]:
399             sch.append(mapper.mapper(s).resolved)
400         workflowobj["$schemas"] = sch
401
402     return mapper
403
404
405 def upload_docker(arvrunner, tool):
406     """Uploads Docker images used in CommandLineTool objects."""
407
408     if isinstance(tool, CommandLineTool):
409         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
410         if docker_req:
411             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
412                 # TODO: can be supported by containers API, but not jobs API.
413                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
414                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
415             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
416         else:
417             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
418     elif isinstance(tool, cwltool.workflow.Workflow):
419         for s in tool.steps:
420             upload_docker(arvrunner, s.embedded_tool)
421
422
423 def packed_workflow(arvrunner, tool, merged_map):
424     """Create a packed workflow.
425
426     A "packed" workflow is one where all the components have been combined into a single document."""
427
428     rewrites = {}
429     packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
430                   tool.tool["id"], tool.metadata, rewrite_out=rewrites)
431
432     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
433
434     def visit(v, cur_id):
435         if isinstance(v, dict):
436             if v.get("class") in ("CommandLineTool", "Workflow"):
437                 if "id" not in v:
438                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
439                 cur_id = rewrite_to_orig.get(v["id"], v["id"])
440             if "location" in v and not v["location"].startswith("keep:"):
441                 v["location"] = merged_map[cur_id].resolved[v["location"]]
442             if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
443                 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
444             if v.get("class") == "DockerRequirement":
445                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
446             for l in v:
447                 visit(v[l], cur_id)
448         if isinstance(v, list):
449             for l in v:
450                 visit(l, cur_id)
451     visit(packed, None)
452     return packed
453
454
455 def tag_git_version(packed):
456     if tool.tool["id"].startswith("file://"):
457         path = os.path.dirname(tool.tool["id"][7:])
458         try:
459             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
460         except (OSError, subprocess.CalledProcessError):
461             pass
462         else:
463             packed["http://schema.org/version"] = githash
464
465
466 def upload_job_order(arvrunner, name, tool, job_order):
467     """Upload local files referenced in the input object and return updated input
468     object with 'location' updated to the proper keep references.
469     """
470
471     # Make a copy of the job order and set defaults.
472     builder_job_order = copy.copy(job_order)
473
474     # fill_in_defaults throws an error if there are any
475     # missing required parameters, we don't want it to do that
476     # so make them all optional.
477     inputs_copy = copy.deepcopy(tool.tool["inputs"])
478     for i in inputs_copy:
479         if "null" not in i["type"]:
480             i["type"] = ["null"] + aslist(i["type"])
481
482     fill_in_defaults(inputs_copy,
483                      builder_job_order,
484                      arvrunner.fs_access)
485     # Need to create a builder object to evaluate expressions.
486     builder = make_builder(builder_job_order,
487                            tool.hints,
488                            tool.requirements,
489                            ArvRuntimeContext())
490     # Now update job_order with secondaryFiles
491     discover_secondary_files(arvrunner.fs_access,
492                              builder,
493                              tool.tool["inputs"],
494                              job_order)
495
496     jobmapper = upload_dependencies(arvrunner,
497                                     name,
498                                     tool.doc_loader,
499                                     job_order,
500                                     job_order.get("id", "#"),
501                                     False)
502
503     if "id" in job_order:
504         del job_order["id"]
505
506     # Need to filter this out, gets added by cwltool when providing
507     # parameters on the command line.
508     if "job_order" in job_order:
509         del job_order["job_order"]
510
511     return job_order
512
513 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
514
515 def upload_workflow_deps(arvrunner, tool):
516     # Ensure that Docker images needed by this workflow are available
517
518     upload_docker(arvrunner, tool)
519
520     document_loader = tool.doc_loader
521
522     merged_map = {}
523
524     def upload_tool_deps(deptool):
525         if "id" in deptool:
526             discovered_secondaryfiles = {}
527             pm = upload_dependencies(arvrunner,
528                                      "%s dependencies" % (shortname(deptool["id"])),
529                                      document_loader,
530                                      deptool,
531                                      deptool["id"],
532                                      False,
533                                      include_primary=False,
534                                      discovered_secondaryfiles=discovered_secondaryfiles)
535             document_loader.idx[deptool["id"]] = deptool
536             toolmap = {}
537             for k,v in pm.items():
538                 toolmap[k] = v.resolved
539             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
540
541     tool.visit(upload_tool_deps)
542
543     return merged_map
544
545 def arvados_jobs_image(arvrunner, img):
546     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
547
548     try:
549         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
550     except Exception as e:
551         raise Exception("Docker image %s is not available\n%s" % (img, e) )
552
553
554 def upload_workflow_collection(arvrunner, name, packed):
555     collection = arvados.collection.Collection(api_client=arvrunner.api,
556                                                keep_client=arvrunner.keep_client,
557                                                num_retries=arvrunner.num_retries)
558     with collection.open("workflow.cwl", "w") as f:
559         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
560
561     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
562                ["name", "like", name+"%"]]
563     if arvrunner.project_uuid:
564         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
565     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
566
567     if exists["items"]:
568         logger.info("Using collection %s", exists["items"][0]["uuid"])
569     else:
570         collection.save_new(name=name,
571                             owner_uuid=arvrunner.project_uuid,
572                             ensure_unique_name=True,
573                             num_retries=arvrunner.num_retries)
574         logger.info("Uploaded to %s", collection.manifest_locator())
575
576     return collection.portable_data_hash()
577
578
579 class Runner(Process):
580     """Base class for runner processes, which submit an instance of
581     arvados-cwl-runner and wait for the final result."""
582
583     def __init__(self, runner, tool, loadingContext, enable_reuse,
584                  output_name, output_tags, submit_runner_ram=0,
585                  name=None, on_error=None, submit_runner_image=None,
586                  intermediate_output_ttl=0, merged_map=None,
587                  priority=None, secret_store=None,
588                  collection_cache_size=256,
589                  collection_cache_is_default=True):
590
591         loadingContext = loadingContext.copy()
592         loadingContext.metadata = loadingContext.metadata.copy()
593         loadingContext.metadata["cwlVersion"] = INTERNAL_VERSION
594
595         super(Runner, self).__init__(tool.tool, loadingContext)
596
597         self.arvrunner = runner
598         self.embedded_tool = tool
599         self.job_order = None
600         self.running = False
601         if enable_reuse:
602             # If reuse is permitted by command line arguments but
603             # disabled by the workflow itself, disable it.
604             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
605             if reuse_req:
606                 enable_reuse = reuse_req["enableReuse"]
607         self.enable_reuse = enable_reuse
608         self.uuid = None
609         self.final_output = None
610         self.output_name = output_name
611         self.output_tags = output_tags
612         self.name = name
613         self.on_error = on_error
614         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
615         self.intermediate_output_ttl = intermediate_output_ttl
616         self.priority = priority
617         self.secret_store = secret_store
618         self.enable_dev = loadingContext.enable_dev
619
620         self.submit_runner_cores = 1
621         self.submit_runner_ram = 1024  # defaut 1 GiB
622         self.collection_cache_size = collection_cache_size
623
624         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
625         if runner_resource_req:
626             if runner_resource_req.get("coresMin"):
627                 self.submit_runner_cores = runner_resource_req["coresMin"]
628             if runner_resource_req.get("ramMin"):
629                 self.submit_runner_ram = runner_resource_req["ramMin"]
630             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
631                 self.collection_cache_size = runner_resource_req["keep_cache"]
632
633         if submit_runner_ram:
634             # Command line / initializer overrides default and/or spec from workflow
635             self.submit_runner_ram = submit_runner_ram
636
637         if self.submit_runner_ram <= 0:
638             raise Exception("Value of submit-runner-ram must be greater than zero")
639
640         if self.submit_runner_cores <= 0:
641             raise Exception("Value of submit-runner-cores must be greater than zero")
642
643         self.merged_map = merged_map or {}
644
645     def job(self,
646             job_order,         # type: Mapping[Text, Text]
647             output_callbacks,  # type: Callable[[Any, Any], Any]
648             runtimeContext     # type: RuntimeContext
649            ):  # type: (...) -> Generator[Any, None, None]
650         self.job_order = job_order
651         self._init_job(job_order, runtimeContext)
652         yield self
653
654     def update_pipeline_component(self, record):
655         pass
656
657     def done(self, record):
658         """Base method for handling a completed runner."""
659
660         try:
661             if record["state"] == "Complete":
662                 if record.get("exit_code") is not None:
663                     if record["exit_code"] == 33:
664                         processStatus = "UnsupportedRequirement"
665                     elif record["exit_code"] == 0:
666                         processStatus = "success"
667                     else:
668                         processStatus = "permanentFail"
669                 else:
670                     processStatus = "success"
671             else:
672                 processStatus = "permanentFail"
673
674             outputs = {}
675
676             if processStatus == "permanentFail":
677                 logc = arvados.collection.CollectionReader(record["log"],
678                                                            api_client=self.arvrunner.api,
679                                                            keep_client=self.arvrunner.keep_client,
680                                                            num_retries=self.arvrunner.num_retries)
681                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
682
683             self.final_output = record["output"]
684             outc = arvados.collection.CollectionReader(self.final_output,
685                                                        api_client=self.arvrunner.api,
686                                                        keep_client=self.arvrunner.keep_client,
687                                                        num_retries=self.arvrunner.num_retries)
688             if "cwl.output.json" in outc:
689                 with outc.open("cwl.output.json", "rb") as f:
690                     if f.size() > 0:
691                         outputs = json.loads(f.read().decode())
692             def keepify(fileobj):
693                 path = fileobj["location"]
694                 if not path.startswith("keep:"):
695                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
696             adjustFileObjs(outputs, keepify)
697             adjustDirObjs(outputs, keepify)
698         except Exception:
699             logger.exception("[%s] While getting final output object", self.name)
700             self.arvrunner.output_callback({}, "permanentFail")
701         else:
702             self.arvrunner.output_callback(outputs, processStatus)