X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b8260eb5cdc2c403083d7dbea49c45791a763e9c..e0fac5213f40fc00946f8ec5e4df42bebdf756d2:/sdk/cwl/arvados_cwl/__init__.py diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index 67d0d4e49e..0533b5e83a 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -1,43 +1,46 @@ #!/usr/bin/env python +# Implement cwl-runner interface for submitting and running jobs on Arvados. + import argparse import arvados -import arvados.events +import arvados.collection import arvados.commands.keepdocker import arvados.commands.run -import arvados.collection +import arvados.events import arvados.util -import cwltool.draft2tool -import cwltool.workflow -import cwltool.main -from cwltool.process import shortname -from cwltool.errors import WorkflowException -import threading +import copy import cwltool.docker +from cwltool.draft2tool import revmap_file, remove_hostfs, CommandLineTool +from cwltool.errors import WorkflowException +import cwltool.main +import cwltool.workflow import fnmatch +from functools import partial +import json import logging -import re import os +import pkg_resources # part of setuptools +import re import sys -import functools -import json +import threading +from schema_salad.ref_resolver import Loader +from cwltool.builder import Builder -from cwltool.process import get_feature, adjustFiles, scandeps +from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps from arvados.api import OrderedJsonModel logger = logging.getLogger('arvados.cwl-runner') logger.setLevel(logging.INFO) -crunchrunner_pdh = "ff6fc71e593081ef9733afacaeee15ea+140" -crunchrunner_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/crunchrunner" -certs_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/ca-certificates.crt" - tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)") outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)") keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)") def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid): + """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker.""" + if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement: dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"] @@ -55,15 +58,17 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid if image_tag: args.append(image_tag) logger.info("Uploading Docker image %s", ":".join(args[1:])) - arvados.commands.keepdocker.main(args) + arvados.commands.keepdocker.main(args, stdout=sys.stderr) return dockerRequirement["dockerImageId"] class CollectionFsAccess(cwltool.process.StdFsAccess): + """Implement the cwltool FsAccess interface for Arvados Collections.""" + def __init__(self, basedir): + super(CollectionFsAccess, self).__init__(basedir) self.collections = {} - self.basedir = basedir def get_collection(self, path): p = path.split("/") @@ -117,6 +122,8 @@ class CollectionFsAccess(cwltool.process.StdFsAccess): return os.path.exists(self._abs(fn)) class ArvadosJob(object): + """Submit and manage a Crunch job for executing a CWL CommandLineTool.""" + def __init__(self, runner): self.arvrunner = runner self.running = False @@ -154,6 +161,8 @@ class ArvadosJob(object): (docker_req, docker_is_req) = get_feature(self, "DockerRequirement") if docker_req and kwargs.get("use_container") is not False: runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid) + else: + runtime_constraints["docker_image"] = "arvados/jobs" resources = self.builder.resources if resources is not None: @@ -161,16 +170,26 @@ class ArvadosJob(object): runtime_constraints["min_ram_mb_per_node"] = resources.get("ram") runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0) + filters = [["repository", "=", "arvados"], + ["script", "=", "crunchrunner"], + ["script_version", "in git", "9e5b98e8f5f4727856b53447191f9c06e3da2ba6"]] + if not self.arvrunner.ignore_docker_for_reuse: + filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]]) + try: - response = self.arvrunner.api.jobs().create(body={ - "owner_uuid": self.arvrunner.project_uuid, - "script": "crunchrunner", - "repository": "arvados", - "script_version": "master", - "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6", - "script_parameters": {"tasks": [script_parameters], "crunchrunner": crunchrunner_pdh+"/crunchrunner"}, - "runtime_constraints": runtime_constraints - }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries) + response = self.arvrunner.api.jobs().create( + body={ + "owner_uuid": self.arvrunner.project_uuid, + "script": "crunchrunner", + "repository": "arvados", + "script_version": "master", + "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6", + "script_parameters": {"tasks": [script_parameters]}, + "runtime_constraints": runtime_constraints + }, + filters=filters, + find_or_create=kwargs.get("enable_reuse", True) + ).execute(num_retries=self.arvrunner.num_retries) self.arvrunner.jobs[response["uuid"]] = self @@ -216,6 +235,14 @@ class ArvadosJob(object): outdir = None keepdir = None for l in log: + # Determine the tmpdir, outdir and keepdir paths from + # the job run. Unfortunately, we can't take the first + # values we find (which are expected to be near the + # top) and stop scanning because if the node fails and + # the job restarts on a different node these values + # will different runs, and we need to know about the + # final run that actually produced output. + g = tmpdirre.match(l) if g: tmpdir = g.group(1) @@ -226,13 +253,39 @@ class ArvadosJob(object): if g: keepdir = g.group(1) - # It turns out if the job fails and restarts it can - # come up on a different compute node, so we have to - # read the log to the end to be sure instead of taking the - # easy way out. - # - #if tmpdir and outdir and keepdir: - # break + colname = "Output %s of %s" % (record["output"][0:7], self.name) + + # check if collection already exists with same owner, name and content + collection_exists = self.arvrunner.api.collections().list( + filters=[["owner_uuid", "=", self.arvrunner.project_uuid], + ['portable_data_hash', '=', record["output"]], + ["name", "=", colname]] + ).execute(num_retries=self.arvrunner.num_retries) + + if not collection_exists["items"]: + # Create a collection located in the same project as the + # pipeline with the contents of the output. + # First, get output record. + collections = self.arvrunner.api.collections().list( + limit=1, + filters=[['portable_data_hash', '=', record["output"]]], + select=["manifest_text"] + ).execute(num_retries=self.arvrunner.num_retries) + + if not collections["items"]: + raise WorkflowException( + "Job output '%s' cannot be found on API server" % ( + record["output"])) + + # Create new collection in the parent project + # with the output contents. + self.arvrunner.api.collections().create(body={ + "owner_uuid": self.arvrunner.project_uuid, + "name": colname, + "portable_data_hash": record["output"], + "manifest_text": collections["items"][0]["manifest_text"] + }, ensure_unique_name=True).execute( + num_retries=self.arvrunner.num_retries) self.builder.outdir = outdir self.builder.pathmapper.keepdir = keepdir @@ -250,6 +303,8 @@ class ArvadosJob(object): class RunnerJob(object): + """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner.""" + def __init__(self, runner, tool, job_order, enable_reuse): self.arvrunner = runner self.tool = tool @@ -261,7 +316,7 @@ class RunnerJob(object): pass def upload_docker(self, tool): - if isinstance(tool, cwltool.draft2tool.CommandLineTool): + if isinstance(tool, CommandLineTool): (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement") if docker_req: arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid) @@ -269,7 +324,13 @@ class RunnerJob(object): for s in tool.steps: self.upload_docker(s.embedded_tool) - def run(self, dry_run=False, pull_image=True, **kwargs): + def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs): + """Create an Arvados job specification for this workflow. + + The returned dict can be used to create a job (i.e., passed as + the +body+ argument to jobs().create()), or as a component in + a pipeline template or pipeline instance. + """ self.upload_docker(self.tool) workflowfiles = set() @@ -282,7 +343,7 @@ class RunnerJob(object): files.add(path) return path - document_loader, _, _ = cwltool.process.get_schema() + document_loader = Loader({"cwl": "https://w3id.org/cwl/cwl#", "id": "@id"}) def loadref(b, u): return document_loader.resolve_ref(u, base_url=b)[0] @@ -290,8 +351,8 @@ class RunnerJob(object): set(("$import", "run")), set(("$include", "$schemas", "path")), loadref) - adjustFiles(sc, functools.partial(visitFiles, workflowfiles)) - adjustFiles(self.job_order, functools.partial(visitFiles, jobfiles)) + adjustFiles(sc, partial(visitFiles, workflowfiles)) + adjustFiles(self.job_order, partial(visitFiles, jobfiles)) workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "", "%s", @@ -311,18 +372,27 @@ class RunnerJob(object): del self.job_order["id"] self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1] - - response = self.arvrunner.api.jobs().create(body={ + return { "script": "cwl-runner", - "script_version": "8654-arv-jobs-cwl-runner", + "script_version": "master", "repository": "arvados", "script_parameters": self.job_order, "runtime_constraints": { "docker_image": "arvados/jobs" } - }, find_or_create=self.enable_reuse).execute(num_retries=self.arvrunner.num_retries) + } + + def run(self, *args, **kwargs): + job_spec = self.arvados_job_spec(*args, **kwargs) + job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid) + + response = self.arvrunner.api.jobs().create( + body=job_spec, + find_or_create=self.enable_reuse + ).execute(num_retries=self.arvrunner.num_retries) - self.arvrunner.jobs[response["uuid"]] = self + self.uuid = response["uuid"] + self.arvrunner.jobs[self.uuid] = self logger.info("Submitted job %s", response["uuid"]) @@ -337,31 +407,135 @@ class RunnerJob(object): outputs = None try: - outc = arvados.collection.Collection(record["output"]) - with outc.open("cwl.output.json") as f: - outputs = json.load(f) + try: + outc = arvados.collection.Collection(record["output"]) + with outc.open("cwl.output.json") as f: + outputs = json.load(f) + def keepify(path): + if not path.startswith("keep:"): + return "keep:%s/%s" % (record["output"], path) + adjustFiles(outputs, keepify) + except Exception as e: + logger.error("While getting final output object: %s", e) self.arvrunner.output_callback(outputs, processStatus) finally: del self.arvrunner.jobs[record["uuid"]] + +class RunnerTemplate(object): + """An Arvados pipeline template that invokes a CWL workflow.""" + + type_to_dataclass = { + 'boolean': 'boolean', + 'File': 'File', + 'float': 'number', + 'int': 'number', + 'string': 'text', + } + + def __init__(self, runner, tool, job_order, enable_reuse): + self.runner = runner + self.tool = tool + self.job = RunnerJob( + runner=runner, + tool=tool, + job_order=job_order, + enable_reuse=enable_reuse) + + def pipeline_component_spec(self): + """Return a component that Workbench and a-r-p-i will understand. + + Specifically, translate CWL input specs to Arvados pipeline + format, like {"dataclass":"File","value":"xyz"}. + """ + spec = self.job.arvados_job_spec() + + # Most of the component spec is exactly the same as the job + # spec (script, script_version, etc.). + # spec['script_parameters'] isn't right, though. A component + # spec's script_parameters hash is a translation of + # self.tool.tool['inputs'] with defaults/overrides taken from + # the job order. So we move the job parameters out of the way + # and build a new spec['script_parameters']. + job_params = spec['script_parameters'] + spec['script_parameters'] = {} + + for param in self.tool.tool['inputs']: + param = copy.deepcopy(param) + + # Data type and "required" flag... + types = param['type'] + if not isinstance(types, list): + types = [types] + param['required'] = 'null' not in types + non_null_types = set(types) - set(['null']) + if len(non_null_types) == 1: + the_type = [c for c in non_null_types][0] + dataclass = self.type_to_dataclass.get(the_type) + if dataclass: + param['dataclass'] = dataclass + # Note: If we didn't figure out a single appropriate + # dataclass, we just left that attribute out. We leave + # the "type" attribute there in any case, which might help + # downstream. + + # Title and description... + title = param.pop('label', '') + descr = param.pop('description', '').rstrip('\n') + if title: + param['title'] = title + if descr: + param['description'] = descr + + # Fill in the value from the current job order, if any. + param_id = shortname(param.pop('id')) + value = job_params.get(param_id) + if value is None: + pass + elif not isinstance(value, dict): + param['value'] = value + elif param.get('dataclass') == 'File' and value.get('path'): + param['value'] = value['path'] + + spec['script_parameters'][param_id] = param + spec['script_parameters']['cwl:tool'] = job_params['cwl:tool'] + return spec + + def save(self): + job_spec = self.pipeline_component_spec() + response = self.runner.api.pipeline_templates().create(body={ + "components": { + self.job.name: job_spec, + }, + "name": self.job.name, + "owner_uuid": self.runner.project_uuid, + }, ensure_unique_name=True).execute(num_retries=self.runner.num_retries) + self.uuid = response["uuid"] + logger.info("Created template %s", self.uuid) + + class ArvPathMapper(cwltool.pathmapper.PathMapper): - def __init__(self, arvrunner, referenced_files, basedir, + """Convert container-local paths to and from Keep collection ids.""" + + def __init__(self, arvrunner, referenced_files, input_basedir, collection_pattern, file_pattern, name=None, **kwargs): self._pathmap = arvrunner.get_uploaded() - uploadfiles = [] + uploadfiles = set() pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+') for src in referenced_files: if isinstance(src, basestring) and pdh_path.match(src): self._pathmap[src] = (src, collection_pattern % src[5:]) + if "#" in src: + src = src[:src.index("#")] if src not in self._pathmap: - ab = cwltool.pathmapper.abspath(src, basedir) + ab = cwltool.pathmapper.abspath(src, input_basedir) st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern) if kwargs.get("conformance_test"): self._pathmap[src] = (src, ab) elif isinstance(st, arvados.commands.run.UploadFile): - uploadfiles.append((src, ab, st)) + uploadfiles.add((src, ab, st)) elif isinstance(st, arvados.commands.run.ArvFile): self._pathmap[src] = (ab, st.fn) else: @@ -391,7 +565,9 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper): return super(ArvPathMapper, self).reversemap(target) -class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool): +class ArvadosCommandTool(CommandLineTool): + """Wrap cwltool CommandLineTool to override selected methods.""" + def __init__(self, arvrunner, toolpath_object, **kwargs): super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs) self.arvrunner = arvrunner @@ -399,14 +575,17 @@ class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool): def makeJobRunner(self): return ArvadosJob(self.arvrunner) - def makePathMapper(self, reffiles, input_basedir, **kwargs): - return ArvPathMapper(self.arvrunner, reffiles, input_basedir, + def makePathMapper(self, reffiles, **kwargs): + return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"], "$(task.keep)/%s", "$(task.keep)/%s/%s", **kwargs) class ArvCwlRunner(object): + """Execute a CWL tool or workflow, submit crunch jobs, wait for them to + complete, and report output.""" + def __init__(self, api_client): self.api = api_client self.jobs = {} @@ -436,7 +615,6 @@ class ArvCwlRunner(object): body={"state": "Failed"}).execute(num_retries=self.num_retries) self.final_output = out - def on_message(self, event): if "object_uuid" in event: if event["object_uuid"] in self.jobs and event["event_type"] == "update": @@ -464,52 +642,45 @@ class ArvCwlRunner(object): def add_uploaded(self, src, pair): self.uploaded[src] = pair - def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs): - self.debug = args.debug + def arvExecutor(self, tool, job_order, **kwargs): + self.debug = kwargs.get("debug") - if args.quiet: + if kwargs.get("quiet"): logger.setLevel(logging.WARN) logging.getLogger('arvados.arv-run').setLevel(logging.WARN) - try: - self.api.collections().get(uuid=crunchrunner_pdh).execute() - except arvados.errors.ApiError as e: - import httplib2 - h = httplib2.Http(ca_certs=arvados.util.ca_certs_path()) - resp, content = h.request(crunchrunner_download, "GET") - resp2, content2 = h.request(certs_download, "GET") - with arvados.collection.Collection() as col: - with col.open("crunchrunner", "w") as f: - f.write(content) - with col.open("ca-certificates.crt", "w") as f: - f.write(content2) - - col.save_new("crunchrunner binary", ensure_unique_name=True) - useruuid = self.api.users().current().execute()["uuid"] - self.project_uuid = args.project_uuid if args.project_uuid else useruuid + self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid self.pipeline = None - if args.submit: - runnerjob = RunnerJob(self, tool, job_order, args.enable_reuse) - if not args.wait: + if kwargs.get("create_template"): + tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse")) + tmpl.save() + # cwltool.main will write our return value to stdout. + return tmpl.uuid + + if kwargs.get("submit"): + runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse")) + if not kwargs.get("wait"): runnerjob.run() - return + return runnerjob.uuid events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message) - self.fs_access = CollectionFsAccess(input_basedir) + self.debug = kwargs.get("debug") + self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse") + self.fs_access = CollectionFsAccess(kwargs["basedir"]) kwargs["fs_access"] = self.fs_access - kwargs["enable_reuse"] = args.enable_reuse + kwargs["enable_reuse"] = kwargs.get("enable_reuse") kwargs["outdir"] = "$(task.outdir)" kwargs["tmpdir"] = "$(task.tmpdir)" if kwargs.get("conformance_test"): - return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs) + return cwltool.main.single_job_executor(tool, job_order, **kwargs) else: - if args.submit: + if kwargs.get("submit"): jobiter = iter((runnerjob,)) else: components = {} @@ -526,7 +697,6 @@ class ArvCwlRunner(object): logger.info("Pipeline instance %s", self.pipeline["uuid"]) jobiter = tool.job(job_order, - input_basedir, self.output_callback, docker_outdir="$(task.outdir)", **kwargs) @@ -551,11 +721,6 @@ class ArvCwlRunner(object): self.cond.wait(1) events.close() - - if self.final_output is None: - raise cwltool.workflow.WorkflowException("Workflow did not return a result.") - - # create final output collection except: if sys.exc_info()[0] is KeyboardInterrupt: logger.error("Interrupted, marking pipeline as failed") @@ -567,12 +732,42 @@ class ArvCwlRunner(object): finally: self.cond.release() + if self.final_output is None: + raise cwltool.workflow.WorkflowException("Workflow did not return a result.") + return self.final_output +def versionstring(): + """Print version string of key packages for provenance and debugging.""" + + arvcwlpkg = pkg_resources.require("arvados-cwl-runner") + arvpkg = pkg_resources.require("arvados-python-client") + cwlpkg = pkg_resources.require("cwltool") + + return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version, + "arvados-python-client", arvpkg[0].version, + "cwltool", cwlpkg[0].version) + +def arg_parser(): # type: () -> argparse.ArgumentParser + parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language') + + parser.add_argument("--conformance-test", action="store_true") + parser.add_argument("--basedir", type=str) + parser.add_argument("--outdir", type=str, default=os.path.abspath('.'), + help="Output directory, default current directory") + + parser.add_argument("--eval-timeout", + help="Time to wait for a Javascript expression to evaluate before giving an error.", + type=float) + parser.add_argument("--version", action="store_true", help="Print version and exit") + + exgroup = parser.add_mutually_exclusive_group() + exgroup.add_argument("--verbose", action="store_true", help="Default logging") + exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.") + exgroup.add_argument("--debug", action="store_true", help="Print even more logging") + + parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool") -def main(args, stdout, stderr, api_client=None): - args.insert(0, "--leave-outputs") - parser = cwltool.main.arg_parser() exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--enable-reuse", action="store_true", default=True, dest="enable_reuse", @@ -580,12 +775,38 @@ def main(args, stdout, stderr, api_client=None): exgroup.add_argument("--disable-reuse", action="store_false", default=True, dest="enable_reuse", help="") + parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs") - parser.add_argument("--submit", action="store_true", help="Submit job and print job uuid.", - default=False) - parser.add_argument("--wait", action="store_true", help="Wait for completion after submitting cwl-runner job.", + parser.add_argument("--ignore-docker-for-reuse", action="store_true", + help="Ignore Docker image version when deciding whether to reuse past jobs.", default=False) + exgroup = parser.add_mutually_exclusive_group() + exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.", + default=True, dest="submit") + exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).", + default=True, dest="submit") + exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.") + + exgroup = parser.add_mutually_exclusive_group() + exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.", + default=True, dest="wait") + exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.", + default=True, dest="wait") + + parser.add_argument("workflow", type=str, nargs="?", default=None) + parser.add_argument("job_order", nargs=argparse.REMAINDER) + + return parser + +def main(args, stdout, stderr, api_client=None): + parser = arg_parser() + + job_order_object = None + arvargs = parser.parse_args(args) + if arvargs.create_template: + job_order_object = ({}, "") + try: if api_client is None: api_client=arvados.api('v1', model=OrderedJsonModel()) @@ -594,9 +815,10 @@ def main(args, stdout, stderr, api_client=None): logger.error(e) return 1 - return cwltool.main.main(args, + return cwltool.main.main(args=arvargs, stdout=stdout, stderr=stderr, executor=runner.arvExecutor, makeTool=runner.arvMakeTool, - parser=parser) + versionfunc=versionstring, + job_order_object=job_order_object)