1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
27 from schema_salad.sourceline import SourceLine, cmap
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32 shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
41 import arvados.collection
43 from .util import collectionUUID
44 from ruamel.yaml import YAML
45 from ruamel.yaml.comments import CommentedMap, CommentedSeq
47 import arvados_cwl.arvdocker
48 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
49 from ._version import __version__
51 from . context import ArvRuntimeContext
53 logger = logging.getLogger('arvados.cwl-runner')
55 def trim_anonymous_location(obj):
56 """Remove 'location' field from File and Directory literals.
58 To make internal handling easier, literals are assigned a random id for
59 'location'. However, when writing the record back out, this can break
60 reproducibility. Since it is valid for literals not have a 'location'
65 if obj.get("location", "").startswith("_:"):
69 def remove_redundant_fields(obj):
70 for field in ("path", "nameext", "nameroot", "dirname"):
75 def find_defaults(d, op):
76 if isinstance(d, list):
79 elif isinstance(d, dict):
83 for i in viewvalues(d):
86 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
89 files=[], # type: List[Dict[Text, Text]]
90 bindings=[], # type: List[Dict[Text, Any]]
91 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
92 names=None, # type: Names
93 requirements=requirements, # type: List[Dict[Text, Any]]
94 hints=hints, # type: List[Dict[Text, Any]]
95 resources={}, # type: Dict[str, int]
96 mutation_manager=None, # type: Optional[MutationManager]
97 formatgraph=None, # type: Optional[Graph]
98 make_fs_access=None, # type: Type[StdFsAccess]
99 fs_access=None, # type: StdFsAccess
100 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
101 timeout=runtimeContext.eval_timeout, # type: float
102 debug=runtimeContext.debug, # type: bool
103 js_console=runtimeContext.js_console, # type: bool
104 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
105 loadListing="", # type: Text
106 outdir="", # type: Text
107 tmpdir="", # type: Text
108 stagedir="", # type: Text
109 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
110 container_engine="docker"
113 def search_schemadef(name, reqs):
115 if r["class"] == "SchemaDefRequirement":
116 for sd in r["types"]:
117 if sd["name"] == name:
121 primitive_types_set = frozenset(("null", "boolean", "int", "long",
122 "float", "double", "string", "record",
125 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
126 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
127 # union type, collect all possible secondaryFiles
128 for i in inputschema:
129 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
132 if inputschema == "File":
133 inputschema = {"type": "File"}
135 if isinstance(inputschema, basestring):
136 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
142 if "secondaryFiles" in inputschema:
143 # set secondaryFiles, may be inherited by compound types.
144 secondaryspec = inputschema["secondaryFiles"]
146 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
147 not isinstance(inputschema["type"], basestring)):
148 # compound type (union, array, record)
149 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
151 elif (inputschema["type"] == "record" and
152 isinstance(primary, Mapping)):
154 # record type, find secondary files associated with fields.
156 for f in inputschema["fields"]:
157 p = primary.get(shortname(f["name"]))
159 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
161 elif (inputschema["type"] == "array" and
162 isinstance(primary, Sequence)):
164 # array type, find secondary files of elements
167 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
169 elif (inputschema["type"] == "File" and
170 isinstance(primary, Mapping) and
171 primary.get("class") == "File"):
173 if "secondaryFiles" in primary or not secondaryspec:
178 # Found a file, check for secondaryFiles
181 primary["secondaryFiles"] = secondaryspec
182 for i, sf in enumerate(aslist(secondaryspec)):
183 if builder.cwlVersion == "v1.0":
186 pattern = sf["pattern"]
189 if isinstance(pattern, list):
190 specs.extend(pattern)
191 elif isinstance(pattern, dict):
192 specs.append(pattern)
193 elif isinstance(pattern, str):
194 if builder.cwlVersion == "v1.0":
195 specs.append({"pattern": pattern, "required": True})
197 specs.append({"pattern": pattern, "required": sf.get("required")})
199 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
200 "Expression must return list, object, string or null")
203 for i, sf in enumerate(specs):
204 if isinstance(sf, dict):
205 if sf.get("class") == "File":
207 if sf.get("location") is None:
208 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
209 "File object is missing 'location': %s" % sf)
210 sfpath = sf["location"]
213 pattern = sf["pattern"]
214 required = sf.get("required")
215 elif isinstance(sf, str):
219 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
220 "Expression must return list, object, string or null")
222 if pattern is not None:
223 if "${" in pattern or "$(" in pattern:
224 sfname = builder.do_eval(pattern, context=primary)
226 sfname = substitute(primary["basename"], pattern)
231 p_location = primary["location"]
232 if "/" in p_location:
234 p_location[0 : p_location.rindex("/") + 1]
238 required = builder.do_eval(required, context=primary)
240 if fsaccess.exists(sfpath):
241 if pattern is not None:
242 found.append({"location": sfpath, "class": "File"})
246 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
247 "Required secondary file '%s' does not exist" % sfpath)
249 primary["secondaryFiles"] = cmap(found)
250 if discovered is not None:
251 discovered[primary["location"]] = primary["secondaryFiles"]
252 elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
253 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
255 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
256 for inputschema in inputs:
257 primary = job_order.get(shortname(inputschema["id"]))
258 if isinstance(primary, (Mapping, Sequence)):
259 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
261 def upload_dependencies(arvrunner, name, document_loader,
262 workflowobj, uri, loadref_run, runtimeContext,
263 include_primary=True, discovered_secondaryfiles=None,
265 """Upload the dependencies of the workflowobj document to Keep.
267 Returns a pathmapper object mapping local paths to keep references. Also
268 does an in-place update of references in "workflowobj".
270 Use scandeps to find $import, $include, $schemas, run, File and Directory
271 fields that represent external references.
273 If workflowobj has an "id" field, this will reload the document to ensure
274 it is scanning the raw document prior to preprocessing.
279 joined = document_loader.fetcher.urljoin(b, u)
280 defrg, _ = urllib.parse.urldefrag(joined)
281 if defrg not in loaded:
283 if cache is not None and defrg in cache:
285 # Use fetch_text to get raw file (before preprocessing).
286 text = document_loader.fetch_text(defrg)
287 if isinstance(text, bytes):
288 textIO = StringIO(text.decode('utf-8'))
290 textIO = StringIO(text)
291 yamlloader = YAML(typ='safe', pure=True)
292 result = yamlloader.load(textIO)
293 if cache is not None:
294 cache[defrg] = result
300 loadref_fields = set(("$import", "run"))
302 loadref_fields = set(("$import",))
304 scanobj = workflowobj
305 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
306 # Need raw file content (before preprocessing) to ensure
307 # that external references in $include and $mixin are captured.
308 scanobj = loadref("", workflowobj["id"])
312 sc_result = scandeps(uri, scanobj,
314 set(("$include", "location")),
315 loadref, urljoin=document_loader.fetcher.urljoin,
318 optional_deps = scandeps(uri, scanobj,
321 loadref, urljoin=document_loader.fetcher.urljoin,
324 sc_result.extend(optional_deps)
329 def collect_uuids(obj):
330 loc = obj.get("location", "")
333 # Collect collection uuids that need to be resolved to
334 # portable data hashes
335 gp = collection_uuid_pattern.match(loc)
337 uuids[gp.groups()[0]] = obj
338 if collectionUUID in obj:
339 uuids[obj[collectionUUID]] = obj
341 def collect_uploads(obj):
342 loc = obj.get("location", "")
346 if sp[0] in ("file", "http", "https"):
347 # Record local files than need to be uploaded,
348 # don't include file literals, keep references, etc.
352 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
353 visit_class(sc_result, ("File", "Directory"), collect_uploads)
355 # Resolve any collection uuids we found to portable data hashes
356 # and assign them to uuid_map
358 fetch_uuids = list(uuids.keys())
360 # For a large number of fetch_uuids, API server may limit
361 # response size, so keep fetching from API server has nothing
363 lookups = arvrunner.api.collections().list(
364 filters=[["uuid", "in", fetch_uuids]],
366 select=["uuid", "portable_data_hash"]).execute(
367 num_retries=arvrunner.num_retries)
369 if not lookups["items"]:
372 for l in lookups["items"]:
373 uuid_map[l["uuid"]] = l["portable_data_hash"]
375 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
377 normalizeFilesDirs(sc)
379 if include_primary and "id" in workflowobj:
380 sc.append({"class": "File", "location": workflowobj["id"]})
382 def visit_default(obj):
383 def defaults_are_optional(f):
384 if "location" not in f and "path" in f:
385 f["location"] = f["path"]
387 normalizeFilesDirs(f)
388 optional_deps.append(f)
389 visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
391 find_defaults(workflowobj, visit_default)
394 def discover_default_secondary_files(obj):
395 builder_job_order = {}
396 for t in obj["inputs"]:
397 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
398 # Need to create a builder object to evaluate expressions.
399 builder = make_builder(builder_job_order,
400 obj.get("hints", []),
401 obj.get("requirements", []),
404 discover_secondary_files(arvrunner.fs_access,
410 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
411 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
413 for d in list(discovered):
414 # Only interested in discovered secondaryFiles which are local
415 # files that need to be uploaded.
416 if d.startswith("file:"):
417 sc.extend(discovered[d])
421 mapper = ArvPathMapper(arvrunner, sc, "",
425 single_collection=True,
426 optional_deps=optional_deps)
430 if k.startswith("keep:"):
431 keeprefs.add(collection_pdh_pattern.match(k).group(1))
434 loc = p.get("location")
435 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
436 p["location"] = mapper.mapper(p["location"]).resolved
437 addkeepref(p["location"])
443 if collectionUUID in p:
444 uuid = p[collectionUUID]
445 if uuid not in uuid_map:
446 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
447 "Collection uuid %s not found" % uuid)
448 gp = collection_pdh_pattern.match(loc)
449 if gp and uuid_map[uuid] != gp.groups()[0]:
450 # This file entry has both collectionUUID and a PDH
451 # location. If the PDH doesn't match the one returned
452 # the API server, raise an error.
453 raise SourceLine(p, "location", validate.ValidationException).makeError(
454 "Expected collection uuid %s to be %s but API server reported %s" % (
455 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
457 gp = collection_uuid_pattern.match(loc)
459 # Not a uuid pattern (must be a pdh pattern)
460 addkeepref(p["location"])
463 uuid = gp.groups()[0]
464 if uuid not in uuid_map:
465 raise SourceLine(p, "location", validate.ValidationException).makeError(
466 "Collection uuid %s not found" % uuid)
467 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
468 p[collectionUUID] = uuid
470 visit_class(workflowobj, ("File", "Directory"), setloc)
471 visit_class(discovered, ("File", "Directory"), setloc)
473 if discovered_secondaryfiles is not None:
475 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
477 if runtimeContext.copy_deps:
478 # Find referenced collections and copy them into the
479 # destination project, for easy sharing.
480 already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
481 filters=[["portable_data_hash", "in", list(keeprefs)],
482 ["owner_uuid", "=", runtimeContext.project_uuid]],
483 select=["uuid", "portable_data_hash", "created_at"]))
485 keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
487 col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
488 order="created_at desc",
489 select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
491 if len(col["items"]) == 0:
492 logger.warning("Cannot find collection with portable data hash %s", kr)
494 col = col["items"][0]
496 arvrunner.api.collections().create(body={"collection": {
497 "owner_uuid": runtimeContext.project_uuid,
499 "description": col["description"],
500 "properties": col["properties"],
501 "portable_data_hash": col["portable_data_hash"],
502 "manifest_text": col["manifest_text"],
503 "storage_classes_desired": col["storage_classes_desired"],
504 "trash_at": col["trash_at"]
505 }}, ensure_unique_name=True).execute()
506 except Exception as e:
507 logger.warning("Unable copy collection to destination: %s", e)
509 if "$schemas" in workflowobj:
511 for s in workflowobj["$schemas"]:
513 sch.append(mapper.mapper(s).resolved)
514 workflowobj["$schemas"] = sch
519 def upload_docker(arvrunner, tool, runtimeContext):
520 """Uploads Docker images used in CommandLineTool objects."""
522 if isinstance(tool, CommandLineTool):
523 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
525 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
526 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
527 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
529 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
530 runtimeContext.project_uuid,
531 runtimeContext.force_docker_pull,
532 runtimeContext.tmp_outdir_prefix,
533 runtimeContext.match_local_docker,
534 runtimeContext.copy_deps)
536 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
538 runtimeContext.project_uuid,
539 runtimeContext.force_docker_pull,
540 runtimeContext.tmp_outdir_prefix,
541 runtimeContext.match_local_docker,
542 runtimeContext.copy_deps)
543 elif isinstance(tool, cwltool.workflow.Workflow):
545 upload_docker(arvrunner, s.embedded_tool, runtimeContext)
548 def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
549 """Create a packed workflow.
551 A "packed" workflow is one where all the components have been combined into a single document."""
554 packed = pack(arvrunner.loadingContext, tool.tool["id"],
555 rewrite_out=rewrites,
556 loader=tool.doc_loader)
558 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
560 def visit(v, cur_id):
561 if isinstance(v, dict):
562 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
563 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
564 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
566 cur_id = rewrite_to_orig.get(v["id"], v["id"])
567 if "path" in v and "location" not in v:
568 v["location"] = v["path"]
570 if "location" in v and cur_id in merged_map:
571 if v["location"] in merged_map[cur_id].resolved:
572 v["location"] = merged_map[cur_id].resolved[v["location"]]
573 if v["location"] in merged_map[cur_id].secondaryFiles:
574 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
575 if v.get("class") == "DockerRequirement":
576 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
577 runtimeContext.project_uuid,
578 runtimeContext.force_docker_pull,
579 runtimeContext.tmp_outdir_prefix,
580 runtimeContext.match_local_docker,
581 runtimeContext.copy_deps)
584 if isinstance(v, list):
591 def tag_git_version(packed):
592 if tool.tool["id"].startswith("file://"):
593 path = os.path.dirname(tool.tool["id"][7:])
595 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
596 except (OSError, subprocess.CalledProcessError):
599 packed["http://schema.org/version"] = githash
602 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
603 """Upload local files referenced in the input object and return updated input
604 object with 'location' updated to the proper keep references.
607 # Make a copy of the job order and set defaults.
608 builder_job_order = copy.copy(job_order)
610 # fill_in_defaults throws an error if there are any
611 # missing required parameters, we don't want it to do that
612 # so make them all optional.
613 inputs_copy = copy.deepcopy(tool.tool["inputs"])
614 for i in inputs_copy:
615 if "null" not in i["type"]:
616 i["type"] = ["null"] + aslist(i["type"])
618 fill_in_defaults(inputs_copy,
621 # Need to create a builder object to evaluate expressions.
622 builder = make_builder(builder_job_order,
627 # Now update job_order with secondaryFiles
628 discover_secondary_files(arvrunner.fs_access,
633 jobmapper = upload_dependencies(arvrunner,
637 job_order.get("id", "#"),
641 if "id" in job_order:
644 # Need to filter this out, gets added by cwltool when providing
645 # parameters on the command line.
646 if "job_order" in job_order:
647 del job_order["job_order"]
651 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
653 def upload_workflow_deps(arvrunner, tool, runtimeContext):
654 # Ensure that Docker images needed by this workflow are available
656 upload_docker(arvrunner, tool, runtimeContext)
658 document_loader = tool.doc_loader
662 def upload_tool_deps(deptool):
664 discovered_secondaryfiles = {}
665 pm = upload_dependencies(arvrunner,
666 "%s dependencies" % (shortname(deptool["id"])),
672 include_primary=False,
673 discovered_secondaryfiles=discovered_secondaryfiles,
674 cache=tool_dep_cache)
675 document_loader.idx[deptool["id"]] = deptool
677 for k,v in pm.items():
678 toolmap[k] = v.resolved
679 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
681 tool.visit(upload_tool_deps)
685 def arvados_jobs_image(arvrunner, img, runtimeContext):
686 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
689 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
691 runtimeContext.project_uuid,
692 runtimeContext.force_docker_pull,
693 runtimeContext.tmp_outdir_prefix,
694 runtimeContext.match_local_docker,
695 runtimeContext.copy_deps)
696 except Exception as e:
697 raise Exception("Docker image %s is not available\n%s" % (img, e) )
700 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
701 collection = arvados.collection.Collection(api_client=arvrunner.api,
702 keep_client=arvrunner.keep_client,
703 num_retries=arvrunner.num_retries)
704 with collection.open("workflow.cwl", "w") as f:
705 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
707 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
708 ["name", "like", name+"%"]]
709 if runtimeContext.project_uuid:
710 filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
711 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
714 logger.info("Using collection %s", exists["items"][0]["uuid"])
716 collection.save_new(name=name,
717 owner_uuid=runtimeContext.project_uuid,
718 ensure_unique_name=True,
719 num_retries=arvrunner.num_retries)
720 logger.info("Uploaded to %s", collection.manifest_locator())
722 return collection.portable_data_hash()
725 class Runner(Process):
726 """Base class for runner processes, which submit an instance of
727 arvados-cwl-runner and wait for the final result."""
729 def __init__(self, runner, updated_tool,
730 tool, loadingContext, enable_reuse,
731 output_name, output_tags, submit_runner_ram=0,
732 name=None, on_error=None, submit_runner_image=None,
733 intermediate_output_ttl=0, merged_map=None,
734 priority=None, secret_store=None,
735 collection_cache_size=256,
736 collection_cache_is_default=True):
738 loadingContext = loadingContext.copy()
739 loadingContext.metadata = updated_tool.metadata.copy()
741 super(Runner, self).__init__(updated_tool.tool, loadingContext)
743 self.arvrunner = runner
744 self.embedded_tool = tool
745 self.job_order = None
748 # If reuse is permitted by command line arguments but
749 # disabled by the workflow itself, disable it.
750 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
752 enable_reuse = reuse_req["enableReuse"]
753 self.enable_reuse = enable_reuse
755 self.final_output = None
756 self.output_name = output_name
757 self.output_tags = output_tags
759 self.on_error = on_error
760 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
761 self.intermediate_output_ttl = intermediate_output_ttl
762 self.priority = priority
763 self.secret_store = secret_store
764 self.enable_dev = loadingContext.enable_dev
766 self.submit_runner_cores = 1
767 self.submit_runner_ram = 1024 # defaut 1 GiB
768 self.collection_cache_size = collection_cache_size
770 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
771 if runner_resource_req:
772 if runner_resource_req.get("coresMin"):
773 self.submit_runner_cores = runner_resource_req["coresMin"]
774 if runner_resource_req.get("ramMin"):
775 self.submit_runner_ram = runner_resource_req["ramMin"]
776 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
777 self.collection_cache_size = runner_resource_req["keep_cache"]
779 if submit_runner_ram:
780 # Command line / initializer overrides default and/or spec from workflow
781 self.submit_runner_ram = submit_runner_ram
783 if self.submit_runner_ram <= 0:
784 raise Exception("Value of submit-runner-ram must be greater than zero")
786 if self.submit_runner_cores <= 0:
787 raise Exception("Value of submit-runner-cores must be greater than zero")
789 self.merged_map = merged_map or {}
792 job_order, # type: Mapping[Text, Text]
793 output_callbacks, # type: Callable[[Any, Any], Any]
794 runtimeContext # type: RuntimeContext
795 ): # type: (...) -> Generator[Any, None, None]
796 self.job_order = job_order
797 self._init_job(job_order, runtimeContext)
800 def update_pipeline_component(self, record):
803 def done(self, record):
804 """Base method for handling a completed runner."""
807 if record["state"] == "Complete":
808 if record.get("exit_code") is not None:
809 if record["exit_code"] == 33:
810 processStatus = "UnsupportedRequirement"
811 elif record["exit_code"] == 0:
812 processStatus = "success"
814 processStatus = "permanentFail"
816 processStatus = "success"
818 processStatus = "permanentFail"
822 if processStatus == "permanentFail":
823 logc = arvados.collection.CollectionReader(record["log"],
824 api_client=self.arvrunner.api,
825 keep_client=self.arvrunner.keep_client,
826 num_retries=self.arvrunner.num_retries)
827 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
829 self.final_output = record["output"]
830 outc = arvados.collection.CollectionReader(self.final_output,
831 api_client=self.arvrunner.api,
832 keep_client=self.arvrunner.keep_client,
833 num_retries=self.arvrunner.num_retries)
834 if "cwl.output.json" in outc:
835 with outc.open("cwl.output.json", "rb") as f:
837 outputs = json.loads(f.read().decode())
838 def keepify(fileobj):
839 path = fileobj["location"]
840 if not path.startswith("keep:"):
841 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
842 adjustFileObjs(outputs, keepify)
843 adjustDirObjs(outputs, keepify)
845 logger.exception("[%s] While getting final output object", self.name)
846 self.arvrunner.output_callback({}, "permanentFail")
848 self.arvrunner.output_callback(outputs, processStatus)