Merge branch '17014-controller-container-requests-take3'
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
21
22 if os.name == "posix" and sys.version_info[0] < 3:
23     import subprocess32 as subprocess
24 else:
25     import subprocess
26
27 from schema_salad.sourceline import SourceLine, cmap
28
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32                              shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
40
41 import arvados.collection
42 from .util import collectionUUID
43 import ruamel.yaml as yaml
44 from ruamel.yaml.comments import CommentedMap, CommentedSeq
45
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
48 from ._version import __version__
49 from . import done
50 from . context import ArvRuntimeContext
51
52 logger = logging.getLogger('arvados.cwl-runner')
53
54 def trim_anonymous_location(obj):
55     """Remove 'location' field from File and Directory literals.
56
57     To make internal handling easier, literals are assigned a random id for
58     'location'.  However, when writing the record back out, this can break
59     reproducibility.  Since it is valid for literals not have a 'location'
60     field, remove it.
61
62     """
63
64     if obj.get("location", "").startswith("_:"):
65         del obj["location"]
66
67
68 def remove_redundant_fields(obj):
69     for field in ("path", "nameext", "nameroot", "dirname"):
70         if field in obj:
71             del obj[field]
72
73
74 def find_defaults(d, op):
75     if isinstance(d, list):
76         for i in d:
77             find_defaults(i, op)
78     elif isinstance(d, dict):
79         if "default" in d:
80             op(d)
81         else:
82             for i in viewvalues(d):
83                 find_defaults(i, op)
84
85 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
86     return Builder(
87                  job=joborder,
88                  files=[],               # type: List[Dict[Text, Text]]
89                  bindings=[],            # type: List[Dict[Text, Any]]
90                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
91                  names=None,               # type: Names
92                  requirements=requirements,        # type: List[Dict[Text, Any]]
93                  hints=hints,               # type: List[Dict[Text, Any]]
94                  resources={},           # type: Dict[str, int]
95                  mutation_manager=None,    # type: Optional[MutationManager]
96                  formatgraph=None,         # type: Optional[Graph]
97                  make_fs_access=None,      # type: Type[StdFsAccess]
98                  fs_access=None,           # type: StdFsAccess
99                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100                  timeout=runtimeContext.eval_timeout,             # type: float
101                  debug=runtimeContext.debug,               # type: bool
102                  js_console=runtimeContext.js_console,          # type: bool
103                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
104                  loadListing="",         # type: Text
105                  outdir="",              # type: Text
106                  tmpdir="",              # type: Text
107                  stagedir="",            # type: Text
108                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion")
109                 )
110
111 def search_schemadef(name, reqs):
112     for r in reqs:
113         if r["class"] == "SchemaDefRequirement":
114             for sd in r["types"]:
115                 if sd["name"] == name:
116                     return sd
117     return None
118
119 primitive_types_set = frozenset(("null", "boolean", "int", "long",
120                                  "float", "double", "string", "record",
121                                  "array", "enum"))
122
123 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
124     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
125         # union type, collect all possible secondaryFiles
126         for i in inputschema:
127             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
128         return
129
130     if isinstance(inputschema, basestring):
131         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
132         if sd:
133             inputschema = sd
134         else:
135             return
136
137     if "secondaryFiles" in inputschema:
138         # set secondaryFiles, may be inherited by compound types.
139         secondaryspec = inputschema["secondaryFiles"]
140
141     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
142         not isinstance(inputschema["type"], basestring)):
143         # compound type (union, array, record)
144         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
145
146     elif (inputschema["type"] == "record" and
147           isinstance(primary, Mapping)):
148         #
149         # record type, find secondary files associated with fields.
150         #
151         for f in inputschema["fields"]:
152             p = primary.get(shortname(f["name"]))
153             if p:
154                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
155
156     elif (inputschema["type"] == "array" and
157           isinstance(primary, Sequence)):
158         #
159         # array type, find secondary files of elements
160         #
161         for p in primary:
162             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
163
164     elif (inputschema["type"] == "File" and
165           secondaryspec and
166           isinstance(primary, Mapping) and
167           primary.get("class") == "File" and
168           "secondaryFiles" not in primary):
169         #
170         # Found a file, check for secondaryFiles
171         #
172         specs = []
173         primary["secondaryFiles"] = secondaryspec
174         for i, sf in enumerate(aslist(secondaryspec)):
175             if builder.cwlVersion == "v1.0":
176                 pattern = builder.do_eval(sf, context=primary)
177             else:
178                 pattern = builder.do_eval(sf["pattern"], context=primary)
179             if pattern is None:
180                 continue
181             if isinstance(pattern, list):
182                 specs.extend(pattern)
183             elif isinstance(pattern, dict):
184                 specs.append(pattern)
185             elif isinstance(pattern, str):
186                 specs.append({"pattern": pattern})
187             else:
188                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
189                     "Expression must return list, object, string or null")
190
191         found = []
192         for i, sf in enumerate(specs):
193             if isinstance(sf, dict):
194                 if sf.get("class") == "File":
195                     pattern = sf["basename"]
196                 else:
197                     pattern = sf["pattern"]
198                     required = sf.get("required")
199             elif isinstance(sf, str):
200                 pattern = sf
201                 required = True
202             else:
203                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
204                     "Expression must return list, object, string or null")
205
206             sfpath = substitute(primary["location"], pattern)
207             required = builder.do_eval(required, context=primary)
208
209             if fsaccess.exists(sfpath):
210                 found.append({"location": sfpath, "class": "File"})
211             elif required:
212                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
213                     "Required secondary file '%s' does not exist" % sfpath)
214
215         primary["secondaryFiles"] = cmap(found)
216         if discovered is not None:
217             discovered[primary["location"]] = primary["secondaryFiles"]
218     elif inputschema["type"] not in primitive_types_set:
219         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
220
221 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
222     for inputschema in inputs:
223         primary = job_order.get(shortname(inputschema["id"]))
224         if isinstance(primary, (Mapping, Sequence)):
225             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
226
227 def upload_dependencies(arvrunner, name, document_loader,
228                         workflowobj, uri, loadref_run,
229                         include_primary=True, discovered_secondaryfiles=None):
230     """Upload the dependencies of the workflowobj document to Keep.
231
232     Returns a pathmapper object mapping local paths to keep references.  Also
233     does an in-place update of references in "workflowobj".
234
235     Use scandeps to find $import, $include, $schemas, run, File and Directory
236     fields that represent external references.
237
238     If workflowobj has an "id" field, this will reload the document to ensure
239     it is scanning the raw document prior to preprocessing.
240     """
241
242     loaded = set()
243     def loadref(b, u):
244         joined = document_loader.fetcher.urljoin(b, u)
245         defrg, _ = urllib.parse.urldefrag(joined)
246         if defrg not in loaded:
247             loaded.add(defrg)
248             # Use fetch_text to get raw file (before preprocessing).
249             text = document_loader.fetch_text(defrg)
250             if isinstance(text, bytes):
251                 textIO = StringIO(text.decode('utf-8'))
252             else:
253                 textIO = StringIO(text)
254             return yaml.safe_load(textIO)
255         else:
256             return {}
257
258     if loadref_run:
259         loadref_fields = set(("$import", "run"))
260     else:
261         loadref_fields = set(("$import",))
262
263     scanobj = workflowobj
264     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
265         # Need raw file content (before preprocessing) to ensure
266         # that external references in $include and $mixin are captured.
267         scanobj = loadref("", workflowobj["id"])
268
269     metadata = scanobj
270
271     sc_result = scandeps(uri, scanobj,
272                   loadref_fields,
273                   set(("$include", "$schemas", "location")),
274                   loadref, urljoin=document_loader.fetcher.urljoin)
275
276     sc = []
277     uuids = {}
278
279     def collect_uuids(obj):
280         loc = obj.get("location", "")
281         sp = loc.split(":")
282         if sp[0] == "keep":
283             # Collect collection uuids that need to be resolved to
284             # portable data hashes
285             gp = collection_uuid_pattern.match(loc)
286             if gp:
287                 uuids[gp.groups()[0]] = obj
288             if collectionUUID in obj:
289                 uuids[obj[collectionUUID]] = obj
290
291     def collect_uploads(obj):
292         loc = obj.get("location", "")
293         sp = loc.split(":")
294         if len(sp) < 1:
295             return
296         if sp[0] in ("file", "http", "https"):
297             # Record local files than need to be uploaded,
298             # don't include file literals, keep references, etc.
299             sc.append(obj)
300         collect_uuids(obj)
301
302     visit_class(workflowobj, ("File", "Directory"), collect_uuids)
303     visit_class(sc_result, ("File", "Directory"), collect_uploads)
304
305     # Resolve any collection uuids we found to portable data hashes
306     # and assign them to uuid_map
307     uuid_map = {}
308     fetch_uuids = list(uuids.keys())
309     while fetch_uuids:
310         # For a large number of fetch_uuids, API server may limit
311         # response size, so keep fetching from API server has nothing
312         # more to give us.
313         lookups = arvrunner.api.collections().list(
314             filters=[["uuid", "in", fetch_uuids]],
315             count="none",
316             select=["uuid", "portable_data_hash"]).execute(
317                 num_retries=arvrunner.num_retries)
318
319         if not lookups["items"]:
320             break
321
322         for l in lookups["items"]:
323             uuid_map[l["uuid"]] = l["portable_data_hash"]
324
325         fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
326
327     normalizeFilesDirs(sc)
328
329     if include_primary and "id" in workflowobj:
330         sc.append({"class": "File", "location": workflowobj["id"]})
331
332     if "$schemas" in workflowobj:
333         for s in workflowobj["$schemas"]:
334             sc.append({"class": "File", "location": s})
335
336     def visit_default(obj):
337         remove = [False]
338         def ensure_default_location(f):
339             if "location" not in f and "path" in f:
340                 f["location"] = f["path"]
341                 del f["path"]
342             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
343                 # Doesn't exist, remove from list of dependencies to upload
344                 sc[:] = [x for x in sc if x["location"] != f["location"]]
345                 # Delete "default" from workflowobj
346                 remove[0] = True
347         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
348         if remove[0]:
349             del obj["default"]
350
351     find_defaults(workflowobj, visit_default)
352
353     discovered = {}
354     def discover_default_secondary_files(obj):
355         builder_job_order = {}
356         for t in obj["inputs"]:
357             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
358         # Need to create a builder object to evaluate expressions.
359         builder = make_builder(builder_job_order,
360                                obj.get("hints", []),
361                                obj.get("requirements", []),
362                                ArvRuntimeContext(),
363                                metadata)
364         discover_secondary_files(arvrunner.fs_access,
365                                  builder,
366                                  obj["inputs"],
367                                  builder_job_order,
368                                  discovered)
369
370     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
371     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
372
373     for d in list(discovered):
374         # Only interested in discovered secondaryFiles which are local
375         # files that need to be uploaded.
376         if d.startswith("file:"):
377             sc.extend(discovered[d])
378         else:
379             del discovered[d]
380
381     mapper = ArvPathMapper(arvrunner, sc, "",
382                            "keep:%s",
383                            "keep:%s/%s",
384                            name=name,
385                            single_collection=True)
386
387     def setloc(p):
388         loc = p.get("location")
389         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
390             p["location"] = mapper.mapper(p["location"]).resolved
391             return
392
393         if not loc:
394             return
395
396         if collectionUUID in p:
397             uuid = p[collectionUUID]
398             if uuid not in uuid_map:
399                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
400                     "Collection uuid %s not found" % uuid)
401             gp = collection_pdh_pattern.match(loc)
402             if gp and uuid_map[uuid] != gp.groups()[0]:
403                 # This file entry has both collectionUUID and a PDH
404                 # location. If the PDH doesn't match the one returned
405                 # the API server, raise an error.
406                 raise SourceLine(p, "location", validate.ValidationException).makeError(
407                     "Expected collection uuid %s to be %s but API server reported %s" % (
408                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
409
410         gp = collection_uuid_pattern.match(loc)
411         if not gp:
412             return
413         uuid = gp.groups()[0]
414         if uuid not in uuid_map:
415             raise SourceLine(p, "location", validate.ValidationException).makeError(
416                 "Collection uuid %s not found" % uuid)
417         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
418         p[collectionUUID] = uuid
419
420     visit_class(workflowobj, ("File", "Directory"), setloc)
421     visit_class(discovered, ("File", "Directory"), setloc)
422
423     if discovered_secondaryfiles is not None:
424         for d in discovered:
425             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
426
427     if "$schemas" in workflowobj:
428         sch = CommentedSeq()
429         for s in workflowobj["$schemas"]:
430             if s in mapper:
431                 sch.append(mapper.mapper(s).resolved)
432         workflowobj["$schemas"] = sch
433
434     return mapper
435
436
437 def upload_docker(arvrunner, tool):
438     """Uploads Docker images used in CommandLineTool objects."""
439
440     if isinstance(tool, CommandLineTool):
441         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
442         if docker_req:
443             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
444                 # TODO: can be supported by containers API, but not jobs API.
445                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
446                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
447             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
448                                                        arvrunner.runtimeContext.force_docker_pull,
449                                                        arvrunner.runtimeContext.tmp_outdir_prefix)
450         else:
451             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
452                                                        True, arvrunner.project_uuid,
453                                                        arvrunner.runtimeContext.force_docker_pull,
454                                                        arvrunner.runtimeContext.tmp_outdir_prefix)
455     elif isinstance(tool, cwltool.workflow.Workflow):
456         for s in tool.steps:
457             upload_docker(arvrunner, s.embedded_tool)
458
459
460 def packed_workflow(arvrunner, tool, merged_map):
461     """Create a packed workflow.
462
463     A "packed" workflow is one where all the components have been combined into a single document."""
464
465     rewrites = {}
466     packed = pack(arvrunner.loadingContext, tool.tool["id"],
467                   rewrite_out=rewrites,
468                   loader=tool.doc_loader)
469
470     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
471
472     def visit(v, cur_id):
473         if isinstance(v, dict):
474             if v.get("class") in ("CommandLineTool", "Workflow"):
475                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
476                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
477                 if "id" in v:
478                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
479             if "path" in v and "location" not in v:
480                 v["location"] = v["path"]
481                 del v["path"]
482             if "location" in v and not v["location"].startswith("keep:"):
483                 v["location"] = merged_map[cur_id].resolved[v["location"]]
484             if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
485                 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
486             if v.get("class") == "DockerRequirement":
487                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
488                                                                                                              arvrunner.project_uuid,
489                                                                                                              arvrunner.runtimeContext.force_docker_pull,
490                                                                                                              arvrunner.runtimeContext.tmp_outdir_prefix)
491             for l in v:
492                 visit(v[l], cur_id)
493         if isinstance(v, list):
494             for l in v:
495                 visit(l, cur_id)
496     visit(packed, None)
497     return packed
498
499
500 def tag_git_version(packed):
501     if tool.tool["id"].startswith("file://"):
502         path = os.path.dirname(tool.tool["id"][7:])
503         try:
504             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
505         except (OSError, subprocess.CalledProcessError):
506             pass
507         else:
508             packed["http://schema.org/version"] = githash
509
510
511 def upload_job_order(arvrunner, name, tool, job_order):
512     """Upload local files referenced in the input object and return updated input
513     object with 'location' updated to the proper keep references.
514     """
515
516     # Make a copy of the job order and set defaults.
517     builder_job_order = copy.copy(job_order)
518
519     # fill_in_defaults throws an error if there are any
520     # missing required parameters, we don't want it to do that
521     # so make them all optional.
522     inputs_copy = copy.deepcopy(tool.tool["inputs"])
523     for i in inputs_copy:
524         if "null" not in i["type"]:
525             i["type"] = ["null"] + aslist(i["type"])
526
527     fill_in_defaults(inputs_copy,
528                      builder_job_order,
529                      arvrunner.fs_access)
530     # Need to create a builder object to evaluate expressions.
531     builder = make_builder(builder_job_order,
532                            tool.hints,
533                            tool.requirements,
534                            ArvRuntimeContext(),
535                            tool.metadata)
536     # Now update job_order with secondaryFiles
537     discover_secondary_files(arvrunner.fs_access,
538                              builder,
539                              tool.tool["inputs"],
540                              job_order)
541
542     jobmapper = upload_dependencies(arvrunner,
543                                     name,
544                                     tool.doc_loader,
545                                     job_order,
546                                     job_order.get("id", "#"),
547                                     False)
548
549     if "id" in job_order:
550         del job_order["id"]
551
552     # Need to filter this out, gets added by cwltool when providing
553     # parameters on the command line.
554     if "job_order" in job_order:
555         del job_order["job_order"]
556
557     return job_order
558
559 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
560
561 def upload_workflow_deps(arvrunner, tool):
562     # Ensure that Docker images needed by this workflow are available
563
564     upload_docker(arvrunner, tool)
565
566     document_loader = tool.doc_loader
567
568     merged_map = {}
569
570     def upload_tool_deps(deptool):
571         if "id" in deptool:
572             discovered_secondaryfiles = {}
573             pm = upload_dependencies(arvrunner,
574                                      "%s dependencies" % (shortname(deptool["id"])),
575                                      document_loader,
576                                      deptool,
577                                      deptool["id"],
578                                      False,
579                                      include_primary=False,
580                                      discovered_secondaryfiles=discovered_secondaryfiles)
581             document_loader.idx[deptool["id"]] = deptool
582             toolmap = {}
583             for k,v in pm.items():
584                 toolmap[k] = v.resolved
585             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
586
587     tool.visit(upload_tool_deps)
588
589     return merged_map
590
591 def arvados_jobs_image(arvrunner, img):
592     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
593
594     try:
595         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
596                                                           arvrunner.runtimeContext.force_docker_pull,
597                                                           arvrunner.runtimeContext.tmp_outdir_prefix)
598     except Exception as e:
599         raise Exception("Docker image %s is not available\n%s" % (img, e) )
600
601
602 def upload_workflow_collection(arvrunner, name, packed):
603     collection = arvados.collection.Collection(api_client=arvrunner.api,
604                                                keep_client=arvrunner.keep_client,
605                                                num_retries=arvrunner.num_retries)
606     with collection.open("workflow.cwl", "w") as f:
607         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
608
609     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
610                ["name", "like", name+"%"]]
611     if arvrunner.project_uuid:
612         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
613     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
614
615     if exists["items"]:
616         logger.info("Using collection %s", exists["items"][0]["uuid"])
617     else:
618         collection.save_new(name=name,
619                             owner_uuid=arvrunner.project_uuid,
620                             ensure_unique_name=True,
621                             num_retries=arvrunner.num_retries)
622         logger.info("Uploaded to %s", collection.manifest_locator())
623
624     return collection.portable_data_hash()
625
626
627 class Runner(Process):
628     """Base class for runner processes, which submit an instance of
629     arvados-cwl-runner and wait for the final result."""
630
631     def __init__(self, runner, updated_tool,
632                  tool, loadingContext, enable_reuse,
633                  output_name, output_tags, submit_runner_ram=0,
634                  name=None, on_error=None, submit_runner_image=None,
635                  intermediate_output_ttl=0, merged_map=None,
636                  priority=None, secret_store=None,
637                  collection_cache_size=256,
638                  collection_cache_is_default=True):
639
640         loadingContext = loadingContext.copy()
641         loadingContext.metadata = updated_tool.metadata.copy()
642
643         super(Runner, self).__init__(updated_tool.tool, loadingContext)
644
645         self.arvrunner = runner
646         self.embedded_tool = tool
647         self.job_order = None
648         self.running = False
649         if enable_reuse:
650             # If reuse is permitted by command line arguments but
651             # disabled by the workflow itself, disable it.
652             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
653             if reuse_req:
654                 enable_reuse = reuse_req["enableReuse"]
655         self.enable_reuse = enable_reuse
656         self.uuid = None
657         self.final_output = None
658         self.output_name = output_name
659         self.output_tags = output_tags
660         self.name = name
661         self.on_error = on_error
662         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
663         self.intermediate_output_ttl = intermediate_output_ttl
664         self.priority = priority
665         self.secret_store = secret_store
666         self.enable_dev = loadingContext.enable_dev
667
668         self.submit_runner_cores = 1
669         self.submit_runner_ram = 1024  # defaut 1 GiB
670         self.collection_cache_size = collection_cache_size
671
672         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
673         if runner_resource_req:
674             if runner_resource_req.get("coresMin"):
675                 self.submit_runner_cores = runner_resource_req["coresMin"]
676             if runner_resource_req.get("ramMin"):
677                 self.submit_runner_ram = runner_resource_req["ramMin"]
678             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
679                 self.collection_cache_size = runner_resource_req["keep_cache"]
680
681         if submit_runner_ram:
682             # Command line / initializer overrides default and/or spec from workflow
683             self.submit_runner_ram = submit_runner_ram
684
685         if self.submit_runner_ram <= 0:
686             raise Exception("Value of submit-runner-ram must be greater than zero")
687
688         if self.submit_runner_cores <= 0:
689             raise Exception("Value of submit-runner-cores must be greater than zero")
690
691         self.merged_map = merged_map or {}
692
693     def job(self,
694             job_order,         # type: Mapping[Text, Text]
695             output_callbacks,  # type: Callable[[Any, Any], Any]
696             runtimeContext     # type: RuntimeContext
697            ):  # type: (...) -> Generator[Any, None, None]
698         self.job_order = job_order
699         self._init_job(job_order, runtimeContext)
700         yield self
701
702     def update_pipeline_component(self, record):
703         pass
704
705     def done(self, record):
706         """Base method for handling a completed runner."""
707
708         try:
709             if record["state"] == "Complete":
710                 if record.get("exit_code") is not None:
711                     if record["exit_code"] == 33:
712                         processStatus = "UnsupportedRequirement"
713                     elif record["exit_code"] == 0:
714                         processStatus = "success"
715                     else:
716                         processStatus = "permanentFail"
717                 else:
718                     processStatus = "success"
719             else:
720                 processStatus = "permanentFail"
721
722             outputs = {}
723
724             if processStatus == "permanentFail":
725                 logc = arvados.collection.CollectionReader(record["log"],
726                                                            api_client=self.arvrunner.api,
727                                                            keep_client=self.arvrunner.keep_client,
728                                                            num_retries=self.arvrunner.num_retries)
729                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
730
731             self.final_output = record["output"]
732             outc = arvados.collection.CollectionReader(self.final_output,
733                                                        api_client=self.arvrunner.api,
734                                                        keep_client=self.arvrunner.keep_client,
735                                                        num_retries=self.arvrunner.num_retries)
736             if "cwl.output.json" in outc:
737                 with outc.open("cwl.output.json", "rb") as f:
738                     if f.size() > 0:
739                         outputs = json.loads(f.read().decode())
740             def keepify(fileobj):
741                 path = fileobj["location"]
742                 if not path.startswith("keep:"):
743                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
744             adjustFileObjs(outputs, keepify)
745             adjustDirObjs(outputs, keepify)
746         except Exception:
747             logger.exception("[%s] While getting final output object", self.name)
748             self.arvrunner.output_callback({}, "permanentFail")
749         else:
750             self.arvrunner.output_callback(outputs, processStatus)