X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/5eabedb88ce741076167efd2305216043dd9abd5..40ad5fb3f699c348222c90692b9ae27a8f716af9:/sdk/cwl/arvados_cwl/__init__.py diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index 3762536d59..8f2102c6c3 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -1,5 +1,7 @@ #!/usr/bin/env python +# Implement cwl-runner interface for submitting and running jobs on Arvados. + import argparse import arvados import arvados.events @@ -35,6 +37,8 @@ keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.ke def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid): + """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker.""" + if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement: dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"] @@ -58,6 +62,8 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid class CollectionFsAccess(cwltool.process.StdFsAccess): + """Implement the cwltool FsAccess interface for Arvados Collections.""" + def __init__(self, basedir): self.collections = {} self.basedir = basedir @@ -114,6 +120,8 @@ class CollectionFsAccess(cwltool.process.StdFsAccess): return os.path.exists(self._abs(fn)) class ArvadosJob(object): + """Submit and manage a Crunch job for executing a CWL CommandLineTool.""" + def __init__(self, runner): self.arvrunner = runner self.running = False @@ -160,16 +168,26 @@ class ArvadosJob(object): runtime_constraints["min_ram_mb_per_node"] = resources.get("ram") runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0) + filters = [["repository", "=", "arvados"], + ["script", "=", "crunchrunner"], + ["script_version", "in git", "9e5b98e8f5f4727856b53447191f9c06e3da2ba6"]] + if not self.arvrunner.ignore_docker_for_reuse: + filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]]) + try: - response = self.arvrunner.api.jobs().create(body={ - "owner_uuid": self.arvrunner.project_uuid, - "script": "crunchrunner", - "repository": "arvados", - "script_version": "master", - "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6", - "script_parameters": {"tasks": [script_parameters]}, - "runtime_constraints": runtime_constraints - }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries) + response = self.arvrunner.api.jobs().create( + body={ + "owner_uuid": self.arvrunner.project_uuid, + "script": "crunchrunner", + "repository": "arvados", + "script_version": "master", + "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6", + "script_parameters": {"tasks": [script_parameters]}, + "runtime_constraints": runtime_constraints + }, + filters=filters, + find_or_create=kwargs.get("enable_reuse", True) + ).execute(num_retries=self.arvrunner.num_retries) self.arvrunner.jobs[response["uuid"]] = self @@ -283,6 +301,8 @@ class ArvadosJob(object): class RunnerJob(object): + """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner.""" + def __init__(self, runner, tool, job_order, enable_reuse): self.arvrunner = runner self.tool = tool @@ -346,6 +366,7 @@ class RunnerJob(object): self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1] response = self.arvrunner.api.jobs().create(body={ + "owner_uuid": self.arvrunner.project_uuid, "script": "cwl-runner", "script_version": "master", "repository": "arvados", @@ -381,6 +402,8 @@ class RunnerJob(object): del self.arvrunner.jobs[record["uuid"]] class ArvPathMapper(cwltool.pathmapper.PathMapper): + """Convert container-local paths to and from Keep collection ids.""" + def __init__(self, arvrunner, referenced_files, basedir, collection_pattern, file_pattern, name=None, **kwargs): self._pathmap = arvrunner.get_uploaded() @@ -430,6 +453,8 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper): class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool): + """Wrap cwltool CommandLineTool to override selected methods.""" + def __init__(self, arvrunner, toolpath_object, **kwargs): super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs) self.arvrunner = arvrunner @@ -445,6 +470,9 @@ class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool): class ArvCwlRunner(object): + """Execute a CWL tool or workflow, submit crunch jobs, wait for them to + complete, and report output.""" + def __init__(self, api_client): self.api = api_client self.jobs = {} @@ -521,6 +549,8 @@ class ArvCwlRunner(object): events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message) + self.debug = args.debug + self.ignore_docker_for_reuse = args.ignore_docker_for_reuse self.fs_access = CollectionFsAccess(input_basedir) kwargs["fs_access"] = self.fs_access @@ -593,14 +623,15 @@ class ArvCwlRunner(object): return self.final_output def versionstring(): - cwlpkg = pkg_resources.require("cwltool") - arvpkg = pkg_resources.require("arvados-python-client") + """Print version string of key packages for provenance and debugging.""" + arvcwlpkg = pkg_resources.require("arvados-cwl-runner") + arvpkg = pkg_resources.require("arvados-python-client") + cwlpkg = pkg_resources.require("cwltool") - return "%s %s, %s %s, %s %s" % (sys.argv[0], - arvcwlpkg[0].version, - "arvados-python-client", cwlpkg[0].version, - "cwltool", arvpkg[0].version) + return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version, + "arvados-python-client", arvpkg[0].version, + "cwltool", cwlpkg[0].version) def main(args, stdout, stderr, api_client=None): args.insert(0, "--leave-outputs") @@ -615,17 +646,20 @@ def main(args, stdout, stderr, api_client=None): help="") parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs") + parser.add_argument("--ignore-docker-for-reuse", action="store_true", + help="Ignore Docker image version when deciding whether to reuse past jobs.", + default=False) exgroup = parser.add_mutually_exclusive_group() - exgroup.add_argument("--submit", action="store_true", help="Submit runner job so workflow can run unattended.", + exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.", default=True, dest="submit") - exgroup.add_argument("--local", action="store_false", help="Workflow runner runs on local host and submits jobs.", + exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).", default=True, dest="submit") exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.", default=True, dest="wait") - exgroup.add_argument("--no-wait", action="store_false", help="Exit after submitting workflow runner job.", + exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.", default=True, dest="wait") try: