1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
39 from cwltool.utils import (
45 if os.name == "posix" and sys.version_info[0] < 3:
46 import subprocess32 as subprocess
50 from schema_salad.sourceline import SourceLine, cmap
52 from cwltool.command_line_tool import CommandLineTool
53 import cwltool.workflow
54 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
55 shortname, Process, fill_in_defaults)
56 from cwltool.load_tool import fetch_document
57 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
58 from cwltool.builder import substitute
59 from cwltool.pack import pack
60 from cwltool.update import INTERNAL_VERSION
61 from cwltool.builder import Builder
62 import schema_salad.validate as validate
64 import arvados.collection
66 from .util import collectionUUID
67 from ruamel.yaml import YAML
68 from ruamel.yaml.comments import CommentedMap, CommentedSeq
70 import arvados_cwl.arvdocker
71 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
72 from ._version import __version__
74 from . context import ArvRuntimeContext
75 from .perf import Perf
77 logger = logging.getLogger('arvados.cwl-runner')
78 metrics = logging.getLogger('arvados.cwl-runner.metrics')
80 def trim_anonymous_location(obj):
81 """Remove 'location' field from File and Directory literals.
83 To make internal handling easier, literals are assigned a random id for
84 'location'. However, when writing the record back out, this can break
85 reproducibility. Since it is valid for literals not have a 'location'
90 if obj.get("location", "").startswith("_:"):
94 def remove_redundant_fields(obj):
95 for field in ("path", "nameext", "nameroot", "dirname"):
100 def find_defaults(d, op):
101 if isinstance(d, list):
104 elif isinstance(d, dict):
108 for i in viewvalues(d):
111 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
114 files=[], # type: List[Dict[Text, Text]]
115 bindings=[], # type: List[Dict[Text, Any]]
116 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
117 names=None, # type: Names
118 requirements=requirements, # type: List[Dict[Text, Any]]
119 hints=hints, # type: List[Dict[Text, Any]]
120 resources={}, # type: Dict[str, int]
121 mutation_manager=None, # type: Optional[MutationManager]
122 formatgraph=None, # type: Optional[Graph]
123 make_fs_access=None, # type: Type[StdFsAccess]
124 fs_access=None, # type: StdFsAccess
125 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
126 timeout=runtimeContext.eval_timeout, # type: float
127 debug=runtimeContext.debug, # type: bool
128 js_console=runtimeContext.js_console, # type: bool
129 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
130 loadListing="", # type: Text
131 outdir="", # type: Text
132 tmpdir="", # type: Text
133 stagedir="", # type: Text
134 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
135 container_engine="docker"
138 def search_schemadef(name, reqs):
140 if r["class"] == "SchemaDefRequirement":
141 for sd in r["types"]:
142 if sd["name"] == name:
146 primitive_types_set = frozenset(("null", "boolean", "int", "long",
147 "float", "double", "string", "record",
150 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
151 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
152 # union type, collect all possible secondaryFiles
153 for i in inputschema:
154 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
157 if inputschema == "File":
158 inputschema = {"type": "File"}
160 if isinstance(inputschema, basestring):
161 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
167 if "secondaryFiles" in inputschema:
168 # set secondaryFiles, may be inherited by compound types.
169 secondaryspec = inputschema["secondaryFiles"]
171 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
172 not isinstance(inputschema["type"], basestring)):
173 # compound type (union, array, record)
174 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
176 elif (inputschema["type"] == "record" and
177 isinstance(primary, Mapping)):
179 # record type, find secondary files associated with fields.
181 for f in inputschema["fields"]:
182 p = primary.get(shortname(f["name"]))
184 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
186 elif (inputschema["type"] == "array" and
187 isinstance(primary, Sequence)):
189 # array type, find secondary files of elements
192 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
194 elif (inputschema["type"] == "File" and
195 isinstance(primary, Mapping) and
196 primary.get("class") == "File"):
198 if "secondaryFiles" in primary or not secondaryspec:
203 # Found a file, check for secondaryFiles
206 primary["secondaryFiles"] = secondaryspec
207 for i, sf in enumerate(aslist(secondaryspec)):
208 if builder.cwlVersion == "v1.0":
211 pattern = sf["pattern"]
214 if isinstance(pattern, list):
215 specs.extend(pattern)
216 elif isinstance(pattern, dict):
217 specs.append(pattern)
218 elif isinstance(pattern, str):
219 if builder.cwlVersion == "v1.0":
220 specs.append({"pattern": pattern, "required": True})
222 specs.append({"pattern": pattern, "required": sf.get("required")})
224 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
225 "Expression must return list, object, string or null")
228 for i, sf in enumerate(specs):
229 if isinstance(sf, dict):
230 if sf.get("class") == "File":
232 if sf.get("location") is None:
233 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
234 "File object is missing 'location': %s" % sf)
235 sfpath = sf["location"]
238 pattern = sf["pattern"]
239 required = sf.get("required")
240 elif isinstance(sf, str):
244 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
245 "Expression must return list, object, string or null")
247 if pattern is not None:
248 if "${" in pattern or "$(" in pattern:
249 sfname = builder.do_eval(pattern, context=primary)
251 sfname = substitute(primary["basename"], pattern)
256 p_location = primary["location"]
257 if "/" in p_location:
259 p_location[0 : p_location.rindex("/") + 1]
263 required = builder.do_eval(required, context=primary)
265 if fsaccess.exists(sfpath):
266 if pattern is not None:
267 found.append({"location": sfpath, "class": "File"})
271 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
272 "Required secondary file '%s' does not exist" % sfpath)
274 primary["secondaryFiles"] = cmap(found)
275 if discovered is not None:
276 discovered[primary["location"]] = primary["secondaryFiles"]
277 elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
278 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
280 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
281 for inputschema in inputs:
282 primary = job_order.get(shortname(inputschema["id"]))
283 if isinstance(primary, (Mapping, Sequence)):
284 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
286 def upload_dependencies(arvrunner, name, document_loader,
287 workflowobj, uri, loadref_run, runtimeContext,
288 include_primary=True, discovered_secondaryfiles=None,
290 """Upload the dependencies of the workflowobj document to Keep.
292 Returns a pathmapper object mapping local paths to keep references. Also
293 does an in-place update of references in "workflowobj".
295 Use scandeps to find $import, $include, $schemas, run, File and Directory
296 fields that represent external references.
298 If workflowobj has an "id" field, this will reload the document to ensure
299 it is scanning the raw document prior to preprocessing.
304 joined = document_loader.fetcher.urljoin(b, u)
305 defrg, _ = urllib.parse.urldefrag(joined)
306 if defrg not in loaded:
308 if cache is not None and defrg in cache:
310 # Use fetch_text to get raw file (before preprocessing).
311 text = document_loader.fetch_text(defrg)
312 if isinstance(text, bytes):
313 textIO = StringIO(text.decode('utf-8'))
315 textIO = StringIO(text)
316 yamlloader = YAML(typ='safe', pure=True)
317 result = yamlloader.load(textIO)
318 if cache is not None:
319 cache[defrg] = result
325 loadref_fields = set(("$import", "run"))
327 loadref_fields = set(("$import",))
329 scanobj = workflowobj
330 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
331 defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
332 if cache is not None and defrg not in cache:
333 # if we haven't seen this file before, want raw file
334 # content (before preprocessing) to ensure that external
335 # references like $include haven't already been inlined.
336 scanobj = loadref("", workflowobj["id"])
340 with Perf(metrics, "scandeps include, location"):
341 sc_result = scandeps(uri, scanobj,
343 set(("$include", "location")),
344 loadref, urljoin=document_loader.fetcher.urljoin,
347 with Perf(metrics, "scandeps $schemas"):
348 optional_deps = scandeps(uri, scanobj,
351 loadref, urljoin=document_loader.fetcher.urljoin,
354 if sc_result is None:
357 if optional_deps is None:
361 sc_result.extend(optional_deps)
366 def collect_uuids(obj):
367 loc = obj.get("location", "")
370 # Collect collection uuids that need to be resolved to
371 # portable data hashes
372 gp = collection_uuid_pattern.match(loc)
374 uuids[gp.groups()[0]] = obj
375 if collectionUUID in obj:
376 uuids[obj[collectionUUID]] = obj
378 def collect_uploads(obj):
379 loc = obj.get("location", "")
383 if sp[0] in ("file", "http", "https"):
384 # Record local files than need to be uploaded,
385 # don't include file literals, keep references, etc.
389 with Perf(metrics, "collect uuids"):
390 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
392 with Perf(metrics, "collect uploads"):
393 visit_class(sc_result, ("File", "Directory"), collect_uploads)
395 # Resolve any collection uuids we found to portable data hashes
396 # and assign them to uuid_map
398 fetch_uuids = list(uuids.keys())
399 with Perf(metrics, "fetch_uuids"):
401 # For a large number of fetch_uuids, API server may limit
402 # response size, so keep fetching from API server has nothing
404 lookups = arvrunner.api.collections().list(
405 filters=[["uuid", "in", fetch_uuids]],
407 select=["uuid", "portable_data_hash"]).execute(
408 num_retries=arvrunner.num_retries)
410 if not lookups["items"]:
413 for l in lookups["items"]:
414 uuid_map[l["uuid"]] = l["portable_data_hash"]
416 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
418 normalizeFilesDirs(sc)
420 if include_primary and "id" in workflowobj:
421 sc.append({"class": "File", "location": workflowobj["id"]})
423 def visit_default(obj):
424 def defaults_are_optional(f):
425 if "location" not in f and "path" in f:
426 f["location"] = f["path"]
428 normalizeFilesDirs(f)
429 optional_deps.append(f)
430 visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
432 find_defaults(workflowobj, visit_default)
435 def discover_default_secondary_files(obj):
436 builder_job_order = {}
437 for t in obj["inputs"]:
438 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
439 # Need to create a builder object to evaluate expressions.
440 builder = make_builder(builder_job_order,
441 obj.get("hints", []),
442 obj.get("requirements", []),
445 discover_secondary_files(arvrunner.fs_access,
451 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
452 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
454 for d in list(discovered):
455 # Only interested in discovered secondaryFiles which are local
456 # files that need to be uploaded.
457 if d.startswith("file:"):
458 sc.extend(discovered[d])
462 with Perf(metrics, "mapper"):
463 mapper = ArvPathMapper(arvrunner, sc, "",
467 single_collection=True,
468 optional_deps=optional_deps)
472 if k.startswith("keep:"):
473 keeprefs.add(collection_pdh_pattern.match(k).group(1))
476 loc = p.get("location")
477 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
478 p["location"] = mapper.mapper(p["location"]).resolved
479 addkeepref(p["location"])
485 if collectionUUID in p:
486 uuid = p[collectionUUID]
487 if uuid not in uuid_map:
488 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
489 "Collection uuid %s not found" % uuid)
490 gp = collection_pdh_pattern.match(loc)
491 if gp and uuid_map[uuid] != gp.groups()[0]:
492 # This file entry has both collectionUUID and a PDH
493 # location. If the PDH doesn't match the one returned
494 # the API server, raise an error.
495 raise SourceLine(p, "location", validate.ValidationException).makeError(
496 "Expected collection uuid %s to be %s but API server reported %s" % (
497 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
499 gp = collection_uuid_pattern.match(loc)
501 # Not a uuid pattern (must be a pdh pattern)
502 addkeepref(p["location"])
505 uuid = gp.groups()[0]
506 if uuid not in uuid_map:
507 raise SourceLine(p, "location", validate.ValidationException).makeError(
508 "Collection uuid %s not found" % uuid)
509 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
510 p[collectionUUID] = uuid
512 with Perf(metrics, "setloc"):
513 visit_class(workflowobj, ("File", "Directory"), setloc)
514 visit_class(discovered, ("File", "Directory"), setloc)
516 if discovered_secondaryfiles is not None:
518 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
520 if runtimeContext.copy_deps:
521 # Find referenced collections and copy them into the
522 # destination project, for easy sharing.
523 already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
524 filters=[["portable_data_hash", "in", list(keeprefs)],
525 ["owner_uuid", "=", runtimeContext.project_uuid]],
526 select=["uuid", "portable_data_hash", "created_at"]))
528 keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
530 col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
531 order="created_at desc",
532 select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
534 if len(col["items"]) == 0:
535 logger.warning("Cannot find collection with portable data hash %s", kr)
537 col = col["items"][0]
539 arvrunner.api.collections().create(body={"collection": {
540 "owner_uuid": runtimeContext.project_uuid,
542 "description": col["description"],
543 "properties": col["properties"],
544 "portable_data_hash": col["portable_data_hash"],
545 "manifest_text": col["manifest_text"],
546 "storage_classes_desired": col["storage_classes_desired"],
547 "trash_at": col["trash_at"]
548 }}, ensure_unique_name=True).execute()
549 except Exception as e:
550 logger.warning("Unable copy collection to destination: %s", e)
552 if "$schemas" in workflowobj:
554 for s in workflowobj["$schemas"]:
556 sch.append(mapper.mapper(s).resolved)
557 workflowobj["$schemas"] = sch
562 def upload_docker(arvrunner, tool, runtimeContext):
563 """Uploads Docker images used in CommandLineTool objects."""
565 if isinstance(tool, CommandLineTool):
566 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
568 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
569 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
570 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
572 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
573 runtimeContext.project_uuid,
574 runtimeContext.force_docker_pull,
575 runtimeContext.tmp_outdir_prefix,
576 runtimeContext.match_local_docker,
577 runtimeContext.copy_deps)
579 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
581 runtimeContext.project_uuid,
582 runtimeContext.force_docker_pull,
583 runtimeContext.tmp_outdir_prefix,
584 runtimeContext.match_local_docker,
585 runtimeContext.copy_deps)
586 elif isinstance(tool, cwltool.workflow.Workflow):
588 upload_docker(arvrunner, s.embedded_tool, runtimeContext)
591 def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
592 """Create a packed workflow.
594 A "packed" workflow is one where all the components have been combined into a single document."""
597 packed = pack(arvrunner.loadingContext, tool.tool["id"],
598 rewrite_out=rewrites,
599 loader=tool.doc_loader)
601 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
603 def visit(v, cur_id):
604 if isinstance(v, dict):
605 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
606 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
607 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
609 cur_id = rewrite_to_orig.get(v["id"], v["id"])
610 if "path" in v and "location" not in v:
611 v["location"] = v["path"]
613 if "location" in v and cur_id in merged_map:
614 if v["location"] in merged_map[cur_id].resolved:
615 v["location"] = merged_map[cur_id].resolved[v["location"]]
616 if v["location"] in merged_map[cur_id].secondaryFiles:
617 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
618 if v.get("class") == "DockerRequirement":
619 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
620 runtimeContext.project_uuid,
621 runtimeContext.force_docker_pull,
622 runtimeContext.tmp_outdir_prefix,
623 runtimeContext.match_local_docker,
624 runtimeContext.copy_deps)
627 if isinstance(v, list):
634 def tag_git_version(packed):
635 if tool.tool["id"].startswith("file://"):
636 path = os.path.dirname(tool.tool["id"][7:])
638 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
639 except (OSError, subprocess.CalledProcessError):
642 packed["http://schema.org/version"] = githash
645 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
646 """Upload local files referenced in the input object and return updated input
647 object with 'location' updated to the proper keep references.
650 # Make a copy of the job order and set defaults.
651 builder_job_order = copy.copy(job_order)
653 # fill_in_defaults throws an error if there are any
654 # missing required parameters, we don't want it to do that
655 # so make them all optional.
656 inputs_copy = copy.deepcopy(tool.tool["inputs"])
657 for i in inputs_copy:
658 if "null" not in i["type"]:
659 i["type"] = ["null"] + aslist(i["type"])
661 fill_in_defaults(inputs_copy,
664 # Need to create a builder object to evaluate expressions.
665 builder = make_builder(builder_job_order,
670 # Now update job_order with secondaryFiles
671 discover_secondary_files(arvrunner.fs_access,
676 jobmapper = upload_dependencies(arvrunner,
680 job_order.get("id", "#"),
684 if "id" in job_order:
687 # Need to filter this out, gets added by cwltool when providing
688 # parameters on the command line.
689 if "job_order" in job_order:
690 del job_order["job_order"]
694 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
696 def upload_workflow_deps(arvrunner, tool, runtimeContext):
697 # Ensure that Docker images needed by this workflow are available
699 with Perf(metrics, "upload_docker"):
700 upload_docker(arvrunner, tool, runtimeContext)
702 document_loader = tool.doc_loader
706 def upload_tool_deps(deptool):
708 discovered_secondaryfiles = {}
709 with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
710 pm = upload_dependencies(arvrunner,
711 "%s dependencies" % (shortname(deptool["id"])),
717 include_primary=False,
718 discovered_secondaryfiles=discovered_secondaryfiles,
719 cache=tool_dep_cache)
720 document_loader.idx[deptool["id"]] = deptool
722 for k,v in pm.items():
723 toolmap[k] = v.resolved
724 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
726 tool.visit(upload_tool_deps)
730 def arvados_jobs_image(arvrunner, img, runtimeContext):
731 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
734 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
736 runtimeContext.project_uuid,
737 runtimeContext.force_docker_pull,
738 runtimeContext.tmp_outdir_prefix,
739 runtimeContext.match_local_docker,
740 runtimeContext.copy_deps)
741 except Exception as e:
742 raise Exception("Docker image %s is not available\n%s" % (img, e) )
745 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
746 collection = arvados.collection.Collection(api_client=arvrunner.api,
747 keep_client=arvrunner.keep_client,
748 num_retries=arvrunner.num_retries)
749 with collection.open("workflow.cwl", "w") as f:
750 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
752 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
753 ["name", "like", name+"%"]]
754 if runtimeContext.project_uuid:
755 filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
756 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
759 logger.info("Using collection %s", exists["items"][0]["uuid"])
761 collection.save_new(name=name,
762 owner_uuid=runtimeContext.project_uuid,
763 ensure_unique_name=True,
764 num_retries=arvrunner.num_retries)
765 logger.info("Uploaded to %s", collection.manifest_locator())
767 return collection.portable_data_hash()
770 class Runner(Process):
771 """Base class for runner processes, which submit an instance of
772 arvados-cwl-runner and wait for the final result."""
774 def __init__(self, runner, updated_tool,
775 tool, loadingContext, enable_reuse,
776 output_name, output_tags, submit_runner_ram=0,
777 name=None, on_error=None, submit_runner_image=None,
778 intermediate_output_ttl=0, merged_map=None,
779 priority=None, secret_store=None,
780 collection_cache_size=256,
781 collection_cache_is_default=True):
783 loadingContext = loadingContext.copy()
784 loadingContext.metadata = updated_tool.metadata.copy()
786 super(Runner, self).__init__(updated_tool.tool, loadingContext)
788 self.arvrunner = runner
789 self.embedded_tool = tool
790 self.job_order = None
793 # If reuse is permitted by command line arguments but
794 # disabled by the workflow itself, disable it.
795 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
797 enable_reuse = reuse_req["enableReuse"]
798 self.enable_reuse = enable_reuse
800 self.final_output = None
801 self.output_name = output_name
802 self.output_tags = output_tags
804 self.on_error = on_error
805 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
806 self.intermediate_output_ttl = intermediate_output_ttl
807 self.priority = priority
808 self.secret_store = secret_store
809 self.enable_dev = loadingContext.enable_dev
811 self.submit_runner_cores = 1
812 self.submit_runner_ram = 1024 # defaut 1 GiB
813 self.collection_cache_size = collection_cache_size
815 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
816 if runner_resource_req:
817 if runner_resource_req.get("coresMin"):
818 self.submit_runner_cores = runner_resource_req["coresMin"]
819 if runner_resource_req.get("ramMin"):
820 self.submit_runner_ram = runner_resource_req["ramMin"]
821 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
822 self.collection_cache_size = runner_resource_req["keep_cache"]
824 if submit_runner_ram:
825 # Command line / initializer overrides default and/or spec from workflow
826 self.submit_runner_ram = submit_runner_ram
828 if self.submit_runner_ram <= 0:
829 raise Exception("Value of submit-runner-ram must be greater than zero")
831 if self.submit_runner_cores <= 0:
832 raise Exception("Value of submit-runner-cores must be greater than zero")
834 self.merged_map = merged_map or {}
837 job_order, # type: Mapping[Text, Text]
838 output_callbacks, # type: Callable[[Any, Any], Any]
839 runtimeContext # type: RuntimeContext
840 ): # type: (...) -> Generator[Any, None, None]
841 self.job_order = job_order
842 self._init_job(job_order, runtimeContext)
845 def update_pipeline_component(self, record):
848 def done(self, record):
849 """Base method for handling a completed runner."""
852 if record["state"] == "Complete":
853 if record.get("exit_code") is not None:
854 if record["exit_code"] == 33:
855 processStatus = "UnsupportedRequirement"
856 elif record["exit_code"] == 0:
857 processStatus = "success"
859 processStatus = "permanentFail"
861 processStatus = "success"
863 processStatus = "permanentFail"
867 if processStatus == "permanentFail":
868 logc = arvados.collection.CollectionReader(record["log"],
869 api_client=self.arvrunner.api,
870 keep_client=self.arvrunner.keep_client,
871 num_retries=self.arvrunner.num_retries)
872 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
874 self.final_output = record["output"]
875 outc = arvados.collection.CollectionReader(self.final_output,
876 api_client=self.arvrunner.api,
877 keep_client=self.arvrunner.keep_client,
878 num_retries=self.arvrunner.num_retries)
879 if "cwl.output.json" in outc:
880 with outc.open("cwl.output.json", "rb") as f:
882 outputs = json.loads(f.read().decode())
883 def keepify(fileobj):
884 path = fileobj["location"]
885 if not path.startswith("keep:"):
886 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
887 adjustFileObjs(outputs, keepify)
888 adjustDirObjs(outputs, keepify)
890 logger.exception("[%s] While getting final output object", self.name)
891 self.arvrunner.output_callback({}, "permanentFail")
893 self.arvrunner.output_callback(outputs, processStatus)