1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
39 from cwltool.utils import (
45 if os.name == "posix" and sys.version_info[0] < 3:
46 import subprocess32 as subprocess
50 from schema_salad.sourceline import SourceLine, cmap
52 from cwltool.command_line_tool import CommandLineTool
53 import cwltool.workflow
54 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
55 shortname, Process, fill_in_defaults)
56 from cwltool.load_tool import fetch_document
57 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
58 from cwltool.builder import substitute
59 from cwltool.pack import pack
60 from cwltool.update import INTERNAL_VERSION
61 from cwltool.builder import Builder
62 import schema_salad.validate as validate
64 import arvados.collection
66 from .util import collectionUUID
67 from ruamel.yaml import YAML
68 from ruamel.yaml.comments import CommentedMap, CommentedSeq
70 import arvados_cwl.arvdocker
71 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
72 from ._version import __version__
74 from . context import ArvRuntimeContext
75 from .perf import Perf
77 logger = logging.getLogger('arvados.cwl-runner')
78 metrics = logging.getLogger('arvados.cwl-runner.metrics')
80 def trim_anonymous_location(obj):
81 """Remove 'location' field from File and Directory literals.
83 To make internal handling easier, literals are assigned a random id for
84 'location'. However, when writing the record back out, this can break
85 reproducibility. Since it is valid for literals not have a 'location'
90 if obj.get("location", "").startswith("_:"):
94 def remove_redundant_fields(obj):
95 for field in ("path", "nameext", "nameroot", "dirname"):
100 def find_defaults(d, op):
101 if isinstance(d, list):
104 elif isinstance(d, dict):
108 for i in viewvalues(d):
111 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
114 files=[], # type: List[Dict[Text, Text]]
115 bindings=[], # type: List[Dict[Text, Any]]
116 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
117 names=None, # type: Names
118 requirements=requirements, # type: List[Dict[Text, Any]]
119 hints=hints, # type: List[Dict[Text, Any]]
120 resources={}, # type: Dict[str, int]
121 mutation_manager=None, # type: Optional[MutationManager]
122 formatgraph=None, # type: Optional[Graph]
123 make_fs_access=None, # type: Type[StdFsAccess]
124 fs_access=None, # type: StdFsAccess
125 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
126 timeout=runtimeContext.eval_timeout, # type: float
127 debug=runtimeContext.debug, # type: bool
128 js_console=runtimeContext.js_console, # type: bool
129 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
130 loadListing="", # type: Text
131 outdir="", # type: Text
132 tmpdir="", # type: Text
133 stagedir="", # type: Text
134 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
135 container_engine="docker"
138 def search_schemadef(name, reqs):
140 if r["class"] == "SchemaDefRequirement":
141 for sd in r["types"]:
142 if sd["name"] == name:
146 primitive_types_set = frozenset(("null", "boolean", "int", "long",
147 "float", "double", "string", "record",
150 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
151 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
152 # union type, collect all possible secondaryFiles
153 for i in inputschema:
154 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
157 if inputschema == "File":
158 inputschema = {"type": "File"}
160 if isinstance(inputschema, basestring):
161 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
167 if "secondaryFiles" in inputschema:
168 # set secondaryFiles, may be inherited by compound types.
169 secondaryspec = inputschema["secondaryFiles"]
171 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
172 not isinstance(inputschema["type"], basestring)):
173 # compound type (union, array, record)
174 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
176 elif (inputschema["type"] == "record" and
177 isinstance(primary, Mapping)):
179 # record type, find secondary files associated with fields.
181 for f in inputschema["fields"]:
182 p = primary.get(shortname(f["name"]))
184 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
186 elif (inputschema["type"] == "array" and
187 isinstance(primary, Sequence)):
189 # array type, find secondary files of elements
192 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
194 elif (inputschema["type"] == "File" and
195 isinstance(primary, Mapping) and
196 primary.get("class") == "File"):
198 if "secondaryFiles" in primary or not secondaryspec:
203 # Found a file, check for secondaryFiles
206 primary["secondaryFiles"] = secondaryspec
207 for i, sf in enumerate(aslist(secondaryspec)):
208 if builder.cwlVersion == "v1.0":
211 pattern = sf["pattern"]
214 if isinstance(pattern, list):
215 specs.extend(pattern)
216 elif isinstance(pattern, dict):
217 specs.append(pattern)
218 elif isinstance(pattern, str):
219 if builder.cwlVersion == "v1.0":
220 specs.append({"pattern": pattern, "required": True})
222 specs.append({"pattern": pattern, "required": sf.get("required")})
224 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
225 "Expression must return list, object, string or null")
228 for i, sf in enumerate(specs):
229 if isinstance(sf, dict):
230 if sf.get("class") == "File":
232 if sf.get("location") is None:
233 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
234 "File object is missing 'location': %s" % sf)
235 sfpath = sf["location"]
238 pattern = sf["pattern"]
239 required = sf.get("required")
240 elif isinstance(sf, str):
244 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
245 "Expression must return list, object, string or null")
247 if pattern is not None:
248 if "${" in pattern or "$(" in pattern:
249 sfname = builder.do_eval(pattern, context=primary)
251 sfname = substitute(primary["basename"], pattern)
256 p_location = primary["location"]
257 if "/" in p_location:
259 p_location[0 : p_location.rindex("/") + 1]
263 required = builder.do_eval(required, context=primary)
265 if fsaccess.exists(sfpath):
266 if pattern is not None:
267 found.append({"location": sfpath, "class": "File"})
271 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
272 "Required secondary file '%s' does not exist" % sfpath)
274 primary["secondaryFiles"] = cmap(found)
275 if discovered is not None:
276 discovered[primary["location"]] = primary["secondaryFiles"]
277 elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
278 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
280 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
281 for inputschema in inputs:
282 primary = job_order.get(shortname(inputschema["id"]))
283 if isinstance(primary, (Mapping, Sequence)):
284 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
286 def upload_dependencies(arvrunner, name, document_loader,
287 workflowobj, uri, loadref_run, runtimeContext,
288 include_primary=True, discovered_secondaryfiles=None,
290 """Upload the dependencies of the workflowobj document to Keep.
292 Returns a pathmapper object mapping local paths to keep references. Also
293 does an in-place update of references in "workflowobj".
295 Use scandeps to find $import, $include, $schemas, run, File and Directory
296 fields that represent external references.
298 If workflowobj has an "id" field, this will reload the document to ensure
299 it is scanning the raw document prior to preprocessing.
304 joined = document_loader.fetcher.urljoin(b, u)
305 defrg, _ = urllib.parse.urldefrag(joined)
306 if defrg not in loaded:
308 if cache is not None and defrg in cache:
310 # Use fetch_text to get raw file (before preprocessing).
311 text = document_loader.fetch_text(defrg)
312 if isinstance(text, bytes):
313 textIO = StringIO(text.decode('utf-8'))
315 textIO = StringIO(text)
316 yamlloader = YAML(typ='safe', pure=True)
317 result = yamlloader.load(textIO)
318 if cache is not None:
319 cache[defrg] = result
325 loadref_fields = set(("$import", "run"))
327 loadref_fields = set(("$import",))
329 scanobj = workflowobj
330 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
331 defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
332 if cache is not None and defrg not in cache:
333 # if we haven't seen this file before, want raw file
334 # content (before preprocessing) to ensure that external
335 # references like $include haven't already been inlined.
336 scanobj = loadref("", workflowobj["id"])
340 with Perf(metrics, "scandeps include, location"):
341 sc_result = scandeps(uri, scanobj,
343 set(("$include", "location")),
344 loadref, urljoin=document_loader.fetcher.urljoin,
347 with Perf(metrics, "scandeps $schemas"):
348 optional_deps = scandeps(uri, scanobj,
351 loadref, urljoin=document_loader.fetcher.urljoin,
354 if sc_result is None:
357 if optional_deps is None:
361 sc_result.extend(optional_deps)
366 def collect_uuids(obj):
367 loc = obj.get("location", "")
370 # Collect collection uuids that need to be resolved to
371 # portable data hashes
372 gp = collection_uuid_pattern.match(loc)
374 uuids[gp.groups()[0]] = obj
375 if collectionUUID in obj:
376 uuids[obj[collectionUUID]] = obj
378 def collect_uploads(obj):
379 loc = obj.get("location", "")
383 if sp[0] in ("file", "http", "https"):
384 # Record local files than need to be uploaded,
385 # don't include file literals, keep references, etc.
389 with Perf(metrics, "collect uuids"):
390 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
392 with Perf(metrics, "collect uploads"):
393 visit_class(sc_result, ("File", "Directory"), collect_uploads)
395 # Resolve any collection uuids we found to portable data hashes
396 # and assign them to uuid_map
398 fetch_uuids = list(uuids.keys())
399 with Perf(metrics, "fetch_uuids"):
401 # For a large number of fetch_uuids, API server may limit
402 # response size, so keep fetching from API server has nothing
404 lookups = arvrunner.api.collections().list(
405 filters=[["uuid", "in", fetch_uuids]],
407 select=["uuid", "portable_data_hash"]).execute(
408 num_retries=arvrunner.num_retries)
410 if not lookups["items"]:
413 for l in lookups["items"]:
414 uuid_map[l["uuid"]] = l["portable_data_hash"]
416 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
418 normalizeFilesDirs(sc)
420 if "id" in workflowobj:
421 defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
423 # make sure it's included
424 sc.append({"class": "File", "location": defrg})
426 # make sure it's excluded
427 sc = [d for d in sc if d.get("location") != defrg]
429 def visit_default(obj):
430 def defaults_are_optional(f):
431 if "location" not in f and "path" in f:
432 f["location"] = f["path"]
434 normalizeFilesDirs(f)
435 optional_deps.append(f)
436 visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
438 find_defaults(workflowobj, visit_default)
441 def discover_default_secondary_files(obj):
442 builder_job_order = {}
443 for t in obj["inputs"]:
444 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
445 # Need to create a builder object to evaluate expressions.
446 builder = make_builder(builder_job_order,
447 obj.get("hints", []),
448 obj.get("requirements", []),
451 discover_secondary_files(arvrunner.fs_access,
457 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
458 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
460 for d in list(discovered):
461 # Only interested in discovered secondaryFiles which are local
462 # files that need to be uploaded.
463 if d.startswith("file:"):
464 sc.extend(discovered[d])
468 with Perf(metrics, "mapper"):
469 mapper = ArvPathMapper(arvrunner, sc, "",
473 single_collection=True,
474 optional_deps=optional_deps)
478 if k.startswith("keep:"):
479 keeprefs.add(collection_pdh_pattern.match(k).group(1))
482 loc = p.get("location")
483 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
484 p["location"] = mapper.mapper(p["location"]).resolved
485 addkeepref(p["location"])
491 if collectionUUID in p:
492 uuid = p[collectionUUID]
493 if uuid not in uuid_map:
494 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
495 "Collection uuid %s not found" % uuid)
496 gp = collection_pdh_pattern.match(loc)
497 if gp and uuid_map[uuid] != gp.groups()[0]:
498 # This file entry has both collectionUUID and a PDH
499 # location. If the PDH doesn't match the one returned
500 # the API server, raise an error.
501 raise SourceLine(p, "location", validate.ValidationException).makeError(
502 "Expected collection uuid %s to be %s but API server reported %s" % (
503 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
505 gp = collection_uuid_pattern.match(loc)
507 # Not a uuid pattern (must be a pdh pattern)
508 addkeepref(p["location"])
511 uuid = gp.groups()[0]
512 if uuid not in uuid_map:
513 raise SourceLine(p, "location", validate.ValidationException).makeError(
514 "Collection uuid %s not found" % uuid)
515 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
516 p[collectionUUID] = uuid
518 with Perf(metrics, "setloc"):
519 visit_class(workflowobj, ("File", "Directory"), setloc)
520 visit_class(discovered, ("File", "Directory"), setloc)
522 if discovered_secondaryfiles is not None:
524 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
526 if runtimeContext.copy_deps:
527 # Find referenced collections and copy them into the
528 # destination project, for easy sharing.
529 already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
530 filters=[["portable_data_hash", "in", list(keeprefs)],
531 ["owner_uuid", "=", runtimeContext.project_uuid]],
532 select=["uuid", "portable_data_hash", "created_at"]))
534 keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
536 col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
537 order="created_at desc",
538 select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
540 if len(col["items"]) == 0:
541 logger.warning("Cannot find collection with portable data hash %s", kr)
543 col = col["items"][0]
545 arvrunner.api.collections().create(body={"collection": {
546 "owner_uuid": runtimeContext.project_uuid,
548 "description": col["description"],
549 "properties": col["properties"],
550 "portable_data_hash": col["portable_data_hash"],
551 "manifest_text": col["manifest_text"],
552 "storage_classes_desired": col["storage_classes_desired"],
553 "trash_at": col["trash_at"]
554 }}, ensure_unique_name=True).execute()
555 except Exception as e:
556 logger.warning("Unable copy collection to destination: %s", e)
558 if "$schemas" in workflowobj:
560 for s in workflowobj["$schemas"]:
562 sch.append(mapper.mapper(s).resolved)
563 workflowobj["$schemas"] = sch
568 def upload_docker(arvrunner, tool, runtimeContext):
569 """Uploads Docker images used in CommandLineTool objects."""
571 if isinstance(tool, CommandLineTool):
572 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
574 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
575 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
576 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
578 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
579 runtimeContext.project_uuid,
580 runtimeContext.force_docker_pull,
581 runtimeContext.tmp_outdir_prefix,
582 runtimeContext.match_local_docker,
583 runtimeContext.copy_deps)
585 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
587 runtimeContext.project_uuid,
588 runtimeContext.force_docker_pull,
589 runtimeContext.tmp_outdir_prefix,
590 runtimeContext.match_local_docker,
591 runtimeContext.copy_deps)
592 elif isinstance(tool, cwltool.workflow.Workflow):
594 upload_docker(arvrunner, s.embedded_tool, runtimeContext)
597 def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
598 """Create a packed workflow.
600 A "packed" workflow is one where all the components have been combined into a single document."""
603 packed = pack(arvrunner.loadingContext, tool.tool["id"],
604 rewrite_out=rewrites,
605 loader=tool.doc_loader)
607 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
609 def visit(v, cur_id):
610 if isinstance(v, dict):
611 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
612 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
613 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
615 cur_id = rewrite_to_orig.get(v["id"], v["id"])
616 if "path" in v and "location" not in v:
617 v["location"] = v["path"]
619 if "location" in v and cur_id in merged_map:
620 if v["location"] in merged_map[cur_id].resolved:
621 v["location"] = merged_map[cur_id].resolved[v["location"]]
622 if v["location"] in merged_map[cur_id].secondaryFiles:
623 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
624 if v.get("class") == "DockerRequirement":
625 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
626 runtimeContext.project_uuid,
627 runtimeContext.force_docker_pull,
628 runtimeContext.tmp_outdir_prefix,
629 runtimeContext.match_local_docker,
630 runtimeContext.copy_deps)
633 if isinstance(v, list):
640 def tag_git_version(packed):
641 if tool.tool["id"].startswith("file://"):
642 path = os.path.dirname(tool.tool["id"][7:])
644 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
645 except (OSError, subprocess.CalledProcessError):
648 packed["http://schema.org/version"] = githash
651 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
652 """Upload local files referenced in the input object and return updated input
653 object with 'location' updated to the proper keep references.
656 # Make a copy of the job order and set defaults.
657 builder_job_order = copy.copy(job_order)
659 # fill_in_defaults throws an error if there are any
660 # missing required parameters, we don't want it to do that
661 # so make them all optional.
662 inputs_copy = copy.deepcopy(tool.tool["inputs"])
663 for i in inputs_copy:
664 if "null" not in i["type"]:
665 i["type"] = ["null"] + aslist(i["type"])
667 fill_in_defaults(inputs_copy,
670 # Need to create a builder object to evaluate expressions.
671 builder = make_builder(builder_job_order,
676 # Now update job_order with secondaryFiles
677 discover_secondary_files(arvrunner.fs_access,
682 jobmapper = upload_dependencies(arvrunner,
686 job_order.get("id", "#"),
690 if "id" in job_order:
693 # Need to filter this out, gets added by cwltool when providing
694 # parameters on the command line.
695 if "job_order" in job_order:
696 del job_order["job_order"]
700 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
702 def upload_workflow_deps(arvrunner, tool, runtimeContext):
703 # Ensure that Docker images needed by this workflow are available
705 with Perf(metrics, "upload_docker"):
706 upload_docker(arvrunner, tool, runtimeContext)
708 document_loader = tool.doc_loader
712 def upload_tool_deps(deptool):
714 discovered_secondaryfiles = {}
715 with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
716 pm = upload_dependencies(arvrunner,
717 "%s dependencies" % (shortname(deptool["id"])),
723 include_primary=False,
724 discovered_secondaryfiles=discovered_secondaryfiles,
725 cache=tool_dep_cache)
726 document_loader.idx[deptool["id"]] = deptool
728 for k,v in pm.items():
729 toolmap[k] = v.resolved
730 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
732 tool.visit(upload_tool_deps)
736 def arvados_jobs_image(arvrunner, img, runtimeContext):
737 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
740 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
742 runtimeContext.project_uuid,
743 runtimeContext.force_docker_pull,
744 runtimeContext.tmp_outdir_prefix,
745 runtimeContext.match_local_docker,
746 runtimeContext.copy_deps)
747 except Exception as e:
748 raise Exception("Docker image %s is not available\n%s" % (img, e) )
751 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
752 collection = arvados.collection.Collection(api_client=arvrunner.api,
753 keep_client=arvrunner.keep_client,
754 num_retries=arvrunner.num_retries)
755 with collection.open("workflow.cwl", "w") as f:
756 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
758 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
759 ["name", "like", name+"%"]]
760 if runtimeContext.project_uuid:
761 filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
762 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
765 logger.info("Using collection %s", exists["items"][0]["uuid"])
767 collection.save_new(name=name,
768 owner_uuid=runtimeContext.project_uuid,
769 ensure_unique_name=True,
770 num_retries=arvrunner.num_retries)
771 logger.info("Uploaded to %s", collection.manifest_locator())
773 return collection.portable_data_hash()
776 class Runner(Process):
777 """Base class for runner processes, which submit an instance of
778 arvados-cwl-runner and wait for the final result."""
780 def __init__(self, runner, updated_tool,
781 tool, loadingContext, enable_reuse,
782 output_name, output_tags, submit_runner_ram=0,
783 name=None, on_error=None, submit_runner_image=None,
784 intermediate_output_ttl=0, merged_map=None,
785 priority=None, secret_store=None,
786 collection_cache_size=256,
787 collection_cache_is_default=True):
789 loadingContext = loadingContext.copy()
790 loadingContext.metadata = updated_tool.metadata.copy()
792 super(Runner, self).__init__(updated_tool.tool, loadingContext)
794 self.arvrunner = runner
795 self.embedded_tool = tool
796 self.job_order = None
799 # If reuse is permitted by command line arguments but
800 # disabled by the workflow itself, disable it.
801 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
803 enable_reuse = reuse_req["enableReuse"]
804 self.enable_reuse = enable_reuse
806 self.final_output = None
807 self.output_name = output_name
808 self.output_tags = output_tags
810 self.on_error = on_error
811 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
812 self.intermediate_output_ttl = intermediate_output_ttl
813 self.priority = priority
814 self.secret_store = secret_store
815 self.enable_dev = loadingContext.enable_dev
817 self.submit_runner_cores = 1
818 self.submit_runner_ram = 1024 # defaut 1 GiB
819 self.collection_cache_size = collection_cache_size
821 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
822 if runner_resource_req:
823 if runner_resource_req.get("coresMin"):
824 self.submit_runner_cores = runner_resource_req["coresMin"]
825 if runner_resource_req.get("ramMin"):
826 self.submit_runner_ram = runner_resource_req["ramMin"]
827 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
828 self.collection_cache_size = runner_resource_req["keep_cache"]
830 if submit_runner_ram:
831 # Command line / initializer overrides default and/or spec from workflow
832 self.submit_runner_ram = submit_runner_ram
834 if self.submit_runner_ram <= 0:
835 raise Exception("Value of submit-runner-ram must be greater than zero")
837 if self.submit_runner_cores <= 0:
838 raise Exception("Value of submit-runner-cores must be greater than zero")
840 self.merged_map = merged_map or {}
843 job_order, # type: Mapping[Text, Text]
844 output_callbacks, # type: Callable[[Any, Any], Any]
845 runtimeContext # type: RuntimeContext
846 ): # type: (...) -> Generator[Any, None, None]
847 self.job_order = job_order
848 self._init_job(job_order, runtimeContext)
851 def update_pipeline_component(self, record):
854 def done(self, record):
855 """Base method for handling a completed runner."""
858 if record["state"] == "Complete":
859 if record.get("exit_code") is not None:
860 if record["exit_code"] == 33:
861 processStatus = "UnsupportedRequirement"
862 elif record["exit_code"] == 0:
863 processStatus = "success"
865 processStatus = "permanentFail"
867 processStatus = "success"
869 processStatus = "permanentFail"
873 if processStatus == "permanentFail":
874 logc = arvados.collection.CollectionReader(record["log"],
875 api_client=self.arvrunner.api,
876 keep_client=self.arvrunner.keep_client,
877 num_retries=self.arvrunner.num_retries)
878 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
880 self.final_output = record["output"]
881 outc = arvados.collection.CollectionReader(self.final_output,
882 api_client=self.arvrunner.api,
883 keep_client=self.arvrunner.keep_client,
884 num_retries=self.arvrunner.num_retries)
885 if "cwl.output.json" in outc:
886 with outc.open("cwl.output.json", "rb") as f:
888 outputs = json.loads(f.read().decode())
889 def keepify(fileobj):
890 path = fileobj["location"]
891 if not path.startswith("keep:"):
892 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
893 adjustFileObjs(outputs, keepify)
894 adjustDirObjs(outputs, keepify)
896 logger.exception("[%s] While getting final output object", self.name)
897 self.arvrunner.output_callback({}, "permanentFail")
899 self.arvrunner.output_callback(outputs, processStatus)