Merge branch 'patch-1' of https://github.com/mr-c/arvados into mr-c-patch-1
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
21
22 if os.name == "posix" and sys.version_info[0] < 3:
23     import subprocess32 as subprocess
24 else:
25     import subprocess
26
27 from schema_salad.sourceline import SourceLine, cmap
28
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32                              shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.utils import aslist
36 from cwltool.builder import substitute
37 from cwltool.pack import pack
38 from cwltool.update import INTERNAL_VERSION
39 from cwltool.builder import Builder
40 import schema_salad.validate as validate
41
42 import arvados.collection
43 from .util import collectionUUID
44 import ruamel.yaml as yaml
45 from ruamel.yaml.comments import CommentedMap, CommentedSeq
46
47 import arvados_cwl.arvdocker
48 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
49 from ._version import __version__
50 from . import done
51 from . context import ArvRuntimeContext
52
53 logger = logging.getLogger('arvados.cwl-runner')
54
55 def trim_anonymous_location(obj):
56     """Remove 'location' field from File and Directory literals.
57
58     To make internal handling easier, literals are assigned a random id for
59     'location'.  However, when writing the record back out, this can break
60     reproducibility.  Since it is valid for literals not have a 'location'
61     field, remove it.
62
63     """
64
65     if obj.get("location", "").startswith("_:"):
66         del obj["location"]
67
68
69 def remove_redundant_fields(obj):
70     for field in ("path", "nameext", "nameroot", "dirname"):
71         if field in obj:
72             del obj[field]
73
74
75 def find_defaults(d, op):
76     if isinstance(d, list):
77         for i in d:
78             find_defaults(i, op)
79     elif isinstance(d, dict):
80         if "default" in d:
81             op(d)
82         else:
83             for i in viewvalues(d):
84                 find_defaults(i, op)
85
86 def make_builder(joborder, hints, requirements, runtimeContext):
87     return Builder(
88                  job=joborder,
89                  files=[],               # type: List[Dict[Text, Text]]
90                  bindings=[],            # type: List[Dict[Text, Any]]
91                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
92                  names=None,               # type: Names
93                  requirements=requirements,        # type: List[Dict[Text, Any]]
94                  hints=hints,               # type: List[Dict[Text, Any]]
95                  resources={},           # type: Dict[str, int]
96                  mutation_manager=None,    # type: Optional[MutationManager]
97                  formatgraph=None,         # type: Optional[Graph]
98                  make_fs_access=None,      # type: Type[StdFsAccess]
99                  fs_access=None,           # type: StdFsAccess
100                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
101                  timeout=runtimeContext.eval_timeout,             # type: float
102                  debug=runtimeContext.debug,               # type: bool
103                  js_console=runtimeContext.js_console,          # type: bool
104                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
105                  loadListing="",         # type: Text
106                  outdir="",              # type: Text
107                  tmpdir="",              # type: Text
108                  stagedir="",            # type: Text
109                 )
110
111 def search_schemadef(name, reqs):
112     for r in reqs:
113         if r["class"] == "SchemaDefRequirement":
114             for sd in r["types"]:
115                 if sd["name"] == name:
116                     return sd
117     return None
118
119 primitive_types_set = frozenset(("null", "boolean", "int", "long",
120                                  "float", "double", "string", "record",
121                                  "array", "enum"))
122
123 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
124     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
125         # union type, collect all possible secondaryFiles
126         for i in inputschema:
127             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
128         return
129
130     if isinstance(inputschema, basestring):
131         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
132         if sd:
133             inputschema = sd
134         else:
135             return
136
137     if "secondaryFiles" in inputschema:
138         # set secondaryFiles, may be inherited by compound types.
139         secondaryspec = inputschema["secondaryFiles"]
140
141     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
142         not isinstance(inputschema["type"], basestring)):
143         # compound type (union, array, record)
144         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
145
146     elif (inputschema["type"] == "record" and
147           isinstance(primary, Mapping)):
148         #
149         # record type, find secondary files associated with fields.
150         #
151         for f in inputschema["fields"]:
152             p = primary.get(shortname(f["name"]))
153             if p:
154                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
155
156     elif (inputschema["type"] == "array" and
157           isinstance(primary, Sequence)):
158         #
159         # array type, find secondary files of elements
160         #
161         for p in primary:
162             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
163
164     elif (inputschema["type"] == "File" and
165           secondaryspec and
166           isinstance(primary, Mapping) and
167           primary.get("class") == "File" and
168           "secondaryFiles" not in primary):
169         #
170         # Found a file, check for secondaryFiles
171         #
172         specs = []
173         primary["secondaryFiles"] = secondaryspec
174         for i, sf in enumerate(aslist(secondaryspec)):
175             pattern = builder.do_eval(sf["pattern"], context=primary)
176             if pattern is None:
177                 continue
178             if isinstance(pattern, list):
179                 specs.extend(pattern)
180             elif isinstance(pattern, dict):
181                 specs.append(pattern)
182             elif isinstance(pattern, str):
183                 specs.append({"pattern": pattern})
184             else:
185                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
186                     "Expression must return list, object, string or null")
187
188         found = []
189         for i, sf in enumerate(specs):
190             if isinstance(sf, dict):
191                 if sf.get("class") == "File":
192                     pattern = sf["basename"]
193                 else:
194                     pattern = sf["pattern"]
195                     required = sf.get("required")
196             elif isinstance(sf, str):
197                 pattern = sf
198                 required = True
199             else:
200                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
201                     "Expression must return list, object, string or null")
202
203             sfpath = substitute(primary["location"], pattern)
204             required = builder.do_eval(required, context=primary)
205
206             if fsaccess.exists(sfpath):
207                 found.append({"location": sfpath, "class": "File"})
208             elif required:
209                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
210                     "Required secondary file '%s' does not exist" % sfpath)
211
212         primary["secondaryFiles"] = cmap(found)
213         if discovered is not None:
214             discovered[primary["location"]] = primary["secondaryFiles"]
215     elif inputschema["type"] not in primitive_types_set:
216         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
217
218 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
219     for inputschema in inputs:
220         primary = job_order.get(shortname(inputschema["id"]))
221         if isinstance(primary, (Mapping, Sequence)):
222             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
223
224 def upload_dependencies(arvrunner, name, document_loader,
225                         workflowobj, uri, loadref_run,
226                         include_primary=True, discovered_secondaryfiles=None):
227     """Upload the dependencies of the workflowobj document to Keep.
228
229     Returns a pathmapper object mapping local paths to keep references.  Also
230     does an in-place update of references in "workflowobj".
231
232     Use scandeps to find $import, $include, $schemas, run, File and Directory
233     fields that represent external references.
234
235     If workflowobj has an "id" field, this will reload the document to ensure
236     it is scanning the raw document prior to preprocessing.
237     """
238
239     loaded = set()
240     def loadref(b, u):
241         joined = document_loader.fetcher.urljoin(b, u)
242         defrg, _ = urllib.parse.urldefrag(joined)
243         if defrg not in loaded:
244             loaded.add(defrg)
245             # Use fetch_text to get raw file (before preprocessing).
246             text = document_loader.fetch_text(defrg)
247             if isinstance(text, bytes):
248                 textIO = StringIO(text.decode('utf-8'))
249             else:
250                 textIO = StringIO(text)
251             return yaml.safe_load(textIO)
252         else:
253             return {}
254
255     if loadref_run:
256         loadref_fields = set(("$import", "run"))
257     else:
258         loadref_fields = set(("$import",))
259
260     scanobj = workflowobj
261     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
262         # Need raw file content (before preprocessing) to ensure
263         # that external references in $include and $mixin are captured.
264         scanobj = loadref("", workflowobj["id"])
265
266     sc_result = scandeps(uri, scanobj,
267                   loadref_fields,
268                   set(("$include", "$schemas", "location")),
269                   loadref, urljoin=document_loader.fetcher.urljoin)
270
271     sc = []
272     uuids = {}
273
274     def collect_uuids(obj):
275         loc = obj.get("location", "")
276         sp = loc.split(":")
277         if sp[0] == "keep":
278             # Collect collection uuids that need to be resolved to
279             # portable data hashes
280             gp = collection_uuid_pattern.match(loc)
281             if gp:
282                 uuids[gp.groups()[0]] = obj
283             if collectionUUID in obj:
284                 uuids[obj[collectionUUID]] = obj
285
286     def collect_uploads(obj):
287         loc = obj.get("location", "")
288         sp = loc.split(":")
289         if len(sp) < 1:
290             return
291         if sp[0] in ("file", "http", "https"):
292             # Record local files than need to be uploaded,
293             # don't include file literals, keep references, etc.
294             sc.append(obj)
295         collect_uuids(obj)
296
297     visit_class(workflowobj, ("File", "Directory"), collect_uuids)
298     visit_class(sc_result, ("File", "Directory"), collect_uploads)
299
300     # Resolve any collection uuids we found to portable data hashes
301     # and assign them to uuid_map
302     uuid_map = {}
303     fetch_uuids = list(uuids.keys())
304     while fetch_uuids:
305         # For a large number of fetch_uuids, API server may limit
306         # response size, so keep fetching from API server has nothing
307         # more to give us.
308         lookups = arvrunner.api.collections().list(
309             filters=[["uuid", "in", fetch_uuids]],
310             count="none",
311             select=["uuid", "portable_data_hash"]).execute(
312                 num_retries=arvrunner.num_retries)
313
314         if not lookups["items"]:
315             break
316
317         for l in lookups["items"]:
318             uuid_map[l["uuid"]] = l["portable_data_hash"]
319
320         fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
321
322     normalizeFilesDirs(sc)
323
324     if include_primary and "id" in workflowobj:
325         sc.append({"class": "File", "location": workflowobj["id"]})
326
327     if "$schemas" in workflowobj:
328         for s in workflowobj["$schemas"]:
329             sc.append({"class": "File", "location": s})
330
331     def visit_default(obj):
332         remove = [False]
333         def ensure_default_location(f):
334             if "location" not in f and "path" in f:
335                 f["location"] = f["path"]
336                 del f["path"]
337             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
338                 # Doesn't exist, remove from list of dependencies to upload
339                 sc[:] = [x for x in sc if x["location"] != f["location"]]
340                 # Delete "default" from workflowobj
341                 remove[0] = True
342         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
343         if remove[0]:
344             del obj["default"]
345
346     find_defaults(workflowobj, visit_default)
347
348     discovered = {}
349     def discover_default_secondary_files(obj):
350         builder_job_order = {}
351         for t in obj["inputs"]:
352             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
353         # Need to create a builder object to evaluate expressions.
354         builder = make_builder(builder_job_order,
355                                obj.get("hints", []),
356                                obj.get("requirements", []),
357                                ArvRuntimeContext())
358         discover_secondary_files(arvrunner.fs_access,
359                                  builder,
360                                  obj["inputs"],
361                                  builder_job_order,
362                                  discovered)
363
364     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
365     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
366
367     for d in list(discovered):
368         # Only interested in discovered secondaryFiles which are local
369         # files that need to be uploaded.
370         if d.startswith("file:"):
371             sc.extend(discovered[d])
372         else:
373             del discovered[d]
374
375     mapper = ArvPathMapper(arvrunner, sc, "",
376                            "keep:%s",
377                            "keep:%s/%s",
378                            name=name,
379                            single_collection=True)
380
381     def setloc(p):
382         loc = p.get("location")
383         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
384             p["location"] = mapper.mapper(p["location"]).resolved
385             return
386
387         if not loc:
388             return
389
390         if collectionUUID in p:
391             uuid = p[collectionUUID]
392             if uuid not in uuid_map:
393                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
394                     "Collection uuid %s not found" % uuid)
395             gp = collection_pdh_pattern.match(loc)
396             if gp and uuid_map[uuid] != gp.groups()[0]:
397                 # This file entry has both collectionUUID and a PDH
398                 # location. If the PDH doesn't match the one returned
399                 # the API server, raise an error.
400                 raise SourceLine(p, "location", validate.ValidationException).makeError(
401                     "Expected collection uuid %s to be %s but API server reported %s" % (
402                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
403
404         gp = collection_uuid_pattern.match(loc)
405         if not gp:
406             return
407         uuid = gp.groups()[0]
408         if uuid not in uuid_map:
409             raise SourceLine(p, "location", validate.ValidationException).makeError(
410                 "Collection uuid %s not found" % uuid)
411         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
412         p[collectionUUID] = uuid
413
414     visit_class(workflowobj, ("File", "Directory"), setloc)
415     visit_class(discovered, ("File", "Directory"), setloc)
416
417     if discovered_secondaryfiles is not None:
418         for d in discovered:
419             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
420
421     if "$schemas" in workflowobj:
422         sch = CommentedSeq()
423         for s in workflowobj["$schemas"]:
424             sch.append(mapper.mapper(s).resolved)
425         workflowobj["$schemas"] = sch
426
427     return mapper
428
429
430 def upload_docker(arvrunner, tool):
431     """Uploads Docker images used in CommandLineTool objects."""
432
433     if isinstance(tool, CommandLineTool):
434         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
435         if docker_req:
436             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
437                 # TODO: can be supported by containers API, but not jobs API.
438                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
439                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
440             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
441         else:
442             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
443     elif isinstance(tool, cwltool.workflow.Workflow):
444         for s in tool.steps:
445             upload_docker(arvrunner, s.embedded_tool)
446
447
448 def packed_workflow(arvrunner, tool, merged_map):
449     """Create a packed workflow.
450
451     A "packed" workflow is one where all the components have been combined into a single document."""
452
453     rewrites = {}
454     packed = pack(arvrunner.loadingContext, tool.tool["id"],
455                   rewrite_out=rewrites,
456                   loader=tool.doc_loader)
457
458     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
459
460     def visit(v, cur_id):
461         if isinstance(v, dict):
462             if v.get("class") in ("CommandLineTool", "Workflow"):
463                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
464                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
465                 if "id" in v:
466                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
467             if "path" in v and "location" not in v:
468                 v["location"] = v["path"]
469                 del v["path"]
470             if "location" in v and not v["location"].startswith("keep:"):
471                 v["location"] = merged_map[cur_id].resolved[v["location"]]
472             if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
473                 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
474             if v.get("class") == "DockerRequirement":
475                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
476             for l in v:
477                 visit(v[l], cur_id)
478         if isinstance(v, list):
479             for l in v:
480                 visit(l, cur_id)
481     visit(packed, None)
482     return packed
483
484
485 def tag_git_version(packed):
486     if tool.tool["id"].startswith("file://"):
487         path = os.path.dirname(tool.tool["id"][7:])
488         try:
489             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
490         except (OSError, subprocess.CalledProcessError):
491             pass
492         else:
493             packed["http://schema.org/version"] = githash
494
495
496 def upload_job_order(arvrunner, name, tool, job_order):
497     """Upload local files referenced in the input object and return updated input
498     object with 'location' updated to the proper keep references.
499     """
500
501     # Make a copy of the job order and set defaults.
502     builder_job_order = copy.copy(job_order)
503
504     # fill_in_defaults throws an error if there are any
505     # missing required parameters, we don't want it to do that
506     # so make them all optional.
507     inputs_copy = copy.deepcopy(tool.tool["inputs"])
508     for i in inputs_copy:
509         if "null" not in i["type"]:
510             i["type"] = ["null"] + aslist(i["type"])
511
512     fill_in_defaults(inputs_copy,
513                      builder_job_order,
514                      arvrunner.fs_access)
515     # Need to create a builder object to evaluate expressions.
516     builder = make_builder(builder_job_order,
517                            tool.hints,
518                            tool.requirements,
519                            ArvRuntimeContext())
520     # Now update job_order with secondaryFiles
521     discover_secondary_files(arvrunner.fs_access,
522                              builder,
523                              tool.tool["inputs"],
524                              job_order)
525
526     jobmapper = upload_dependencies(arvrunner,
527                                     name,
528                                     tool.doc_loader,
529                                     job_order,
530                                     job_order.get("id", "#"),
531                                     False)
532
533     if "id" in job_order:
534         del job_order["id"]
535
536     # Need to filter this out, gets added by cwltool when providing
537     # parameters on the command line.
538     if "job_order" in job_order:
539         del job_order["job_order"]
540
541     return job_order
542
543 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
544
545 def upload_workflow_deps(arvrunner, tool):
546     # Ensure that Docker images needed by this workflow are available
547
548     upload_docker(arvrunner, tool)
549
550     document_loader = tool.doc_loader
551
552     merged_map = {}
553
554     def upload_tool_deps(deptool):
555         if "id" in deptool:
556             discovered_secondaryfiles = {}
557             pm = upload_dependencies(arvrunner,
558                                      "%s dependencies" % (shortname(deptool["id"])),
559                                      document_loader,
560                                      deptool,
561                                      deptool["id"],
562                                      False,
563                                      include_primary=False,
564                                      discovered_secondaryfiles=discovered_secondaryfiles)
565             document_loader.idx[deptool["id"]] = deptool
566             toolmap = {}
567             for k,v in pm.items():
568                 toolmap[k] = v.resolved
569             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
570
571     tool.visit(upload_tool_deps)
572
573     return merged_map
574
575 def arvados_jobs_image(arvrunner, img):
576     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
577
578     try:
579         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
580     except Exception as e:
581         raise Exception("Docker image %s is not available\n%s" % (img, e) )
582
583
584 def upload_workflow_collection(arvrunner, name, packed):
585     collection = arvados.collection.Collection(api_client=arvrunner.api,
586                                                keep_client=arvrunner.keep_client,
587                                                num_retries=arvrunner.num_retries)
588     with collection.open("workflow.cwl", "w") as f:
589         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
590
591     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
592                ["name", "like", name+"%"]]
593     if arvrunner.project_uuid:
594         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
595     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
596
597     if exists["items"]:
598         logger.info("Using collection %s", exists["items"][0]["uuid"])
599     else:
600         collection.save_new(name=name,
601                             owner_uuid=arvrunner.project_uuid,
602                             ensure_unique_name=True,
603                             num_retries=arvrunner.num_retries)
604         logger.info("Uploaded to %s", collection.manifest_locator())
605
606     return collection.portable_data_hash()
607
608
609 class Runner(Process):
610     """Base class for runner processes, which submit an instance of
611     arvados-cwl-runner and wait for the final result."""
612
613     def __init__(self, runner, updated_tool,
614                  tool, loadingContext, enable_reuse,
615                  output_name, output_tags, submit_runner_ram=0,
616                  name=None, on_error=None, submit_runner_image=None,
617                  intermediate_output_ttl=0, merged_map=None,
618                  priority=None, secret_store=None,
619                  collection_cache_size=256,
620                  collection_cache_is_default=True):
621
622         loadingContext = loadingContext.copy()
623         loadingContext.metadata = updated_tool.metadata.copy()
624
625         super(Runner, self).__init__(updated_tool.tool, loadingContext)
626
627         self.arvrunner = runner
628         self.embedded_tool = tool
629         self.job_order = None
630         self.running = False
631         if enable_reuse:
632             # If reuse is permitted by command line arguments but
633             # disabled by the workflow itself, disable it.
634             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
635             if reuse_req:
636                 enable_reuse = reuse_req["enableReuse"]
637         self.enable_reuse = enable_reuse
638         self.uuid = None
639         self.final_output = None
640         self.output_name = output_name
641         self.output_tags = output_tags
642         self.name = name
643         self.on_error = on_error
644         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
645         self.intermediate_output_ttl = intermediate_output_ttl
646         self.priority = priority
647         self.secret_store = secret_store
648         self.enable_dev = loadingContext.enable_dev
649
650         self.submit_runner_cores = 1
651         self.submit_runner_ram = 1024  # defaut 1 GiB
652         self.collection_cache_size = collection_cache_size
653
654         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
655         if runner_resource_req:
656             if runner_resource_req.get("coresMin"):
657                 self.submit_runner_cores = runner_resource_req["coresMin"]
658             if runner_resource_req.get("ramMin"):
659                 self.submit_runner_ram = runner_resource_req["ramMin"]
660             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
661                 self.collection_cache_size = runner_resource_req["keep_cache"]
662
663         if submit_runner_ram:
664             # Command line / initializer overrides default and/or spec from workflow
665             self.submit_runner_ram = submit_runner_ram
666
667         if self.submit_runner_ram <= 0:
668             raise Exception("Value of submit-runner-ram must be greater than zero")
669
670         if self.submit_runner_cores <= 0:
671             raise Exception("Value of submit-runner-cores must be greater than zero")
672
673         self.merged_map = merged_map or {}
674
675     def job(self,
676             job_order,         # type: Mapping[Text, Text]
677             output_callbacks,  # type: Callable[[Any, Any], Any]
678             runtimeContext     # type: RuntimeContext
679            ):  # type: (...) -> Generator[Any, None, None]
680         self.job_order = job_order
681         self._init_job(job_order, runtimeContext)
682         yield self
683
684     def update_pipeline_component(self, record):
685         pass
686
687     def done(self, record):
688         """Base method for handling a completed runner."""
689
690         try:
691             if record["state"] == "Complete":
692                 if record.get("exit_code") is not None:
693                     if record["exit_code"] == 33:
694                         processStatus = "UnsupportedRequirement"
695                     elif record["exit_code"] == 0:
696                         processStatus = "success"
697                     else:
698                         processStatus = "permanentFail"
699                 else:
700                     processStatus = "success"
701             else:
702                 processStatus = "permanentFail"
703
704             outputs = {}
705
706             if processStatus == "permanentFail":
707                 logc = arvados.collection.CollectionReader(record["log"],
708                                                            api_client=self.arvrunner.api,
709                                                            keep_client=self.arvrunner.keep_client,
710                                                            num_retries=self.arvrunner.num_retries)
711                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
712
713             self.final_output = record["output"]
714             outc = arvados.collection.CollectionReader(self.final_output,
715                                                        api_client=self.arvrunner.api,
716                                                        keep_client=self.arvrunner.keep_client,
717                                                        num_retries=self.arvrunner.num_retries)
718             if "cwl.output.json" in outc:
719                 with outc.open("cwl.output.json", "rb") as f:
720                     if f.size() > 0:
721                         outputs = json.loads(f.read().decode())
722             def keepify(fileobj):
723                 path = fileobj["location"]
724                 if not path.startswith("keep:"):
725                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
726             adjustFileObjs(outputs, keepify)
727             adjustDirObjs(outputs, keepify)
728         except Exception:
729             logger.exception("[%s] While getting final output object", self.name)
730             self.arvrunner.output_callback({}, "permanentFail")
731         else:
732             self.arvrunner.output_callback(outputs, processStatus)