16382: Extra handling for secondaryFiles containing expressions
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
21
22 if os.name == "posix" and sys.version_info[0] < 3:
23     import subprocess32 as subprocess
24 else:
25     import subprocess
26
27 from schema_salad.sourceline import SourceLine, cmap
28
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32                              shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.utils import aslist
36 from cwltool.builder import substitute
37 from cwltool.pack import pack
38 from cwltool.update import INTERNAL_VERSION
39 from cwltool.builder import Builder
40 import schema_salad.validate as validate
41
42 import arvados.collection
43 from .util import collectionUUID
44 import ruamel.yaml as yaml
45 from ruamel.yaml.comments import CommentedMap, CommentedSeq
46
47 import arvados_cwl.arvdocker
48 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
49 from ._version import __version__
50 from . import done
51 from . context import ArvRuntimeContext
52
53 logger = logging.getLogger('arvados.cwl-runner')
54
55 def trim_anonymous_location(obj):
56     """Remove 'location' field from File and Directory literals.
57
58     To make internal handling easier, literals are assigned a random id for
59     'location'.  However, when writing the record back out, this can break
60     reproducibility.  Since it is valid for literals not have a 'location'
61     field, remove it.
62
63     """
64
65     if obj.get("location", "").startswith("_:"):
66         del obj["location"]
67
68
69 def remove_redundant_fields(obj):
70     for field in ("path", "nameext", "nameroot", "dirname"):
71         if field in obj:
72             del obj[field]
73
74
75 def find_defaults(d, op):
76     if isinstance(d, list):
77         for i in d:
78             find_defaults(i, op)
79     elif isinstance(d, dict):
80         if "default" in d:
81             op(d)
82         else:
83             for i in viewvalues(d):
84                 find_defaults(i, op)
85
86 def make_builder(joborder, hints, requirements, runtimeContext):
87     return Builder(
88                  job=joborder,
89                  files=[],               # type: List[Dict[Text, Text]]
90                  bindings=[],            # type: List[Dict[Text, Any]]
91                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
92                  names=None,               # type: Names
93                  requirements=requirements,        # type: List[Dict[Text, Any]]
94                  hints=hints,               # type: List[Dict[Text, Any]]
95                  resources={},           # type: Dict[str, int]
96                  mutation_manager=None,    # type: Optional[MutationManager]
97                  formatgraph=None,         # type: Optional[Graph]
98                  make_fs_access=None,      # type: Type[StdFsAccess]
99                  fs_access=None,           # type: StdFsAccess
100                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
101                  timeout=runtimeContext.eval_timeout,             # type: float
102                  debug=runtimeContext.debug,               # type: bool
103                  js_console=runtimeContext.js_console,          # type: bool
104                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
105                  loadListing="",         # type: Text
106                  outdir="",              # type: Text
107                  tmpdir="",              # type: Text
108                  stagedir="",            # type: Text
109                 )
110
111 def search_schemadef(name, reqs):
112     for r in reqs:
113         if r["class"] == "SchemaDefRequirement":
114             for sd in r["types"]:
115                 if sd["name"] == name:
116                     return sd
117     return None
118
119 primitive_types_set = frozenset(("null", "boolean", "int", "long",
120                                  "float", "double", "string", "record",
121                                  "array", "enum"))
122
123 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
124     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
125         # union type, collect all possible secondaryFiles
126         for i in inputschema:
127             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
128         return
129
130     if isinstance(inputschema, basestring):
131         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
132         if sd:
133             inputschema = sd
134         else:
135             return
136
137     if "secondaryFiles" in inputschema:
138         # set secondaryFiles, may be inherited by compound types.
139         secondaryspec = inputschema["secondaryFiles"]
140
141     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
142         not isinstance(inputschema["type"], basestring)):
143         # compound type (union, array, record)
144         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
145
146     elif (inputschema["type"] == "record" and
147           isinstance(primary, Mapping)):
148         #
149         # record type, find secondary files associated with fields.
150         #
151         for f in inputschema["fields"]:
152             p = primary.get(shortname(f["name"]))
153             if p:
154                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
155
156     elif (inputschema["type"] == "array" and
157           isinstance(primary, Sequence)):
158         #
159         # array type, find secondary files of elements
160         #
161         for p in primary:
162             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
163
164     elif (inputschema["type"] == "File" and
165           secondaryspec and
166           isinstance(primary, Mapping) and
167           primary.get("class") == "File" and
168           "secondaryFiles" not in primary):
169         #
170         # Found a file, check for secondaryFiles
171         #
172         specs = []
173         primary["secondaryFiles"] = secondaryspec
174         for i, sf in enumerate(aslist(secondaryspec)):
175             pattern = builder.do_eval(sf["pattern"], context=primary)
176             if pattern is None:
177                 continue
178             if isinstance(pattern, list):
179                 specs.extend(pattern)
180             elif isinstance(pattern, dict):
181                 specs.append(pattern)
182             elif isinstance(pattern, str):
183                 specs.append({"pattern": pattern})
184             else:
185                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
186                     "Expression must return list, object, string or null")
187
188         found = []
189         for i, sf in enumerate(specs):
190             if isinstance(sf, dict):
191                 if sf.get("class") == "File":
192                     pattern = sf["basename"]
193                 else:
194                     pattern = sf["pattern"]
195                     required = sf.get("required")
196             elif isinstance(sf, str):
197                 pattern = sf
198                 required = True
199             else:
200                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
201                     "Expression must return list, object, string or null")
202
203             sfpath = substitute(primary["location"], pattern)
204             required = builder.do_eval(required, context=primary)
205
206             if fsaccess.exists(sfpath):
207                 found.append({"location": sfpath, "class": "File"})
208             elif required:
209                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
210                     "Required secondary file '%s' does not exist" % sfpath)
211
212         primary["secondaryFiles"] = cmap(found)
213         if discovered is not None:
214             discovered[primary["location"]] = primary["secondaryFiles"]
215     elif inputschema["type"] not in primitive_types_set:
216         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
217
218 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
219     for inputschema in inputs:
220         primary = job_order.get(shortname(inputschema["id"]))
221         if isinstance(primary, (Mapping, Sequence)):
222             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
223
224 def upload_dependencies(arvrunner, name, document_loader,
225                         workflowobj, uri, loadref_run,
226                         include_primary=True, discovered_secondaryfiles=None):
227     """Upload the dependencies of the workflowobj document to Keep.
228
229     Returns a pathmapper object mapping local paths to keep references.  Also
230     does an in-place update of references in "workflowobj".
231
232     Use scandeps to find $import, $include, $schemas, run, File and Directory
233     fields that represent external references.
234
235     If workflowobj has an "id" field, this will reload the document to ensure
236     it is scanning the raw document prior to preprocessing.
237     """
238
239     loaded = set()
240     def loadref(b, u):
241         joined = document_loader.fetcher.urljoin(b, u)
242         defrg, _ = urllib.parse.urldefrag(joined)
243         if defrg not in loaded:
244             loaded.add(defrg)
245             # Use fetch_text to get raw file (before preprocessing).
246             text = document_loader.fetch_text(defrg)
247             if isinstance(text, bytes):
248                 textIO = StringIO(text.decode('utf-8'))
249             else:
250                 textIO = StringIO(text)
251             return yaml.safe_load(textIO)
252         else:
253             return {}
254
255     if loadref_run:
256         loadref_fields = set(("$import", "run"))
257     else:
258         loadref_fields = set(("$import",))
259
260     scanobj = workflowobj
261     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
262         # Need raw file content (before preprocessing) to ensure
263         # that external references in $include and $mixin are captured.
264         scanobj = loadref("", workflowobj["id"])
265
266     sc_result = scandeps(uri, scanobj,
267                   loadref_fields,
268                   set(("$include", "$schemas", "location")),
269                   loadref, urljoin=document_loader.fetcher.urljoin)
270
271     sc = []
272     uuids = {}
273
274     def collect_uuids(obj):
275         loc = obj.get("location", "")
276         sp = loc.split(":")
277         if sp[0] == "keep":
278             # Collect collection uuids that need to be resolved to
279             # portable data hashes
280             gp = collection_uuid_pattern.match(loc)
281             if gp:
282                 uuids[gp.groups()[0]] = obj
283             if collectionUUID in obj:
284                 uuids[obj[collectionUUID]] = obj
285
286     def collect_uploads(obj):
287         loc = obj.get("location", "")
288         sp = loc.split(":")
289         if len(sp) < 1:
290             return
291         if sp[0] in ("file", "http", "https"):
292             # Record local files than need to be uploaded,
293             # don't include file literals, keep references, etc.
294             sc.append(obj)
295         collect_uuids(obj)
296
297     visit_class(workflowobj, ("File", "Directory"), collect_uuids)
298     visit_class(sc_result, ("File", "Directory"), collect_uploads)
299
300     # Resolve any collection uuids we found to portable data hashes
301     # and assign them to uuid_map
302     uuid_map = {}
303     fetch_uuids = list(uuids.keys())
304     while fetch_uuids:
305         # For a large number of fetch_uuids, API server may limit
306         # response size, so keep fetching from API server has nothing
307         # more to give us.
308         lookups = arvrunner.api.collections().list(
309             filters=[["uuid", "in", fetch_uuids]],
310             count="none",
311             select=["uuid", "portable_data_hash"]).execute(
312                 num_retries=arvrunner.num_retries)
313
314         if not lookups["items"]:
315             break
316
317         for l in lookups["items"]:
318             uuid_map[l["uuid"]] = l["portable_data_hash"]
319
320         fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
321
322     normalizeFilesDirs(sc)
323
324     if include_primary and "id" in workflowobj:
325         sc.append({"class": "File", "location": workflowobj["id"]})
326
327     if "$schemas" in workflowobj:
328         for s in workflowobj["$schemas"]:
329             sc.append({"class": "File", "location": s})
330
331     def visit_default(obj):
332         remove = [False]
333         def ensure_default_location(f):
334             if "location" not in f and "path" in f:
335                 f["location"] = f["path"]
336                 del f["path"]
337             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
338                 # Doesn't exist, remove from list of dependencies to upload
339                 sc[:] = [x for x in sc if x["location"] != f["location"]]
340                 # Delete "default" from workflowobj
341                 remove[0] = True
342         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
343         if remove[0]:
344             del obj["default"]
345
346     find_defaults(workflowobj, visit_default)
347
348     discovered = {}
349     def discover_default_secondary_files(obj):
350         builder_job_order = {}
351         for t in obj["inputs"]:
352             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
353         # Need to create a builder object to evaluate expressions.
354         builder = make_builder(builder_job_order,
355                                obj.get("hints", []),
356                                obj.get("requirements", []),
357                                ArvRuntimeContext())
358         discover_secondary_files(arvrunner.fs_access,
359                                  builder,
360                                  obj["inputs"],
361                                  builder_job_order,
362                                  discovered)
363
364     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
365     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
366
367     for d in list(discovered):
368         # Only interested in discovered secondaryFiles which are local
369         # files that need to be uploaded.
370         if d.startswith("file:"):
371             sc.extend(discovered[d])
372         else:
373             del discovered[d]
374
375     mapper = ArvPathMapper(arvrunner, sc, "",
376                            "keep:%s",
377                            "keep:%s/%s",
378                            name=name,
379                            single_collection=True)
380
381     def setloc(p):
382         loc = p.get("location")
383         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
384             p["location"] = mapper.mapper(p["location"]).resolved
385             return
386
387         if not loc:
388             return
389
390         if collectionUUID in p:
391             uuid = p[collectionUUID]
392             if uuid not in uuid_map:
393                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
394                     "Collection uuid %s not found" % uuid)
395             gp = collection_pdh_pattern.match(loc)
396             if gp and uuid_map[uuid] != gp.groups()[0]:
397                 # This file entry has both collectionUUID and a PDH
398                 # location. If the PDH doesn't match the one returned
399                 # the API server, raise an error.
400                 raise SourceLine(p, "location", validate.ValidationException).makeError(
401                     "Expected collection uuid %s to be %s but API server reported %s" % (
402                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
403
404         gp = collection_uuid_pattern.match(loc)
405         if not gp:
406             return
407         uuid = gp.groups()[0]
408         if uuid not in uuid_map:
409             raise SourceLine(p, "location", validate.ValidationException).makeError(
410                 "Collection uuid %s not found" % uuid)
411         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
412         p[collectionUUID] = uuid
413
414     visit_class(workflowobj, ("File", "Directory"), setloc)
415     visit_class(discovered, ("File", "Directory"), setloc)
416
417     if discovered_secondaryfiles is not None:
418         for d in discovered:
419             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
420
421     if "$schemas" in workflowobj:
422         sch = CommentedSeq()
423         for s in workflowobj["$schemas"]:
424             sch.append(mapper.mapper(s).resolved)
425         workflowobj["$schemas"] = sch
426
427     return mapper
428
429
430 def upload_docker(arvrunner, tool):
431     """Uploads Docker images used in CommandLineTool objects."""
432
433     if isinstance(tool, CommandLineTool):
434         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
435         if docker_req:
436             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
437                 # TODO: can be supported by containers API, but not jobs API.
438                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
439                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
440             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
441         else:
442             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
443     elif isinstance(tool, cwltool.workflow.Workflow):
444         for s in tool.steps:
445             upload_docker(arvrunner, s.embedded_tool)
446
447
448 def packed_workflow(arvrunner, tool, merged_map):
449     """Create a packed workflow.
450
451     A "packed" workflow is one where all the components have been combined into a single document."""
452
453     rewrites = {}
454     packed = pack(arvrunner.loadingContext, tool.tool["id"],
455                   rewrite_out=rewrites,
456                   loader=tool.doc_loader)
457
458     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
459
460     def visit(v, cur_id):
461         if isinstance(v, dict):
462             if v.get("class") in ("CommandLineTool", "Workflow"):
463                 if "id" in v:
464                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
465             if "path" in v and "location" not in v:
466                 v["location"] = v["path"]
467                 del v["path"]
468             if "location" in v and not v["location"].startswith("keep:"):
469                 v["location"] = merged_map[cur_id].resolved[v["location"]]
470             if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
471                 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
472             if v.get("class") == "DockerRequirement":
473                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
474             for l in v:
475                 visit(v[l], cur_id)
476         if isinstance(v, list):
477             for l in v:
478                 visit(l, cur_id)
479     visit(packed, None)
480     return packed
481
482
483 def tag_git_version(packed):
484     if tool.tool["id"].startswith("file://"):
485         path = os.path.dirname(tool.tool["id"][7:])
486         try:
487             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
488         except (OSError, subprocess.CalledProcessError):
489             pass
490         else:
491             packed["http://schema.org/version"] = githash
492
493
494 def upload_job_order(arvrunner, name, tool, job_order):
495     """Upload local files referenced in the input object and return updated input
496     object with 'location' updated to the proper keep references.
497     """
498
499     # Make a copy of the job order and set defaults.
500     builder_job_order = copy.copy(job_order)
501
502     # fill_in_defaults throws an error if there are any
503     # missing required parameters, we don't want it to do that
504     # so make them all optional.
505     inputs_copy = copy.deepcopy(tool.tool["inputs"])
506     for i in inputs_copy:
507         if "null" not in i["type"]:
508             i["type"] = ["null"] + aslist(i["type"])
509
510     fill_in_defaults(inputs_copy,
511                      builder_job_order,
512                      arvrunner.fs_access)
513     # Need to create a builder object to evaluate expressions.
514     builder = make_builder(builder_job_order,
515                            tool.hints,
516                            tool.requirements,
517                            ArvRuntimeContext())
518     # Now update job_order with secondaryFiles
519     discover_secondary_files(arvrunner.fs_access,
520                              builder,
521                              tool.tool["inputs"],
522                              job_order)
523
524     jobmapper = upload_dependencies(arvrunner,
525                                     name,
526                                     tool.doc_loader,
527                                     job_order,
528                                     job_order.get("id", "#"),
529                                     False)
530
531     if "id" in job_order:
532         del job_order["id"]
533
534     # Need to filter this out, gets added by cwltool when providing
535     # parameters on the command line.
536     if "job_order" in job_order:
537         del job_order["job_order"]
538
539     return job_order
540
541 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
542
543 def upload_workflow_deps(arvrunner, tool):
544     # Ensure that Docker images needed by this workflow are available
545
546     upload_docker(arvrunner, tool)
547
548     document_loader = tool.doc_loader
549
550     merged_map = {}
551
552     def upload_tool_deps(deptool):
553         if "id" in deptool:
554             discovered_secondaryfiles = {}
555             pm = upload_dependencies(arvrunner,
556                                      "%s dependencies" % (shortname(deptool["id"])),
557                                      document_loader,
558                                      deptool,
559                                      deptool["id"],
560                                      False,
561                                      include_primary=False,
562                                      discovered_secondaryfiles=discovered_secondaryfiles)
563             document_loader.idx[deptool["id"]] = deptool
564             toolmap = {}
565             for k,v in pm.items():
566                 toolmap[k] = v.resolved
567             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
568
569     tool.visit(upload_tool_deps)
570
571     return merged_map
572
573 def arvados_jobs_image(arvrunner, img):
574     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
575
576     try:
577         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
578     except Exception as e:
579         raise Exception("Docker image %s is not available\n%s" % (img, e) )
580
581
582 def upload_workflow_collection(arvrunner, name, packed):
583     collection = arvados.collection.Collection(api_client=arvrunner.api,
584                                                keep_client=arvrunner.keep_client,
585                                                num_retries=arvrunner.num_retries)
586     with collection.open("workflow.cwl", "w") as f:
587         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
588
589     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
590                ["name", "like", name+"%"]]
591     if arvrunner.project_uuid:
592         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
593     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
594
595     if exists["items"]:
596         logger.info("Using collection %s", exists["items"][0]["uuid"])
597     else:
598         collection.save_new(name=name,
599                             owner_uuid=arvrunner.project_uuid,
600                             ensure_unique_name=True,
601                             num_retries=arvrunner.num_retries)
602         logger.info("Uploaded to %s", collection.manifest_locator())
603
604     return collection.portable_data_hash()
605
606
607 class Runner(Process):
608     """Base class for runner processes, which submit an instance of
609     arvados-cwl-runner and wait for the final result."""
610
611     def __init__(self, runner, updated_tool,
612                  tool, loadingContext, enable_reuse,
613                  output_name, output_tags, submit_runner_ram=0,
614                  name=None, on_error=None, submit_runner_image=None,
615                  intermediate_output_ttl=0, merged_map=None,
616                  priority=None, secret_store=None,
617                  collection_cache_size=256,
618                  collection_cache_is_default=True):
619
620         loadingContext = loadingContext.copy()
621         loadingContext.metadata = updated_tool.metadata.copy()
622
623         super(Runner, self).__init__(updated_tool.tool, loadingContext)
624
625         self.arvrunner = runner
626         self.embedded_tool = tool
627         self.job_order = None
628         self.running = False
629         if enable_reuse:
630             # If reuse is permitted by command line arguments but
631             # disabled by the workflow itself, disable it.
632             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
633             if reuse_req:
634                 enable_reuse = reuse_req["enableReuse"]
635         self.enable_reuse = enable_reuse
636         self.uuid = None
637         self.final_output = None
638         self.output_name = output_name
639         self.output_tags = output_tags
640         self.name = name
641         self.on_error = on_error
642         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
643         self.intermediate_output_ttl = intermediate_output_ttl
644         self.priority = priority
645         self.secret_store = secret_store
646         self.enable_dev = loadingContext.enable_dev
647
648         self.submit_runner_cores = 1
649         self.submit_runner_ram = 1024  # defaut 1 GiB
650         self.collection_cache_size = collection_cache_size
651
652         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
653         if runner_resource_req:
654             if runner_resource_req.get("coresMin"):
655                 self.submit_runner_cores = runner_resource_req["coresMin"]
656             if runner_resource_req.get("ramMin"):
657                 self.submit_runner_ram = runner_resource_req["ramMin"]
658             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
659                 self.collection_cache_size = runner_resource_req["keep_cache"]
660
661         if submit_runner_ram:
662             # Command line / initializer overrides default and/or spec from workflow
663             self.submit_runner_ram = submit_runner_ram
664
665         if self.submit_runner_ram <= 0:
666             raise Exception("Value of submit-runner-ram must be greater than zero")
667
668         if self.submit_runner_cores <= 0:
669             raise Exception("Value of submit-runner-cores must be greater than zero")
670
671         self.merged_map = merged_map or {}
672
673     def job(self,
674             job_order,         # type: Mapping[Text, Text]
675             output_callbacks,  # type: Callable[[Any, Any], Any]
676             runtimeContext     # type: RuntimeContext
677            ):  # type: (...) -> Generator[Any, None, None]
678         self.job_order = job_order
679         self._init_job(job_order, runtimeContext)
680         yield self
681
682     def update_pipeline_component(self, record):
683         pass
684
685     def done(self, record):
686         """Base method for handling a completed runner."""
687
688         try:
689             if record["state"] == "Complete":
690                 if record.get("exit_code") is not None:
691                     if record["exit_code"] == 33:
692                         processStatus = "UnsupportedRequirement"
693                     elif record["exit_code"] == 0:
694                         processStatus = "success"
695                     else:
696                         processStatus = "permanentFail"
697                 else:
698                     processStatus = "success"
699             else:
700                 processStatus = "permanentFail"
701
702             outputs = {}
703
704             if processStatus == "permanentFail":
705                 logc = arvados.collection.CollectionReader(record["log"],
706                                                            api_client=self.arvrunner.api,
707                                                            keep_client=self.arvrunner.keep_client,
708                                                            num_retries=self.arvrunner.num_retries)
709                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
710
711             self.final_output = record["output"]
712             outc = arvados.collection.CollectionReader(self.final_output,
713                                                        api_client=self.arvrunner.api,
714                                                        keep_client=self.arvrunner.keep_client,
715                                                        num_retries=self.arvrunner.num_retries)
716             if "cwl.output.json" in outc:
717                 with outc.open("cwl.output.json", "rb") as f:
718                     if f.size() > 0:
719                         outputs = json.loads(f.read().decode())
720             def keepify(fileobj):
721                 path = fileobj["location"]
722                 if not path.startswith("keep:"):
723                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
724             adjustFileObjs(outputs, keepify)
725             adjustDirObjs(outputs, keepify)
726         except Exception:
727             logger.exception("[%s] While getting final output object", self.name)
728             self.arvrunner.output_callback({}, "permanentFail")
729         else:
730             self.arvrunner.output_callback(outputs, processStatus)