1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
27 from schema_salad.sourceline import SourceLine, cmap
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32 shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
41 import arvados.collection
43 from .util import collectionUUID
44 from ruamel.yaml import YAML
45 from ruamel.yaml.comments import CommentedMap, CommentedSeq
47 import arvados_cwl.arvdocker
48 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
49 from ._version import __version__
51 from . context import ArvRuntimeContext
53 logger = logging.getLogger('arvados.cwl-runner')
55 def trim_anonymous_location(obj):
56 """Remove 'location' field from File and Directory literals.
58 To make internal handling easier, literals are assigned a random id for
59 'location'. However, when writing the record back out, this can break
60 reproducibility. Since it is valid for literals not have a 'location'
65 if obj.get("location", "").startswith("_:"):
69 def remove_redundant_fields(obj):
70 for field in ("path", "nameext", "nameroot", "dirname"):
75 def find_defaults(d, op):
76 if isinstance(d, list):
79 elif isinstance(d, dict):
83 for i in viewvalues(d):
86 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
89 files=[], # type: List[Dict[Text, Text]]
90 bindings=[], # type: List[Dict[Text, Any]]
91 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
92 names=None, # type: Names
93 requirements=requirements, # type: List[Dict[Text, Any]]
94 hints=hints, # type: List[Dict[Text, Any]]
95 resources={}, # type: Dict[str, int]
96 mutation_manager=None, # type: Optional[MutationManager]
97 formatgraph=None, # type: Optional[Graph]
98 make_fs_access=None, # type: Type[StdFsAccess]
99 fs_access=None, # type: StdFsAccess
100 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
101 timeout=runtimeContext.eval_timeout, # type: float
102 debug=runtimeContext.debug, # type: bool
103 js_console=runtimeContext.js_console, # type: bool
104 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
105 loadListing="", # type: Text
106 outdir="", # type: Text
107 tmpdir="", # type: Text
108 stagedir="", # type: Text
109 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
110 container_engine="docker"
113 def search_schemadef(name, reqs):
115 if r["class"] == "SchemaDefRequirement":
116 for sd in r["types"]:
117 if sd["name"] == name:
121 primitive_types_set = frozenset(("null", "boolean", "int", "long",
122 "float", "double", "string", "record",
125 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
126 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
127 # union type, collect all possible secondaryFiles
128 for i in inputschema:
129 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
132 if inputschema == "File":
133 inputschema = {"type": "File"}
135 if isinstance(inputschema, basestring):
136 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
142 if "secondaryFiles" in inputschema:
143 # set secondaryFiles, may be inherited by compound types.
144 secondaryspec = inputschema["secondaryFiles"]
146 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
147 not isinstance(inputschema["type"], basestring)):
148 # compound type (union, array, record)
149 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
151 elif (inputschema["type"] == "record" and
152 isinstance(primary, Mapping)):
154 # record type, find secondary files associated with fields.
156 for f in inputschema["fields"]:
157 p = primary.get(shortname(f["name"]))
159 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
161 elif (inputschema["type"] == "array" and
162 isinstance(primary, Sequence)):
164 # array type, find secondary files of elements
167 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
169 elif (inputschema["type"] == "File" and
170 isinstance(primary, Mapping) and
171 primary.get("class") == "File"):
173 if "secondaryFiles" in primary or not secondaryspec:
178 # Found a file, check for secondaryFiles
181 primary["secondaryFiles"] = secondaryspec
182 for i, sf in enumerate(aslist(secondaryspec)):
183 if builder.cwlVersion == "v1.0":
186 pattern = sf["pattern"]
189 if isinstance(pattern, list):
190 specs.extend(pattern)
191 elif isinstance(pattern, dict):
192 specs.append(pattern)
193 elif isinstance(pattern, str):
194 if builder.cwlVersion == "v1.0":
195 specs.append({"pattern": pattern, "required": True})
197 specs.append({"pattern": pattern, "required": sf.get("required")})
199 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
200 "Expression must return list, object, string or null")
203 for i, sf in enumerate(specs):
204 if isinstance(sf, dict):
205 if sf.get("class") == "File":
207 if sf.get("location") is None:
208 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
209 "File object is missing 'location': %s" % sf)
210 sfpath = sf["location"]
213 pattern = sf["pattern"]
214 required = sf.get("required")
215 elif isinstance(sf, str):
219 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
220 "Expression must return list, object, string or null")
222 if pattern is not None:
223 if "${" in pattern or "$(" in pattern:
224 sfname = builder.do_eval(pattern, context=primary)
226 sfname = substitute(primary["basename"], pattern)
231 p_location = primary["location"]
232 if "/" in p_location:
234 p_location[0 : p_location.rindex("/") + 1]
238 required = builder.do_eval(required, context=primary)
240 if fsaccess.exists(sfpath):
241 if pattern is not None:
242 found.append({"location": sfpath, "class": "File"})
246 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
247 "Required secondary file '%s' does not exist" % sfpath)
249 primary["secondaryFiles"] = cmap(found)
250 if discovered is not None:
251 discovered[primary["location"]] = primary["secondaryFiles"]
252 elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
253 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
255 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
256 for inputschema in inputs:
257 primary = job_order.get(shortname(inputschema["id"]))
258 if isinstance(primary, (Mapping, Sequence)):
259 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
261 def upload_dependencies(arvrunner, name, document_loader,
262 workflowobj, uri, loadref_run, runtimeContext,
263 include_primary=True, discovered_secondaryfiles=None):
264 """Upload the dependencies of the workflowobj document to Keep.
266 Returns a pathmapper object mapping local paths to keep references. Also
267 does an in-place update of references in "workflowobj".
269 Use scandeps to find $import, $include, $schemas, run, File and Directory
270 fields that represent external references.
272 If workflowobj has an "id" field, this will reload the document to ensure
273 it is scanning the raw document prior to preprocessing.
278 joined = document_loader.fetcher.urljoin(b, u)
279 defrg, _ = urllib.parse.urldefrag(joined)
280 if defrg not in loaded:
282 # Use fetch_text to get raw file (before preprocessing).
283 text = document_loader.fetch_text(defrg)
284 if isinstance(text, bytes):
285 textIO = StringIO(text.decode('utf-8'))
287 textIO = StringIO(text)
288 yamlloader = YAML(typ='safe', pure=True)
289 return yamlloader.load(textIO)
294 loadref_fields = set(("$import", "run"))
296 loadref_fields = set(("$import",))
298 scanobj = workflowobj
299 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
300 # Need raw file content (before preprocessing) to ensure
301 # that external references in $include and $mixin are captured.
302 scanobj = loadref("", workflowobj["id"])
306 sc_result = scandeps(uri, scanobj,
308 set(("$include", "location")),
309 loadref, urljoin=document_loader.fetcher.urljoin,
312 optional_deps = scandeps(uri, scanobj,
315 loadref, urljoin=document_loader.fetcher.urljoin,
318 sc_result.extend(optional_deps)
323 def collect_uuids(obj):
324 loc = obj.get("location", "")
327 # Collect collection uuids that need to be resolved to
328 # portable data hashes
329 gp = collection_uuid_pattern.match(loc)
331 uuids[gp.groups()[0]] = obj
332 if collectionUUID in obj:
333 uuids[obj[collectionUUID]] = obj
335 def collect_uploads(obj):
336 loc = obj.get("location", "")
340 if sp[0] in ("file", "http", "https"):
341 # Record local files than need to be uploaded,
342 # don't include file literals, keep references, etc.
346 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
347 visit_class(sc_result, ("File", "Directory"), collect_uploads)
349 # Resolve any collection uuids we found to portable data hashes
350 # and assign them to uuid_map
352 fetch_uuids = list(uuids.keys())
354 # For a large number of fetch_uuids, API server may limit
355 # response size, so keep fetching from API server has nothing
357 lookups = arvrunner.api.collections().list(
358 filters=[["uuid", "in", fetch_uuids]],
360 select=["uuid", "portable_data_hash"]).execute(
361 num_retries=arvrunner.num_retries)
363 if not lookups["items"]:
366 for l in lookups["items"]:
367 uuid_map[l["uuid"]] = l["portable_data_hash"]
369 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
371 normalizeFilesDirs(sc)
373 if include_primary and "id" in workflowobj:
374 sc.append({"class": "File", "location": workflowobj["id"]})
376 def visit_default(obj):
377 def defaults_are_optional(f):
378 if "location" not in f and "path" in f:
379 f["location"] = f["path"]
381 normalizeFilesDirs(f)
382 optional_deps.append(f)
383 visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
385 find_defaults(workflowobj, visit_default)
388 def discover_default_secondary_files(obj):
389 builder_job_order = {}
390 for t in obj["inputs"]:
391 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
392 # Need to create a builder object to evaluate expressions.
393 builder = make_builder(builder_job_order,
394 obj.get("hints", []),
395 obj.get("requirements", []),
398 discover_secondary_files(arvrunner.fs_access,
404 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
405 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
407 for d in list(discovered):
408 # Only interested in discovered secondaryFiles which are local
409 # files that need to be uploaded.
410 if d.startswith("file:"):
411 sc.extend(discovered[d])
415 mapper = ArvPathMapper(arvrunner, sc, "",
419 single_collection=True,
420 optional_deps=optional_deps)
424 if k.startswith("keep:"):
425 keeprefs.add(collection_pdh_pattern.match(k).group(1))
428 loc = p.get("location")
429 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
430 p["location"] = mapper.mapper(p["location"]).resolved
431 addkeepref(p["location"])
437 if collectionUUID in p:
438 uuid = p[collectionUUID]
439 if uuid not in uuid_map:
440 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
441 "Collection uuid %s not found" % uuid)
442 gp = collection_pdh_pattern.match(loc)
443 if gp and uuid_map[uuid] != gp.groups()[0]:
444 # This file entry has both collectionUUID and a PDH
445 # location. If the PDH doesn't match the one returned
446 # the API server, raise an error.
447 raise SourceLine(p, "location", validate.ValidationException).makeError(
448 "Expected collection uuid %s to be %s but API server reported %s" % (
449 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
451 gp = collection_uuid_pattern.match(loc)
453 # Not a uuid pattern (must be a pdh pattern)
454 addkeepref(p["location"])
457 uuid = gp.groups()[0]
458 if uuid not in uuid_map:
459 raise SourceLine(p, "location", validate.ValidationException).makeError(
460 "Collection uuid %s not found" % uuid)
461 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
462 p[collectionUUID] = uuid
464 visit_class(workflowobj, ("File", "Directory"), setloc)
465 visit_class(discovered, ("File", "Directory"), setloc)
467 if discovered_secondaryfiles is not None:
469 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
471 if runtimeContext.copy_deps:
472 # Find referenced collections and copy them into the
473 # destination project, for easy sharing.
474 already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
475 filters=[["portable_data_hash", "in", list(keeprefs)],
476 ["owner_uuid", "=", runtimeContext.project_uuid]],
477 select=["uuid", "portable_data_hash", "created_at"]))
479 keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
481 col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
482 order="created_at desc",
483 select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
485 if len(col["items"]) == 0:
486 logger.warning("Cannot find collection with portable data hash %s", kr)
488 col = col["items"][0]
490 arvrunner.api.collections().create(body={"collection": {
491 "owner_uuid": runtimeContext.project_uuid,
493 "description": col["description"],
494 "properties": col["properties"],
495 "portable_data_hash": col["portable_data_hash"],
496 "manifest_text": col["manifest_text"],
497 "storage_classes_desired": col["storage_classes_desired"],
498 "trash_at": col["trash_at"]
499 }}, ensure_unique_name=True).execute()
500 except Exception as e:
501 logger.warning("Unable copy collection to destination: %s", e)
503 if "$schemas" in workflowobj:
505 for s in workflowobj["$schemas"]:
507 sch.append(mapper.mapper(s).resolved)
508 workflowobj["$schemas"] = sch
513 def upload_docker(arvrunner, tool, runtimeContext):
514 """Uploads Docker images used in CommandLineTool objects."""
516 if isinstance(tool, CommandLineTool):
517 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
519 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
520 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
521 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
523 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
524 runtimeContext.project_uuid,
525 runtimeContext.force_docker_pull,
526 runtimeContext.tmp_outdir_prefix,
527 runtimeContext.match_local_docker,
528 runtimeContext.copy_deps)
530 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
532 runtimeContext.project_uuid,
533 runtimeContext.force_docker_pull,
534 runtimeContext.tmp_outdir_prefix,
535 runtimeContext.match_local_docker,
536 runtimeContext.copy_deps)
537 elif isinstance(tool, cwltool.workflow.Workflow):
539 upload_docker(arvrunner, s.embedded_tool, runtimeContext)
542 def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
543 """Create a packed workflow.
545 A "packed" workflow is one where all the components have been combined into a single document."""
548 packed = pack(arvrunner.loadingContext, tool.tool["id"],
549 rewrite_out=rewrites,
550 loader=tool.doc_loader)
552 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
554 def visit(v, cur_id):
555 if isinstance(v, dict):
556 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
557 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
558 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
560 cur_id = rewrite_to_orig.get(v["id"], v["id"])
561 if "path" in v and "location" not in v:
562 v["location"] = v["path"]
564 if "location" in v and cur_id in merged_map:
565 if v["location"] in merged_map[cur_id].resolved:
566 v["location"] = merged_map[cur_id].resolved[v["location"]]
567 if v["location"] in merged_map[cur_id].secondaryFiles:
568 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
569 if v.get("class") == "DockerRequirement":
570 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
571 runtimeContext.project_uuid,
572 runtimeContext.force_docker_pull,
573 runtimeContext.tmp_outdir_prefix,
574 runtimeContext.match_local_docker,
575 runtimeContext.copy_deps)
578 if isinstance(v, list):
585 def tag_git_version(packed):
586 if tool.tool["id"].startswith("file://"):
587 path = os.path.dirname(tool.tool["id"][7:])
589 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
590 except (OSError, subprocess.CalledProcessError):
593 packed["http://schema.org/version"] = githash
596 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
597 """Upload local files referenced in the input object and return updated input
598 object with 'location' updated to the proper keep references.
601 # Make a copy of the job order and set defaults.
602 builder_job_order = copy.copy(job_order)
604 # fill_in_defaults throws an error if there are any
605 # missing required parameters, we don't want it to do that
606 # so make them all optional.
607 inputs_copy = copy.deepcopy(tool.tool["inputs"])
608 for i in inputs_copy:
609 if "null" not in i["type"]:
610 i["type"] = ["null"] + aslist(i["type"])
612 fill_in_defaults(inputs_copy,
615 # Need to create a builder object to evaluate expressions.
616 builder = make_builder(builder_job_order,
621 # Now update job_order with secondaryFiles
622 discover_secondary_files(arvrunner.fs_access,
627 jobmapper = upload_dependencies(arvrunner,
631 job_order.get("id", "#"),
635 if "id" in job_order:
638 # Need to filter this out, gets added by cwltool when providing
639 # parameters on the command line.
640 if "job_order" in job_order:
641 del job_order["job_order"]
645 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
647 def upload_workflow_deps(arvrunner, tool, runtimeContext):
648 # Ensure that Docker images needed by this workflow are available
650 upload_docker(arvrunner, tool, runtimeContext)
652 document_loader = tool.doc_loader
656 def upload_tool_deps(deptool):
658 discovered_secondaryfiles = {}
659 pm = upload_dependencies(arvrunner,
660 "%s dependencies" % (shortname(deptool["id"])),
666 include_primary=False,
667 discovered_secondaryfiles=discovered_secondaryfiles)
668 document_loader.idx[deptool["id"]] = deptool
670 for k,v in pm.items():
671 toolmap[k] = v.resolved
672 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
674 tool.visit(upload_tool_deps)
678 def arvados_jobs_image(arvrunner, img, runtimeContext):
679 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
682 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
684 runtimeContext.project_uuid,
685 runtimeContext.force_docker_pull,
686 runtimeContext.tmp_outdir_prefix,
687 runtimeContext.match_local_docker,
688 runtimeContext.copy_deps)
689 except Exception as e:
690 raise Exception("Docker image %s is not available\n%s" % (img, e) )
693 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
694 collection = arvados.collection.Collection(api_client=arvrunner.api,
695 keep_client=arvrunner.keep_client,
696 num_retries=arvrunner.num_retries)
697 with collection.open("workflow.cwl", "w") as f:
698 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
700 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
701 ["name", "like", name+"%"]]
702 if runtimeContext.project_uuid:
703 filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
704 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
707 logger.info("Using collection %s", exists["items"][0]["uuid"])
709 collection.save_new(name=name,
710 owner_uuid=runtimeContext.project_uuid,
711 ensure_unique_name=True,
712 num_retries=arvrunner.num_retries)
713 logger.info("Uploaded to %s", collection.manifest_locator())
715 return collection.portable_data_hash()
718 class Runner(Process):
719 """Base class for runner processes, which submit an instance of
720 arvados-cwl-runner and wait for the final result."""
722 def __init__(self, runner, updated_tool,
723 tool, loadingContext, enable_reuse,
724 output_name, output_tags, submit_runner_ram=0,
725 name=None, on_error=None, submit_runner_image=None,
726 intermediate_output_ttl=0, merged_map=None,
727 priority=None, secret_store=None,
728 collection_cache_size=256,
729 collection_cache_is_default=True):
731 loadingContext = loadingContext.copy()
732 loadingContext.metadata = updated_tool.metadata.copy()
734 super(Runner, self).__init__(updated_tool.tool, loadingContext)
736 self.arvrunner = runner
737 self.embedded_tool = tool
738 self.job_order = None
741 # If reuse is permitted by command line arguments but
742 # disabled by the workflow itself, disable it.
743 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
745 enable_reuse = reuse_req["enableReuse"]
746 self.enable_reuse = enable_reuse
748 self.final_output = None
749 self.output_name = output_name
750 self.output_tags = output_tags
752 self.on_error = on_error
753 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
754 self.intermediate_output_ttl = intermediate_output_ttl
755 self.priority = priority
756 self.secret_store = secret_store
757 self.enable_dev = loadingContext.enable_dev
759 self.submit_runner_cores = 1
760 self.submit_runner_ram = 1024 # defaut 1 GiB
761 self.collection_cache_size = collection_cache_size
763 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
764 if runner_resource_req:
765 if runner_resource_req.get("coresMin"):
766 self.submit_runner_cores = runner_resource_req["coresMin"]
767 if runner_resource_req.get("ramMin"):
768 self.submit_runner_ram = runner_resource_req["ramMin"]
769 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
770 self.collection_cache_size = runner_resource_req["keep_cache"]
772 if submit_runner_ram:
773 # Command line / initializer overrides default and/or spec from workflow
774 self.submit_runner_ram = submit_runner_ram
776 if self.submit_runner_ram <= 0:
777 raise Exception("Value of submit-runner-ram must be greater than zero")
779 if self.submit_runner_cores <= 0:
780 raise Exception("Value of submit-runner-cores must be greater than zero")
782 self.merged_map = merged_map or {}
785 job_order, # type: Mapping[Text, Text]
786 output_callbacks, # type: Callable[[Any, Any], Any]
787 runtimeContext # type: RuntimeContext
788 ): # type: (...) -> Generator[Any, None, None]
789 self.job_order = job_order
790 self._init_job(job_order, runtimeContext)
793 def update_pipeline_component(self, record):
796 def done(self, record):
797 """Base method for handling a completed runner."""
800 if record["state"] == "Complete":
801 if record.get("exit_code") is not None:
802 if record["exit_code"] == 33:
803 processStatus = "UnsupportedRequirement"
804 elif record["exit_code"] == 0:
805 processStatus = "success"
807 processStatus = "permanentFail"
809 processStatus = "success"
811 processStatus = "permanentFail"
815 if processStatus == "permanentFail":
816 logc = arvados.collection.CollectionReader(record["log"],
817 api_client=self.arvrunner.api,
818 keep_client=self.arvrunner.keep_client,
819 num_retries=self.arvrunner.num_retries)
820 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
822 self.final_output = record["output"]
823 outc = arvados.collection.CollectionReader(self.final_output,
824 api_client=self.arvrunner.api,
825 keep_client=self.arvrunner.keep_client,
826 num_retries=self.arvrunner.num_retries)
827 if "cwl.output.json" in outc:
828 with outc.open("cwl.output.json", "rb") as f:
830 outputs = json.loads(f.read().decode())
831 def keepify(fileobj):
832 path = fileobj["location"]
833 if not path.startswith("keep:"):
834 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
835 adjustFileObjs(outputs, keepify)
836 adjustDirObjs(outputs, keepify)
838 logger.exception("[%s] While getting final output object", self.name)
839 self.arvrunner.output_callback({}, "permanentFail")
841 self.arvrunner.output_callback(outputs, processStatus)