X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/df5453354edc2e6a3db198884b9c5bd0f86fed7c..3386dd9826cf143d078aa8985726516932fafa5b:/sdk/cwl/arvados_cwl/__init__.py diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index 8370e3d5e7..ab8d725bd7 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -1,27 +1,44 @@ #!/usr/bin/env python +# Implement cwl-runner interface for submitting and running jobs on Arvados. + import argparse import arvados import arvados.events import arvados.commands.keepdocker import arvados.commands.run +import arvados.collection +import arvados.util import cwltool.draft2tool import cwltool.workflow import cwltool.main from cwltool.process import shortname +from cwltool.errors import WorkflowException import threading import cwltool.docker import fnmatch import logging import re import os +import sys +import functools +import json +import pkg_resources # part of setuptools -from cwltool.process import get_feature +from cwltool.process import get_feature, adjustFiles, scandeps +from arvados.api import OrderedJsonModel logger = logging.getLogger('arvados.cwl-runner') logger.setLevel(logging.INFO) -def arv_docker_get_image(api_client, dockerRequirement, pull_image): +tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)") +outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)") +keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)") + + +def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid): + """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker.""" + if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement: dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"] @@ -35,16 +52,18 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image): if not images: imageId = cwltool.docker.get_image(dockerRequirement, pull_image) - args = [image_name] + args = ["--project-uuid="+project_uuid, image_name] if image_tag: args.append(image_tag) - logger.info("Uploading Docker image %s", ":".join(args)) - arvados.commands.keepdocker.main(args) + logger.info("Uploading Docker image %s", ":".join(args[1:])) + arvados.commands.keepdocker.main(args, stdout=sys.stderr) return dockerRequirement["dockerImageId"] class CollectionFsAccess(cwltool.process.StdFsAccess): + """Implement the cwltool FsAccess interface for Arvados Collections.""" + def __init__(self, basedir): self.collections = {} self.basedir = basedir @@ -101,6 +120,8 @@ class CollectionFsAccess(cwltool.process.StdFsAccess): return os.path.exists(self._abs(fn)) class ArvadosJob(object): + """Submit and manage a Crunch job for executing a CWL CommandLineTool.""" + def __init__(self, runner): self.arvrunner = runner self.running = False @@ -137,16 +158,36 @@ class ArvadosJob(object): (docker_req, docker_is_req) = get_feature(self, "DockerRequirement") if docker_req and kwargs.get("use_container") is not False: - runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image) + runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid) + else: + runtime_constraints["docker_image"] = "arvados/jobs" + + resources = self.builder.resources + if resources is not None: + runtime_constraints["min_cores_per_node"] = resources.get("cores", 1) + runtime_constraints["min_ram_mb_per_node"] = resources.get("ram") + runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0) + + filters = [["repository", "=", "arvados"], + ["script", "=", "crunchrunner"], + ["script_version", "in git", "9e5b98e8f5f4727856b53447191f9c06e3da2ba6"]] + if not self.arvrunner.ignore_docker_for_reuse: + filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]]) try: - response = self.arvrunner.api.jobs().create(body={ - "script": "crunchrunner", - "repository": kwargs["repository"], - "script_version": "master", - "script_parameters": {"tasks": [script_parameters]}, - "runtime_constraints": runtime_constraints - }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries) + response = self.arvrunner.api.jobs().create( + body={ + "owner_uuid": self.arvrunner.project_uuid, + "script": "crunchrunner", + "repository": "arvados", + "script_version": "master", + "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6", + "script_parameters": {"tasks": [script_parameters]}, + "runtime_constraints": runtime_constraints + }, + filters=filters, + find_or_create=kwargs.get("enable_reuse", True) + ).execute(num_retries=self.arvrunner.num_retries) self.arvrunner.jobs[response["uuid"]] = self @@ -186,9 +227,72 @@ class ArvadosJob(object): try: outputs = {} if record["output"]: + logc = arvados.collection.Collection(record["log"]) + log = logc.open(logc.keys()[0]) + tmpdir = None + outdir = None + keepdir = None + for l in log: + # Determine the tmpdir, outdir and keepdir paths from + # the job run. Unfortunately, we can't take the first + # values we find (which are expected to be near the + # top) and stop scanning because if the node fails and + # the job restarts on a different node these values + # will different runs, and we need to know about the + # final run that actually produced output. + + g = tmpdirre.match(l) + if g: + tmpdir = g.group(1) + g = outdirre.match(l) + if g: + outdir = g.group(1) + g = keepre.match(l) + if g: + keepdir = g.group(1) + + colname = "Output %s of %s" % (record["output"][0:7], self.name) + + # check if collection already exists with same owner, name and content + collection_exists = self.arvrunner.api.collections().list( + filters=[["owner_uuid", "=", self.arvrunner.project_uuid], + ['portable_data_hash', '=', record["output"]], + ["name", "=", colname]] + ).execute(num_retries=self.arvrunner.num_retries) + + if not collection_exists["items"]: + # Create a collection located in the same project as the + # pipeline with the contents of the output. + # First, get output record. + collections = self.arvrunner.api.collections().list( + limit=1, + filters=[['portable_data_hash', '=', record["output"]]], + select=["manifest_text"] + ).execute(num_retries=self.arvrunner.num_retries) + + if not collections["items"]: + raise WorkflowException( + "Job output '%s' cannot be found on API server" % ( + record["output"])) + + # Create new collection in the parent project + # with the output contents. + self.arvrunner.api.collections().create(body={ + "owner_uuid": self.arvrunner.project_uuid, + "name": colname, + "portable_data_hash": record["output"], + "manifest_text": collections["items"][0]["manifest_text"] + }, ensure_unique_name=True).execute( + num_retries=self.arvrunner.num_retries) + + self.builder.outdir = outdir + self.builder.pathmapper.keepdir = keepdir outputs = self.collect_outputs("keep:" + record["output"]) + except WorkflowException as e: + logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False)) + processStatus = "permanentFail" except Exception as e: - logger.exception("Got exception while collecting job outputs:") + logger.exception("Got unknown exception while collecting job outputs:") processStatus = "permanentFail" self.output_callback(outputs, processStatus) @@ -196,23 +300,128 @@ class ArvadosJob(object): del self.arvrunner.jobs[record["uuid"]] +class RunnerJob(object): + """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner.""" + + def __init__(self, runner, tool, job_order, enable_reuse): + self.arvrunner = runner + self.tool = tool + self.job_order = job_order + self.running = False + self.enable_reuse = enable_reuse + + def update_pipeline_component(self, record): + pass + + def upload_docker(self, tool): + if isinstance(tool, cwltool.draft2tool.CommandLineTool): + (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement") + if docker_req: + arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid) + elif isinstance(tool, cwltool.workflow.Workflow): + for s in tool.steps: + self.upload_docker(s.embedded_tool) + + def run(self, dry_run=False, pull_image=True, **kwargs): + self.upload_docker(self.tool) + + workflowfiles = set() + jobfiles = set() + workflowfiles.add(self.tool.tool["id"]) + + self.name = os.path.basename(self.tool.tool["id"]) + + def visitFiles(files, path): + files.add(path) + return path + + document_loader, _, _ = cwltool.process.get_schema() + def loadref(b, u): + return document_loader.resolve_ref(u, base_url=b)[0] + + sc = scandeps("", self.tool.tool, + set(("$import", "run")), + set(("$include", "$schemas", "path")), + loadref) + adjustFiles(sc, functools.partial(visitFiles, workflowfiles)) + adjustFiles(self.job_order, functools.partial(visitFiles, jobfiles)) + + workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "", + "%s", + "%s/%s", + name=self.name, + **kwargs) + + jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "", + "%s", + "%s/%s", + name=os.path.basename(self.job_order.get("id", "#")), + **kwargs) + + adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1]) + + if "id" in self.job_order: + del self.job_order["id"] + + self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1] + + response = self.arvrunner.api.jobs().create(body={ + "script": "cwl-runner", + "script_version": "master", + "repository": "arvados", + "script_parameters": self.job_order, + "runtime_constraints": { + "docker_image": "arvados/jobs" + } + }, find_or_create=self.enable_reuse).execute(num_retries=self.arvrunner.num_retries) + + self.arvrunner.jobs[response["uuid"]] = self + + logger.info("Submitted job %s", response["uuid"]) + + if response["state"] in ("Complete", "Failed", "Cancelled"): + self.done(response) + + def done(self, record): + if record["state"] == "Complete": + processStatus = "success" + else: + processStatus = "permanentFail" + + outputs = None + try: + try: + outc = arvados.collection.Collection(record["output"]) + with outc.open("cwl.output.json") as f: + outputs = json.load(f) + except Exception as e: + logger.error("While getting final output object: %s", e) + self.arvrunner.output_callback(outputs, processStatus) + finally: + del self.arvrunner.jobs[record["uuid"]] + class ArvPathMapper(cwltool.pathmapper.PathMapper): - def __init__(self, arvrunner, referenced_files, basedir, **kwargs): + """Convert container-local paths to and from Keep collection ids.""" + + def __init__(self, arvrunner, referenced_files, basedir, + collection_pattern, file_pattern, name=None, **kwargs): self._pathmap = arvrunner.get_uploaded() - uploadfiles = [] + uploadfiles = set() pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+') for src in referenced_files: if isinstance(src, basestring) and pdh_path.match(src): - self._pathmap[src] = (src, "$(task.keep)/%s" % src[5:]) + self._pathmap[src] = (src, collection_pattern % src[5:]) + if "#" in src: + src = src[:src.index("#")] if src not in self._pathmap: ab = cwltool.pathmapper.abspath(src, basedir) - st = arvados.commands.run.statfile("", ab, fnPattern="$(task.keep)/%s/%s") + st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern) if kwargs.get("conformance_test"): self._pathmap[src] = (src, ab) elif isinstance(st, arvados.commands.run.UploadFile): - uploadfiles.append((src, ab, st)) + uploadfiles.add((src, ab, st)) elif isinstance(st, arvados.commands.run.ArvFile): self._pathmap[src] = (ab, st.fn) else: @@ -223,27 +432,46 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper): arvrunner.api, dry_run=kwargs.get("dry_run"), num_retries=3, - fnPattern="$(task.keep)/%s/%s") + fnPattern=file_pattern, + name=name, + project=arvrunner.project_uuid) for src, ab, st in uploadfiles: arvrunner.add_uploaded(src, (ab, st.fn)) self._pathmap[src] = (ab, st.fn) + self.keepdir = None + + def reversemap(self, target): + if target.startswith("keep:"): + return (target, target) + elif self.keepdir and target.startswith(self.keepdir): + return (target, "keep:" + target[len(self.keepdir)+1:]) + else: + return super(ArvPathMapper, self).reversemap(target) class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool): + """Wrap cwltool CommandLineTool to override selected methods.""" + def __init__(self, arvrunner, toolpath_object, **kwargs): - super(ArvadosCommandTool, self).__init__(toolpath_object, outdir="$(task.outdir)", tmpdir="$(task.tmpdir)", **kwargs) + super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs) self.arvrunner = arvrunner def makeJobRunner(self): return ArvadosJob(self.arvrunner) def makePathMapper(self, reffiles, input_basedir, **kwargs): - return ArvPathMapper(self.arvrunner, reffiles, input_basedir, **kwargs) + return ArvPathMapper(self.arvrunner, reffiles, input_basedir, + "$(task.keep)/%s", + "$(task.keep)/%s/%s", + **kwargs) class ArvCwlRunner(object): + """Execute a CWL tool or workflow, submit crunch jobs, wait for them to + complete, and report output.""" + def __init__(self, api_client): self.api = api_client self.jobs = {} @@ -262,36 +490,38 @@ class ArvCwlRunner(object): def output_callback(self, out, processStatus): if processStatus == "success": logger.info("Overall job status is %s", processStatus) - self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], - body={"state": "Complete"}).execute(num_retries=self.num_retries) + if self.pipeline: + self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], + body={"state": "Complete"}).execute(num_retries=self.num_retries) else: logger.warn("Overall job status is %s", processStatus) - self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], - body={"state": "Failed"}).execute(num_retries=self.num_retries) + if self.pipeline: + self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], + body={"state": "Failed"}).execute(num_retries=self.num_retries) self.final_output = out def on_message(self, event): if "object_uuid" in event: - if event["object_uuid"] in self.jobs and event["event_type"] == "update": - if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False: - uuid = event["object_uuid"] - with self.lock: - j = self.jobs[uuid] - logger.info("Job %s (%s) is Running", j.name, uuid) - j.running = True - j.update_pipeline_component(event["properties"]["new_attributes"]) - elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"): - uuid = event["object_uuid"] - try: - self.cond.acquire() - j = self.jobs[uuid] - logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"]) - j.done(event["properties"]["new_attributes"]) - self.cond.notify() - finally: - self.cond.release() + if event["object_uuid"] in self.jobs and event["event_type"] == "update": + if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False: + uuid = event["object_uuid"] + with self.lock: + j = self.jobs[uuid] + logger.info("Job %s (%s) is Running", j.name, uuid) + j.running = True + j.update_pipeline_component(event["properties"]["new_attributes"]) + elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"): + uuid = event["object_uuid"] + try: + self.cond.acquire() + j = self.jobs[uuid] + logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"]) + j.done(event["properties"]["new_attributes"]) + self.cond.notify() + finally: + self.cond.release() def get_uploaded(self): return self.uploaded.copy() @@ -300,68 +530,149 @@ class ArvCwlRunner(object): self.uploaded[src] = pair def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs): - events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message) + self.debug = args.debug - self.pipeline = self.api.pipeline_instances().create(body={"name": shortname(tool.tool["id"]), - "components": {}, - "state": "RunningOnClient"}).execute(num_retries=self.num_retries) + if args.quiet: + logger.setLevel(logging.WARN) + logging.getLogger('arvados.arv-run').setLevel(logging.WARN) + + useruuid = self.api.users().current().execute()["uuid"] + self.project_uuid = args.project_uuid if args.project_uuid else useruuid + self.pipeline = None + + if args.submit: + runnerjob = RunnerJob(self, tool, job_order, args.enable_reuse) + if not args.wait: + runnerjob.run() + return + + events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message) + self.debug = args.debug + self.ignore_docker_for_reuse = args.ignore_docker_for_reuse self.fs_access = CollectionFsAccess(input_basedir) kwargs["fs_access"] = self.fs_access kwargs["enable_reuse"] = args.enable_reuse - kwargs["repository"] = args.repository + + kwargs["outdir"] = "$(task.outdir)" + kwargs["tmpdir"] = "$(task.tmpdir)" if kwargs.get("conformance_test"): return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs) else: - jobiter = tool.job(job_order, - input_basedir, - self.output_callback, - **kwargs) + if args.submit: + jobiter = iter((runnerjob,)) + else: + components = {} + if "cwl_runner_job" in kwargs: + components[os.path.basename(tool.tool["id"])] = {"job": kwargs["cwl_runner_job"]} - for runnable in jobiter: - if runnable: - with self.lock: + self.pipeline = self.api.pipeline_instances().create( + body={ + "owner_uuid": self.project_uuid, + "name": shortname(tool.tool["id"]), + "components": components, + "state": "RunningOnClient"}).execute(num_retries=self.num_retries) + + logger.info("Pipeline instance %s", self.pipeline["uuid"]) + + jobiter = tool.job(job_order, + input_basedir, + self.output_callback, + docker_outdir="$(task.outdir)", + **kwargs) + + try: + self.cond.acquire() + # Will continue to hold the lock for the duration of this code + # except when in cond.wait(), at which point on_message can update + # job state and process output callbacks. + + for runnable in jobiter: + if runnable: runnable.run(**kwargs) - else: - if self.jobs: - try: - self.cond.acquire() - self.cond.wait() - finally: - self.cond.release() else: - logger.error("Workflow cannot make any more progress.") - break + if self.jobs: + self.cond.wait(1) + else: + logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.") + break - while self.jobs: - try: - self.cond.acquire() - self.cond.wait() - finally: - self.cond.release() + while self.jobs: + self.cond.wait(1) - events.close() + events.close() - if self.final_output is None: - raise cwltool.workflow.WorkflowException("Workflow did not return a result.") + if self.final_output is None: + raise cwltool.workflow.WorkflowException("Workflow did not return a result.") + + # create final output collection + except: + if sys.exc_info()[0] is KeyboardInterrupt: + logger.error("Interrupted, marking pipeline as failed") + else: + logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False)) + if self.pipeline: + self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], + body={"state": "Failed"}).execute(num_retries=self.num_retries) + finally: + self.cond.release() return self.final_output +def versionstring(): + """Print version string of key packages for provenance and debugging.""" + + arvcwlpkg = pkg_resources.require("arvados-cwl-runner") + arvpkg = pkg_resources.require("arvados-python-client") + cwlpkg = pkg_resources.require("cwltool") + + return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version, + "arvados-python-client", arvpkg[0].version, + "cwltool", cwlpkg[0].version) def main(args, stdout, stderr, api_client=None): - runner = ArvCwlRunner(api_client=arvados.api('v1')) args.insert(0, "--leave-outputs") parser = cwltool.main.arg_parser() + exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--enable-reuse", action="store_true", - default=False, dest="enable_reuse", + default=True, dest="enable_reuse", help="") exgroup.add_argument("--disable-reuse", action="store_false", - default=False, dest="enable_reuse", + default=True, dest="enable_reuse", help="") - parser.add_argument('--repository', type=str, default="peter/crunchrunner", help="Repository containing the 'crunchrunner' program.") + parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs") + parser.add_argument("--ignore-docker-for-reuse", action="store_true", + help="Ignore Docker image version when deciding whether to reuse past jobs.", + default=False) + + exgroup = parser.add_mutually_exclusive_group() + exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.", + default=True, dest="submit") + exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).", + default=True, dest="submit") - return cwltool.main.main(args, executor=runner.arvExecutor, makeTool=runner.arvMakeTool, parser=parser) + exgroup = parser.add_mutually_exclusive_group() + exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.", + default=True, dest="wait") + exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.", + default=True, dest="wait") + + try: + if api_client is None: + api_client=arvados.api('v1', model=OrderedJsonModel()) + runner = ArvCwlRunner(api_client) + except Exception as e: + logger.error(e) + return 1 + + return cwltool.main.main(args, + stdout=stdout, + stderr=stderr, + executor=runner.arvExecutor, + makeTool=runner.arvMakeTool, + parser=parser, + versionfunc=versionstring)