Merge branch '17756-dispatch-lsf' into main
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
21
22 if os.name == "posix" and sys.version_info[0] < 3:
23     import subprocess32 as subprocess
24 else:
25     import subprocess
26
27 from schema_salad.sourceline import SourceLine, cmap
28
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32                              shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
40
41 import arvados.collection
42 from .util import collectionUUID
43 import ruamel.yaml as yaml
44 from ruamel.yaml.comments import CommentedMap, CommentedSeq
45
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
48 from ._version import __version__
49 from . import done
50 from . context import ArvRuntimeContext
51
52 logger = logging.getLogger('arvados.cwl-runner')
53
54 def trim_anonymous_location(obj):
55     """Remove 'location' field from File and Directory literals.
56
57     To make internal handling easier, literals are assigned a random id for
58     'location'.  However, when writing the record back out, this can break
59     reproducibility.  Since it is valid for literals not have a 'location'
60     field, remove it.
61
62     """
63
64     if obj.get("location", "").startswith("_:"):
65         del obj["location"]
66
67
68 def remove_redundant_fields(obj):
69     for field in ("path", "nameext", "nameroot", "dirname"):
70         if field in obj:
71             del obj[field]
72
73
74 def find_defaults(d, op):
75     if isinstance(d, list):
76         for i in d:
77             find_defaults(i, op)
78     elif isinstance(d, dict):
79         if "default" in d:
80             op(d)
81         else:
82             for i in viewvalues(d):
83                 find_defaults(i, op)
84
85 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
86     return Builder(
87                  job=joborder,
88                  files=[],               # type: List[Dict[Text, Text]]
89                  bindings=[],            # type: List[Dict[Text, Any]]
90                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
91                  names=None,               # type: Names
92                  requirements=requirements,        # type: List[Dict[Text, Any]]
93                  hints=hints,               # type: List[Dict[Text, Any]]
94                  resources={},           # type: Dict[str, int]
95                  mutation_manager=None,    # type: Optional[MutationManager]
96                  formatgraph=None,         # type: Optional[Graph]
97                  make_fs_access=None,      # type: Type[StdFsAccess]
98                  fs_access=None,           # type: StdFsAccess
99                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100                  timeout=runtimeContext.eval_timeout,             # type: float
101                  debug=runtimeContext.debug,               # type: bool
102                  js_console=runtimeContext.js_console,          # type: bool
103                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
104                  loadListing="",         # type: Text
105                  outdir="",              # type: Text
106                  tmpdir="",              # type: Text
107                  stagedir="",            # type: Text
108                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion")
109                 )
110
111 def search_schemadef(name, reqs):
112     for r in reqs:
113         if r["class"] == "SchemaDefRequirement":
114             for sd in r["types"]:
115                 if sd["name"] == name:
116                     return sd
117     return None
118
119 primitive_types_set = frozenset(("null", "boolean", "int", "long",
120                                  "float", "double", "string", "record",
121                                  "array", "enum"))
122
123 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
124     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
125         # union type, collect all possible secondaryFiles
126         for i in inputschema:
127             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
128         return
129
130     if isinstance(inputschema, basestring):
131         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
132         if sd:
133             inputschema = sd
134         else:
135             return
136
137     if "secondaryFiles" in inputschema:
138         # set secondaryFiles, may be inherited by compound types.
139         secondaryspec = inputschema["secondaryFiles"]
140
141     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
142         not isinstance(inputschema["type"], basestring)):
143         # compound type (union, array, record)
144         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
145
146     elif (inputschema["type"] == "record" and
147           isinstance(primary, Mapping)):
148         #
149         # record type, find secondary files associated with fields.
150         #
151         for f in inputschema["fields"]:
152             p = primary.get(shortname(f["name"]))
153             if p:
154                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
155
156     elif (inputschema["type"] == "array" and
157           isinstance(primary, Sequence)):
158         #
159         # array type, find secondary files of elements
160         #
161         for p in primary:
162             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
163
164     elif (inputschema["type"] == "File" and
165           secondaryspec and
166           isinstance(primary, Mapping) and
167           primary.get("class") == "File" and
168           "secondaryFiles" not in primary):
169         #
170         # Found a file, check for secondaryFiles
171         #
172         specs = []
173         primary["secondaryFiles"] = secondaryspec
174         for i, sf in enumerate(aslist(secondaryspec)):
175             if builder.cwlVersion == "v1.0":
176                 pattern = builder.do_eval(sf, context=primary)
177             else:
178                 pattern = builder.do_eval(sf["pattern"], context=primary)
179             if pattern is None:
180                 continue
181             if isinstance(pattern, list):
182                 specs.extend(pattern)
183             elif isinstance(pattern, dict):
184                 specs.append(pattern)
185             elif isinstance(pattern, str):
186                 specs.append({"pattern": pattern})
187             else:
188                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
189                     "Expression must return list, object, string or null")
190
191         found = []
192         for i, sf in enumerate(specs):
193             if isinstance(sf, dict):
194                 if sf.get("class") == "File":
195                     pattern = sf["basename"]
196                 else:
197                     pattern = sf["pattern"]
198                     required = sf.get("required")
199             elif isinstance(sf, str):
200                 pattern = sf
201                 required = True
202             else:
203                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
204                     "Expression must return list, object, string or null")
205
206             sfpath = substitute(primary["location"], pattern)
207             required = builder.do_eval(required, context=primary)
208
209             if fsaccess.exists(sfpath):
210                 found.append({"location": sfpath, "class": "File"})
211             elif required:
212                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
213                     "Required secondary file '%s' does not exist" % sfpath)
214
215         primary["secondaryFiles"] = cmap(found)
216         if discovered is not None:
217             discovered[primary["location"]] = primary["secondaryFiles"]
218     elif inputschema["type"] not in primitive_types_set:
219         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
220
221 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
222     for inputschema in inputs:
223         primary = job_order.get(shortname(inputschema["id"]))
224         if isinstance(primary, (Mapping, Sequence)):
225             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
226
227 def upload_dependencies(arvrunner, name, document_loader,
228                         workflowobj, uri, loadref_run,
229                         include_primary=True, discovered_secondaryfiles=None):
230     """Upload the dependencies of the workflowobj document to Keep.
231
232     Returns a pathmapper object mapping local paths to keep references.  Also
233     does an in-place update of references in "workflowobj".
234
235     Use scandeps to find $import, $include, $schemas, run, File and Directory
236     fields that represent external references.
237
238     If workflowobj has an "id" field, this will reload the document to ensure
239     it is scanning the raw document prior to preprocessing.
240     """
241
242     loaded = set()
243     def loadref(b, u):
244         joined = document_loader.fetcher.urljoin(b, u)
245         defrg, _ = urllib.parse.urldefrag(joined)
246         if defrg not in loaded:
247             loaded.add(defrg)
248             # Use fetch_text to get raw file (before preprocessing).
249             text = document_loader.fetch_text(defrg)
250             if isinstance(text, bytes):
251                 textIO = StringIO(text.decode('utf-8'))
252             else:
253                 textIO = StringIO(text)
254             return yaml.safe_load(textIO)
255         else:
256             return {}
257
258     if loadref_run:
259         loadref_fields = set(("$import", "run"))
260     else:
261         loadref_fields = set(("$import",))
262
263     scanobj = workflowobj
264     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
265         # Need raw file content (before preprocessing) to ensure
266         # that external references in $include and $mixin are captured.
267         scanobj = loadref("", workflowobj["id"])
268
269     metadata = scanobj
270
271     sc_result = scandeps(uri, scanobj,
272                   loadref_fields,
273                   set(("$include", "$schemas", "location")),
274                   loadref, urljoin=document_loader.fetcher.urljoin)
275
276     sc = []
277     uuids = {}
278
279     def collect_uuids(obj):
280         loc = obj.get("location", "")
281         sp = loc.split(":")
282         if sp[0] == "keep":
283             # Collect collection uuids that need to be resolved to
284             # portable data hashes
285             gp = collection_uuid_pattern.match(loc)
286             if gp:
287                 uuids[gp.groups()[0]] = obj
288             if collectionUUID in obj:
289                 uuids[obj[collectionUUID]] = obj
290
291     def collect_uploads(obj):
292         loc = obj.get("location", "")
293         sp = loc.split(":")
294         if len(sp) < 1:
295             return
296         if sp[0] in ("file", "http", "https"):
297             # Record local files than need to be uploaded,
298             # don't include file literals, keep references, etc.
299             sc.append(obj)
300         collect_uuids(obj)
301
302     visit_class(workflowobj, ("File", "Directory"), collect_uuids)
303     visit_class(sc_result, ("File", "Directory"), collect_uploads)
304
305     # Resolve any collection uuids we found to portable data hashes
306     # and assign them to uuid_map
307     uuid_map = {}
308     fetch_uuids = list(uuids.keys())
309     while fetch_uuids:
310         # For a large number of fetch_uuids, API server may limit
311         # response size, so keep fetching from API server has nothing
312         # more to give us.
313         lookups = arvrunner.api.collections().list(
314             filters=[["uuid", "in", fetch_uuids]],
315             count="none",
316             select=["uuid", "portable_data_hash"]).execute(
317                 num_retries=arvrunner.num_retries)
318
319         if not lookups["items"]:
320             break
321
322         for l in lookups["items"]:
323             uuid_map[l["uuid"]] = l["portable_data_hash"]
324
325         fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
326
327     normalizeFilesDirs(sc)
328
329     if include_primary and "id" in workflowobj:
330         sc.append({"class": "File", "location": workflowobj["id"]})
331
332     if "$schemas" in workflowobj:
333         for s in workflowobj["$schemas"]:
334             sc.append({"class": "File", "location": s})
335
336     def visit_default(obj):
337         remove = [False]
338         def ensure_default_location(f):
339             if "location" not in f and "path" in f:
340                 f["location"] = f["path"]
341                 del f["path"]
342             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
343                 # Doesn't exist, remove from list of dependencies to upload
344                 sc[:] = [x for x in sc if x["location"] != f["location"]]
345                 # Delete "default" from workflowobj
346                 remove[0] = True
347         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
348         if remove[0]:
349             del obj["default"]
350
351     find_defaults(workflowobj, visit_default)
352
353     discovered = {}
354     def discover_default_secondary_files(obj):
355         builder_job_order = {}
356         for t in obj["inputs"]:
357             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
358         # Need to create a builder object to evaluate expressions.
359         builder = make_builder(builder_job_order,
360                                obj.get("hints", []),
361                                obj.get("requirements", []),
362                                ArvRuntimeContext(),
363                                metadata)
364         discover_secondary_files(arvrunner.fs_access,
365                                  builder,
366                                  obj["inputs"],
367                                  builder_job_order,
368                                  discovered)
369
370     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
371     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
372
373     for d in list(discovered):
374         # Only interested in discovered secondaryFiles which are local
375         # files that need to be uploaded.
376         if d.startswith("file:"):
377             sc.extend(discovered[d])
378         else:
379             del discovered[d]
380
381     mapper = ArvPathMapper(arvrunner, sc, "",
382                            "keep:%s",
383                            "keep:%s/%s",
384                            name=name,
385                            single_collection=True)
386
387     def setloc(p):
388         loc = p.get("location")
389         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
390             p["location"] = mapper.mapper(p["location"]).resolved
391             return
392
393         if not loc:
394             return
395
396         if collectionUUID in p:
397             uuid = p[collectionUUID]
398             if uuid not in uuid_map:
399                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
400                     "Collection uuid %s not found" % uuid)
401             gp = collection_pdh_pattern.match(loc)
402             if gp and uuid_map[uuid] != gp.groups()[0]:
403                 # This file entry has both collectionUUID and a PDH
404                 # location. If the PDH doesn't match the one returned
405                 # the API server, raise an error.
406                 raise SourceLine(p, "location", validate.ValidationException).makeError(
407                     "Expected collection uuid %s to be %s but API server reported %s" % (
408                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
409
410         gp = collection_uuid_pattern.match(loc)
411         if not gp:
412             return
413         uuid = gp.groups()[0]
414         if uuid not in uuid_map:
415             raise SourceLine(p, "location", validate.ValidationException).makeError(
416                 "Collection uuid %s not found" % uuid)
417         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
418         p[collectionUUID] = uuid
419
420     visit_class(workflowobj, ("File", "Directory"), setloc)
421     visit_class(discovered, ("File", "Directory"), setloc)
422
423     if discovered_secondaryfiles is not None:
424         for d in discovered:
425             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
426
427     if "$schemas" in workflowobj:
428         sch = CommentedSeq()
429         for s in workflowobj["$schemas"]:
430             if s in mapper:
431                 sch.append(mapper.mapper(s).resolved)
432         workflowobj["$schemas"] = sch
433
434     return mapper
435
436
437 def upload_docker(arvrunner, tool):
438     """Uploads Docker images used in CommandLineTool objects."""
439
440     if isinstance(tool, CommandLineTool):
441         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
442         if docker_req:
443             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
444                 # TODO: can be supported by containers API, but not jobs API.
445                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
446                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
447             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
448                                                        arvrunner.runtimeContext.force_docker_pull,
449                                                        arvrunner.runtimeContext.tmp_outdir_prefix)
450         else:
451             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
452                                                        True, arvrunner.project_uuid,
453                                                        arvrunner.runtimeContext.force_docker_pull,
454                                                        arvrunner.runtimeContext.tmp_outdir_prefix)
455     elif isinstance(tool, cwltool.workflow.Workflow):
456         for s in tool.steps:
457             upload_docker(arvrunner, s.embedded_tool)
458
459
460 def packed_workflow(arvrunner, tool, merged_map):
461     """Create a packed workflow.
462
463     A "packed" workflow is one where all the components have been combined into a single document."""
464
465     rewrites = {}
466     packed = pack(arvrunner.loadingContext, tool.tool["id"],
467                   rewrite_out=rewrites,
468                   loader=tool.doc_loader)
469
470     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
471
472     def visit(v, cur_id):
473         if isinstance(v, dict):
474             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
475                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
476                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
477                 if "id" in v:
478                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
479             if "path" in v and "location" not in v:
480                 v["location"] = v["path"]
481                 del v["path"]
482             if "location" in v and cur_id in merged_map:
483                 if v["location"] in merged_map[cur_id].resolved:
484                     v["location"] = merged_map[cur_id].resolved[v["location"]]
485                 if v["location"] in merged_map[cur_id].secondaryFiles:
486                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
487             if v.get("class") == "DockerRequirement":
488                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
489                                                                                                              arvrunner.project_uuid,
490                                                                                                              arvrunner.runtimeContext.force_docker_pull,
491                                                                                                              arvrunner.runtimeContext.tmp_outdir_prefix)
492             for l in v:
493                 visit(v[l], cur_id)
494         if isinstance(v, list):
495             for l in v:
496                 visit(l, cur_id)
497     visit(packed, None)
498     return packed
499
500
501 def tag_git_version(packed):
502     if tool.tool["id"].startswith("file://"):
503         path = os.path.dirname(tool.tool["id"][7:])
504         try:
505             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
506         except (OSError, subprocess.CalledProcessError):
507             pass
508         else:
509             packed["http://schema.org/version"] = githash
510
511
512 def upload_job_order(arvrunner, name, tool, job_order):
513     """Upload local files referenced in the input object and return updated input
514     object with 'location' updated to the proper keep references.
515     """
516
517     # Make a copy of the job order and set defaults.
518     builder_job_order = copy.copy(job_order)
519
520     # fill_in_defaults throws an error if there are any
521     # missing required parameters, we don't want it to do that
522     # so make them all optional.
523     inputs_copy = copy.deepcopy(tool.tool["inputs"])
524     for i in inputs_copy:
525         if "null" not in i["type"]:
526             i["type"] = ["null"] + aslist(i["type"])
527
528     fill_in_defaults(inputs_copy,
529                      builder_job_order,
530                      arvrunner.fs_access)
531     # Need to create a builder object to evaluate expressions.
532     builder = make_builder(builder_job_order,
533                            tool.hints,
534                            tool.requirements,
535                            ArvRuntimeContext(),
536                            tool.metadata)
537     # Now update job_order with secondaryFiles
538     discover_secondary_files(arvrunner.fs_access,
539                              builder,
540                              tool.tool["inputs"],
541                              job_order)
542
543     jobmapper = upload_dependencies(arvrunner,
544                                     name,
545                                     tool.doc_loader,
546                                     job_order,
547                                     job_order.get("id", "#"),
548                                     False)
549
550     if "id" in job_order:
551         del job_order["id"]
552
553     # Need to filter this out, gets added by cwltool when providing
554     # parameters on the command line.
555     if "job_order" in job_order:
556         del job_order["job_order"]
557
558     return job_order
559
560 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
561
562 def upload_workflow_deps(arvrunner, tool):
563     # Ensure that Docker images needed by this workflow are available
564
565     upload_docker(arvrunner, tool)
566
567     document_loader = tool.doc_loader
568
569     merged_map = {}
570
571     def upload_tool_deps(deptool):
572         if "id" in deptool:
573             discovered_secondaryfiles = {}
574             pm = upload_dependencies(arvrunner,
575                                      "%s dependencies" % (shortname(deptool["id"])),
576                                      document_loader,
577                                      deptool,
578                                      deptool["id"],
579                                      False,
580                                      include_primary=False,
581                                      discovered_secondaryfiles=discovered_secondaryfiles)
582             document_loader.idx[deptool["id"]] = deptool
583             toolmap = {}
584             for k,v in pm.items():
585                 toolmap[k] = v.resolved
586             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
587
588     tool.visit(upload_tool_deps)
589
590     return merged_map
591
592 def arvados_jobs_image(arvrunner, img):
593     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
594
595     try:
596         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
597                                                           arvrunner.runtimeContext.force_docker_pull,
598                                                           arvrunner.runtimeContext.tmp_outdir_prefix)
599     except Exception as e:
600         raise Exception("Docker image %s is not available\n%s" % (img, e) )
601
602
603 def upload_workflow_collection(arvrunner, name, packed):
604     collection = arvados.collection.Collection(api_client=arvrunner.api,
605                                                keep_client=arvrunner.keep_client,
606                                                num_retries=arvrunner.num_retries)
607     with collection.open("workflow.cwl", "w") as f:
608         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
609
610     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
611                ["name", "like", name+"%"]]
612     if arvrunner.project_uuid:
613         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
614     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
615
616     if exists["items"]:
617         logger.info("Using collection %s", exists["items"][0]["uuid"])
618     else:
619         collection.save_new(name=name,
620                             owner_uuid=arvrunner.project_uuid,
621                             ensure_unique_name=True,
622                             num_retries=arvrunner.num_retries)
623         logger.info("Uploaded to %s", collection.manifest_locator())
624
625     return collection.portable_data_hash()
626
627
628 class Runner(Process):
629     """Base class for runner processes, which submit an instance of
630     arvados-cwl-runner and wait for the final result."""
631
632     def __init__(self, runner, updated_tool,
633                  tool, loadingContext, enable_reuse,
634                  output_name, output_tags, submit_runner_ram=0,
635                  name=None, on_error=None, submit_runner_image=None,
636                  intermediate_output_ttl=0, merged_map=None,
637                  priority=None, secret_store=None,
638                  collection_cache_size=256,
639                  collection_cache_is_default=True):
640
641         loadingContext = loadingContext.copy()
642         loadingContext.metadata = updated_tool.metadata.copy()
643
644         super(Runner, self).__init__(updated_tool.tool, loadingContext)
645
646         self.arvrunner = runner
647         self.embedded_tool = tool
648         self.job_order = None
649         self.running = False
650         if enable_reuse:
651             # If reuse is permitted by command line arguments but
652             # disabled by the workflow itself, disable it.
653             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
654             if reuse_req:
655                 enable_reuse = reuse_req["enableReuse"]
656         self.enable_reuse = enable_reuse
657         self.uuid = None
658         self.final_output = None
659         self.output_name = output_name
660         self.output_tags = output_tags
661         self.name = name
662         self.on_error = on_error
663         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
664         self.intermediate_output_ttl = intermediate_output_ttl
665         self.priority = priority
666         self.secret_store = secret_store
667         self.enable_dev = loadingContext.enable_dev
668
669         self.submit_runner_cores = 1
670         self.submit_runner_ram = 1024  # defaut 1 GiB
671         self.collection_cache_size = collection_cache_size
672
673         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
674         if runner_resource_req:
675             if runner_resource_req.get("coresMin"):
676                 self.submit_runner_cores = runner_resource_req["coresMin"]
677             if runner_resource_req.get("ramMin"):
678                 self.submit_runner_ram = runner_resource_req["ramMin"]
679             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
680                 self.collection_cache_size = runner_resource_req["keep_cache"]
681
682         if submit_runner_ram:
683             # Command line / initializer overrides default and/or spec from workflow
684             self.submit_runner_ram = submit_runner_ram
685
686         if self.submit_runner_ram <= 0:
687             raise Exception("Value of submit-runner-ram must be greater than zero")
688
689         if self.submit_runner_cores <= 0:
690             raise Exception("Value of submit-runner-cores must be greater than zero")
691
692         self.merged_map = merged_map or {}
693
694     def job(self,
695             job_order,         # type: Mapping[Text, Text]
696             output_callbacks,  # type: Callable[[Any, Any], Any]
697             runtimeContext     # type: RuntimeContext
698            ):  # type: (...) -> Generator[Any, None, None]
699         self.job_order = job_order
700         self._init_job(job_order, runtimeContext)
701         yield self
702
703     def update_pipeline_component(self, record):
704         pass
705
706     def done(self, record):
707         """Base method for handling a completed runner."""
708
709         try:
710             if record["state"] == "Complete":
711                 if record.get("exit_code") is not None:
712                     if record["exit_code"] == 33:
713                         processStatus = "UnsupportedRequirement"
714                     elif record["exit_code"] == 0:
715                         processStatus = "success"
716                     else:
717                         processStatus = "permanentFail"
718                 else:
719                     processStatus = "success"
720             else:
721                 processStatus = "permanentFail"
722
723             outputs = {}
724
725             if processStatus == "permanentFail":
726                 logc = arvados.collection.CollectionReader(record["log"],
727                                                            api_client=self.arvrunner.api,
728                                                            keep_client=self.arvrunner.keep_client,
729                                                            num_retries=self.arvrunner.num_retries)
730                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
731
732             self.final_output = record["output"]
733             outc = arvados.collection.CollectionReader(self.final_output,
734                                                        api_client=self.arvrunner.api,
735                                                        keep_client=self.arvrunner.keep_client,
736                                                        num_retries=self.arvrunner.num_retries)
737             if "cwl.output.json" in outc:
738                 with outc.open("cwl.output.json", "rb") as f:
739                     if f.size() > 0:
740                         outputs = json.loads(f.read().decode())
741             def keepify(fileobj):
742                 path = fileobj["location"]
743                 if not path.startswith("keep:"):
744                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
745             adjustFileObjs(outputs, keepify)
746             adjustDirObjs(outputs, keepify)
747         except Exception:
748             logger.exception("[%s] While getting final output object", self.name)
749             self.arvrunner.output_callback({}, "permanentFail")
750         else:
751             self.arvrunner.output_callback(outputs, processStatus)