1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import viewvalues, viewitems
8 from past.builtins import basestring
14 from functools import partial
18 from collections import namedtuple
19 from io import StringIO
39 from cwltool.utils import (
45 if os.name == "posix" and sys.version_info[0] < 3:
46 import subprocess32 as subprocess
50 from schema_salad.sourceline import SourceLine, cmap
52 from cwltool.command_line_tool import CommandLineTool
53 import cwltool.workflow
54 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
55 shortname, Process, fill_in_defaults)
56 from cwltool.load_tool import fetch_document, jobloaderctx
57 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
58 from cwltool.builder import substitute
59 from cwltool.pack import pack
60 from cwltool.update import INTERNAL_VERSION
61 from cwltool.builder import Builder
62 import schema_salad.validate as validate
63 import schema_salad.ref_resolver
65 import arvados.collection
67 from .util import collectionUUID
68 from ruamel.yaml import YAML
69 from ruamel.yaml.comments import CommentedMap, CommentedSeq
71 import arvados_cwl.arvdocker
72 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
73 from ._version import __version__
75 from . context import ArvRuntimeContext
76 from .perf import Perf
78 logger = logging.getLogger('arvados.cwl-runner')
79 metrics = logging.getLogger('arvados.cwl-runner.metrics')
81 def trim_anonymous_location(obj):
82 """Remove 'location' field from File and Directory literals.
84 To make internal handling easier, literals are assigned a random id for
85 'location'. However, when writing the record back out, this can break
86 reproducibility. Since it is valid for literals not have a 'location'
91 if obj.get("location", "").startswith("_:"):
95 def remove_redundant_fields(obj):
96 for field in ("path", "nameext", "nameroot", "dirname"):
101 def find_defaults(d, op):
102 if isinstance(d, list):
105 elif isinstance(d, dict):
109 for i in viewvalues(d):
112 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
115 files=[], # type: List[Dict[Text, Text]]
116 bindings=[], # type: List[Dict[Text, Any]]
117 schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
118 names=None, # type: Names
119 requirements=requirements, # type: List[Dict[Text, Any]]
120 hints=hints, # type: List[Dict[Text, Any]]
121 resources={}, # type: Dict[str, int]
122 mutation_manager=None, # type: Optional[MutationManager]
123 formatgraph=None, # type: Optional[Graph]
124 make_fs_access=None, # type: Type[StdFsAccess]
125 fs_access=None, # type: StdFsAccess
126 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
127 timeout=runtimeContext.eval_timeout, # type: float
128 debug=runtimeContext.debug, # type: bool
129 js_console=runtimeContext.js_console, # type: bool
130 force_docker_pull=runtimeContext.force_docker_pull, # type: bool
131 loadListing="", # type: Text
132 outdir="", # type: Text
133 tmpdir="", # type: Text
134 stagedir="", # type: Text
135 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
136 container_engine="docker"
139 def search_schemadef(name, reqs):
141 if r["class"] == "SchemaDefRequirement":
142 for sd in r["types"]:
143 if sd["name"] == name:
147 primitive_types_set = frozenset(("null", "boolean", "int", "long",
148 "float", "double", "string", "record",
151 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
152 if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
153 # union type, collect all possible secondaryFiles
154 for i in inputschema:
155 set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
158 if inputschema == "File":
159 inputschema = {"type": "File"}
161 if isinstance(inputschema, basestring):
162 sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
168 if "secondaryFiles" in inputschema:
169 # set secondaryFiles, may be inherited by compound types.
170 secondaryspec = inputschema["secondaryFiles"]
172 if (isinstance(inputschema["type"], (Mapping, Sequence)) and
173 not isinstance(inputschema["type"], basestring)):
174 # compound type (union, array, record)
175 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
177 elif (inputschema["type"] == "record" and
178 isinstance(primary, Mapping)):
180 # record type, find secondary files associated with fields.
182 for f in inputschema["fields"]:
183 p = primary.get(shortname(f["name"]))
185 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
187 elif (inputschema["type"] == "array" and
188 isinstance(primary, Sequence)):
190 # array type, find secondary files of elements
193 set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
195 elif (inputschema["type"] == "File" and
196 isinstance(primary, Mapping) and
197 primary.get("class") == "File"):
199 if "secondaryFiles" in primary or not secondaryspec:
204 # Found a file, check for secondaryFiles
207 primary["secondaryFiles"] = secondaryspec
208 for i, sf in enumerate(aslist(secondaryspec)):
209 if builder.cwlVersion == "v1.0":
212 pattern = sf["pattern"]
215 if isinstance(pattern, list):
216 specs.extend(pattern)
217 elif isinstance(pattern, dict):
218 specs.append(pattern)
219 elif isinstance(pattern, str):
220 if builder.cwlVersion == "v1.0":
221 specs.append({"pattern": pattern, "required": True})
223 specs.append({"pattern": pattern, "required": sf.get("required")})
225 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
226 "Expression must return list, object, string or null")
229 for i, sf in enumerate(specs):
230 if isinstance(sf, dict):
231 if sf.get("class") == "File":
233 if sf.get("location") is None:
234 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
235 "File object is missing 'location': %s" % sf)
236 sfpath = sf["location"]
239 pattern = sf["pattern"]
240 required = sf.get("required")
241 elif isinstance(sf, str):
245 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
246 "Expression must return list, object, string or null")
248 if pattern is not None:
249 if "${" in pattern or "$(" in pattern:
250 sfname = builder.do_eval(pattern, context=primary)
252 sfname = substitute(primary["basename"], pattern)
257 if isinstance(sfname, str):
258 p_location = primary["location"]
259 if "/" in p_location:
261 p_location[0 : p_location.rindex("/") + 1]
265 required = builder.do_eval(required, context=primary)
267 if isinstance(sfname, list) or isinstance(sfname, dict):
268 each = aslist(sfname)
270 if required and not fsaccess.exists(e.get("location")):
271 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
272 "Required secondary file '%s' does not exist" % e.get("location"))
275 if isinstance(sfname, str):
276 if fsaccess.exists(sfpath):
277 if pattern is not None:
278 found.append({"location": sfpath, "class": "File"})
282 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
283 "Required secondary file '%s' does not exist" % sfpath)
285 primary["secondaryFiles"] = cmap(found)
286 if discovered is not None:
287 discovered[primary["location"]] = primary["secondaryFiles"]
288 elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
289 set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
291 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
292 for inputschema in inputs:
293 primary = job_order.get(shortname(inputschema["id"]))
294 if isinstance(primary, (Mapping, Sequence)):
295 set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
297 def upload_dependencies(arvrunner, name, document_loader,
298 workflowobj, uri, runtimeContext,
299 include_primary=True, discovered_secondaryfiles=None,
301 """Upload the dependencies of the workflowobj document to Keep.
303 Returns a pathmapper object mapping local paths to keep references. Also
304 does an in-place update of references in "workflowobj".
306 Use scandeps to find $schemas, File and Directory
307 fields that represent external references.
309 If workflowobj has an "id" field, this will reload the document to ensure
310 it is scanning the raw document prior to preprocessing.
313 scanobj = workflowobj
316 with Perf(metrics, "scandeps"):
317 sc_result = scandeps(uri, scanobj,
320 None, urljoin=document_loader.fetcher.urljoin,
322 optional_deps = scandeps(uri, scanobj,
325 None, urljoin=document_loader.fetcher.urljoin,
328 if sc_result is None:
331 if optional_deps is None:
335 sc_result.extend(optional_deps)
340 def collect_uuids(obj):
341 loc = obj.get("location", "")
344 # Collect collection uuids that need to be resolved to
345 # portable data hashes
346 gp = collection_uuid_pattern.match(loc)
348 uuids[gp.groups()[0]] = obj
349 if collectionUUID in obj:
350 uuids[obj[collectionUUID]] = obj
352 def collect_uploads(obj):
353 loc = obj.get("location", "")
357 if sp[0] in ("file", "http", "https"):
358 # Record local files than need to be uploaded,
359 # don't include file literals, keep references, etc.
363 with Perf(metrics, "collect uuids"):
364 visit_class(workflowobj, ("File", "Directory"), collect_uuids)
366 with Perf(metrics, "collect uploads"):
367 visit_class(sc_result, ("File", "Directory"), collect_uploads)
369 # Resolve any collection uuids we found to portable data hashes
370 # and assign them to uuid_map
372 fetch_uuids = list(uuids.keys())
373 with Perf(metrics, "fetch_uuids"):
375 # For a large number of fetch_uuids, API server may limit
376 # response size, so keep fetching from API server has nothing
378 lookups = arvrunner.api.collections().list(
379 filters=[["uuid", "in", fetch_uuids]],
381 select=["uuid", "portable_data_hash"]).execute(
382 num_retries=arvrunner.num_retries)
384 if not lookups["items"]:
387 for l in lookups["items"]:
388 uuid_map[l["uuid"]] = l["portable_data_hash"]
390 fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
392 normalizeFilesDirs(sc)
394 if "id" in workflowobj:
395 defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
397 # make sure it's included
398 sc.append({"class": "File", "location": defrg})
400 # make sure it's excluded
401 sc = [d for d in sc if d.get("location") != defrg]
403 def visit_default(obj):
404 def defaults_are_optional(f):
405 if "location" not in f and "path" in f:
406 f["location"] = f["path"]
408 normalizeFilesDirs(f)
409 optional_deps.append(f)
410 visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
412 find_defaults(workflowobj, visit_default)
415 def discover_default_secondary_files(obj):
416 builder_job_order = {}
417 for t in obj["inputs"]:
418 builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
419 # Need to create a builder object to evaluate expressions.
420 builder = make_builder(builder_job_order,
421 obj.get("hints", []),
422 obj.get("requirements", []),
425 discover_secondary_files(arvrunner.fs_access,
431 copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
432 visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
434 for d in list(discovered):
435 # Only interested in discovered secondaryFiles which are local
436 # files that need to be uploaded.
437 if d.startswith("file:"):
438 sc.extend(discovered[d])
442 with Perf(metrics, "mapper"):
443 mapper = ArvPathMapper(arvrunner, sc, "",
447 single_collection=True,
448 optional_deps=optional_deps)
452 if k.startswith("keep:"):
453 keeprefs.add(collection_pdh_pattern.match(k).group(1))
457 loc = p.get("location")
458 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
459 addkeepref(p["location"])
465 if collectionUUID in p:
466 uuid = p[collectionUUID]
467 if uuid not in uuid_map:
468 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
469 "Collection uuid %s not found" % uuid)
470 gp = collection_pdh_pattern.match(loc)
471 if gp and uuid_map[uuid] != gp.groups()[0]:
472 # This file entry has both collectionUUID and a PDH
473 # location. If the PDH doesn't match the one returned
474 # the API server, raise an error.
475 raise SourceLine(p, "location", validate.ValidationException).makeError(
476 "Expected collection uuid %s to be %s but API server reported %s" % (
477 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
479 gp = collection_uuid_pattern.match(loc)
481 # Not a uuid pattern (must be a pdh pattern)
482 addkeepref(p["location"])
485 uuid = gp.groups()[0]
486 if uuid not in uuid_map:
487 raise SourceLine(p, "location", validate.ValidationException).makeError(
488 "Collection uuid %s not found" % uuid)
490 with Perf(metrics, "collectloc"):
491 visit_class(workflowobj, ("File", "Directory"), collectloc)
492 visit_class(discovered, ("File", "Directory"), collectloc)
494 if discovered_secondaryfiles is not None:
496 discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
498 if runtimeContext.copy_deps:
499 # Find referenced collections and copy them into the
500 # destination project, for easy sharing.
501 already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
502 filters=[["portable_data_hash", "in", list(keeprefs)],
503 ["owner_uuid", "=", runtimeContext.project_uuid]],
504 select=["uuid", "portable_data_hash", "created_at"]))
506 keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
508 col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
509 order="created_at desc",
510 select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
512 if len(col["items"]) == 0:
513 logger.warning("Cannot find collection with portable data hash %s", kr)
515 col = col["items"][0]
516 col["name"] = arvados.util.trim_name(col["name"])
518 arvrunner.api.collections().create(body={"collection": {
519 "owner_uuid": runtimeContext.project_uuid,
521 "description": col["description"],
522 "properties": col["properties"],
523 "portable_data_hash": col["portable_data_hash"],
524 "manifest_text": col["manifest_text"],
525 "storage_classes_desired": col["storage_classes_desired"],
526 "trash_at": col["trash_at"]
527 }}, ensure_unique_name=True).execute()
528 except Exception as e:
529 logger.warning("Unable to copy collection to destination: %s", e)
531 if "$schemas" in workflowobj:
533 for s in workflowobj["$schemas"]:
535 sch.append(mapper.mapper(s).resolved)
536 workflowobj["$schemas"] = sch
541 def upload_docker(arvrunner, tool, runtimeContext):
542 """Uploads Docker images used in CommandLineTool objects."""
544 if isinstance(tool, CommandLineTool):
545 (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
547 if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
548 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
549 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
551 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, runtimeContext)
553 arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
554 True, runtimeContext)
555 elif isinstance(tool, cwltool.workflow.Workflow):
557 upload_docker(arvrunner, s.embedded_tool, runtimeContext)
560 def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info):
561 """Create a packed workflow.
563 A "packed" workflow is one where all the components have been combined into a single document."""
566 packed = pack(arvrunner.loadingContext, tool.tool["id"],
567 rewrite_out=rewrites,
568 loader=tool.doc_loader)
570 rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
572 def visit(v, cur_id):
573 if isinstance(v, dict):
574 if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
575 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
576 raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
578 cur_id = rewrite_to_orig.get(v["id"], v["id"])
579 if "path" in v and "location" not in v:
580 v["location"] = v["path"]
582 if "location" in v and cur_id in merged_map:
583 if v["location"] in merged_map[cur_id].resolved:
584 v["location"] = merged_map[cur_id].resolved[v["location"]]
585 if v["location"] in merged_map[cur_id].secondaryFiles:
586 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
587 if v.get("class") == "DockerRequirement":
588 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
589 runtimeContext.project_uuid,
590 runtimeContext.force_docker_pull,
591 runtimeContext.tmp_outdir_prefix,
592 runtimeContext.match_local_docker,
593 runtimeContext.copy_deps)
596 if isinstance(v, list):
603 packed[g] = git_info[g]
608 def tag_git_version(packed):
609 if tool.tool["id"].startswith("file://"):
610 path = os.path.dirname(tool.tool["id"][7:])
612 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
613 except (OSError, subprocess.CalledProcessError):
616 packed["http://schema.org/version"] = githash
618 def setloc(mapper, p):
619 loc = p.get("location")
620 if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
621 p["location"] = mapper.mapper(p["location"]).resolved
627 if collectionUUID in p:
628 uuid = p[collectionUUID]
629 if uuid not in uuid_map:
630 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
631 "Collection uuid %s not found" % uuid)
632 gp = collection_pdh_pattern.match(loc)
633 if gp and uuid_map[uuid] != gp.groups()[0]:
634 # This file entry has both collectionUUID and a PDH
635 # location. If the PDH doesn't match the one returned
636 # the API server, raise an error.
637 raise SourceLine(p, "location", validate.ValidationException).makeError(
638 "Expected collection uuid %s to be %s but API server reported %s" % (
639 uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
641 gp = collection_uuid_pattern.match(loc)
643 # Not a uuid pattern (must be a pdh pattern)
646 uuid = gp.groups()[0]
647 if uuid not in uuid_map:
648 raise SourceLine(p, "location", validate.ValidationException).makeError(
649 "Collection uuid %s not found" % uuid)
650 p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
651 p[collectionUUID] = uuid
654 def update_from_mapper(workflowobj, mapper):
655 with Perf(metrics, "setloc"):
656 visit_class(workflowobj, ("File", "Directory"), partial(setloc, mapper))
658 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
659 """Upload local files referenced in the input object and return updated input
660 object with 'location' updated to the proper keep references.
663 # Make a copy of the job order and set defaults.
664 builder_job_order = copy.copy(job_order)
666 # fill_in_defaults throws an error if there are any
667 # missing required parameters, we don't want it to do that
668 # so make them all optional.
669 inputs_copy = copy.deepcopy(tool.tool["inputs"])
670 for i in inputs_copy:
671 if "null" not in i["type"]:
672 i["type"] = ["null"] + aslist(i["type"])
674 fill_in_defaults(inputs_copy,
677 # Need to create a builder object to evaluate expressions.
678 builder = make_builder(builder_job_order,
683 # Now update job_order with secondaryFiles
684 discover_secondary_files(arvrunner.fs_access,
689 _jobloaderctx = jobloaderctx.copy()
690 jobloader = schema_salad.ref_resolver.Loader(_jobloaderctx, fetcher_constructor=tool.doc_loader.fetcher_constructor)
692 jobmapper = upload_dependencies(arvrunner,
696 job_order.get("id", "#"),
699 if "id" in job_order:
702 # Need to filter this out, gets added by cwltool when providing
703 # parameters on the command line.
704 if "job_order" in job_order:
705 del job_order["job_order"]
707 update_from_mapper(job_order, jobmapper)
711 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
713 def upload_workflow_deps(arvrunner, tool, runtimeContext):
714 # Ensure that Docker images needed by this workflow are available
716 # commented out for testing only, uncomment me
717 with Perf(metrics, "upload_docker"):
718 upload_docker(arvrunner, tool, runtimeContext)
720 document_loader = tool.doc_loader
727 # Standard traversal is top down, we want to go bottom up, so use
728 # the visitor to accumalate a list of nodes to visit, then
729 # visit them in reverse order.
730 def upload_tool_deps(deptool):
734 tool.visit(upload_tool_deps)
736 for deptool in reversed(todo):
737 discovered_secondaryfiles = {}
738 with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
739 pm = upload_dependencies(arvrunner,
740 "%s dependencies" % (shortname(deptool["id"])),
745 include_primary=False,
746 discovered_secondaryfiles=discovered_secondaryfiles,
747 cache=tool_dep_cache)
749 document_loader.idx[deptool["id"]] = deptool
751 for k,v in pm.items():
752 toolmap[k] = v.resolved
753 merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
757 def arvados_jobs_image(arvrunner, img, runtimeContext):
758 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
761 return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
762 True, runtimeContext)
763 except Exception as e:
764 raise Exception("Docker image %s is not available\n%s" % (img, e) )
767 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
768 collection = arvados.collection.Collection(api_client=arvrunner.api,
769 keep_client=arvrunner.keep_client,
770 num_retries=arvrunner.num_retries)
771 with collection.open("workflow.cwl", "w") as f:
772 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
774 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
775 ["name", "like", name+"%"]]
776 if runtimeContext.project_uuid:
777 filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
778 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
781 logger.info("Using collection %s", exists["items"][0]["uuid"])
783 collection.save_new(name=name,
784 owner_uuid=runtimeContext.project_uuid,
785 ensure_unique_name=True,
786 num_retries=arvrunner.num_retries)
787 logger.info("Uploaded to %s", collection.manifest_locator())
789 return collection.portable_data_hash()
792 class Runner(Process):
793 """Base class for runner processes, which submit an instance of
794 arvados-cwl-runner and wait for the final result."""
796 def __init__(self, runner, updated_tool,
797 tool, loadingContext, enable_reuse,
798 output_name, output_tags, submit_runner_ram=0,
799 name=None, on_error=None, submit_runner_image=None,
800 intermediate_output_ttl=0, merged_map=None,
801 priority=None, secret_store=None,
802 collection_cache_size=256,
803 collection_cache_is_default=True,
806 self.loadingContext = loadingContext.copy()
807 self.loadingContext.metadata = updated_tool.metadata.copy()
809 super(Runner, self).__init__(updated_tool.tool, loadingContext)
811 self.arvrunner = runner
812 self.embedded_tool = tool
813 self.job_order = None
816 # If reuse is permitted by command line arguments but
817 # disabled by the workflow itself, disable it.
818 reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
820 enable_reuse = reuse_req["enableReuse"]
821 reuse_req, _ = self.embedded_tool.get_requirement("WorkReuse")
823 enable_reuse = reuse_req["enableReuse"]
824 self.enable_reuse = enable_reuse
826 self.final_output = None
827 self.output_name = output_name
828 self.output_tags = output_tags
830 self.on_error = on_error
831 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
832 self.intermediate_output_ttl = intermediate_output_ttl
833 self.priority = priority
834 self.secret_store = secret_store
835 self.enable_dev = self.loadingContext.enable_dev
836 self.git_info = git_info
837 self.fast_parser = self.loadingContext.fast_parser
839 self.submit_runner_cores = 1
840 self.submit_runner_ram = 1024 # defaut 1 GiB
841 self.collection_cache_size = collection_cache_size
843 runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
844 if runner_resource_req:
845 if runner_resource_req.get("coresMin"):
846 self.submit_runner_cores = runner_resource_req["coresMin"]
847 if runner_resource_req.get("ramMin"):
848 self.submit_runner_ram = runner_resource_req["ramMin"]
849 if runner_resource_req.get("keep_cache") and collection_cache_is_default:
850 self.collection_cache_size = runner_resource_req["keep_cache"]
852 if submit_runner_ram:
853 # Command line / initializer overrides default and/or spec from workflow
854 self.submit_runner_ram = submit_runner_ram
856 if self.submit_runner_ram <= 0:
857 raise Exception("Value of submit-runner-ram must be greater than zero")
859 if self.submit_runner_cores <= 0:
860 raise Exception("Value of submit-runner-cores must be greater than zero")
862 self.merged_map = merged_map or {}
865 job_order, # type: Mapping[Text, Text]
866 output_callbacks, # type: Callable[[Any, Any], Any]
867 runtimeContext # type: RuntimeContext
868 ): # type: (...) -> Generator[Any, None, None]
869 self.job_order = job_order
870 self._init_job(job_order, runtimeContext)
873 def update_pipeline_component(self, record):
876 def done(self, record):
877 """Base method for handling a completed runner."""
880 if record["state"] == "Complete":
881 if record.get("exit_code") is not None:
882 if record["exit_code"] == 33:
883 processStatus = "UnsupportedRequirement"
884 elif record["exit_code"] == 0:
885 processStatus = "success"
887 processStatus = "permanentFail"
889 processStatus = "success"
891 processStatus = "permanentFail"
895 if processStatus == "permanentFail":
896 logc = arvados.collection.CollectionReader(record["log"],
897 api_client=self.arvrunner.api,
898 keep_client=self.arvrunner.keep_client,
899 num_retries=self.arvrunner.num_retries)
900 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
902 self.final_output = record["output"]
903 outc = arvados.collection.CollectionReader(self.final_output,
904 api_client=self.arvrunner.api,
905 keep_client=self.arvrunner.keep_client,
906 num_retries=self.arvrunner.num_retries)
907 if "cwl.output.json" in outc:
908 with outc.open("cwl.output.json", "rb") as f:
910 outputs = json.loads(f.read().decode())
911 def keepify(fileobj):
912 path = fileobj["location"]
913 if not path.startswith("keep:"):
914 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
915 adjustFileObjs(outputs, keepify)
916 adjustDirObjs(outputs, keepify)
918 logger.exception("[%s] While getting final output object", self.name)
919 self.arvrunner.output_callback({}, "permanentFail")
921 self.arvrunner.output_callback(outputs, processStatus)