16306: Merge branch 'master'
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
21
22 if os.name == "posix" and sys.version_info[0] < 3:
23     import subprocess32 as subprocess
24 else:
25     import subprocess
26
27 from schema_salad.sourceline import SourceLine, cmap
28
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32                              shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
40
41 import arvados.collection
42 from .util import collectionUUID
43 import ruamel.yaml as yaml
44 from ruamel.yaml.comments import CommentedMap, CommentedSeq
45
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
48 from ._version import __version__
49 from . import done
50 from . context import ArvRuntimeContext
51
52 logger = logging.getLogger('arvados.cwl-runner')
53
54 def trim_anonymous_location(obj):
55     """Remove 'location' field from File and Directory literals.
56
57     To make internal handling easier, literals are assigned a random id for
58     'location'.  However, when writing the record back out, this can break
59     reproducibility.  Since it is valid for literals not have a 'location'
60     field, remove it.
61
62     """
63
64     if obj.get("location", "").startswith("_:"):
65         del obj["location"]
66
67
68 def remove_redundant_fields(obj):
69     for field in ("path", "nameext", "nameroot", "dirname"):
70         if field in obj:
71             del obj[field]
72
73
74 def find_defaults(d, op):
75     if isinstance(d, list):
76         for i in d:
77             find_defaults(i, op)
78     elif isinstance(d, dict):
79         if "default" in d:
80             op(d)
81         else:
82             for i in viewvalues(d):
83                 find_defaults(i, op)
84
85 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
86     return Builder(
87                  job=joborder,
88                  files=[],               # type: List[Dict[Text, Text]]
89                  bindings=[],            # type: List[Dict[Text, Any]]
90                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
91                  names=None,               # type: Names
92                  requirements=requirements,        # type: List[Dict[Text, Any]]
93                  hints=hints,               # type: List[Dict[Text, Any]]
94                  resources={},           # type: Dict[str, int]
95                  mutation_manager=None,    # type: Optional[MutationManager]
96                  formatgraph=None,         # type: Optional[Graph]
97                  make_fs_access=None,      # type: Type[StdFsAccess]
98                  fs_access=None,           # type: StdFsAccess
99                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100                  timeout=runtimeContext.eval_timeout,             # type: float
101                  debug=runtimeContext.debug,               # type: bool
102                  js_console=runtimeContext.js_console,          # type: bool
103                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
104                  loadListing="",         # type: Text
105                  outdir="",              # type: Text
106                  tmpdir="",              # type: Text
107                  stagedir="",            # type: Text
108                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion")
109                 )
110
111 def search_schemadef(name, reqs):
112     for r in reqs:
113         if r["class"] == "SchemaDefRequirement":
114             for sd in r["types"]:
115                 if sd["name"] == name:
116                     return sd
117     return None
118
119 primitive_types_set = frozenset(("null", "boolean", "int", "long",
120                                  "float", "double", "string", "record",
121                                  "array", "enum"))
122
123 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
124     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
125         # union type, collect all possible secondaryFiles
126         for i in inputschema:
127             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
128         return
129
130     if isinstance(inputschema, basestring):
131         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
132         if sd:
133             inputschema = sd
134         else:
135             return
136
137     if "secondaryFiles" in inputschema:
138         # set secondaryFiles, may be inherited by compound types.
139         secondaryspec = inputschema["secondaryFiles"]
140
141     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
142         not isinstance(inputschema["type"], basestring)):
143         # compound type (union, array, record)
144         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
145
146     elif (inputschema["type"] == "record" and
147           isinstance(primary, Mapping)):
148         #
149         # record type, find secondary files associated with fields.
150         #
151         for f in inputschema["fields"]:
152             p = primary.get(shortname(f["name"]))
153             if p:
154                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
155
156     elif (inputschema["type"] == "array" and
157           isinstance(primary, Sequence)):
158         #
159         # array type, find secondary files of elements
160         #
161         for p in primary:
162             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
163
164     elif (inputschema["type"] == "File" and
165           secondaryspec and
166           isinstance(primary, Mapping) and
167           primary.get("class") == "File" and
168           "secondaryFiles" not in primary):
169         #
170         # Found a file, check for secondaryFiles
171         #
172         specs = []
173         primary["secondaryFiles"] = secondaryspec
174         for i, sf in enumerate(aslist(secondaryspec)):
175             if builder.cwlVersion == "v1.0":
176                 pattern = builder.do_eval(sf, context=primary)
177             else:
178                 pattern = builder.do_eval(sf["pattern"], context=primary)
179             if pattern is None:
180                 continue
181             if isinstance(pattern, list):
182                 specs.extend(pattern)
183             elif isinstance(pattern, dict):
184                 specs.append(pattern)
185             elif isinstance(pattern, str):
186                 specs.append({"pattern": pattern})
187             else:
188                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
189                     "Expression must return list, object, string or null")
190
191         found = []
192         for i, sf in enumerate(specs):
193             if isinstance(sf, dict):
194                 if sf.get("class") == "File":
195                     pattern = sf["basename"]
196                 else:
197                     pattern = sf["pattern"]
198                     required = sf.get("required")
199             elif isinstance(sf, str):
200                 pattern = sf
201                 required = True
202             else:
203                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
204                     "Expression must return list, object, string or null")
205
206             sfpath = substitute(primary["location"], pattern)
207             required = builder.do_eval(required, context=primary)
208
209             if fsaccess.exists(sfpath):
210                 found.append({"location": sfpath, "class": "File"})
211             elif required:
212                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
213                     "Required secondary file '%s' does not exist" % sfpath)
214
215         primary["secondaryFiles"] = cmap(found)
216         if discovered is not None:
217             discovered[primary["location"]] = primary["secondaryFiles"]
218     elif inputschema["type"] not in primitive_types_set:
219         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
220
221 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
222     for inputschema in inputs:
223         primary = job_order.get(shortname(inputschema["id"]))
224         if isinstance(primary, (Mapping, Sequence)):
225             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
226
227 def upload_dependencies(arvrunner, name, document_loader,
228                         workflowobj, uri, loadref_run,
229                         include_primary=True, discovered_secondaryfiles=None):
230     """Upload the dependencies of the workflowobj document to Keep.
231
232     Returns a pathmapper object mapping local paths to keep references.  Also
233     does an in-place update of references in "workflowobj".
234
235     Use scandeps to find $import, $include, $schemas, run, File and Directory
236     fields that represent external references.
237
238     If workflowobj has an "id" field, this will reload the document to ensure
239     it is scanning the raw document prior to preprocessing.
240     """
241
242     loaded = set()
243     def loadref(b, u):
244         joined = document_loader.fetcher.urljoin(b, u)
245         defrg, _ = urllib.parse.urldefrag(joined)
246         if defrg not in loaded:
247             loaded.add(defrg)
248             # Use fetch_text to get raw file (before preprocessing).
249             text = document_loader.fetch_text(defrg)
250             if isinstance(text, bytes):
251                 textIO = StringIO(text.decode('utf-8'))
252             else:
253                 textIO = StringIO(text)
254             return yaml.safe_load(textIO)
255         else:
256             return {}
257
258     if loadref_run:
259         loadref_fields = set(("$import", "run"))
260     else:
261         loadref_fields = set(("$import",))
262
263     scanobj = workflowobj
264     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
265         # Need raw file content (before preprocessing) to ensure
266         # that external references in $include and $mixin are captured.
267         scanobj = loadref("", workflowobj["id"])
268
269     metadata = scanobj
270
271     sc_result = scandeps(uri, scanobj,
272                   loadref_fields,
273                   set(("$include", "$schemas", "location")),
274                   loadref, urljoin=document_loader.fetcher.urljoin)
275
276     sc = []
277     uuids = {}
278
279     def collect_uuids(obj):
280         loc = obj.get("location", "")
281         sp = loc.split(":")
282         if sp[0] == "keep":
283             # Collect collection uuids that need to be resolved to
284             # portable data hashes
285             gp = collection_uuid_pattern.match(loc)
286             if gp:
287                 uuids[gp.groups()[0]] = obj
288             if collectionUUID in obj:
289                 uuids[obj[collectionUUID]] = obj
290
291     def collect_uploads(obj):
292         loc = obj.get("location", "")
293         sp = loc.split(":")
294         if len(sp) < 1:
295             return
296         if sp[0] in ("file", "http", "https"):
297             # Record local files than need to be uploaded,
298             # don't include file literals, keep references, etc.
299             sc.append(obj)
300         collect_uuids(obj)
301
302     visit_class(workflowobj, ("File", "Directory"), collect_uuids)
303     visit_class(sc_result, ("File", "Directory"), collect_uploads)
304
305     # Resolve any collection uuids we found to portable data hashes
306     # and assign them to uuid_map
307     uuid_map = {}
308     fetch_uuids = list(uuids.keys())
309     while fetch_uuids:
310         # For a large number of fetch_uuids, API server may limit
311         # response size, so keep fetching from API server has nothing
312         # more to give us.
313         lookups = arvrunner.api.collections().list(
314             filters=[["uuid", "in", fetch_uuids]],
315             count="none",
316             select=["uuid", "portable_data_hash"]).execute(
317                 num_retries=arvrunner.num_retries)
318
319         if not lookups["items"]:
320             break
321
322         for l in lookups["items"]:
323             uuid_map[l["uuid"]] = l["portable_data_hash"]
324
325         fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
326
327     normalizeFilesDirs(sc)
328
329     if include_primary and "id" in workflowobj:
330         sc.append({"class": "File", "location": workflowobj["id"]})
331
332     if "$schemas" in workflowobj:
333         for s in workflowobj["$schemas"]:
334             sc.append({"class": "File", "location": s})
335
336     def visit_default(obj):
337         remove = [False]
338         def ensure_default_location(f):
339             if "location" not in f and "path" in f:
340                 f["location"] = f["path"]
341                 del f["path"]
342             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
343                 # Doesn't exist, remove from list of dependencies to upload
344                 sc[:] = [x for x in sc if x["location"] != f["location"]]
345                 # Delete "default" from workflowobj
346                 remove[0] = True
347         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
348         if remove[0]:
349             del obj["default"]
350
351     find_defaults(workflowobj, visit_default)
352
353     discovered = {}
354     def discover_default_secondary_files(obj):
355         builder_job_order = {}
356         for t in obj["inputs"]:
357             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
358         # Need to create a builder object to evaluate expressions.
359         builder = make_builder(builder_job_order,
360                                obj.get("hints", []),
361                                obj.get("requirements", []),
362                                ArvRuntimeContext(),
363                                metadata)
364         discover_secondary_files(arvrunner.fs_access,
365                                  builder,
366                                  obj["inputs"],
367                                  builder_job_order,
368                                  discovered)
369
370     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
371     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
372
373     for d in list(discovered):
374         # Only interested in discovered secondaryFiles which are local
375         # files that need to be uploaded.
376         if d.startswith("file:"):
377             sc.extend(discovered[d])
378         else:
379             del discovered[d]
380
381     mapper = ArvPathMapper(arvrunner, sc, "",
382                            "keep:%s",
383                            "keep:%s/%s",
384                            name=name,
385                            single_collection=True)
386
387     def setloc(p):
388         loc = p.get("location")
389         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
390             p["location"] = mapper.mapper(p["location"]).resolved
391             return
392
393         if not loc:
394             return
395
396         if collectionUUID in p:
397             uuid = p[collectionUUID]
398             if uuid not in uuid_map:
399                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
400                     "Collection uuid %s not found" % uuid)
401             gp = collection_pdh_pattern.match(loc)
402             if gp and uuid_map[uuid] != gp.groups()[0]:
403                 # This file entry has both collectionUUID and a PDH
404                 # location. If the PDH doesn't match the one returned
405                 # the API server, raise an error.
406                 raise SourceLine(p, "location", validate.ValidationException).makeError(
407                     "Expected collection uuid %s to be %s but API server reported %s" % (
408                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
409
410         gp = collection_uuid_pattern.match(loc)
411         if not gp:
412             return
413         uuid = gp.groups()[0]
414         if uuid not in uuid_map:
415             raise SourceLine(p, "location", validate.ValidationException).makeError(
416                 "Collection uuid %s not found" % uuid)
417         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
418         p[collectionUUID] = uuid
419
420     visit_class(workflowobj, ("File", "Directory"), setloc)
421     visit_class(discovered, ("File", "Directory"), setloc)
422
423     if discovered_secondaryfiles is not None:
424         for d in discovered:
425             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
426
427     if "$schemas" in workflowobj:
428         sch = CommentedSeq()
429         for s in workflowobj["$schemas"]:
430             sch.append(mapper.mapper(s).resolved)
431         workflowobj["$schemas"] = sch
432
433     return mapper
434
435
436 def upload_docker(arvrunner, tool):
437     """Uploads Docker images used in CommandLineTool objects."""
438
439     if isinstance(tool, CommandLineTool):
440         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
441         if docker_req:
442             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
443                 # TODO: can be supported by containers API, but not jobs API.
444                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
445                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
446             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
447                                                        arvrunner.runtimeContext.force_docker_pull,
448                                                        arvrunner.runtimeContext.tmp_outdir_prefix)
449         else:
450             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
451                                                        True, arvrunner.project_uuid,
452                                                        arvrunner.runtimeContext.force_docker_pull,
453                                                        arvrunner.runtimeContext.tmp_outdir_prefix)
454     elif isinstance(tool, cwltool.workflow.Workflow):
455         for s in tool.steps:
456             upload_docker(arvrunner, s.embedded_tool)
457
458
459 def packed_workflow(arvrunner, tool, merged_map):
460     """Create a packed workflow.
461
462     A "packed" workflow is one where all the components have been combined into a single document."""
463
464     rewrites = {}
465     packed = pack(arvrunner.loadingContext, tool.tool["id"],
466                   rewrite_out=rewrites,
467                   loader=tool.doc_loader)
468
469     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
470
471     def visit(v, cur_id):
472         if isinstance(v, dict):
473             if v.get("class") in ("CommandLineTool", "Workflow"):
474                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
475                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
476                 if "id" in v:
477                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
478             if "path" in v and "location" not in v:
479                 v["location"] = v["path"]
480                 del v["path"]
481             if "location" in v and not v["location"].startswith("keep:"):
482                 v["location"] = merged_map[cur_id].resolved[v["location"]]
483             if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
484                 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
485             if v.get("class") == "DockerRequirement":
486                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
487                                                                                                              arvrunner.project_uuid,
488                                                                                                              arvrunner.runtimeContext.force_docker_pull,
489                                                                                                              arvrunner.runtimeContext.tmp_outdir_prefix)
490             for l in v:
491                 visit(v[l], cur_id)
492         if isinstance(v, list):
493             for l in v:
494                 visit(l, cur_id)
495     visit(packed, None)
496     return packed
497
498
499 def tag_git_version(packed):
500     if tool.tool["id"].startswith("file://"):
501         path = os.path.dirname(tool.tool["id"][7:])
502         try:
503             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
504         except (OSError, subprocess.CalledProcessError):
505             pass
506         else:
507             packed["http://schema.org/version"] = githash
508
509
510 def upload_job_order(arvrunner, name, tool, job_order):
511     """Upload local files referenced in the input object and return updated input
512     object with 'location' updated to the proper keep references.
513     """
514
515     # Make a copy of the job order and set defaults.
516     builder_job_order = copy.copy(job_order)
517
518     # fill_in_defaults throws an error if there are any
519     # missing required parameters, we don't want it to do that
520     # so make them all optional.
521     inputs_copy = copy.deepcopy(tool.tool["inputs"])
522     for i in inputs_copy:
523         if "null" not in i["type"]:
524             i["type"] = ["null"] + aslist(i["type"])
525
526     fill_in_defaults(inputs_copy,
527                      builder_job_order,
528                      arvrunner.fs_access)
529     # Need to create a builder object to evaluate expressions.
530     builder = make_builder(builder_job_order,
531                            tool.hints,
532                            tool.requirements,
533                            ArvRuntimeContext(),
534                            tool.metadata)
535     # Now update job_order with secondaryFiles
536     discover_secondary_files(arvrunner.fs_access,
537                              builder,
538                              tool.tool["inputs"],
539                              job_order)
540
541     jobmapper = upload_dependencies(arvrunner,
542                                     name,
543                                     tool.doc_loader,
544                                     job_order,
545                                     job_order.get("id", "#"),
546                                     False)
547
548     if "id" in job_order:
549         del job_order["id"]
550
551     # Need to filter this out, gets added by cwltool when providing
552     # parameters on the command line.
553     if "job_order" in job_order:
554         del job_order["job_order"]
555
556     return job_order
557
558 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
559
560 def upload_workflow_deps(arvrunner, tool):
561     # Ensure that Docker images needed by this workflow are available
562
563     upload_docker(arvrunner, tool)
564
565     document_loader = tool.doc_loader
566
567     merged_map = {}
568
569     def upload_tool_deps(deptool):
570         if "id" in deptool:
571             discovered_secondaryfiles = {}
572             pm = upload_dependencies(arvrunner,
573                                      "%s dependencies" % (shortname(deptool["id"])),
574                                      document_loader,
575                                      deptool,
576                                      deptool["id"],
577                                      False,
578                                      include_primary=False,
579                                      discovered_secondaryfiles=discovered_secondaryfiles)
580             document_loader.idx[deptool["id"]] = deptool
581             toolmap = {}
582             for k,v in pm.items():
583                 toolmap[k] = v.resolved
584             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
585
586     tool.visit(upload_tool_deps)
587
588     return merged_map
589
590 def arvados_jobs_image(arvrunner, img):
591     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
592
593     try:
594         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
595                                                           arvrunner.runtimeContext.force_docker_pull,
596                                                           arvrunner.runtimeContext.tmp_outdir_prefix)
597     except Exception as e:
598         raise Exception("Docker image %s is not available\n%s" % (img, e) )
599
600
601 def upload_workflow_collection(arvrunner, name, packed):
602     collection = arvados.collection.Collection(api_client=arvrunner.api,
603                                                keep_client=arvrunner.keep_client,
604                                                num_retries=arvrunner.num_retries)
605     with collection.open("workflow.cwl", "w") as f:
606         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
607
608     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
609                ["name", "like", name+"%"]]
610     if arvrunner.project_uuid:
611         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
612     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
613
614     if exists["items"]:
615         logger.info("Using collection %s", exists["items"][0]["uuid"])
616     else:
617         collection.save_new(name=name,
618                             owner_uuid=arvrunner.project_uuid,
619                             ensure_unique_name=True,
620                             num_retries=arvrunner.num_retries)
621         logger.info("Uploaded to %s", collection.manifest_locator())
622
623     return collection.portable_data_hash()
624
625
626 class Runner(Process):
627     """Base class for runner processes, which submit an instance of
628     arvados-cwl-runner and wait for the final result."""
629
630     def __init__(self, runner, updated_tool,
631                  tool, loadingContext, enable_reuse,
632                  output_name, output_tags, submit_runner_ram=0,
633                  name=None, on_error=None, submit_runner_image=None,
634                  intermediate_output_ttl=0, merged_map=None,
635                  priority=None, secret_store=None,
636                  collection_cache_size=256,
637                  collection_cache_is_default=True):
638
639         loadingContext = loadingContext.copy()
640         loadingContext.metadata = updated_tool.metadata.copy()
641
642         super(Runner, self).__init__(updated_tool.tool, loadingContext)
643
644         self.arvrunner = runner
645         self.embedded_tool = tool
646         self.job_order = None
647         self.running = False
648         if enable_reuse:
649             # If reuse is permitted by command line arguments but
650             # disabled by the workflow itself, disable it.
651             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
652             if reuse_req:
653                 enable_reuse = reuse_req["enableReuse"]
654         self.enable_reuse = enable_reuse
655         self.uuid = None
656         self.final_output = None
657         self.output_name = output_name
658         self.output_tags = output_tags
659         self.name = name
660         self.on_error = on_error
661         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
662         self.intermediate_output_ttl = intermediate_output_ttl
663         self.priority = priority
664         self.secret_store = secret_store
665         self.enable_dev = loadingContext.enable_dev
666
667         self.submit_runner_cores = 1
668         self.submit_runner_ram = 1024  # defaut 1 GiB
669         self.collection_cache_size = collection_cache_size
670
671         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
672         if runner_resource_req:
673             if runner_resource_req.get("coresMin"):
674                 self.submit_runner_cores = runner_resource_req["coresMin"]
675             if runner_resource_req.get("ramMin"):
676                 self.submit_runner_ram = runner_resource_req["ramMin"]
677             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
678                 self.collection_cache_size = runner_resource_req["keep_cache"]
679
680         if submit_runner_ram:
681             # Command line / initializer overrides default and/or spec from workflow
682             self.submit_runner_ram = submit_runner_ram
683
684         if self.submit_runner_ram <= 0:
685             raise Exception("Value of submit-runner-ram must be greater than zero")
686
687         if self.submit_runner_cores <= 0:
688             raise Exception("Value of submit-runner-cores must be greater than zero")
689
690         self.merged_map = merged_map or {}
691
692     def job(self,
693             job_order,         # type: Mapping[Text, Text]
694             output_callbacks,  # type: Callable[[Any, Any], Any]
695             runtimeContext     # type: RuntimeContext
696            ):  # type: (...) -> Generator[Any, None, None]
697         self.job_order = job_order
698         self._init_job(job_order, runtimeContext)
699         yield self
700
701     def update_pipeline_component(self, record):
702         pass
703
704     def done(self, record):
705         """Base method for handling a completed runner."""
706
707         try:
708             if record["state"] == "Complete":
709                 if record.get("exit_code") is not None:
710                     if record["exit_code"] == 33:
711                         processStatus = "UnsupportedRequirement"
712                     elif record["exit_code"] == 0:
713                         processStatus = "success"
714                     else:
715                         processStatus = "permanentFail"
716                 else:
717                     processStatus = "success"
718             else:
719                 processStatus = "permanentFail"
720
721             outputs = {}
722
723             if processStatus == "permanentFail":
724                 logc = arvados.collection.CollectionReader(record["log"],
725                                                            api_client=self.arvrunner.api,
726                                                            keep_client=self.arvrunner.keep_client,
727                                                            num_retries=self.arvrunner.num_retries)
728                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
729
730             self.final_output = record["output"]
731             outc = arvados.collection.CollectionReader(self.final_output,
732                                                        api_client=self.arvrunner.api,
733                                                        keep_client=self.arvrunner.keep_client,
734                                                        num_retries=self.arvrunner.num_retries)
735             if "cwl.output.json" in outc:
736                 with outc.open("cwl.output.json", "rb") as f:
737                     if f.size() > 0:
738                         outputs = json.loads(f.read().decode())
739             def keepify(fileobj):
740                 path = fileobj["location"]
741                 if not path.startswith("keep:"):
742                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
743             adjustFileObjs(outputs, keepify)
744             adjustDirObjs(outputs, keepify)
745         except Exception:
746             logger.exception("[%s] While getting final output object", self.name)
747             self.arvrunner.output_callback({}, "permanentFail")
748         else:
749             self.arvrunner.output_callback(outputs, processStatus)