16535: Merge branch 'master'
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
21
22 if os.name == "posix" and sys.version_info[0] < 3:
23     import subprocess32 as subprocess
24 else:
25     import subprocess
26
27 from schema_salad.sourceline import SourceLine, cmap
28
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32                              shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.utils import aslist
36 from cwltool.builder import substitute
37 from cwltool.pack import pack
38 from cwltool.update import INTERNAL_VERSION
39 from cwltool.builder import Builder
40 import schema_salad.validate as validate
41
42 import arvados.collection
43 from .util import collectionUUID
44 import ruamel.yaml as yaml
45 from ruamel.yaml.comments import CommentedMap, CommentedSeq
46
47 import arvados_cwl.arvdocker
48 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
49 from ._version import __version__
50 from . import done
51 from . context import ArvRuntimeContext
52
53 logger = logging.getLogger('arvados.cwl-runner')
54
55 def trim_anonymous_location(obj):
56     """Remove 'location' field from File and Directory literals.
57
58     To make internal handling easier, literals are assigned a random id for
59     'location'.  However, when writing the record back out, this can break
60     reproducibility.  Since it is valid for literals not have a 'location'
61     field, remove it.
62
63     """
64
65     if obj.get("location", "").startswith("_:"):
66         del obj["location"]
67
68
69 def remove_redundant_fields(obj):
70     for field in ("path", "nameext", "nameroot", "dirname"):
71         if field in obj:
72             del obj[field]
73
74
75 def find_defaults(d, op):
76     if isinstance(d, list):
77         for i in d:
78             find_defaults(i, op)
79     elif isinstance(d, dict):
80         if "default" in d:
81             op(d)
82         else:
83             for i in viewvalues(d):
84                 find_defaults(i, op)
85
86 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
87     return Builder(
88                  job=joborder,
89                  files=[],               # type: List[Dict[Text, Text]]
90                  bindings=[],            # type: List[Dict[Text, Any]]
91                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
92                  names=None,               # type: Names
93                  requirements=requirements,        # type: List[Dict[Text, Any]]
94                  hints=hints,               # type: List[Dict[Text, Any]]
95                  resources={},           # type: Dict[str, int]
96                  mutation_manager=None,    # type: Optional[MutationManager]
97                  formatgraph=None,         # type: Optional[Graph]
98                  make_fs_access=None,      # type: Type[StdFsAccess]
99                  fs_access=None,           # type: StdFsAccess
100                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
101                  timeout=runtimeContext.eval_timeout,             # type: float
102                  debug=runtimeContext.debug,               # type: bool
103                  js_console=runtimeContext.js_console,          # type: bool
104                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
105                  loadListing="",         # type: Text
106                  outdir="",              # type: Text
107                  tmpdir="",              # type: Text
108                  stagedir="",            # type: Text
109                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion")
110                 )
111
112 def search_schemadef(name, reqs):
113     for r in reqs:
114         if r["class"] == "SchemaDefRequirement":
115             for sd in r["types"]:
116                 if sd["name"] == name:
117                     return sd
118     return None
119
120 primitive_types_set = frozenset(("null", "boolean", "int", "long",
121                                  "float", "double", "string", "record",
122                                  "array", "enum"))
123
124 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
125     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
126         # union type, collect all possible secondaryFiles
127         for i in inputschema:
128             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
129         return
130
131     if isinstance(inputschema, basestring):
132         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
133         if sd:
134             inputschema = sd
135         else:
136             return
137
138     if "secondaryFiles" in inputschema:
139         # set secondaryFiles, may be inherited by compound types.
140         secondaryspec = inputschema["secondaryFiles"]
141
142     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
143         not isinstance(inputschema["type"], basestring)):
144         # compound type (union, array, record)
145         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
146
147     elif (inputschema["type"] == "record" and
148           isinstance(primary, Mapping)):
149         #
150         # record type, find secondary files associated with fields.
151         #
152         for f in inputschema["fields"]:
153             p = primary.get(shortname(f["name"]))
154             if p:
155                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
156
157     elif (inputschema["type"] == "array" and
158           isinstance(primary, Sequence)):
159         #
160         # array type, find secondary files of elements
161         #
162         for p in primary:
163             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
164
165     elif (inputschema["type"] == "File" and
166           secondaryspec and
167           isinstance(primary, Mapping) and
168           primary.get("class") == "File" and
169           "secondaryFiles" not in primary):
170         #
171         # Found a file, check for secondaryFiles
172         #
173         specs = []
174         primary["secondaryFiles"] = secondaryspec
175         for i, sf in enumerate(aslist(secondaryspec)):
176             if builder.cwlVersion == "v1.0":
177                 pattern = builder.do_eval(sf, context=primary)
178             else:
179                 pattern = builder.do_eval(sf["pattern"], context=primary)
180             if pattern is None:
181                 continue
182             if isinstance(pattern, list):
183                 specs.extend(pattern)
184             elif isinstance(pattern, dict):
185                 specs.append(pattern)
186             elif isinstance(pattern, str):
187                 specs.append({"pattern": pattern})
188             else:
189                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
190                     "Expression must return list, object, string or null")
191
192         found = []
193         for i, sf in enumerate(specs):
194             if isinstance(sf, dict):
195                 if sf.get("class") == "File":
196                     pattern = sf["basename"]
197                 else:
198                     pattern = sf["pattern"]
199                     required = sf.get("required")
200             elif isinstance(sf, str):
201                 pattern = sf
202                 required = True
203             else:
204                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
205                     "Expression must return list, object, string or null")
206
207             sfpath = substitute(primary["location"], pattern)
208             required = builder.do_eval(required, context=primary)
209
210             if fsaccess.exists(sfpath):
211                 found.append({"location": sfpath, "class": "File"})
212             elif required:
213                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
214                     "Required secondary file '%s' does not exist" % sfpath)
215
216         primary["secondaryFiles"] = cmap(found)
217         if discovered is not None:
218             discovered[primary["location"]] = primary["secondaryFiles"]
219     elif inputschema["type"] not in primitive_types_set:
220         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
221
222 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
223     for inputschema in inputs:
224         primary = job_order.get(shortname(inputschema["id"]))
225         if isinstance(primary, (Mapping, Sequence)):
226             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
227
228 def upload_dependencies(arvrunner, name, document_loader,
229                         workflowobj, uri, loadref_run,
230                         include_primary=True, discovered_secondaryfiles=None):
231     """Upload the dependencies of the workflowobj document to Keep.
232
233     Returns a pathmapper object mapping local paths to keep references.  Also
234     does an in-place update of references in "workflowobj".
235
236     Use scandeps to find $import, $include, $schemas, run, File and Directory
237     fields that represent external references.
238
239     If workflowobj has an "id" field, this will reload the document to ensure
240     it is scanning the raw document prior to preprocessing.
241     """
242
243     loaded = set()
244     def loadref(b, u):
245         joined = document_loader.fetcher.urljoin(b, u)
246         defrg, _ = urllib.parse.urldefrag(joined)
247         if defrg not in loaded:
248             loaded.add(defrg)
249             # Use fetch_text to get raw file (before preprocessing).
250             text = document_loader.fetch_text(defrg)
251             if isinstance(text, bytes):
252                 textIO = StringIO(text.decode('utf-8'))
253             else:
254                 textIO = StringIO(text)
255             return yaml.safe_load(textIO)
256         else:
257             return {}
258
259     if loadref_run:
260         loadref_fields = set(("$import", "run"))
261     else:
262         loadref_fields = set(("$import",))
263
264     scanobj = workflowobj
265     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
266         # Need raw file content (before preprocessing) to ensure
267         # that external references in $include and $mixin are captured.
268         scanobj = loadref("", workflowobj["id"])
269
270     metadata = scanobj
271
272     sc_result = scandeps(uri, scanobj,
273                   loadref_fields,
274                   set(("$include", "$schemas", "location")),
275                   loadref, urljoin=document_loader.fetcher.urljoin)
276
277     sc = []
278     uuids = {}
279
280     def collect_uuids(obj):
281         loc = obj.get("location", "")
282         sp = loc.split(":")
283         if sp[0] == "keep":
284             # Collect collection uuids that need to be resolved to
285             # portable data hashes
286             gp = collection_uuid_pattern.match(loc)
287             if gp:
288                 uuids[gp.groups()[0]] = obj
289             if collectionUUID in obj:
290                 uuids[obj[collectionUUID]] = obj
291
292     def collect_uploads(obj):
293         loc = obj.get("location", "")
294         sp = loc.split(":")
295         if len(sp) < 1:
296             return
297         if sp[0] in ("file", "http", "https"):
298             # Record local files than need to be uploaded,
299             # don't include file literals, keep references, etc.
300             sc.append(obj)
301         collect_uuids(obj)
302
303     visit_class(workflowobj, ("File", "Directory"), collect_uuids)
304     visit_class(sc_result, ("File", "Directory"), collect_uploads)
305
306     # Resolve any collection uuids we found to portable data hashes
307     # and assign them to uuid_map
308     uuid_map = {}
309     fetch_uuids = list(uuids.keys())
310     while fetch_uuids:
311         # For a large number of fetch_uuids, API server may limit
312         # response size, so keep fetching from API server has nothing
313         # more to give us.
314         lookups = arvrunner.api.collections().list(
315             filters=[["uuid", "in", fetch_uuids]],
316             count="none",
317             select=["uuid", "portable_data_hash"]).execute(
318                 num_retries=arvrunner.num_retries)
319
320         if not lookups["items"]:
321             break
322
323         for l in lookups["items"]:
324             uuid_map[l["uuid"]] = l["portable_data_hash"]
325
326         fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
327
328     normalizeFilesDirs(sc)
329
330     if include_primary and "id" in workflowobj:
331         sc.append({"class": "File", "location": workflowobj["id"]})
332
333     if "$schemas" in workflowobj:
334         for s in workflowobj["$schemas"]:
335             sc.append({"class": "File", "location": s})
336
337     def visit_default(obj):
338         remove = [False]
339         def ensure_default_location(f):
340             if "location" not in f and "path" in f:
341                 f["location"] = f["path"]
342                 del f["path"]
343             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
344                 # Doesn't exist, remove from list of dependencies to upload
345                 sc[:] = [x for x in sc if x["location"] != f["location"]]
346                 # Delete "default" from workflowobj
347                 remove[0] = True
348         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
349         if remove[0]:
350             del obj["default"]
351
352     find_defaults(workflowobj, visit_default)
353
354     discovered = {}
355     def discover_default_secondary_files(obj):
356         builder_job_order = {}
357         for t in obj["inputs"]:
358             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
359         # Need to create a builder object to evaluate expressions.
360         builder = make_builder(builder_job_order,
361                                obj.get("hints", []),
362                                obj.get("requirements", []),
363                                ArvRuntimeContext(),
364                                metadata)
365         discover_secondary_files(arvrunner.fs_access,
366                                  builder,
367                                  obj["inputs"],
368                                  builder_job_order,
369                                  discovered)
370
371     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
372     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
373
374     for d in list(discovered):
375         # Only interested in discovered secondaryFiles which are local
376         # files that need to be uploaded.
377         if d.startswith("file:"):
378             sc.extend(discovered[d])
379         else:
380             del discovered[d]
381
382     mapper = ArvPathMapper(arvrunner, sc, "",
383                            "keep:%s",
384                            "keep:%s/%s",
385                            name=name,
386                            single_collection=True)
387
388     def setloc(p):
389         loc = p.get("location")
390         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
391             p["location"] = mapper.mapper(p["location"]).resolved
392             return
393
394         if not loc:
395             return
396
397         if collectionUUID in p:
398             uuid = p[collectionUUID]
399             if uuid not in uuid_map:
400                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
401                     "Collection uuid %s not found" % uuid)
402             gp = collection_pdh_pattern.match(loc)
403             if gp and uuid_map[uuid] != gp.groups()[0]:
404                 # This file entry has both collectionUUID and a PDH
405                 # location. If the PDH doesn't match the one returned
406                 # the API server, raise an error.
407                 raise SourceLine(p, "location", validate.ValidationException).makeError(
408                     "Expected collection uuid %s to be %s but API server reported %s" % (
409                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
410
411         gp = collection_uuid_pattern.match(loc)
412         if not gp:
413             return
414         uuid = gp.groups()[0]
415         if uuid not in uuid_map:
416             raise SourceLine(p, "location", validate.ValidationException).makeError(
417                 "Collection uuid %s not found" % uuid)
418         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
419         p[collectionUUID] = uuid
420
421     visit_class(workflowobj, ("File", "Directory"), setloc)
422     visit_class(discovered, ("File", "Directory"), setloc)
423
424     if discovered_secondaryfiles is not None:
425         for d in discovered:
426             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
427
428     if "$schemas" in workflowobj:
429         sch = CommentedSeq()
430         for s in workflowobj["$schemas"]:
431             sch.append(mapper.mapper(s).resolved)
432         workflowobj["$schemas"] = sch
433
434     return mapper
435
436
437 def upload_docker(arvrunner, tool):
438     """Uploads Docker images used in CommandLineTool objects."""
439
440     if isinstance(tool, CommandLineTool):
441         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
442         if docker_req:
443             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
444                 # TODO: can be supported by containers API, but not jobs API.
445                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
446                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
447             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
448         else:
449             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
450     elif isinstance(tool, cwltool.workflow.Workflow):
451         for s in tool.steps:
452             upload_docker(arvrunner, s.embedded_tool)
453
454
455 def packed_workflow(arvrunner, tool, merged_map):
456     """Create a packed workflow.
457
458     A "packed" workflow is one where all the components have been combined into a single document."""
459
460     rewrites = {}
461     packed = pack(arvrunner.loadingContext, tool.tool["id"],
462                   rewrite_out=rewrites,
463                   loader=tool.doc_loader)
464
465     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
466
467     def visit(v, cur_id):
468         if isinstance(v, dict):
469             if v.get("class") in ("CommandLineTool", "Workflow"):
470                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
471                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
472                 if "id" in v:
473                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
474             if "path" in v and "location" not in v:
475                 v["location"] = v["path"]
476                 del v["path"]
477             if "location" in v and not v["location"].startswith("keep:"):
478                 v["location"] = merged_map[cur_id].resolved[v["location"]]
479             if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
480                 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
481             if v.get("class") == "DockerRequirement":
482                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
483             for l in v:
484                 visit(v[l], cur_id)
485         if isinstance(v, list):
486             for l in v:
487                 visit(l, cur_id)
488     visit(packed, None)
489     return packed
490
491
492 def tag_git_version(packed):
493     if tool.tool["id"].startswith("file://"):
494         path = os.path.dirname(tool.tool["id"][7:])
495         try:
496             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
497         except (OSError, subprocess.CalledProcessError):
498             pass
499         else:
500             packed["http://schema.org/version"] = githash
501
502
503 def upload_job_order(arvrunner, name, tool, job_order):
504     """Upload local files referenced in the input object and return updated input
505     object with 'location' updated to the proper keep references.
506     """
507
508     # Make a copy of the job order and set defaults.
509     builder_job_order = copy.copy(job_order)
510
511     # fill_in_defaults throws an error if there are any
512     # missing required parameters, we don't want it to do that
513     # so make them all optional.
514     inputs_copy = copy.deepcopy(tool.tool["inputs"])
515     for i in inputs_copy:
516         if "null" not in i["type"]:
517             i["type"] = ["null"] + aslist(i["type"])
518
519     fill_in_defaults(inputs_copy,
520                      builder_job_order,
521                      arvrunner.fs_access)
522     # Need to create a builder object to evaluate expressions.
523     builder = make_builder(builder_job_order,
524                            tool.hints,
525                            tool.requirements,
526                            ArvRuntimeContext(),
527                            tool.metadata)
528     # Now update job_order with secondaryFiles
529     discover_secondary_files(arvrunner.fs_access,
530                              builder,
531                              tool.tool["inputs"],
532                              job_order)
533
534     jobmapper = upload_dependencies(arvrunner,
535                                     name,
536                                     tool.doc_loader,
537                                     job_order,
538                                     job_order.get("id", "#"),
539                                     False)
540
541     if "id" in job_order:
542         del job_order["id"]
543
544     # Need to filter this out, gets added by cwltool when providing
545     # parameters on the command line.
546     if "job_order" in job_order:
547         del job_order["job_order"]
548
549     return job_order
550
551 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
552
553 def upload_workflow_deps(arvrunner, tool):
554     # Ensure that Docker images needed by this workflow are available
555
556     upload_docker(arvrunner, tool)
557
558     document_loader = tool.doc_loader
559
560     merged_map = {}
561
562     def upload_tool_deps(deptool):
563         if "id" in deptool:
564             discovered_secondaryfiles = {}
565             pm = upload_dependencies(arvrunner,
566                                      "%s dependencies" % (shortname(deptool["id"])),
567                                      document_loader,
568                                      deptool,
569                                      deptool["id"],
570                                      False,
571                                      include_primary=False,
572                                      discovered_secondaryfiles=discovered_secondaryfiles)
573             document_loader.idx[deptool["id"]] = deptool
574             toolmap = {}
575             for k,v in pm.items():
576                 toolmap[k] = v.resolved
577             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
578
579     tool.visit(upload_tool_deps)
580
581     return merged_map
582
583 def arvados_jobs_image(arvrunner, img):
584     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
585
586     try:
587         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
588     except Exception as e:
589         raise Exception("Docker image %s is not available\n%s" % (img, e) )
590
591
592 def upload_workflow_collection(arvrunner, name, packed):
593     collection = arvados.collection.Collection(api_client=arvrunner.api,
594                                                keep_client=arvrunner.keep_client,
595                                                num_retries=arvrunner.num_retries)
596     with collection.open("workflow.cwl", "w") as f:
597         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
598
599     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
600                ["name", "like", name+"%"]]
601     if arvrunner.project_uuid:
602         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
603     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
604
605     if exists["items"]:
606         logger.info("Using collection %s", exists["items"][0]["uuid"])
607     else:
608         collection.save_new(name=name,
609                             owner_uuid=arvrunner.project_uuid,
610                             ensure_unique_name=True,
611                             num_retries=arvrunner.num_retries)
612         logger.info("Uploaded to %s", collection.manifest_locator())
613
614     return collection.portable_data_hash()
615
616
617 class Runner(Process):
618     """Base class for runner processes, which submit an instance of
619     arvados-cwl-runner and wait for the final result."""
620
621     def __init__(self, runner, updated_tool,
622                  tool, loadingContext, enable_reuse,
623                  output_name, output_tags, submit_runner_ram=0,
624                  name=None, on_error=None, submit_runner_image=None,
625                  intermediate_output_ttl=0, merged_map=None,
626                  priority=None, secret_store=None,
627                  collection_cache_size=256,
628                  collection_cache_is_default=True):
629
630         loadingContext = loadingContext.copy()
631         loadingContext.metadata = updated_tool.metadata.copy()
632
633         super(Runner, self).__init__(updated_tool.tool, loadingContext)
634
635         self.arvrunner = runner
636         self.embedded_tool = tool
637         self.job_order = None
638         self.running = False
639         if enable_reuse:
640             # If reuse is permitted by command line arguments but
641             # disabled by the workflow itself, disable it.
642             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
643             if reuse_req:
644                 enable_reuse = reuse_req["enableReuse"]
645         self.enable_reuse = enable_reuse
646         self.uuid = None
647         self.final_output = None
648         self.output_name = output_name
649         self.output_tags = output_tags
650         self.name = name
651         self.on_error = on_error
652         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
653         self.intermediate_output_ttl = intermediate_output_ttl
654         self.priority = priority
655         self.secret_store = secret_store
656         self.enable_dev = loadingContext.enable_dev
657
658         self.submit_runner_cores = 1
659         self.submit_runner_ram = 1024  # defaut 1 GiB
660         self.collection_cache_size = collection_cache_size
661
662         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
663         if runner_resource_req:
664             if runner_resource_req.get("coresMin"):
665                 self.submit_runner_cores = runner_resource_req["coresMin"]
666             if runner_resource_req.get("ramMin"):
667                 self.submit_runner_ram = runner_resource_req["ramMin"]
668             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
669                 self.collection_cache_size = runner_resource_req["keep_cache"]
670
671         if submit_runner_ram:
672             # Command line / initializer overrides default and/or spec from workflow
673             self.submit_runner_ram = submit_runner_ram
674
675         if self.submit_runner_ram <= 0:
676             raise Exception("Value of submit-runner-ram must be greater than zero")
677
678         if self.submit_runner_cores <= 0:
679             raise Exception("Value of submit-runner-cores must be greater than zero")
680
681         self.merged_map = merged_map or {}
682
683     def job(self,
684             job_order,         # type: Mapping[Text, Text]
685             output_callbacks,  # type: Callable[[Any, Any], Any]
686             runtimeContext     # type: RuntimeContext
687            ):  # type: (...) -> Generator[Any, None, None]
688         self.job_order = job_order
689         self._init_job(job_order, runtimeContext)
690         yield self
691
692     def update_pipeline_component(self, record):
693         pass
694
695     def done(self, record):
696         """Base method for handling a completed runner."""
697
698         try:
699             if record["state"] == "Complete":
700                 if record.get("exit_code") is not None:
701                     if record["exit_code"] == 33:
702                         processStatus = "UnsupportedRequirement"
703                     elif record["exit_code"] == 0:
704                         processStatus = "success"
705                     else:
706                         processStatus = "permanentFail"
707                 else:
708                     processStatus = "success"
709             else:
710                 processStatus = "permanentFail"
711
712             outputs = {}
713
714             if processStatus == "permanentFail":
715                 logc = arvados.collection.CollectionReader(record["log"],
716                                                            api_client=self.arvrunner.api,
717                                                            keep_client=self.arvrunner.keep_client,
718                                                            num_retries=self.arvrunner.num_retries)
719                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
720
721             self.final_output = record["output"]
722             outc = arvados.collection.CollectionReader(self.final_output,
723                                                        api_client=self.arvrunner.api,
724                                                        keep_client=self.arvrunner.keep_client,
725                                                        num_retries=self.arvrunner.num_retries)
726             if "cwl.output.json" in outc:
727                 with outc.open("cwl.output.json", "rb") as f:
728                     if f.size() > 0:
729                         outputs = json.loads(f.read().decode())
730             def keepify(fileobj):
731                 path = fileobj["location"]
732                 if not path.startswith("keep:"):
733                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
734             adjustFileObjs(outputs, keepify)
735             adjustDirObjs(outputs, keepify)
736         except Exception:
737             logger.exception("[%s] While getting final output object", self.name)
738             self.arvrunner.output_callback({}, "permanentFail")
739         else:
740             self.arvrunner.output_callback(outputs, processStatus)