Merge branch '18216-arvados-server-keepstore'
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
21
22 if os.name == "posix" and sys.version_info[0] < 3:
23     import subprocess32 as subprocess
24 else:
25     import subprocess
26
27 from schema_salad.sourceline import SourceLine, cmap
28
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32                              shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
40
41 import arvados.collection
42 from .util import collectionUUID
43 import ruamel.yaml as yaml
44 from ruamel.yaml.comments import CommentedMap, CommentedSeq
45
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
48 from ._version import __version__
49 from . import done
50 from . context import ArvRuntimeContext
51
52 logger = logging.getLogger('arvados.cwl-runner')
53
54 def trim_anonymous_location(obj):
55     """Remove 'location' field from File and Directory literals.
56
57     To make internal handling easier, literals are assigned a random id for
58     'location'.  However, when writing the record back out, this can break
59     reproducibility.  Since it is valid for literals not have a 'location'
60     field, remove it.
61
62     """
63
64     if obj.get("location", "").startswith("_:"):
65         del obj["location"]
66
67
68 def remove_redundant_fields(obj):
69     for field in ("path", "nameext", "nameroot", "dirname"):
70         if field in obj:
71             del obj[field]
72
73
74 def find_defaults(d, op):
75     if isinstance(d, list):
76         for i in d:
77             find_defaults(i, op)
78     elif isinstance(d, dict):
79         if "default" in d:
80             op(d)
81         else:
82             for i in viewvalues(d):
83                 find_defaults(i, op)
84
85 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
86     return Builder(
87                  job=joborder,
88                  files=[],               # type: List[Dict[Text, Text]]
89                  bindings=[],            # type: List[Dict[Text, Any]]
90                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
91                  names=None,               # type: Names
92                  requirements=requirements,        # type: List[Dict[Text, Any]]
93                  hints=hints,               # type: List[Dict[Text, Any]]
94                  resources={},           # type: Dict[str, int]
95                  mutation_manager=None,    # type: Optional[MutationManager]
96                  formatgraph=None,         # type: Optional[Graph]
97                  make_fs_access=None,      # type: Type[StdFsAccess]
98                  fs_access=None,           # type: StdFsAccess
99                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100                  timeout=runtimeContext.eval_timeout,             # type: float
101                  debug=runtimeContext.debug,               # type: bool
102                  js_console=runtimeContext.js_console,          # type: bool
103                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
104                  loadListing="",         # type: Text
105                  outdir="",              # type: Text
106                  tmpdir="",              # type: Text
107                  stagedir="",            # type: Text
108                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
109                  container_engine="docker"
110                 )
111
112 def search_schemadef(name, reqs):
113     for r in reqs:
114         if r["class"] == "SchemaDefRequirement":
115             for sd in r["types"]:
116                 if sd["name"] == name:
117                     return sd
118     return None
119
120 primitive_types_set = frozenset(("null", "boolean", "int", "long",
121                                  "float", "double", "string", "record",
122                                  "array", "enum"))
123
124 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
125     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
126         # union type, collect all possible secondaryFiles
127         for i in inputschema:
128             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
129         return
130
131     if isinstance(inputschema, basestring):
132         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
133         if sd:
134             inputschema = sd
135         else:
136             return
137
138     if "secondaryFiles" in inputschema:
139         # set secondaryFiles, may be inherited by compound types.
140         secondaryspec = inputschema["secondaryFiles"]
141
142     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
143         not isinstance(inputschema["type"], basestring)):
144         # compound type (union, array, record)
145         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
146
147     elif (inputschema["type"] == "record" and
148           isinstance(primary, Mapping)):
149         #
150         # record type, find secondary files associated with fields.
151         #
152         for f in inputschema["fields"]:
153             p = primary.get(shortname(f["name"]))
154             if p:
155                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
156
157     elif (inputschema["type"] == "array" and
158           isinstance(primary, Sequence)):
159         #
160         # array type, find secondary files of elements
161         #
162         for p in primary:
163             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
164
165     elif (inputschema["type"] == "File" and
166           secondaryspec and
167           isinstance(primary, Mapping) and
168           primary.get("class") == "File" and
169           "secondaryFiles" not in primary):
170         #
171         # Found a file, check for secondaryFiles
172         #
173         specs = []
174         primary["secondaryFiles"] = secondaryspec
175         for i, sf in enumerate(aslist(secondaryspec)):
176             if builder.cwlVersion == "v1.0":
177                 pattern = builder.do_eval(sf, context=primary)
178             else:
179                 pattern = builder.do_eval(sf["pattern"], context=primary)
180             if pattern is None:
181                 continue
182             if isinstance(pattern, list):
183                 specs.extend(pattern)
184             elif isinstance(pattern, dict):
185                 specs.append(pattern)
186             elif isinstance(pattern, str):
187                 specs.append({"pattern": pattern, "required": sf.get("required")})
188             else:
189                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
190                     "Expression must return list, object, string or null")
191
192         found = []
193         for i, sf in enumerate(specs):
194             if isinstance(sf, dict):
195                 if sf.get("class") == "File":
196                     pattern = None
197                     sfpath = sf["location"]
198                     required = True
199                 else:
200                     pattern = sf["pattern"]
201                     required = sf.get("required")
202             elif isinstance(sf, str):
203                 pattern = sf
204                 required = True
205             else:
206                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
207                     "Expression must return list, object, string or null")
208
209             if pattern is not None:
210                 sfpath = substitute(primary["location"], pattern)
211
212             required = builder.do_eval(required, context=primary)
213
214             if fsaccess.exists(sfpath):
215                 if pattern is not None:
216                     found.append({"location": sfpath, "class": "File"})
217                 else:
218                     found.append(sf)
219             elif required:
220                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
221                     "Required secondary file '%s' does not exist" % sfpath)
222
223         primary["secondaryFiles"] = cmap(found)
224         if discovered is not None:
225             discovered[primary["location"]] = primary["secondaryFiles"]
226     elif inputschema["type"] not in primitive_types_set:
227         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
228
229 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
230     for inputschema in inputs:
231         primary = job_order.get(shortname(inputschema["id"]))
232         if isinstance(primary, (Mapping, Sequence)):
233             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
234
235 def upload_dependencies(arvrunner, name, document_loader,
236                         workflowobj, uri, loadref_run,
237                         include_primary=True, discovered_secondaryfiles=None):
238     """Upload the dependencies of the workflowobj document to Keep.
239
240     Returns a pathmapper object mapping local paths to keep references.  Also
241     does an in-place update of references in "workflowobj".
242
243     Use scandeps to find $import, $include, $schemas, run, File and Directory
244     fields that represent external references.
245
246     If workflowobj has an "id" field, this will reload the document to ensure
247     it is scanning the raw document prior to preprocessing.
248     """
249
250     loaded = set()
251     def loadref(b, u):
252         joined = document_loader.fetcher.urljoin(b, u)
253         defrg, _ = urllib.parse.urldefrag(joined)
254         if defrg not in loaded:
255             loaded.add(defrg)
256             # Use fetch_text to get raw file (before preprocessing).
257             text = document_loader.fetch_text(defrg)
258             if isinstance(text, bytes):
259                 textIO = StringIO(text.decode('utf-8'))
260             else:
261                 textIO = StringIO(text)
262             return yaml.safe_load(textIO)
263         else:
264             return {}
265
266     if loadref_run:
267         loadref_fields = set(("$import", "run"))
268     else:
269         loadref_fields = set(("$import",))
270
271     scanobj = workflowobj
272     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
273         # Need raw file content (before preprocessing) to ensure
274         # that external references in $include and $mixin are captured.
275         scanobj = loadref("", workflowobj["id"])
276
277     metadata = scanobj
278
279     sc_result = scandeps(uri, scanobj,
280                   loadref_fields,
281                   set(("$include", "$schemas", "location")),
282                   loadref, urljoin=document_loader.fetcher.urljoin)
283
284     sc = []
285     uuids = {}
286
287     def collect_uuids(obj):
288         loc = obj.get("location", "")
289         sp = loc.split(":")
290         if sp[0] == "keep":
291             # Collect collection uuids that need to be resolved to
292             # portable data hashes
293             gp = collection_uuid_pattern.match(loc)
294             if gp:
295                 uuids[gp.groups()[0]] = obj
296             if collectionUUID in obj:
297                 uuids[obj[collectionUUID]] = obj
298
299     def collect_uploads(obj):
300         loc = obj.get("location", "")
301         sp = loc.split(":")
302         if len(sp) < 1:
303             return
304         if sp[0] in ("file", "http", "https"):
305             # Record local files than need to be uploaded,
306             # don't include file literals, keep references, etc.
307             sc.append(obj)
308         collect_uuids(obj)
309
310     visit_class(workflowobj, ("File", "Directory"), collect_uuids)
311     visit_class(sc_result, ("File", "Directory"), collect_uploads)
312
313     # Resolve any collection uuids we found to portable data hashes
314     # and assign them to uuid_map
315     uuid_map = {}
316     fetch_uuids = list(uuids.keys())
317     while fetch_uuids:
318         # For a large number of fetch_uuids, API server may limit
319         # response size, so keep fetching from API server has nothing
320         # more to give us.
321         lookups = arvrunner.api.collections().list(
322             filters=[["uuid", "in", fetch_uuids]],
323             count="none",
324             select=["uuid", "portable_data_hash"]).execute(
325                 num_retries=arvrunner.num_retries)
326
327         if not lookups["items"]:
328             break
329
330         for l in lookups["items"]:
331             uuid_map[l["uuid"]] = l["portable_data_hash"]
332
333         fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
334
335     normalizeFilesDirs(sc)
336
337     if include_primary and "id" in workflowobj:
338         sc.append({"class": "File", "location": workflowobj["id"]})
339
340     if "$schemas" in workflowobj:
341         for s in workflowobj["$schemas"]:
342             sc.append({"class": "File", "location": s})
343
344     def visit_default(obj):
345         remove = [False]
346         def ensure_default_location(f):
347             if "location" not in f and "path" in f:
348                 f["location"] = f["path"]
349                 del f["path"]
350             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
351                 # Doesn't exist, remove from list of dependencies to upload
352                 sc[:] = [x for x in sc if x["location"] != f["location"]]
353                 # Delete "default" from workflowobj
354                 remove[0] = True
355         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
356         if remove[0]:
357             del obj["default"]
358
359     find_defaults(workflowobj, visit_default)
360
361     discovered = {}
362     def discover_default_secondary_files(obj):
363         builder_job_order = {}
364         for t in obj["inputs"]:
365             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
366         # Need to create a builder object to evaluate expressions.
367         builder = make_builder(builder_job_order,
368                                obj.get("hints", []),
369                                obj.get("requirements", []),
370                                ArvRuntimeContext(),
371                                metadata)
372         discover_secondary_files(arvrunner.fs_access,
373                                  builder,
374                                  obj["inputs"],
375                                  builder_job_order,
376                                  discovered)
377
378     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
379     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
380
381     for d in list(discovered):
382         # Only interested in discovered secondaryFiles which are local
383         # files that need to be uploaded.
384         if d.startswith("file:"):
385             sc.extend(discovered[d])
386         else:
387             del discovered[d]
388
389     mapper = ArvPathMapper(arvrunner, sc, "",
390                            "keep:%s",
391                            "keep:%s/%s",
392                            name=name,
393                            single_collection=True)
394
395     def setloc(p):
396         loc = p.get("location")
397         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
398             p["location"] = mapper.mapper(p["location"]).resolved
399             return
400
401         if not loc:
402             return
403
404         if collectionUUID in p:
405             uuid = p[collectionUUID]
406             if uuid not in uuid_map:
407                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
408                     "Collection uuid %s not found" % uuid)
409             gp = collection_pdh_pattern.match(loc)
410             if gp and uuid_map[uuid] != gp.groups()[0]:
411                 # This file entry has both collectionUUID and a PDH
412                 # location. If the PDH doesn't match the one returned
413                 # the API server, raise an error.
414                 raise SourceLine(p, "location", validate.ValidationException).makeError(
415                     "Expected collection uuid %s to be %s but API server reported %s" % (
416                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
417
418         gp = collection_uuid_pattern.match(loc)
419         if not gp:
420             return
421         uuid = gp.groups()[0]
422         if uuid not in uuid_map:
423             raise SourceLine(p, "location", validate.ValidationException).makeError(
424                 "Collection uuid %s not found" % uuid)
425         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
426         p[collectionUUID] = uuid
427
428     visit_class(workflowobj, ("File", "Directory"), setloc)
429     visit_class(discovered, ("File", "Directory"), setloc)
430
431     if discovered_secondaryfiles is not None:
432         for d in discovered:
433             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
434
435     if "$schemas" in workflowobj:
436         sch = CommentedSeq()
437         for s in workflowobj["$schemas"]:
438             if s in mapper:
439                 sch.append(mapper.mapper(s).resolved)
440         workflowobj["$schemas"] = sch
441
442     return mapper
443
444
445 def upload_docker(arvrunner, tool):
446     """Uploads Docker images used in CommandLineTool objects."""
447
448     if isinstance(tool, CommandLineTool):
449         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
450         if docker_req:
451             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
452                 # TODO: can be supported by containers API, but not jobs API.
453                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
454                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
455             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
456                                                        arvrunner.runtimeContext.force_docker_pull,
457                                                        arvrunner.runtimeContext.tmp_outdir_prefix)
458         else:
459             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
460                                                        True, arvrunner.project_uuid,
461                                                        arvrunner.runtimeContext.force_docker_pull,
462                                                        arvrunner.runtimeContext.tmp_outdir_prefix)
463     elif isinstance(tool, cwltool.workflow.Workflow):
464         for s in tool.steps:
465             upload_docker(arvrunner, s.embedded_tool)
466
467
468 def packed_workflow(arvrunner, tool, merged_map):
469     """Create a packed workflow.
470
471     A "packed" workflow is one where all the components have been combined into a single document."""
472
473     rewrites = {}
474     packed = pack(arvrunner.loadingContext, tool.tool["id"],
475                   rewrite_out=rewrites,
476                   loader=tool.doc_loader)
477
478     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
479
480     def visit(v, cur_id):
481         if isinstance(v, dict):
482             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
483                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
484                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
485                 if "id" in v:
486                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
487             if "path" in v and "location" not in v:
488                 v["location"] = v["path"]
489                 del v["path"]
490             if "location" in v and cur_id in merged_map:
491                 if v["location"] in merged_map[cur_id].resolved:
492                     v["location"] = merged_map[cur_id].resolved[v["location"]]
493                 if v["location"] in merged_map[cur_id].secondaryFiles:
494                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
495             if v.get("class") == "DockerRequirement":
496                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
497                                                                                                              arvrunner.project_uuid,
498                                                                                                              arvrunner.runtimeContext.force_docker_pull,
499                                                                                                              arvrunner.runtimeContext.tmp_outdir_prefix)
500             for l in v:
501                 visit(v[l], cur_id)
502         if isinstance(v, list):
503             for l in v:
504                 visit(l, cur_id)
505     visit(packed, None)
506     return packed
507
508
509 def tag_git_version(packed):
510     if tool.tool["id"].startswith("file://"):
511         path = os.path.dirname(tool.tool["id"][7:])
512         try:
513             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
514         except (OSError, subprocess.CalledProcessError):
515             pass
516         else:
517             packed["http://schema.org/version"] = githash
518
519
520 def upload_job_order(arvrunner, name, tool, job_order):
521     """Upload local files referenced in the input object and return updated input
522     object with 'location' updated to the proper keep references.
523     """
524
525     # Make a copy of the job order and set defaults.
526     builder_job_order = copy.copy(job_order)
527
528     # fill_in_defaults throws an error if there are any
529     # missing required parameters, we don't want it to do that
530     # so make them all optional.
531     inputs_copy = copy.deepcopy(tool.tool["inputs"])
532     for i in inputs_copy:
533         if "null" not in i["type"]:
534             i["type"] = ["null"] + aslist(i["type"])
535
536     fill_in_defaults(inputs_copy,
537                      builder_job_order,
538                      arvrunner.fs_access)
539     # Need to create a builder object to evaluate expressions.
540     builder = make_builder(builder_job_order,
541                            tool.hints,
542                            tool.requirements,
543                            ArvRuntimeContext(),
544                            tool.metadata)
545     # Now update job_order with secondaryFiles
546     discover_secondary_files(arvrunner.fs_access,
547                              builder,
548                              tool.tool["inputs"],
549                              job_order)
550
551     jobmapper = upload_dependencies(arvrunner,
552                                     name,
553                                     tool.doc_loader,
554                                     job_order,
555                                     job_order.get("id", "#"),
556                                     False)
557
558     if "id" in job_order:
559         del job_order["id"]
560
561     # Need to filter this out, gets added by cwltool when providing
562     # parameters on the command line.
563     if "job_order" in job_order:
564         del job_order["job_order"]
565
566     return job_order
567
568 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
569
570 def upload_workflow_deps(arvrunner, tool):
571     # Ensure that Docker images needed by this workflow are available
572
573     upload_docker(arvrunner, tool)
574
575     document_loader = tool.doc_loader
576
577     merged_map = {}
578
579     def upload_tool_deps(deptool):
580         if "id" in deptool:
581             discovered_secondaryfiles = {}
582             pm = upload_dependencies(arvrunner,
583                                      "%s dependencies" % (shortname(deptool["id"])),
584                                      document_loader,
585                                      deptool,
586                                      deptool["id"],
587                                      False,
588                                      include_primary=False,
589                                      discovered_secondaryfiles=discovered_secondaryfiles)
590             document_loader.idx[deptool["id"]] = deptool
591             toolmap = {}
592             for k,v in pm.items():
593                 toolmap[k] = v.resolved
594             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
595
596     tool.visit(upload_tool_deps)
597
598     return merged_map
599
600 def arvados_jobs_image(arvrunner, img):
601     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
602
603     try:
604         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
605                                                           arvrunner.runtimeContext.force_docker_pull,
606                                                           arvrunner.runtimeContext.tmp_outdir_prefix)
607     except Exception as e:
608         raise Exception("Docker image %s is not available\n%s" % (img, e) )
609
610
611 def upload_workflow_collection(arvrunner, name, packed):
612     collection = arvados.collection.Collection(api_client=arvrunner.api,
613                                                keep_client=arvrunner.keep_client,
614                                                num_retries=arvrunner.num_retries)
615     with collection.open("workflow.cwl", "w") as f:
616         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
617
618     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
619                ["name", "like", name+"%"]]
620     if arvrunner.project_uuid:
621         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
622     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
623
624     if exists["items"]:
625         logger.info("Using collection %s", exists["items"][0]["uuid"])
626     else:
627         collection.save_new(name=name,
628                             owner_uuid=arvrunner.project_uuid,
629                             ensure_unique_name=True,
630                             num_retries=arvrunner.num_retries)
631         logger.info("Uploaded to %s", collection.manifest_locator())
632
633     return collection.portable_data_hash()
634
635
636 class Runner(Process):
637     """Base class for runner processes, which submit an instance of
638     arvados-cwl-runner and wait for the final result."""
639
640     def __init__(self, runner, updated_tool,
641                  tool, loadingContext, enable_reuse,
642                  output_name, output_tags, submit_runner_ram=0,
643                  name=None, on_error=None, submit_runner_image=None,
644                  intermediate_output_ttl=0, merged_map=None,
645                  priority=None, secret_store=None,
646                  collection_cache_size=256,
647                  collection_cache_is_default=True):
648
649         loadingContext = loadingContext.copy()
650         loadingContext.metadata = updated_tool.metadata.copy()
651
652         super(Runner, self).__init__(updated_tool.tool, loadingContext)
653
654         self.arvrunner = runner
655         self.embedded_tool = tool
656         self.job_order = None
657         self.running = False
658         if enable_reuse:
659             # If reuse is permitted by command line arguments but
660             # disabled by the workflow itself, disable it.
661             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
662             if reuse_req:
663                 enable_reuse = reuse_req["enableReuse"]
664         self.enable_reuse = enable_reuse
665         self.uuid = None
666         self.final_output = None
667         self.output_name = output_name
668         self.output_tags = output_tags
669         self.name = name
670         self.on_error = on_error
671         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
672         self.intermediate_output_ttl = intermediate_output_ttl
673         self.priority = priority
674         self.secret_store = secret_store
675         self.enable_dev = loadingContext.enable_dev
676
677         self.submit_runner_cores = 1
678         self.submit_runner_ram = 1024  # defaut 1 GiB
679         self.collection_cache_size = collection_cache_size
680
681         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
682         if runner_resource_req:
683             if runner_resource_req.get("coresMin"):
684                 self.submit_runner_cores = runner_resource_req["coresMin"]
685             if runner_resource_req.get("ramMin"):
686                 self.submit_runner_ram = runner_resource_req["ramMin"]
687             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
688                 self.collection_cache_size = runner_resource_req["keep_cache"]
689
690         if submit_runner_ram:
691             # Command line / initializer overrides default and/or spec from workflow
692             self.submit_runner_ram = submit_runner_ram
693
694         if self.submit_runner_ram <= 0:
695             raise Exception("Value of submit-runner-ram must be greater than zero")
696
697         if self.submit_runner_cores <= 0:
698             raise Exception("Value of submit-runner-cores must be greater than zero")
699
700         self.merged_map = merged_map or {}
701
702     def job(self,
703             job_order,         # type: Mapping[Text, Text]
704             output_callbacks,  # type: Callable[[Any, Any], Any]
705             runtimeContext     # type: RuntimeContext
706            ):  # type: (...) -> Generator[Any, None, None]
707         self.job_order = job_order
708         self._init_job(job_order, runtimeContext)
709         yield self
710
711     def update_pipeline_component(self, record):
712         pass
713
714     def done(self, record):
715         """Base method for handling a completed runner."""
716
717         try:
718             if record["state"] == "Complete":
719                 if record.get("exit_code") is not None:
720                     if record["exit_code"] == 33:
721                         processStatus = "UnsupportedRequirement"
722                     elif record["exit_code"] == 0:
723                         processStatus = "success"
724                     else:
725                         processStatus = "permanentFail"
726                 else:
727                     processStatus = "success"
728             else:
729                 processStatus = "permanentFail"
730
731             outputs = {}
732
733             if processStatus == "permanentFail":
734                 logc = arvados.collection.CollectionReader(record["log"],
735                                                            api_client=self.arvrunner.api,
736                                                            keep_client=self.arvrunner.keep_client,
737                                                            num_retries=self.arvrunner.num_retries)
738                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
739
740             self.final_output = record["output"]
741             outc = arvados.collection.CollectionReader(self.final_output,
742                                                        api_client=self.arvrunner.api,
743                                                        keep_client=self.arvrunner.keep_client,
744                                                        num_retries=self.arvrunner.num_retries)
745             if "cwl.output.json" in outc:
746                 with outc.open("cwl.output.json", "rb") as f:
747                     if f.size() > 0:
748                         outputs = json.loads(f.read().decode())
749             def keepify(fileobj):
750                 path = fileobj["location"]
751                 if not path.startswith("keep:"):
752                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
753             adjustFileObjs(outputs, keepify)
754             adjustDirObjs(outputs, keepify)
755         except Exception:
756             logger.exception("[%s] While getting final output object", self.name)
757             self.arvrunner.output_callback({}, "permanentFail")
758         else:
759             self.arvrunner.output_callback(outputs, processStatus)