1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
39 from cwltool.utils import (
45 if os.name == "posix" and sys.version_info[0] < 3:
46 import subprocess32 as subprocess
50 from schema_salad.sourceline import SourceLine, cmap
52 from cwltool.command_line_tool import CommandLineTool
53 import cwltool.workflow
54 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
55 shortname, Process, fill_in_defaults)
56 from cwltool.load_tool import fetch_document
57 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
58 from cwltool.builder import substitute
59 from cwltool.pack import pack
60 from cwltool.update import INTERNAL_VERSION
61 from cwltool.builder import Builder
62 import schema_salad.validate as validate
64 import arvados.collection
66 from .util import collectionUUID
67 from ruamel.yaml import YAML
68 from ruamel.yaml.comments import CommentedMap, CommentedSeq
70 import arvados_cwl.arvdocker
71 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
72 from ._version import __version__
74 from . context import ArvRuntimeContext
75 from .perf import Perf
77 logger = logging.getLogger('arvados.cwl-runner')
78 metrics = logging.getLogger('arvados.cwl-runner.metrics')
80 def trim_anonymous_location(obj):
81 """Remove 'location' field from File and Directory literals.
83 To make internal handling easier, literals are assigned a random id for
84 'location'. However, when writing the record back out, this can break
85 reproducibility. Since it is valid for literals not have a 'location'
90 if obj.get("location", "").startswith("_:"):
94 def remove_redundant_fields(obj):
95 for field in ("path", "nameext", "nameroot", "dirname"):
100 def find_defaults(d, op):
101 if isinstance(d, list):
104 elif isinstance(d, dict):
108 for i in viewvalues(d):
111 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
114 files=[], # type: List[Dict[Text, Text]]
115 bindings=[], # type: List[Dict[Text, Any]]
116 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
117 names=None, # type: Names
118 requirements=requirements, # type: List[Dict[Text, Any]]
119 hints=hints, # type: List[Dict[Text, Any]]
120 resources={}, # type: Dict[str, int]
121 mutation_manager=None, # type: Optional[MutationManager]
122 formatgraph=None, # type: Optional[Graph]
123 make_fs_access=None, # type: Type[StdFsAccess]
124 fs_access=None, # type: StdFsAccess
125 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
126 timeout=runtimeContext.eval_timeout, # type: float
127 debug=runtimeContext.debug, # type: bool
128 js_console=runtimeContext.js_console, # type: bool
129 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
130 loadListing="", # type: Text
131 outdir="", # type: Text
132 tmpdir="", # type: Text
133 stagedir="", # type: Text
134 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
135 container_engine="docker"
138 def search_schemadef(name, reqs):
140 if r["class"] == "SchemaDefRequirement":
141 for sd in r["types"]:
142 if sd["name"] == name:
146 primitive_types_set = frozenset(("null", "boolean", "int", "long",
147 "float", "double", "string", "record",
150 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
151 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
152 # union type, collect all possible secondaryFiles
153 for i in inputschema:
154 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
157 if inputschema == "File":
158 inputschema = {"type": "File"}
160 if isinstance(inputschema, basestring):
161 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
167 if "secondaryFiles" in inputschema:
168 # set secondaryFiles, may be inherited by compound types.
169 secondaryspec = inputschema["secondaryFiles"]
171 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
172 not isinstance(inputschema["type"], basestring)):
173 # compound type (union, array, record)
174 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
176 elif (inputschema["type"] == "record" and
177 isinstance(primary, Mapping)):
179 # record type, find secondary files associated with fields.
181 for f in inputschema["fields"]:
182 p = primary.get(shortname(f["name"]))
184 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
186 elif (inputschema["type"] == "array" and
187 isinstance(primary, Sequence)):
189 # array type, find secondary files of elements
192 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
194 elif (inputschema["type"] == "File" and
195 isinstance(primary, Mapping) and
196 primary.get("class") == "File"):
198 if "secondaryFiles" in primary or not secondaryspec:
203 # Found a file, check for secondaryFiles
206 primary["secondaryFiles"] = secondaryspec
207 for i, sf in enumerate(aslist(secondaryspec)):
208 if builder.cwlVersion == "v1.0":
211 pattern = sf["pattern"]
214 if isinstance(pattern, list):
215 specs.extend(pattern)
216 elif isinstance(pattern, dict):
217 specs.append(pattern)
218 elif isinstance(pattern, str):
219 if builder.cwlVersion == "v1.0":
220 specs.append({"pattern": pattern, "required": True})
222 specs.append({"pattern": pattern, "required": sf.get("required")})
224 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
225 "Expression must return list, object, string or null")
228 for i, sf in enumerate(specs):
229 if isinstance(sf, dict):
230 if sf.get("class") == "File":
232 if sf.get("location") is None:
233 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
234 "File object is missing 'location': %s" % sf)
235 sfpath = sf["location"]
238 pattern = sf["pattern"]
239 required = sf.get("required")
240 elif isinstance(sf, str):
244 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
245 "Expression must return list, object, string or null")
247 if pattern is not None:
248 if "${" in pattern or "$(" in pattern:
249 sfname = builder.do_eval(pattern, context=primary)
251 sfname = substitute(primary["basename"], pattern)
256 if isinstance(sfname, str):
257 p_location = primary["location"]
258 if "/" in p_location:
260 p_location[0 : p_location.rindex("/") + 1]
264 required = builder.do_eval(required, context=primary)
266 if isinstance(sfname, list) or isinstance(sfname, dict):
267 each = aslist(sfname)
269 if required and not fsaccess.exists(e.get("location")):
270 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
271 "Required secondary file '%s' does not exist" % e.get("location"))
274 if isinstance(sfname, str):
275 if fsaccess.exists(sfpath):
276 if pattern is not None:
277 found.append({"location": sfpath, "class": "File"})
281 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
282 "Required secondary file '%s' does not exist" % sfpath)
284 primary["secondaryFiles"] = cmap(found)
285 if discovered is not None:
286 discovered[primary["location"]] = primary["secondaryFiles"]
287 elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
288 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
290 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
291 for inputschema in inputs:
292 primary = job_order.get(shortname(inputschema["id"]))
293 if isinstance(primary, (Mapping, Sequence)):
294 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
296 def upload_dependencies(arvrunner, name, document_loader,
297 workflowobj, uri, loadref_run, runtimeContext,
298 include_primary=True, discovered_secondaryfiles=None,
300 """Upload the dependencies of the workflowobj document to Keep.
302 Returns a pathmapper object mapping local paths to keep references. Also
303 does an in-place update of references in "workflowobj".
305 Use scandeps to find $import, $include, $schemas, run, File and Directory
306 fields that represent external references.
308 If workflowobj has an "id" field, this will reload the document to ensure
309 it is scanning the raw document prior to preprocessing.
314 joined = document_loader.fetcher.urljoin(b, u)
315 defrg, _ = urllib.parse.urldefrag(joined)
316 if defrg not in loaded:
318 if cache is not None and defrg in cache:
320 # Use fetch_text to get raw file (before preprocessing).
321 text = document_loader.fetch_text(defrg)
322 if isinstance(text, bytes):
323 textIO = StringIO(text.decode('utf-8'))
325 textIO = StringIO(text)
326 yamlloader = YAML(typ='safe', pure=True)
327 result = yamlloader.load(textIO)
328 if cache is not None:
329 cache[defrg] = result
335 loadref_fields = set(("$import", "run"))
337 loadref_fields = set(("$import",))
339 scanobj = workflowobj
340 if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
341 defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
342 if cache is not None and defrg not in cache:
343 # if we haven't seen this file before, want raw file
344 # content (before preprocessing) to ensure that external
345 # references like $include haven't already been inlined.
346 scanobj = loadref("", workflowobj["id"])
350 with Perf(metrics, "scandeps include, location"):
351 sc_result = scandeps(uri, scanobj,
353 set(("$include", "location")),
354 loadref, urljoin=document_loader.fetcher.urljoin,
357 with Perf(metrics, "scandeps $schemas"):
358 optional_deps = scandeps(uri, scanobj,
361 loadref, urljoin=document_loader.fetcher.urljoin,
364 if sc_result is None:
367 if optional_deps is None:
371 sc_result.extend(optional_deps)
376 def collect_uuids(obj):
377 loc = obj.get("location", "")
380 # Collect collection uuids that need to be resolved to
381 # portable data hashes
382 gp = collection_uuid_pattern.match(loc)
384 uuids[gp.groups()[0]] = obj
385 if collectionUUID in obj:
386 uuids[obj[collectionUUID]] = obj
388 def collect_uploads(obj):
389 loc = obj.get("location", "")
393 if sp[0] in ("file", "http", "https"):
394 # Record local files than need to be uploaded,
395 # don't include file literals, keep references, etc.
399 with Perf(metrics, "collect uuids"):
400 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
402 with Perf(metrics, "collect uploads"):
403 visit_class(sc_result, ("File", "Directory"), collect_uploads)
405 # Resolve any collection uuids we found to portable data hashes
406 # and assign them to uuid_map
408 fetch_uuids = list(uuids.keys())
409 with Perf(metrics, "fetch_uuids"):
411 # For a large number of fetch_uuids, API server may limit
412 # response size, so keep fetching from API server has nothing
414 lookups = arvrunner.api.collections().list(
415 filters=[["uuid", "in", fetch_uuids]],
417 select=["uuid", "portable_data_hash"]).execute(
418 num_retries=arvrunner.num_retries)
420 if not lookups["items"]:
423 for l in lookups["items"]:
424 uuid_map[l["uuid"]] = l["portable_data_hash"]
426 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
428 normalizeFilesDirs(sc)
430 if "id" in workflowobj:
431 defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
433 # make sure it's included
434 sc.append({"class": "File", "location": defrg})
436 # make sure it's excluded
437 sc = [d for d in sc if d.get("location") != defrg]
439 def visit_default(obj):
440 def defaults_are_optional(f):
441 if "location" not in f and "path" in f:
442 f["location"] = f["path"]
444 normalizeFilesDirs(f)
445 optional_deps.append(f)
446 visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
448 find_defaults(workflowobj, visit_default)
451 def discover_default_secondary_files(obj):
452 builder_job_order = {}
453 for t in obj["inputs"]:
454 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
455 # Need to create a builder object to evaluate expressions.
456 builder = make_builder(builder_job_order,
457 obj.get("hints", []),
458 obj.get("requirements", []),
461 discover_secondary_files(arvrunner.fs_access,
467 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
468 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
470 for d in list(discovered):
471 # Only interested in discovered secondaryFiles which are local
472 # files that need to be uploaded.
473 if d.startswith("file:"):
474 sc.extend(discovered[d])
478 with Perf(metrics, "mapper"):
479 mapper = ArvPathMapper(arvrunner, sc, "",
483 single_collection=True,
484 optional_deps=optional_deps)
488 if k.startswith("keep:"):
489 keeprefs.add(collection_pdh_pattern.match(k).group(1))
492 loc = p.get("location")
493 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
494 p["location"] = mapper.mapper(p["location"]).resolved
495 addkeepref(p["location"])
501 if collectionUUID in p:
502 uuid = p[collectionUUID]
503 if uuid not in uuid_map:
504 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
505 "Collection uuid %s not found" % uuid)
506 gp = collection_pdh_pattern.match(loc)
507 if gp and uuid_map[uuid] != gp.groups()[0]:
508 # This file entry has both collectionUUID and a PDH
509 # location. If the PDH doesn't match the one returned
510 # the API server, raise an error.
511 raise SourceLine(p, "location", validate.ValidationException).makeError(
512 "Expected collection uuid %s to be %s but API server reported %s" % (
513 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
515 gp = collection_uuid_pattern.match(loc)
517 # Not a uuid pattern (must be a pdh pattern)
518 addkeepref(p["location"])
521 uuid = gp.groups()[0]
522 if uuid not in uuid_map:
523 raise SourceLine(p, "location", validate.ValidationException).makeError(
524 "Collection uuid %s not found" % uuid)
525 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
526 p[collectionUUID] = uuid
528 with Perf(metrics, "setloc"):
529 visit_class(workflowobj, ("File", "Directory"), setloc)
530 visit_class(discovered, ("File", "Directory"), setloc)
532 if discovered_secondaryfiles is not None:
534 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
536 if runtimeContext.copy_deps:
537 # Find referenced collections and copy them into the
538 # destination project, for easy sharing.
539 already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
540 filters=[["portable_data_hash", "in", list(keeprefs)],
541 ["owner_uuid", "=", runtimeContext.project_uuid]],
542 select=["uuid", "portable_data_hash", "created_at"]))
544 keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
546 col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
547 order="created_at desc",
548 select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
550 if len(col["items"]) == 0:
551 logger.warning("Cannot find collection with portable data hash %s", kr)
553 col = col["items"][0]
555 arvrunner.api.collections().create(body={"collection": {
556 "owner_uuid": runtimeContext.project_uuid,
558 "description": col["description"],
559 "properties": col["properties"],
560 "portable_data_hash": col["portable_data_hash"],
561 "manifest_text": col["manifest_text"],
562 "storage_classes_desired": col["storage_classes_desired"],
563 "trash_at": col["trash_at"]
564 }}, ensure_unique_name=True).execute()
565 except Exception as e:
566 logger.warning("Unable copy collection to destination: %s", e)
568 if "$schemas" in workflowobj:
570 for s in workflowobj["$schemas"]:
572 sch.append(mapper.mapper(s).resolved)
573 workflowobj["$schemas"] = sch
578 def upload_docker(arvrunner, tool, runtimeContext):
579 """Uploads Docker images used in CommandLineTool objects."""
581 if isinstance(tool, CommandLineTool):
582 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
584 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
585 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
586 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
588 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
589 runtimeContext.project_uuid,
590 runtimeContext.force_docker_pull,
591 runtimeContext.tmp_outdir_prefix,
592 runtimeContext.match_local_docker,
593 runtimeContext.copy_deps)
595 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
597 runtimeContext.project_uuid,
598 runtimeContext.force_docker_pull,
599 runtimeContext.tmp_outdir_prefix,
600 runtimeContext.match_local_docker,
601 runtimeContext.copy_deps)
602 elif isinstance(tool, cwltool.workflow.Workflow):
604 upload_docker(arvrunner, s.embedded_tool, runtimeContext)
607 def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
608 """Create a packed workflow.
610 A "packed" workflow is one where all the components have been combined into a single document."""
613 packed = pack(arvrunner.loadingContext, tool.tool["id"],
614 rewrite_out=rewrites,
615 loader=tool.doc_loader)
617 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
619 def visit(v, cur_id):
620 if isinstance(v, dict):
621 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
622 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
623 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
625 cur_id = rewrite_to_orig.get(v["id"], v["id"])
626 if "path" in v and "location" not in v:
627 v["location"] = v["path"]
629 if "location" in v and cur_id in merged_map:
630 if v["location"] in merged_map[cur_id].resolved:
631 v["location"] = merged_map[cur_id].resolved[v["location"]]
632 if v["location"] in merged_map[cur_id].secondaryFiles:
633 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
634 if v.get("class") == "DockerRequirement":
635 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
636 runtimeContext.project_uuid,
637 runtimeContext.force_docker_pull,
638 runtimeContext.tmp_outdir_prefix,
639 runtimeContext.match_local_docker,
640 runtimeContext.copy_deps)
643 if isinstance(v, list):
650 def tag_git_version(packed):
651 if tool.tool["id"].startswith("file://"):
652 path = os.path.dirname(tool.tool["id"][7:])
654 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
655 except (OSError, subprocess.CalledProcessError):
658 packed["http://schema.org/version"] = githash
661 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
662 """Upload local files referenced in the input object and return updated input
663 object with 'location' updated to the proper keep references.
666 # Make a copy of the job order and set defaults.
667 builder_job_order = copy.copy(job_order)
669 # fill_in_defaults throws an error if there are any
670 # missing required parameters, we don't want it to do that
671 # so make them all optional.
672 inputs_copy = copy.deepcopy(tool.tool["inputs"])
673 for i in inputs_copy:
674 if "null" not in i["type"]:
675 i["type"] = ["null"] + aslist(i["type"])
677 fill_in_defaults(inputs_copy,
680 # Need to create a builder object to evaluate expressions.
681 builder = make_builder(builder_job_order,
686 # Now update job_order with secondaryFiles
687 discover_secondary_files(arvrunner.fs_access,
692 jobmapper = upload_dependencies(arvrunner,
696 job_order.get("id", "#"),
700 if "id" in job_order:
703 # Need to filter this out, gets added by cwltool when providing
704 # parameters on the command line.
705 if "job_order" in job_order:
706 del job_order["job_order"]
710 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
712 def upload_workflow_deps(arvrunner, tool, runtimeContext):
713 # Ensure that Docker images needed by this workflow are available
715 with Perf(metrics, "upload_docker"):
716 upload_docker(arvrunner, tool, runtimeContext)
718 document_loader = tool.doc_loader
722 def upload_tool_deps(deptool):
724 discovered_secondaryfiles = {}
725 with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
726 pm = upload_dependencies(arvrunner,
727 "%s dependencies" % (shortname(deptool["id"])),
733 include_primary=False,
734 discovered_secondaryfiles=discovered_secondaryfiles,
735 cache=tool_dep_cache)
736 document_loader.idx[deptool["id"]] = deptool
738 for k,v in pm.items():
739 toolmap[k] = v.resolved
740 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
742 tool.visit(upload_tool_deps)
746 def arvados_jobs_image(arvrunner, img, runtimeContext):
747 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
750 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
752 runtimeContext.project_uuid,
753 runtimeContext.force_docker_pull,
754 runtimeContext.tmp_outdir_prefix,
755 runtimeContext.match_local_docker,
756 runtimeContext.copy_deps)
757 except Exception as e:
758 raise Exception("Docker image %s is not available\n%s" % (img, e) )
761 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
762 collection = arvados.collection.Collection(api_client=arvrunner.api,
763 keep_client=arvrunner.keep_client,
764 num_retries=arvrunner.num_retries)
765 with collection.open("workflow.cwl", "w") as f:
766 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
768 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
769 ["name", "like", name+"%"]]
770 if runtimeContext.project_uuid:
771 filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
772 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
775 logger.info("Using collection %s", exists["items"][0]["uuid"])
777 collection.save_new(name=name,
778 owner_uuid=runtimeContext.project_uuid,
779 ensure_unique_name=True,
780 num_retries=arvrunner.num_retries)
781 logger.info("Uploaded to %s", collection.manifest_locator())
783 return collection.portable_data_hash()
786 class Runner(Process):
787 """Base class for runner processes, which submit an instance of
788 arvados-cwl-runner and wait for the final result."""
790 def __init__(self, runner, updated_tool,
791 tool, loadingContext, enable_reuse,
792 output_name, output_tags, submit_runner_ram=0,
793 name=None, on_error=None, submit_runner_image=None,
794 intermediate_output_ttl=0, merged_map=None,
795 priority=None, secret_store=None,
796 collection_cache_size=256,
797 collection_cache_is_default=True):
799 loadingContext = loadingContext.copy()
800 loadingContext.metadata = updated_tool.metadata.copy()
802 super(Runner, self).__init__(updated_tool.tool, loadingContext)
804 self.arvrunner = runner
805 self.embedded_tool = tool
806 self.job_order = None
809 # If reuse is permitted by command line arguments but
810 # disabled by the workflow itself, disable it.
811 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
813 enable_reuse = reuse_req["enableReuse"]
814 self.enable_reuse = enable_reuse
816 self.final_output = None
817 self.output_name = output_name
818 self.output_tags = output_tags
820 self.on_error = on_error
821 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
822 self.intermediate_output_ttl = intermediate_output_ttl
823 self.priority = priority
824 self.secret_store = secret_store
825 self.enable_dev = loadingContext.enable_dev
827 self.submit_runner_cores = 1
828 self.submit_runner_ram = 1024 # defaut 1 GiB
829 self.collection_cache_size = collection_cache_size
831 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
832 if runner_resource_req:
833 if runner_resource_req.get("coresMin"):
834 self.submit_runner_cores = runner_resource_req["coresMin"]
835 if runner_resource_req.get("ramMin"):
836 self.submit_runner_ram = runner_resource_req["ramMin"]
837 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
838 self.collection_cache_size = runner_resource_req["keep_cache"]
840 if submit_runner_ram:
841 # Command line / initializer overrides default and/or spec from workflow
842 self.submit_runner_ram = submit_runner_ram
844 if self.submit_runner_ram <= 0:
845 raise Exception("Value of submit-runner-ram must be greater than zero")
847 if self.submit_runner_cores <= 0:
848 raise Exception("Value of submit-runner-cores must be greater than zero")
850 self.merged_map = merged_map or {}
853 job_order, # type: Mapping[Text, Text]
854 output_callbacks, # type: Callable[[Any, Any], Any]
855 runtimeContext # type: RuntimeContext
856 ): # type: (...) -> Generator[Any, None, None]
857 self.job_order = job_order
858 self._init_job(job_order, runtimeContext)
861 def update_pipeline_component(self, record):
864 def done(self, record):
865 """Base method for handling a completed runner."""
868 if record["state"] == "Complete":
869 if record.get("exit_code") is not None:
870 if record["exit_code"] == 33:
871 processStatus = "UnsupportedRequirement"
872 elif record["exit_code"] == 0:
873 processStatus = "success"
875 processStatus = "permanentFail"
877 processStatus = "success"
879 processStatus = "permanentFail"
883 if processStatus == "permanentFail":
884 logc = arvados.collection.CollectionReader(record["log"],
885 api_client=self.arvrunner.api,
886 keep_client=self.arvrunner.keep_client,
887 num_retries=self.arvrunner.num_retries)
888 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
890 self.final_output = record["output"]
891 outc = arvados.collection.CollectionReader(self.final_output,
892 api_client=self.arvrunner.api,
893 keep_client=self.arvrunner.keep_client,
894 num_retries=self.arvrunner.num_retries)
895 if "cwl.output.json" in outc:
896 with outc.open("cwl.output.json", "rb") as f:
898 outputs = json.loads(f.read().decode())
899 def keepify(fileobj):
900 path = fileobj["location"]
901 if not path.startswith("keep:"):
902 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
903 adjustFileObjs(outputs, keepify)
904 adjustDirObjs(outputs, keepify)
906 logger.exception("[%s] While getting final output object", self.name)
907 self.arvrunner.output_callback({}, "permanentFail")
909 self.arvrunner.output_callback(outputs, processStatus)