X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b8260eb5cdc2c403083d7dbea49c45791a763e9c..ae6ced575905d226963516bb7780c2bae391621c:/sdk/cwl/arvados_cwl/__init__.py diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index 67d0d4e49e..8f2102c6c3 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -1,5 +1,7 @@ #!/usr/bin/env python +# Implement cwl-runner interface for submitting and running jobs on Arvados. + import argparse import arvados import arvados.events @@ -21,6 +23,7 @@ import os import sys import functools import json +import pkg_resources # part of setuptools from cwltool.process import get_feature, adjustFiles, scandeps from arvados.api import OrderedJsonModel @@ -28,16 +31,14 @@ from arvados.api import OrderedJsonModel logger = logging.getLogger('arvados.cwl-runner') logger.setLevel(logging.INFO) -crunchrunner_pdh = "ff6fc71e593081ef9733afacaeee15ea+140" -crunchrunner_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/crunchrunner" -certs_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/ca-certificates.crt" - tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)") outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)") keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)") def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid): + """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker.""" + if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement: dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"] @@ -55,12 +56,14 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid if image_tag: args.append(image_tag) logger.info("Uploading Docker image %s", ":".join(args[1:])) - arvados.commands.keepdocker.main(args) + arvados.commands.keepdocker.main(args, stdout=sys.stderr) return dockerRequirement["dockerImageId"] class CollectionFsAccess(cwltool.process.StdFsAccess): + """Implement the cwltool FsAccess interface for Arvados Collections.""" + def __init__(self, basedir): self.collections = {} self.basedir = basedir @@ -117,6 +120,8 @@ class CollectionFsAccess(cwltool.process.StdFsAccess): return os.path.exists(self._abs(fn)) class ArvadosJob(object): + """Submit and manage a Crunch job for executing a CWL CommandLineTool.""" + def __init__(self, runner): self.arvrunner = runner self.running = False @@ -154,6 +159,8 @@ class ArvadosJob(object): (docker_req, docker_is_req) = get_feature(self, "DockerRequirement") if docker_req and kwargs.get("use_container") is not False: runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid) + else: + runtime_constraints["docker_image"] = "arvados/jobs" resources = self.builder.resources if resources is not None: @@ -161,16 +168,26 @@ class ArvadosJob(object): runtime_constraints["min_ram_mb_per_node"] = resources.get("ram") runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0) + filters = [["repository", "=", "arvados"], + ["script", "=", "crunchrunner"], + ["script_version", "in git", "9e5b98e8f5f4727856b53447191f9c06e3da2ba6"]] + if not self.arvrunner.ignore_docker_for_reuse: + filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]]) + try: - response = self.arvrunner.api.jobs().create(body={ - "owner_uuid": self.arvrunner.project_uuid, - "script": "crunchrunner", - "repository": "arvados", - "script_version": "master", - "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6", - "script_parameters": {"tasks": [script_parameters], "crunchrunner": crunchrunner_pdh+"/crunchrunner"}, - "runtime_constraints": runtime_constraints - }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries) + response = self.arvrunner.api.jobs().create( + body={ + "owner_uuid": self.arvrunner.project_uuid, + "script": "crunchrunner", + "repository": "arvados", + "script_version": "master", + "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6", + "script_parameters": {"tasks": [script_parameters]}, + "runtime_constraints": runtime_constraints + }, + filters=filters, + find_or_create=kwargs.get("enable_reuse", True) + ).execute(num_retries=self.arvrunner.num_retries) self.arvrunner.jobs[response["uuid"]] = self @@ -216,6 +233,14 @@ class ArvadosJob(object): outdir = None keepdir = None for l in log: + # Determine the tmpdir, outdir and keepdir paths from + # the job run. Unfortunately, we can't take the first + # values we find (which are expected to be near the + # top) and stop scanning because if the node fails and + # the job restarts on a different node these values + # will different runs, and we need to know about the + # final run that actually produced output. + g = tmpdirre.match(l) if g: tmpdir = g.group(1) @@ -226,13 +251,39 @@ class ArvadosJob(object): if g: keepdir = g.group(1) - # It turns out if the job fails and restarts it can - # come up on a different compute node, so we have to - # read the log to the end to be sure instead of taking the - # easy way out. - # - #if tmpdir and outdir and keepdir: - # break + colname = "Output %s of %s" % (record["output"][0:7], self.name) + + # check if collection already exists with same owner, name and content + collection_exists = self.arvrunner.api.collections().list( + filters=[["owner_uuid", "=", self.arvrunner.project_uuid], + ['portable_data_hash', '=', record["output"]], + ["name", "=", colname]] + ).execute(num_retries=self.arvrunner.num_retries) + + if not collection_exists["items"]: + # Create a collection located in the same project as the + # pipeline with the contents of the output. + # First, get output record. + collections = self.arvrunner.api.collections().list( + limit=1, + filters=[['portable_data_hash', '=', record["output"]]], + select=["manifest_text"] + ).execute(num_retries=self.arvrunner.num_retries) + + if not collections["items"]: + raise WorkflowException( + "Job output '%s' cannot be found on API server" % ( + record["output"])) + + # Create new collection in the parent project + # with the output contents. + self.arvrunner.api.collections().create(body={ + "owner_uuid": self.arvrunner.project_uuid, + "name": colname, + "portable_data_hash": record["output"], + "manifest_text": collections["items"][0]["manifest_text"] + }, ensure_unique_name=True).execute( + num_retries=self.arvrunner.num_retries) self.builder.outdir = outdir self.builder.pathmapper.keepdir = keepdir @@ -250,6 +301,8 @@ class ArvadosJob(object): class RunnerJob(object): + """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner.""" + def __init__(self, runner, tool, job_order, enable_reuse): self.arvrunner = runner self.tool = tool @@ -313,8 +366,9 @@ class RunnerJob(object): self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1] response = self.arvrunner.api.jobs().create(body={ + "owner_uuid": self.arvrunner.project_uuid, "script": "cwl-runner", - "script_version": "8654-arv-jobs-cwl-runner", + "script_version": "master", "repository": "arvados", "script_parameters": self.job_order, "runtime_constraints": { @@ -337,31 +391,38 @@ class RunnerJob(object): outputs = None try: - outc = arvados.collection.Collection(record["output"]) - with outc.open("cwl.output.json") as f: - outputs = json.load(f) + try: + outc = arvados.collection.Collection(record["output"]) + with outc.open("cwl.output.json") as f: + outputs = json.load(f) + except Exception as e: + logger.error("While getting final output object: %s", e) self.arvrunner.output_callback(outputs, processStatus) finally: del self.arvrunner.jobs[record["uuid"]] class ArvPathMapper(cwltool.pathmapper.PathMapper): + """Convert container-local paths to and from Keep collection ids.""" + def __init__(self, arvrunner, referenced_files, basedir, collection_pattern, file_pattern, name=None, **kwargs): self._pathmap = arvrunner.get_uploaded() - uploadfiles = [] + uploadfiles = set() pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+') for src in referenced_files: if isinstance(src, basestring) and pdh_path.match(src): self._pathmap[src] = (src, collection_pattern % src[5:]) + if "#" in src: + src = src[:src.index("#")] if src not in self._pathmap: ab = cwltool.pathmapper.abspath(src, basedir) st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern) if kwargs.get("conformance_test"): self._pathmap[src] = (src, ab) elif isinstance(st, arvados.commands.run.UploadFile): - uploadfiles.append((src, ab, st)) + uploadfiles.add((src, ab, st)) elif isinstance(st, arvados.commands.run.ArvFile): self._pathmap[src] = (ab, st.fn) else: @@ -392,6 +453,8 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper): class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool): + """Wrap cwltool CommandLineTool to override selected methods.""" + def __init__(self, arvrunner, toolpath_object, **kwargs): super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs) self.arvrunner = arvrunner @@ -407,6 +470,9 @@ class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool): class ArvCwlRunner(object): + """Execute a CWL tool or workflow, submit crunch jobs, wait for them to + complete, and report output.""" + def __init__(self, api_client): self.api = api_client self.jobs = {} @@ -471,21 +537,6 @@ class ArvCwlRunner(object): logger.setLevel(logging.WARN) logging.getLogger('arvados.arv-run').setLevel(logging.WARN) - try: - self.api.collections().get(uuid=crunchrunner_pdh).execute() - except arvados.errors.ApiError as e: - import httplib2 - h = httplib2.Http(ca_certs=arvados.util.ca_certs_path()) - resp, content = h.request(crunchrunner_download, "GET") - resp2, content2 = h.request(certs_download, "GET") - with arvados.collection.Collection() as col: - with col.open("crunchrunner", "w") as f: - f.write(content) - with col.open("ca-certificates.crt", "w") as f: - f.write(content2) - - col.save_new("crunchrunner binary", ensure_unique_name=True) - useruuid = self.api.users().current().execute()["uuid"] self.project_uuid = args.project_uuid if args.project_uuid else useruuid self.pipeline = None @@ -498,6 +549,8 @@ class ArvCwlRunner(object): events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message) + self.debug = args.debug + self.ignore_docker_for_reuse = args.ignore_docker_for_reuse self.fs_access = CollectionFsAccess(input_basedir) kwargs["fs_access"] = self.fs_access @@ -569,10 +622,21 @@ class ArvCwlRunner(object): return self.final_output +def versionstring(): + """Print version string of key packages for provenance and debugging.""" + + arvcwlpkg = pkg_resources.require("arvados-cwl-runner") + arvpkg = pkg_resources.require("arvados-python-client") + cwlpkg = pkg_resources.require("cwltool") + + return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version, + "arvados-python-client", arvpkg[0].version, + "cwltool", cwlpkg[0].version) def main(args, stdout, stderr, api_client=None): args.insert(0, "--leave-outputs") parser = cwltool.main.arg_parser() + exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--enable-reuse", action="store_true", default=True, dest="enable_reuse", @@ -580,12 +644,24 @@ def main(args, stdout, stderr, api_client=None): exgroup.add_argument("--disable-reuse", action="store_false", default=True, dest="enable_reuse", help="") + parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs") - parser.add_argument("--submit", action="store_true", help="Submit job and print job uuid.", - default=False) - parser.add_argument("--wait", action="store_true", help="Wait for completion after submitting cwl-runner job.", + parser.add_argument("--ignore-docker-for-reuse", action="store_true", + help="Ignore Docker image version when deciding whether to reuse past jobs.", default=False) + exgroup = parser.add_mutually_exclusive_group() + exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.", + default=True, dest="submit") + exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).", + default=True, dest="submit") + + exgroup = parser.add_mutually_exclusive_group() + exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.", + default=True, dest="wait") + exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.", + default=True, dest="wait") + try: if api_client is None: api_client=arvados.api('v1', model=OrderedJsonModel()) @@ -599,4 +675,5 @@ def main(args, stdout, stderr, api_client=None): stderr=stderr, executor=runner.arvExecutor, makeTool=runner.arvMakeTool, - parser=parser) + parser=parser, + versionfunc=versionstring)