#!/usr/bin/env python import argparse import arvados import arvados.events import arvados.commands.keepdocker import arvados.commands.run import arvados.collection import arvados.util import cwltool.draft2tool import cwltool.workflow import cwltool.main from cwltool.process import shortname from cwltool.errors import WorkflowException import threading import cwltool.docker import fnmatch import logging import re import os import sys from cwltool.process import get_feature from arvados.api import OrderedJsonModel logger = logging.getLogger('arvados.cwl-runner') logger.setLevel(logging.INFO) tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)") outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)") keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)") def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid): if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement: dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"] sp = dockerRequirement["dockerImageId"].split(":") image_name = sp[0] image_tag = sp[1] if len(sp) > 1 else None images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3, image_name=image_name, image_tag=image_tag) if not images: imageId = cwltool.docker.get_image(dockerRequirement, pull_image) args = ["--project-uuid="+project_uuid, image_name] if image_tag: args.append(image_tag) logger.info("Uploading Docker image %s", ":".join(args[1:])) arvados.commands.keepdocker.main(args) return dockerRequirement["dockerImageId"] class CollectionFsAccess(cwltool.process.StdFsAccess): def __init__(self, basedir): self.collections = {} self.basedir = basedir def get_collection(self, path): p = path.split("/") if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]): pdh = p[0][5:] if pdh not in self.collections: self.collections[pdh] = arvados.collection.CollectionReader(pdh) return (self.collections[pdh], "/".join(p[1:])) else: return (None, path) def _match(self, collection, patternsegments, parent): if not patternsegments: return [] if not isinstance(collection, arvados.collection.RichCollectionBase): return [] ret = [] # iterate over the files and subcollections in 'collection' for filename in collection: if patternsegments[0] == '.': # Pattern contains something like "./foo" so just shift # past the "./" ret.extend(self._match(collection, patternsegments[1:], parent)) elif fnmatch.fnmatch(filename, patternsegments[0]): cur = os.path.join(parent, filename) if len(patternsegments) == 1: ret.append(cur) else: ret.extend(self._match(collection[filename], patternsegments[1:], cur)) return ret def glob(self, pattern): collection, rest = self.get_collection(pattern) patternsegments = rest.split("/") return self._match(collection, patternsegments, "keep:" + collection.manifest_locator()) def open(self, fn, mode): collection, rest = self.get_collection(fn) if collection: return collection.open(rest, mode) else: return open(self._abs(fn), mode) def exists(self, fn): collection, rest = self.get_collection(fn) if collection: return collection.exists(rest) else: return os.path.exists(self._abs(fn)) class ArvadosJob(object): def __init__(self, runner): self.arvrunner = runner self.running = False def run(self, dry_run=False, pull_image=True, **kwargs): script_parameters = { "command": self.command_line } runtime_constraints = {} if self.generatefiles: vwd = arvados.collection.Collection() script_parameters["task.vwd"] = {} for t in self.generatefiles: if isinstance(self.generatefiles[t], dict): src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:")) vwd.copy(rest, t, source_collection=src) else: with vwd.open(t, "w") as f: f.write(self.generatefiles[t]) vwd.save_new() for t in self.generatefiles: script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t) script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"} if self.environment: script_parameters["task.env"].update(self.environment) if self.stdin: script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1] if self.stdout: script_parameters["task.stdout"] = self.stdout (docker_req, docker_is_req) = get_feature(self, "DockerRequirement") if docker_req and kwargs.get("use_container") is not False: runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid) else: runtime_constraints["docker_image"] = "arvados/jobs" resources = self.builder.resources if resources is not None: runtime_constraints["min_cores_per_node"] = resources.get("cores", 1) runtime_constraints["min_ram_mb_per_node"] = resources.get("ram") runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0) try: response = self.arvrunner.api.jobs().create(body={ "owner_uuid": self.arvrunner.project_uuid, "script": "crunchrunner", "repository": "arvados", "script_version": "master", "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6", "script_parameters": {"tasks": [script_parameters]}, "runtime_constraints": runtime_constraints }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries) self.arvrunner.jobs[response["uuid"]] = self self.arvrunner.pipeline["components"][self.name] = {"job": response} self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"], body={ "components": self.arvrunner.pipeline["components"] }).execute(num_retries=self.arvrunner.num_retries) logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"]) if response["state"] in ("Complete", "Failed", "Cancelled"): self.done(response) except Exception as e: logger.error("Got error %s" % str(e)) self.output_callback({}, "permanentFail") def update_pipeline_component(self, record): self.arvrunner.pipeline["components"][self.name] = {"job": record} self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"], body={ "components": self.arvrunner.pipeline["components"] }).execute(num_retries=self.arvrunner.num_retries) def done(self, record): try: self.update_pipeline_component(record) except: pass try: if record["state"] == "Complete": processStatus = "success" else: processStatus = "permanentFail" try: outputs = {} if record["output"]: logc = arvados.collection.Collection(record["log"]) log = logc.open(logc.keys()[0]) tmpdir = None outdir = None keepdir = None for l in log: # Determine the tmpdir, outdir and keepdir paths from # the job run. Unfortunately, we can't take the first # values we find (which are expected to be near the # top) and stop scanning because if the node fails and # the job restarts on a different node these values # will different runs, and we need to know about the # final run that actually produced output. g = tmpdirre.match(l) if g: tmpdir = g.group(1) g = outdirre.match(l) if g: outdir = g.group(1) g = keepre.match(l) if g: keepdir = g.group(1) colname = "Output %s of %s" % (record["output"][0:7], self.name) # check if collection already exists with same owner, name and content collection_exists = self.arvrunner.api.collections().list( filters=[["owner_uuid", "=", self.arvrunner.project_uuid], ['portable_data_hash', '=', record["output"]], ["name", "=", colname]] ).execute(num_retries=self.arvrunner.num_retries) if not collection_exists["items"]: # Create a collection located in the same project as the # pipeline with the contents of the output. # First, get output record. collections = self.arvrunner.api.collections().list( limit=1, filters=[['portable_data_hash', '=', record["output"]]], select=["manifest_text"] ).execute(num_retries=self.arvrunner.num_retries) if not collections["items"]: raise WorkflowException( "Job output '%s' cannot be found on API server" % ( record["output"])) # Create new collection in the parent project # with the output contents. self.arvrunner.api.collections().create(body={ "owner_uuid": self.arvrunner.project_uuid, "name": colname, "portable_data_hash": record["output"], "manifest_text": collections["items"][0]["manifest_text"] }, ensure_unique_name=True).execute( num_retries=self.arvrunner.num_retries) self.builder.outdir = outdir self.builder.pathmapper.keepdir = keepdir outputs = self.collect_outputs("keep:" + record["output"]) except WorkflowException as e: logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False)) processStatus = "permanentFail" except Exception as e: logger.exception("Got unknown exception while collecting job outputs:") processStatus = "permanentFail" self.output_callback(outputs, processStatus) finally: del self.arvrunner.jobs[record["uuid"]] class ArvPathMapper(cwltool.pathmapper.PathMapper): def __init__(self, arvrunner, referenced_files, basedir, **kwargs): self._pathmap = arvrunner.get_uploaded() uploadfiles = [] pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+') for src in referenced_files: if isinstance(src, basestring) and pdh_path.match(src): self._pathmap[src] = (src, "$(task.keep)/%s" % src[5:]) if src not in self._pathmap: ab = cwltool.pathmapper.abspath(src, basedir) st = arvados.commands.run.statfile("", ab, fnPattern="$(task.keep)/%s/%s") if kwargs.get("conformance_test"): self._pathmap[src] = (src, ab) elif isinstance(st, arvados.commands.run.UploadFile): uploadfiles.append((src, ab, st)) elif isinstance(st, arvados.commands.run.ArvFile): self._pathmap[src] = (ab, st.fn) else: raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st) if uploadfiles: arvados.commands.run.uploadfiles([u[2] for u in uploadfiles], arvrunner.api, dry_run=kwargs.get("dry_run"), num_retries=3, fnPattern="$(task.keep)/%s/%s", project=arvrunner.project_uuid) for src, ab, st in uploadfiles: arvrunner.add_uploaded(src, (ab, st.fn)) self._pathmap[src] = (ab, st.fn) self.keepdir = None def reversemap(self, target): if target.startswith("keep:"): return (target, target) elif self.keepdir and target.startswith(self.keepdir): return (target, "keep:" + target[len(self.keepdir)+1:]) else: return super(ArvPathMapper, self).reversemap(target) class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool): def __init__(self, arvrunner, toolpath_object, **kwargs): super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs) self.arvrunner = arvrunner def makeJobRunner(self): return ArvadosJob(self.arvrunner) def makePathMapper(self, reffiles, input_basedir, **kwargs): return ArvPathMapper(self.arvrunner, reffiles, input_basedir, **kwargs) class ArvCwlRunner(object): def __init__(self, api_client): self.api = api_client self.jobs = {} self.lock = threading.Lock() self.cond = threading.Condition(self.lock) self.final_output = None self.uploaded = {} self.num_retries = 4 def arvMakeTool(self, toolpath_object, **kwargs): if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool": return ArvadosCommandTool(self, toolpath_object, **kwargs) else: return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs) def output_callback(self, out, processStatus): if processStatus == "success": logger.info("Overall job status is %s", processStatus) self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], body={"state": "Complete"}).execute(num_retries=self.num_retries) else: logger.warn("Overall job status is %s", processStatus) self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], body={"state": "Failed"}).execute(num_retries=self.num_retries) self.final_output = out def on_message(self, event): if "object_uuid" in event: if event["object_uuid"] in self.jobs and event["event_type"] == "update": if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False: uuid = event["object_uuid"] with self.lock: j = self.jobs[uuid] logger.info("Job %s (%s) is Running", j.name, uuid) j.running = True j.update_pipeline_component(event["properties"]["new_attributes"]) elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"): uuid = event["object_uuid"] try: self.cond.acquire() j = self.jobs[uuid] logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"]) j.done(event["properties"]["new_attributes"]) self.cond.notify() finally: self.cond.release() def get_uploaded(self): return self.uploaded.copy() def add_uploaded(self, src, pair): self.uploaded[src] = pair def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs): events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message) self.debug = args.debug self.fs_access = CollectionFsAccess(input_basedir) kwargs["fs_access"] = self.fs_access kwargs["enable_reuse"] = args.enable_reuse kwargs["outdir"] = "$(task.outdir)" kwargs["tmpdir"] = "$(task.tmpdir)" useruuid = self.api.users().current().execute()["uuid"] self.project_uuid = args.project_uuid if args.project_uuid else useruuid if kwargs.get("conformance_test"): return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs) else: self.pipeline = self.api.pipeline_instances().create( body={ "owner_uuid": self.project_uuid, "name": shortname(tool.tool["id"]), "components": {}, "state": "RunningOnClient"}).execute(num_retries=self.num_retries) logger.info("Pipeline instance %s", self.pipeline["uuid"]) jobiter = tool.job(job_order, input_basedir, self.output_callback, docker_outdir="$(task.outdir)", **kwargs) try: self.cond.acquire() # Will continue to hold the lock for the duration of this code # except when in cond.wait(), at which point on_message can update # job state and process output callbacks. for runnable in jobiter: if runnable: runnable.run(**kwargs) else: if self.jobs: self.cond.wait(1) else: logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.") break while self.jobs: self.cond.wait(1) events.close() if self.final_output is None: raise cwltool.workflow.WorkflowException("Workflow did not return a result.") # create final output collection except: if sys.exc_info()[0] is KeyboardInterrupt: logger.error("Interrupted, marking pipeline as failed") else: logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False)) self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], body={"state": "Failed"}).execute(num_retries=self.num_retries) finally: self.cond.release() return self.final_output def main(args, stdout, stderr, api_client=None): args.insert(0, "--leave-outputs") parser = cwltool.main.arg_parser() exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--enable-reuse", action="store_true", default=True, dest="enable_reuse", help="") exgroup.add_argument("--disable-reuse", action="store_false", default=True, dest="enable_reuse", help="") parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs") try: runner = ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel())) except Exception as e: logger.error(e) return 1 return cwltool.main.main(args, executor=runner.arvExecutor, makeTool=runner.arvMakeTool, parser=parser)