#!/usr/bin/env python # Implement cwl-runner interface for submitting and running jobs on Arvados. import argparse import arvados import arvados.events import arvados.commands.keepdocker import arvados.commands.run import arvados.collection import arvados.util import cwltool.draft2tool import cwltool.workflow import cwltool.main from cwltool.process import shortname from cwltool.errors import WorkflowException import threading import cwltool.docker import fnmatch import logging import re import os import sys import functools import json import pkg_resources # part of setuptools from cwltool.process import get_feature, adjustFiles, scandeps from arvados.api import OrderedJsonModel logger = logging.getLogger('arvados.cwl-runner') logger.setLevel(logging.INFO) tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)") outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)") keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)") def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid): """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker.""" if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement: dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"] sp = dockerRequirement["dockerImageId"].split(":") image_name = sp[0] image_tag = sp[1] if len(sp) > 1 else None images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3, image_name=image_name, image_tag=image_tag) if not images: imageId = cwltool.docker.get_image(dockerRequirement, pull_image) args = ["--project-uuid="+project_uuid, image_name] if image_tag: args.append(image_tag) logger.info("Uploading Docker image %s", ":".join(args[1:])) arvados.commands.keepdocker.main(args, stdout=sys.stderr) return dockerRequirement["dockerImageId"] class CollectionFsAccess(cwltool.process.StdFsAccess): """Implement the cwltool FsAccess interface for Arvados Collections.""" def __init__(self, basedir): self.collections = {} self.basedir = basedir def get_collection(self, path): p = path.split("/") if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]): pdh = p[0][5:] if pdh not in self.collections: self.collections[pdh] = arvados.collection.CollectionReader(pdh) return (self.collections[pdh], "/".join(p[1:])) else: return (None, path) def _match(self, collection, patternsegments, parent): if not patternsegments: return [] if not isinstance(collection, arvados.collection.RichCollectionBase): return [] ret = [] # iterate over the files and subcollections in 'collection' for filename in collection: if patternsegments[0] == '.': # Pattern contains something like "./foo" so just shift # past the "./" ret.extend(self._match(collection, patternsegments[1:], parent)) elif fnmatch.fnmatch(filename, patternsegments[0]): cur = os.path.join(parent, filename) if len(patternsegments) == 1: ret.append(cur) else: ret.extend(self._match(collection[filename], patternsegments[1:], cur)) return ret def glob(self, pattern): collection, rest = self.get_collection(pattern) patternsegments = rest.split("/") return self._match(collection, patternsegments, "keep:" + collection.manifest_locator()) def open(self, fn, mode): collection, rest = self.get_collection(fn) if collection: return collection.open(rest, mode) else: return open(self._abs(fn), mode) def exists(self, fn): collection, rest = self.get_collection(fn) if collection: return collection.exists(rest) else: return os.path.exists(self._abs(fn)) class ArvadosJob(object): """Submit and manage a Crunch job for executing a CWL CommandLineTool.""" def __init__(self, runner): self.arvrunner = runner self.running = False def run(self, dry_run=False, pull_image=True, **kwargs): script_parameters = { "command": self.command_line } runtime_constraints = {} if self.generatefiles: vwd = arvados.collection.Collection() script_parameters["task.vwd"] = {} for t in self.generatefiles: if isinstance(self.generatefiles[t], dict): src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:")) vwd.copy(rest, t, source_collection=src) else: with vwd.open(t, "w") as f: f.write(self.generatefiles[t]) vwd.save_new() for t in self.generatefiles: script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t) script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"} if self.environment: script_parameters["task.env"].update(self.environment) if self.stdin: script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1] if self.stdout: script_parameters["task.stdout"] = self.stdout (docker_req, docker_is_req) = get_feature(self, "DockerRequirement") if docker_req and kwargs.get("use_container") is not False: runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid) else: runtime_constraints["docker_image"] = "arvados/jobs" resources = self.builder.resources if resources is not None: runtime_constraints["min_cores_per_node"] = resources.get("cores", 1) runtime_constraints["min_ram_mb_per_node"] = resources.get("ram") runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0) filters = [["repository", "=", "arvados"], ["script", "=", "crunchrunner"], ["script_version", "in git", "9e5b98e8f5f4727856b53447191f9c06e3da2ba6"]] if not self.arvrunner.ignore_docker_for_reuse: filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]]) try: response = self.arvrunner.api.jobs().create( body={ "owner_uuid": self.arvrunner.project_uuid, "script": "crunchrunner", "repository": "arvados", "script_version": "master", "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6", "script_parameters": {"tasks": [script_parameters]}, "runtime_constraints": runtime_constraints }, filters=filters, find_or_create=kwargs.get("enable_reuse", True) ).execute(num_retries=self.arvrunner.num_retries) self.arvrunner.jobs[response["uuid"]] = self self.arvrunner.pipeline["components"][self.name] = {"job": response} self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"], body={ "components": self.arvrunner.pipeline["components"] }).execute(num_retries=self.arvrunner.num_retries) logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"]) if response["state"] in ("Complete", "Failed", "Cancelled"): self.done(response) except Exception as e: logger.error("Got error %s" % str(e)) self.output_callback({}, "permanentFail") def update_pipeline_component(self, record): self.arvrunner.pipeline["components"][self.name] = {"job": record} self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"], body={ "components": self.arvrunner.pipeline["components"] }).execute(num_retries=self.arvrunner.num_retries) def done(self, record): try: self.update_pipeline_component(record) except: pass try: if record["state"] == "Complete": processStatus = "success" else: processStatus = "permanentFail" try: outputs = {} if record["output"]: logc = arvados.collection.Collection(record["log"]) log = logc.open(logc.keys()[0]) tmpdir = None outdir = None keepdir = None for l in log: # Determine the tmpdir, outdir and keepdir paths from # the job run. Unfortunately, we can't take the first # values we find (which are expected to be near the # top) and stop scanning because if the node fails and # the job restarts on a different node these values # will different runs, and we need to know about the # final run that actually produced output. g = tmpdirre.match(l) if g: tmpdir = g.group(1) g = outdirre.match(l) if g: outdir = g.group(1) g = keepre.match(l) if g: keepdir = g.group(1) colname = "Output %s of %s" % (record["output"][0:7], self.name) # check if collection already exists with same owner, name and content collection_exists = self.arvrunner.api.collections().list( filters=[["owner_uuid", "=", self.arvrunner.project_uuid], ['portable_data_hash', '=', record["output"]], ["name", "=", colname]] ).execute(num_retries=self.arvrunner.num_retries) if not collection_exists["items"]: # Create a collection located in the same project as the # pipeline with the contents of the output. # First, get output record. collections = self.arvrunner.api.collections().list( limit=1, filters=[['portable_data_hash', '=', record["output"]]], select=["manifest_text"] ).execute(num_retries=self.arvrunner.num_retries) if not collections["items"]: raise WorkflowException( "Job output '%s' cannot be found on API server" % ( record["output"])) # Create new collection in the parent project # with the output contents. self.arvrunner.api.collections().create(body={ "owner_uuid": self.arvrunner.project_uuid, "name": colname, "portable_data_hash": record["output"], "manifest_text": collections["items"][0]["manifest_text"] }, ensure_unique_name=True).execute( num_retries=self.arvrunner.num_retries) self.builder.outdir = outdir self.builder.pathmapper.keepdir = keepdir outputs = self.collect_outputs("keep:" + record["output"]) except WorkflowException as e: logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False)) processStatus = "permanentFail" except Exception as e: logger.exception("Got unknown exception while collecting job outputs:") processStatus = "permanentFail" self.output_callback(outputs, processStatus) finally: del self.arvrunner.jobs[record["uuid"]] class RunnerJob(object): """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner.""" def __init__(self, runner, tool, job_order, enable_reuse): self.arvrunner = runner self.tool = tool self.job_order = job_order self.running = False self.enable_reuse = enable_reuse def update_pipeline_component(self, record): pass def upload_docker(self, tool): if isinstance(tool, cwltool.draft2tool.CommandLineTool): (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement") if docker_req: arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid) elif isinstance(tool, cwltool.workflow.Workflow): for s in tool.steps: self.upload_docker(s.embedded_tool) def run(self, dry_run=False, pull_image=True, **kwargs): self.upload_docker(self.tool) workflowfiles = set() jobfiles = set() workflowfiles.add(self.tool.tool["id"]) self.name = os.path.basename(self.tool.tool["id"]) def visitFiles(files, path): files.add(path) return path document_loader, _, _ = cwltool.process.get_schema() def loadref(b, u): return document_loader.resolve_ref(u, base_url=b)[0] sc = scandeps("", self.tool.tool, set(("$import", "run")), set(("$include", "$schemas", "path")), loadref) adjustFiles(sc, functools.partial(visitFiles, workflowfiles)) adjustFiles(self.job_order, functools.partial(visitFiles, jobfiles)) workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "", "%s", "%s/%s", name=self.name, **kwargs) jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "", "%s", "%s/%s", name=os.path.basename(self.job_order.get("id", "#")), **kwargs) adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1]) if "id" in self.job_order: del self.job_order["id"] self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1] response = self.arvrunner.api.jobs().create(body={ "owner_uuid": self.arvrunner.project_uuid, "script": "cwl-runner", "script_version": "master", "repository": "arvados", "script_parameters": self.job_order, "runtime_constraints": { "docker_image": "arvados/jobs" } }, find_or_create=self.enable_reuse).execute(num_retries=self.arvrunner.num_retries) self.arvrunner.jobs[response["uuid"]] = self logger.info("Submitted job %s", response["uuid"]) if response["state"] in ("Complete", "Failed", "Cancelled"): self.done(response) def done(self, record): if record["state"] == "Complete": processStatus = "success" else: processStatus = "permanentFail" outputs = None try: try: outc = arvados.collection.Collection(record["output"]) with outc.open("cwl.output.json") as f: outputs = json.load(f) except Exception as e: logger.error("While getting final output object: %s", e) self.arvrunner.output_callback(outputs, processStatus) finally: del self.arvrunner.jobs[record["uuid"]] class ArvPathMapper(cwltool.pathmapper.PathMapper): """Convert container-local paths to and from Keep collection ids.""" def __init__(self, arvrunner, referenced_files, basedir, collection_pattern, file_pattern, name=None, **kwargs): self._pathmap = arvrunner.get_uploaded() uploadfiles = set() pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+') for src in referenced_files: if isinstance(src, basestring) and pdh_path.match(src): self._pathmap[src] = (src, collection_pattern % src[5:]) if "#" in src: src = src[:src.index("#")] if src not in self._pathmap: ab = cwltool.pathmapper.abspath(src, basedir) st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern) if kwargs.get("conformance_test"): self._pathmap[src] = (src, ab) elif isinstance(st, arvados.commands.run.UploadFile): uploadfiles.add((src, ab, st)) elif isinstance(st, arvados.commands.run.ArvFile): self._pathmap[src] = (ab, st.fn) else: raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st) if uploadfiles: arvados.commands.run.uploadfiles([u[2] for u in uploadfiles], arvrunner.api, dry_run=kwargs.get("dry_run"), num_retries=3, fnPattern=file_pattern, name=name, project=arvrunner.project_uuid) for src, ab, st in uploadfiles: arvrunner.add_uploaded(src, (ab, st.fn)) self._pathmap[src] = (ab, st.fn) self.keepdir = None def reversemap(self, target): if target.startswith("keep:"): return (target, target) elif self.keepdir and target.startswith(self.keepdir): return (target, "keep:" + target[len(self.keepdir)+1:]) else: return super(ArvPathMapper, self).reversemap(target) class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool): """Wrap cwltool CommandLineTool to override selected methods.""" def __init__(self, arvrunner, toolpath_object, **kwargs): super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs) self.arvrunner = arvrunner def makeJobRunner(self): return ArvadosJob(self.arvrunner) def makePathMapper(self, reffiles, input_basedir, **kwargs): return ArvPathMapper(self.arvrunner, reffiles, input_basedir, "$(task.keep)/%s", "$(task.keep)/%s/%s", **kwargs) class ArvCwlRunner(object): """Execute a CWL tool or workflow, submit crunch jobs, wait for them to complete, and report output.""" def __init__(self, api_client): self.api = api_client self.jobs = {} self.lock = threading.Lock() self.cond = threading.Condition(self.lock) self.final_output = None self.uploaded = {} self.num_retries = 4 def arvMakeTool(self, toolpath_object, **kwargs): if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool": return ArvadosCommandTool(self, toolpath_object, **kwargs) else: return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs) def output_callback(self, out, processStatus): if processStatus == "success": logger.info("Overall job status is %s", processStatus) if self.pipeline: self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], body={"state": "Complete"}).execute(num_retries=self.num_retries) else: logger.warn("Overall job status is %s", processStatus) if self.pipeline: self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], body={"state": "Failed"}).execute(num_retries=self.num_retries) self.final_output = out def on_message(self, event): if "object_uuid" in event: if event["object_uuid"] in self.jobs and event["event_type"] == "update": if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False: uuid = event["object_uuid"] with self.lock: j = self.jobs[uuid] logger.info("Job %s (%s) is Running", j.name, uuid) j.running = True j.update_pipeline_component(event["properties"]["new_attributes"]) elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"): uuid = event["object_uuid"] try: self.cond.acquire() j = self.jobs[uuid] logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"]) j.done(event["properties"]["new_attributes"]) self.cond.notify() finally: self.cond.release() def get_uploaded(self): return self.uploaded.copy() def add_uploaded(self, src, pair): self.uploaded[src] = pair def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs): self.debug = args.debug if args.quiet: logger.setLevel(logging.WARN) logging.getLogger('arvados.arv-run').setLevel(logging.WARN) useruuid = self.api.users().current().execute()["uuid"] self.project_uuid = args.project_uuid if args.project_uuid else useruuid self.pipeline = None if args.submit: runnerjob = RunnerJob(self, tool, job_order, args.enable_reuse) if not args.wait: runnerjob.run() return events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message) self.debug = args.debug self.ignore_docker_for_reuse = args.ignore_docker_for_reuse self.fs_access = CollectionFsAccess(input_basedir) kwargs["fs_access"] = self.fs_access kwargs["enable_reuse"] = args.enable_reuse kwargs["outdir"] = "$(task.outdir)" kwargs["tmpdir"] = "$(task.tmpdir)" if kwargs.get("conformance_test"): return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs) else: if args.submit: jobiter = iter((runnerjob,)) else: components = {} if "cwl_runner_job" in kwargs: components[os.path.basename(tool.tool["id"])] = {"job": kwargs["cwl_runner_job"]} self.pipeline = self.api.pipeline_instances().create( body={ "owner_uuid": self.project_uuid, "name": shortname(tool.tool["id"]), "components": components, "state": "RunningOnClient"}).execute(num_retries=self.num_retries) logger.info("Pipeline instance %s", self.pipeline["uuid"]) jobiter = tool.job(job_order, input_basedir, self.output_callback, docker_outdir="$(task.outdir)", **kwargs) try: self.cond.acquire() # Will continue to hold the lock for the duration of this code # except when in cond.wait(), at which point on_message can update # job state and process output callbacks. for runnable in jobiter: if runnable: runnable.run(**kwargs) else: if self.jobs: self.cond.wait(1) else: logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.") break while self.jobs: self.cond.wait(1) events.close() if self.final_output is None: raise cwltool.workflow.WorkflowException("Workflow did not return a result.") # create final output collection except: if sys.exc_info()[0] is KeyboardInterrupt: logger.error("Interrupted, marking pipeline as failed") else: logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False)) if self.pipeline: self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], body={"state": "Failed"}).execute(num_retries=self.num_retries) finally: self.cond.release() return self.final_output def versionstring(): """Print version string of key packages for provenance and debugging.""" arvcwlpkg = pkg_resources.require("arvados-cwl-runner") arvpkg = pkg_resources.require("arvados-python-client") cwlpkg = pkg_resources.require("cwltool") return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version, "arvados-python-client", arvpkg[0].version, "cwltool", cwlpkg[0].version) def main(args, stdout, stderr, api_client=None): args.insert(0, "--leave-outputs") parser = cwltool.main.arg_parser() exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--enable-reuse", action="store_true", default=True, dest="enable_reuse", help="") exgroup.add_argument("--disable-reuse", action="store_false", default=True, dest="enable_reuse", help="") parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs") parser.add_argument("--ignore-docker-for-reuse", action="store_true", help="Ignore Docker image version when deciding whether to reuse past jobs.", default=False) exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.", default=True, dest="submit") exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).", default=True, dest="submit") exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.", default=True, dest="wait") exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.", default=True, dest="wait") try: if api_client is None: api_client=arvados.api('v1', model=OrderedJsonModel()) runner = ArvCwlRunner(api_client) except Exception as e: logger.error(e) return 1 return cwltool.main.main(args, stdout=stdout, stderr=stderr, executor=runner.arvExecutor, makeTool=runner.arvMakeTool, parser=parser, versionfunc=versionstring)