1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
27 from schema_salad.sourceline import SourceLine, cmap
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32 shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
41 import arvados.collection
43 from .util import collectionUUID
44 from ruamel.yaml import YAML
45 from ruamel.yaml.comments import CommentedMap, CommentedSeq
47 import arvados_cwl.arvdocker
48 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
49 from ._version import __version__
51 from . context import ArvRuntimeContext
52 from .perf import Perf
54 logger = logging.getLogger('arvados.cwl-runner')
55 metrics = logging.getLogger('arvados.cwl-runner.metrics')
57 def trim_anonymous_location(obj):
58 """Remove 'location' field from File and Directory literals.
60 To make internal handling easier, literals are assigned a random id for
61 'location'. However, when writing the record back out, this can break
62 reproducibility. Since it is valid for literals not have a 'location'
67 if obj.get("location", "").startswith("_:"):
71 def remove_redundant_fields(obj):
72 for field in ("path", "nameext", "nameroot", "dirname"):
77 def find_defaults(d, op):
78 if isinstance(d, list):
81 elif isinstance(d, dict):
85 for i in viewvalues(d):
88 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
91 files=[], # type: List[Dict[Text, Text]]
92 bindings=[], # type: List[Dict[Text, Any]]
93 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
94 names=None, # type: Names
95 requirements=requirements, # type: List[Dict[Text, Any]]
96 hints=hints, # type: List[Dict[Text, Any]]
97 resources={}, # type: Dict[str, int]
98 mutation_manager=None, # type: Optional[MutationManager]
99 formatgraph=None, # type: Optional[Graph]
100 make_fs_access=None, # type: Type[StdFsAccess]
101 fs_access=None, # type: StdFsAccess
102 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
103 timeout=runtimeContext.eval_timeout, # type: float
104 debug=runtimeContext.debug, # type: bool
105 js_console=runtimeContext.js_console, # type: bool
106 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
107 loadListing="", # type: Text
108 outdir="", # type: Text
109 tmpdir="", # type: Text
110 stagedir="", # type: Text
111 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
112 container_engine="docker"
115 def search_schemadef(name, reqs):
117 if r["class"] == "SchemaDefRequirement":
118 for sd in r["types"]:
119 if sd["name"] == name:
123 primitive_types_set = frozenset(("null", "boolean", "int", "long",
124 "float", "double", "string", "record",
127 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
128 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
129 # union type, collect all possible secondaryFiles
130 for i in inputschema:
131 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
134 if inputschema == "File":
135 inputschema = {"type": "File"}
137 if isinstance(inputschema, basestring):
138 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
144 if "secondaryFiles" in inputschema:
145 # set secondaryFiles, may be inherited by compound types.
146 secondaryspec = inputschema["secondaryFiles"]
148 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
149 not isinstance(inputschema["type"], basestring)):
150 # compound type (union, array, record)
151 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
153 elif (inputschema["type"] == "record" and
154 isinstance(primary, Mapping)):
156 # record type, find secondary files associated with fields.
158 for f in inputschema["fields"]:
159 p = primary.get(shortname(f["name"]))
161 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
163 elif (inputschema["type"] == "array" and
164 isinstance(primary, Sequence)):
166 # array type, find secondary files of elements
169 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
171 elif (inputschema["type"] == "File" and
172 isinstance(primary, Mapping) and
173 primary.get("class") == "File"):
175 if "secondaryFiles" in primary or not secondaryspec:
180 # Found a file, check for secondaryFiles
183 primary["secondaryFiles"] = secondaryspec
184 for i, sf in enumerate(aslist(secondaryspec)):
185 if builder.cwlVersion == "v1.0":
188 pattern = sf["pattern"]
191 if isinstance(pattern, list):
192 specs.extend(pattern)
193 elif isinstance(pattern, dict):
194 specs.append(pattern)
195 elif isinstance(pattern, str):
196 if builder.cwlVersion == "v1.0":
197 specs.append({"pattern": pattern, "required": True})
199 specs.append({"pattern": pattern, "required": sf.get("required")})
201 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
202 "Expression must return list, object, string or null")
205 for i, sf in enumerate(specs):
206 if isinstance(sf, dict):
207 if sf.get("class") == "File":
209 if sf.get("location") is None:
210 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
211 "File object is missing 'location': %s" % sf)
212 sfpath = sf["location"]
215 pattern = sf["pattern"]
216 required = sf.get("required")
217 elif isinstance(sf, str):
221 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
222 "Expression must return list, object, string or null")
224 if pattern is not None:
225 if "${" in pattern or "$(" in pattern:
226 sfname = builder.do_eval(pattern, context=primary)
228 sfname = substitute(primary["basename"], pattern)
233 p_location = primary["location"]
234 if "/" in p_location:
236 p_location[0 : p_location.rindex("/") + 1]
240 required = builder.do_eval(required, context=primary)
242 if fsaccess.exists(sfpath):
243 if pattern is not None:
244 found.append({"location": sfpath, "class": "File"})
248 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
249 "Required secondary file '%s' does not exist" % sfpath)
251 primary["secondaryFiles"] = cmap(found)
252 if discovered is not None:
253 discovered[primary["location"]] = primary["secondaryFiles"]
254 elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
255 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
257 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
258 for inputschema in inputs:
259 primary = job_order.get(shortname(inputschema["id"]))
260 if isinstance(primary, (Mapping, Sequence)):
261 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
263 def upload_dependencies(arvrunner, name, document_loader,
264 workflowobj, uri, loadref_run, runtimeContext,
265 include_primary=True, discovered_secondaryfiles=None,
267 """Upload the dependencies of the workflowobj document to Keep.
269 Returns a pathmapper object mapping local paths to keep references. Also
270 does an in-place update of references in "workflowobj".
272 Use scandeps to find $import, $include, $schemas, run, File and Directory
273 fields that represent external references.
275 If workflowobj has an "id" field, this will reload the document to ensure
276 it is scanning the raw document prior to preprocessing.
281 joined = document_loader.fetcher.urljoin(b, u)
282 defrg, _ = urllib.parse.urldefrag(joined)
283 if defrg not in loaded:
285 if cache is not None and defrg in cache:
287 # Use fetch_text to get raw file (before preprocessing).
288 text = document_loader.fetch_text(defrg)
289 if isinstance(text, bytes):
290 textIO = StringIO(text.decode('utf-8'))
292 textIO = StringIO(text)
293 yamlloader = YAML(typ='safe', pure=True)
294 result = yamlloader.load(textIO)
295 if cache is not None:
296 cache[defrg] = result
302 loadref_fields = set(("$import", "run"))
304 loadref_fields = set(("$import",))
306 scanobj = workflowobj
307 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
308 # Need raw file content (before preprocessing) to ensure
309 # that external references in $include and $mixin are captured.
310 scanobj = loadref("", workflowobj["id"])
314 sc_result = scandeps(uri, scanobj,
316 set(("$include", "location")),
317 loadref, urljoin=document_loader.fetcher.urljoin,
320 optional_deps = scandeps(uri, scanobj,
323 loadref, urljoin=document_loader.fetcher.urljoin,
326 sc_result.extend(optional_deps)
331 def collect_uuids(obj):
332 loc = obj.get("location", "")
335 # Collect collection uuids that need to be resolved to
336 # portable data hashes
337 gp = collection_uuid_pattern.match(loc)
339 uuids[gp.groups()[0]] = obj
340 if collectionUUID in obj:
341 uuids[obj[collectionUUID]] = obj
343 def collect_uploads(obj):
344 loc = obj.get("location", "")
348 if sp[0] in ("file", "http", "https"):
349 # Record local files than need to be uploaded,
350 # don't include file literals, keep references, etc.
354 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
355 visit_class(sc_result, ("File", "Directory"), collect_uploads)
357 # Resolve any collection uuids we found to portable data hashes
358 # and assign them to uuid_map
360 fetch_uuids = list(uuids.keys())
362 # For a large number of fetch_uuids, API server may limit
363 # response size, so keep fetching from API server has nothing
365 lookups = arvrunner.api.collections().list(
366 filters=[["uuid", "in", fetch_uuids]],
368 select=["uuid", "portable_data_hash"]).execute(
369 num_retries=arvrunner.num_retries)
371 if not lookups["items"]:
374 for l in lookups["items"]:
375 uuid_map[l["uuid"]] = l["portable_data_hash"]
377 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
379 normalizeFilesDirs(sc)
381 if include_primary and "id" in workflowobj:
382 sc.append({"class": "File", "location": workflowobj["id"]})
384 def visit_default(obj):
385 def defaults_are_optional(f):
386 if "location" not in f and "path" in f:
387 f["location"] = f["path"]
389 normalizeFilesDirs(f)
390 optional_deps.append(f)
391 visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
393 find_defaults(workflowobj, visit_default)
396 def discover_default_secondary_files(obj):
397 builder_job_order = {}
398 for t in obj["inputs"]:
399 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
400 # Need to create a builder object to evaluate expressions.
401 builder = make_builder(builder_job_order,
402 obj.get("hints", []),
403 obj.get("requirements", []),
406 discover_secondary_files(arvrunner.fs_access,
412 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
413 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
415 for d in list(discovered):
416 # Only interested in discovered secondaryFiles which are local
417 # files that need to be uploaded.
418 if d.startswith("file:"):
419 sc.extend(discovered[d])
423 mapper = ArvPathMapper(arvrunner, sc, "",
427 single_collection=True,
428 optional_deps=optional_deps)
432 if k.startswith("keep:"):
433 keeprefs.add(collection_pdh_pattern.match(k).group(1))
436 loc = p.get("location")
437 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
438 p["location"] = mapper.mapper(p["location"]).resolved
439 addkeepref(p["location"])
445 if collectionUUID in p:
446 uuid = p[collectionUUID]
447 if uuid not in uuid_map:
448 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
449 "Collection uuid %s not found" % uuid)
450 gp = collection_pdh_pattern.match(loc)
451 if gp and uuid_map[uuid] != gp.groups()[0]:
452 # This file entry has both collectionUUID and a PDH
453 # location. If the PDH doesn't match the one returned
454 # the API server, raise an error.
455 raise SourceLine(p, "location", validate.ValidationException).makeError(
456 "Expected collection uuid %s to be %s but API server reported %s" % (
457 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
459 gp = collection_uuid_pattern.match(loc)
461 # Not a uuid pattern (must be a pdh pattern)
462 addkeepref(p["location"])
465 uuid = gp.groups()[0]
466 if uuid not in uuid_map:
467 raise SourceLine(p, "location", validate.ValidationException).makeError(
468 "Collection uuid %s not found" % uuid)
469 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
470 p[collectionUUID] = uuid
472 visit_class(workflowobj, ("File", "Directory"), setloc)
473 visit_class(discovered, ("File", "Directory"), setloc)
475 if discovered_secondaryfiles is not None:
477 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
479 if runtimeContext.copy_deps:
480 # Find referenced collections and copy them into the
481 # destination project, for easy sharing.
482 already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
483 filters=[["portable_data_hash", "in", list(keeprefs)],
484 ["owner_uuid", "=", runtimeContext.project_uuid]],
485 select=["uuid", "portable_data_hash", "created_at"]))
487 keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
489 col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
490 order="created_at desc",
491 select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
493 if len(col["items"]) == 0:
494 logger.warning("Cannot find collection with portable data hash %s", kr)
496 col = col["items"][0]
498 arvrunner.api.collections().create(body={"collection": {
499 "owner_uuid": runtimeContext.project_uuid,
501 "description": col["description"],
502 "properties": col["properties"],
503 "portable_data_hash": col["portable_data_hash"],
504 "manifest_text": col["manifest_text"],
505 "storage_classes_desired": col["storage_classes_desired"],
506 "trash_at": col["trash_at"]
507 }}, ensure_unique_name=True).execute()
508 except Exception as e:
509 logger.warning("Unable copy collection to destination: %s", e)
511 if "$schemas" in workflowobj:
513 for s in workflowobj["$schemas"]:
515 sch.append(mapper.mapper(s).resolved)
516 workflowobj["$schemas"] = sch
521 def upload_docker(arvrunner, tool, runtimeContext):
522 """Uploads Docker images used in CommandLineTool objects."""
524 if isinstance(tool, CommandLineTool):
525 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
527 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
528 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
529 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
531 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
532 runtimeContext.project_uuid,
533 runtimeContext.force_docker_pull,
534 runtimeContext.tmp_outdir_prefix,
535 runtimeContext.match_local_docker,
536 runtimeContext.copy_deps)
538 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
540 runtimeContext.project_uuid,
541 runtimeContext.force_docker_pull,
542 runtimeContext.tmp_outdir_prefix,
543 runtimeContext.match_local_docker,
544 runtimeContext.copy_deps)
545 elif isinstance(tool, cwltool.workflow.Workflow):
547 upload_docker(arvrunner, s.embedded_tool, runtimeContext)
550 def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
551 """Create a packed workflow.
553 A "packed" workflow is one where all the components have been combined into a single document."""
556 packed = pack(arvrunner.loadingContext, tool.tool["id"],
557 rewrite_out=rewrites,
558 loader=tool.doc_loader)
560 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
562 def visit(v, cur_id):
563 if isinstance(v, dict):
564 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
565 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
566 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
568 cur_id = rewrite_to_orig.get(v["id"], v["id"])
569 if "path" in v and "location" not in v:
570 v["location"] = v["path"]
572 if "location" in v and cur_id in merged_map:
573 if v["location"] in merged_map[cur_id].resolved:
574 v["location"] = merged_map[cur_id].resolved[v["location"]]
575 if v["location"] in merged_map[cur_id].secondaryFiles:
576 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
577 if v.get("class") == "DockerRequirement":
578 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
579 runtimeContext.project_uuid,
580 runtimeContext.force_docker_pull,
581 runtimeContext.tmp_outdir_prefix,
582 runtimeContext.match_local_docker,
583 runtimeContext.copy_deps)
586 if isinstance(v, list):
593 def tag_git_version(packed):
594 if tool.tool["id"].startswith("file://"):
595 path = os.path.dirname(tool.tool["id"][7:])
597 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
598 except (OSError, subprocess.CalledProcessError):
601 packed["http://schema.org/version"] = githash
604 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
605 """Upload local files referenced in the input object and return updated input
606 object with 'location' updated to the proper keep references.
609 # Make a copy of the job order and set defaults.
610 builder_job_order = copy.copy(job_order)
612 # fill_in_defaults throws an error if there are any
613 # missing required parameters, we don't want it to do that
614 # so make them all optional.
615 inputs_copy = copy.deepcopy(tool.tool["inputs"])
616 for i in inputs_copy:
617 if "null" not in i["type"]:
618 i["type"] = ["null"] + aslist(i["type"])
620 fill_in_defaults(inputs_copy,
623 # Need to create a builder object to evaluate expressions.
624 builder = make_builder(builder_job_order,
629 # Now update job_order with secondaryFiles
630 discover_secondary_files(arvrunner.fs_access,
635 jobmapper = upload_dependencies(arvrunner,
639 job_order.get("id", "#"),
643 if "id" in job_order:
646 # Need to filter this out, gets added by cwltool when providing
647 # parameters on the command line.
648 if "job_order" in job_order:
649 del job_order["job_order"]
653 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
655 def upload_workflow_deps(arvrunner, tool, runtimeContext):
656 # Ensure that Docker images needed by this workflow are available
658 with Perf(metrics, "upload_docker"):
659 upload_docker(arvrunner, tool, runtimeContext)
661 document_loader = tool.doc_loader
665 def upload_tool_deps(deptool):
667 discovered_secondaryfiles = {}
668 with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
669 pm = upload_dependencies(arvrunner,
670 "%s dependencies" % (shortname(deptool["id"])),
676 include_primary=False,
677 discovered_secondaryfiles=discovered_secondaryfiles,
678 cache=tool_dep_cache)
679 document_loader.idx[deptool["id"]] = deptool
681 for k,v in pm.items():
682 toolmap[k] = v.resolved
683 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
685 tool.visit(upload_tool_deps)
689 def arvados_jobs_image(arvrunner, img, runtimeContext):
690 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
693 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
695 runtimeContext.project_uuid,
696 runtimeContext.force_docker_pull,
697 runtimeContext.tmp_outdir_prefix,
698 runtimeContext.match_local_docker,
699 runtimeContext.copy_deps)
700 except Exception as e:
701 raise Exception("Docker image %s is not available\n%s" % (img, e) )
704 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
705 collection = arvados.collection.Collection(api_client=arvrunner.api,
706 keep_client=arvrunner.keep_client,
707 num_retries=arvrunner.num_retries)
708 with collection.open("workflow.cwl", "w") as f:
709 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
711 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
712 ["name", "like", name+"%"]]
713 if runtimeContext.project_uuid:
714 filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
715 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
718 logger.info("Using collection %s", exists["items"][0]["uuid"])
720 collection.save_new(name=name,
721 owner_uuid=runtimeContext.project_uuid,
722 ensure_unique_name=True,
723 num_retries=arvrunner.num_retries)
724 logger.info("Uploaded to %s", collection.manifest_locator())
726 return collection.portable_data_hash()
729 class Runner(Process):
730 """Base class for runner processes, which submit an instance of
731 arvados-cwl-runner and wait for the final result."""
733 def __init__(self, runner, updated_tool,
734 tool, loadingContext, enable_reuse,
735 output_name, output_tags, submit_runner_ram=0,
736 name=None, on_error=None, submit_runner_image=None,
737 intermediate_output_ttl=0, merged_map=None,
738 priority=None, secret_store=None,
739 collection_cache_size=256,
740 collection_cache_is_default=True):
742 loadingContext = loadingContext.copy()
743 loadingContext.metadata = updated_tool.metadata.copy()
745 super(Runner, self).__init__(updated_tool.tool, loadingContext)
747 self.arvrunner = runner
748 self.embedded_tool = tool
749 self.job_order = None
752 # If reuse is permitted by command line arguments but
753 # disabled by the workflow itself, disable it.
754 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
756 enable_reuse = reuse_req["enableReuse"]
757 self.enable_reuse = enable_reuse
759 self.final_output = None
760 self.output_name = output_name
761 self.output_tags = output_tags
763 self.on_error = on_error
764 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
765 self.intermediate_output_ttl = intermediate_output_ttl
766 self.priority = priority
767 self.secret_store = secret_store
768 self.enable_dev = loadingContext.enable_dev
770 self.submit_runner_cores = 1
771 self.submit_runner_ram = 1024 # defaut 1 GiB
772 self.collection_cache_size = collection_cache_size
774 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
775 if runner_resource_req:
776 if runner_resource_req.get("coresMin"):
777 self.submit_runner_cores = runner_resource_req["coresMin"]
778 if runner_resource_req.get("ramMin"):
779 self.submit_runner_ram = runner_resource_req["ramMin"]
780 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
781 self.collection_cache_size = runner_resource_req["keep_cache"]
783 if submit_runner_ram:
784 # Command line / initializer overrides default and/or spec from workflow
785 self.submit_runner_ram = submit_runner_ram
787 if self.submit_runner_ram <= 0:
788 raise Exception("Value of submit-runner-ram must be greater than zero")
790 if self.submit_runner_cores <= 0:
791 raise Exception("Value of submit-runner-cores must be greater than zero")
793 self.merged_map = merged_map or {}
796 job_order, # type: Mapping[Text, Text]
797 output_callbacks, # type: Callable[[Any, Any], Any]
798 runtimeContext # type: RuntimeContext
799 ): # type: (...) -> Generator[Any, None, None]
800 self.job_order = job_order
801 self._init_job(job_order, runtimeContext)
804 def update_pipeline_component(self, record):
807 def done(self, record):
808 """Base method for handling a completed runner."""
811 if record["state"] == "Complete":
812 if record.get("exit_code") is not None:
813 if record["exit_code"] == 33:
814 processStatus = "UnsupportedRequirement"
815 elif record["exit_code"] == 0:
816 processStatus = "success"
818 processStatus = "permanentFail"
820 processStatus = "success"
822 processStatus = "permanentFail"
826 if processStatus == "permanentFail":
827 logc = arvados.collection.CollectionReader(record["log"],
828 api_client=self.arvrunner.api,
829 keep_client=self.arvrunner.keep_client,
830 num_retries=self.arvrunner.num_retries)
831 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
833 self.final_output = record["output"]
834 outc = arvados.collection.CollectionReader(self.final_output,
835 api_client=self.arvrunner.api,
836 keep_client=self.arvrunner.keep_client,
837 num_retries=self.arvrunner.num_retries)
838 if "cwl.output.json" in outc:
839 with outc.open("cwl.output.json", "rb") as f:
841 outputs = json.loads(f.read().decode())
842 def keepify(fileobj):
843 path = fileobj["location"]
844 if not path.startswith("keep:"):
845 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
846 adjustFileObjs(outputs, keepify)
847 adjustDirObjs(outputs, keepify)
849 logger.exception("[%s] While getting final output object", self.name)
850 self.arvrunner.output_callback({}, "permanentFail")
852 self.arvrunner.output_callback(outputs, processStatus)