X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d48af5cfd486b9ea93b57c7f88b80e0305664b0b..08cc91f0493dd8dfe27046faf02c2c907e50443e:/sdk/cwl/arvados_cwl/__init__.py diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index ab8d725bd7..1740a90009 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -4,28 +4,31 @@ import argparse import arvados -import arvados.events +import arvados.collection import arvados.commands.keepdocker import arvados.commands.run -import arvados.collection +import arvados.events import arvados.util -import cwltool.draft2tool -import cwltool.workflow -import cwltool.main -from cwltool.process import shortname -from cwltool.errors import WorkflowException -import threading +import copy import cwltool.docker +from cwltool.draft2tool import revmap_file, remove_hostfs, CommandLineTool +from cwltool.errors import WorkflowException +import cwltool.main +import cwltool.workflow import fnmatch +from functools import partial +import json import logging -import re import os -import sys -import functools -import json import pkg_resources # part of setuptools +import re +import sys +import threading +from cwltool.load_tool import fetch_document +from cwltool.builder import Builder +import urlparse -from cwltool.process import get_feature, adjustFiles, scandeps +from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps from arvados.api import OrderedJsonModel logger = logging.getLogger('arvados.cwl-runner') @@ -65,8 +68,8 @@ class CollectionFsAccess(cwltool.process.StdFsAccess): """Implement the cwltool FsAccess interface for Arvados Collections.""" def __init__(self, basedir): + super(CollectionFsAccess, self).__init__(basedir) self.collections = {} - self.basedir = basedir def get_collection(self, path): p = path.split("/") @@ -314,7 +317,7 @@ class RunnerJob(object): pass def upload_docker(self, tool): - if isinstance(tool, cwltool.draft2tool.CommandLineTool): + if isinstance(tool, CommandLineTool): (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement") if docker_req: arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid) @@ -322,7 +325,13 @@ class RunnerJob(object): for s in tool.steps: self.upload_docker(s.embedded_tool) - def run(self, dry_run=False, pull_image=True, **kwargs): + def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs): + """Create an Arvados job specification for this workflow. + + The returned dict can be used to create a job (i.e., passed as + the +body+ argument to jobs().create()), or as a component in + a pipeline template or pipeline instance. + """ self.upload_docker(self.tool) workflowfiles = set() @@ -335,16 +344,16 @@ class RunnerJob(object): files.add(path) return path - document_loader, _, _ = cwltool.process.get_schema() + document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"]) def loadref(b, u): - return document_loader.resolve_ref(u, base_url=b)[0] + return document_loader.fetch(urlparse.urljoin(b, u)) - sc = scandeps("", self.tool.tool, + sc = scandeps(uri, workflowobj, set(("$import", "run")), set(("$include", "$schemas", "path")), loadref) - adjustFiles(sc, functools.partial(visitFiles, workflowfiles)) - adjustFiles(self.job_order, functools.partial(visitFiles, jobfiles)) + adjustFiles(sc, partial(visitFiles, workflowfiles)) + adjustFiles(self.job_order, partial(visitFiles, jobfiles)) workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "", "%s", @@ -364,8 +373,7 @@ class RunnerJob(object): del self.job_order["id"] self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1] - - response = self.arvrunner.api.jobs().create(body={ + return { "script": "cwl-runner", "script_version": "master", "repository": "arvados", @@ -373,9 +381,19 @@ class RunnerJob(object): "runtime_constraints": { "docker_image": "arvados/jobs" } - }, find_or_create=self.enable_reuse).execute(num_retries=self.arvrunner.num_retries) + } + + def run(self, *args, **kwargs): + job_spec = self.arvados_job_spec(*args, **kwargs) + job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid) + + response = self.arvrunner.api.jobs().create( + body=job_spec, + find_or_create=self.enable_reuse + ).execute(num_retries=self.arvrunner.num_retries) - self.arvrunner.jobs[response["uuid"]] = self + self.uuid = response["uuid"] + self.arvrunner.jobs[self.uuid] = self logger.info("Submitted job %s", response["uuid"]) @@ -394,16 +412,113 @@ class RunnerJob(object): outc = arvados.collection.Collection(record["output"]) with outc.open("cwl.output.json") as f: outputs = json.load(f) + def keepify(path): + if not path.startswith("keep:"): + return "keep:%s/%s" % (record["output"], path) + adjustFiles(outputs, keepify) except Exception as e: logger.error("While getting final output object: %s", e) self.arvrunner.output_callback(outputs, processStatus) finally: del self.arvrunner.jobs[record["uuid"]] + +class RunnerTemplate(object): + """An Arvados pipeline template that invokes a CWL workflow.""" + + type_to_dataclass = { + 'boolean': 'boolean', + 'File': 'File', + 'float': 'number', + 'int': 'number', + 'string': 'text', + } + + def __init__(self, runner, tool, job_order, enable_reuse): + self.runner = runner + self.tool = tool + self.job = RunnerJob( + runner=runner, + tool=tool, + job_order=job_order, + enable_reuse=enable_reuse) + + def pipeline_component_spec(self): + """Return a component that Workbench and a-r-p-i will understand. + + Specifically, translate CWL input specs to Arvados pipeline + format, like {"dataclass":"File","value":"xyz"}. + """ + spec = self.job.arvados_job_spec() + + # Most of the component spec is exactly the same as the job + # spec (script, script_version, etc.). + # spec['script_parameters'] isn't right, though. A component + # spec's script_parameters hash is a translation of + # self.tool.tool['inputs'] with defaults/overrides taken from + # the job order. So we move the job parameters out of the way + # and build a new spec['script_parameters']. + job_params = spec['script_parameters'] + spec['script_parameters'] = {} + + for param in self.tool.tool['inputs']: + param = copy.deepcopy(param) + + # Data type and "required" flag... + types = param['type'] + if not isinstance(types, list): + types = [types] + param['required'] = 'null' not in types + non_null_types = set(types) - set(['null']) + if len(non_null_types) == 1: + the_type = [c for c in non_null_types][0] + dataclass = self.type_to_dataclass.get(the_type) + if dataclass: + param['dataclass'] = dataclass + # Note: If we didn't figure out a single appropriate + # dataclass, we just left that attribute out. We leave + # the "type" attribute there in any case, which might help + # downstream. + + # Title and description... + title = param.pop('label', '') + descr = param.pop('description', '').rstrip('\n') + if title: + param['title'] = title + if descr: + param['description'] = descr + + # Fill in the value from the current job order, if any. + param_id = shortname(param.pop('id')) + value = job_params.get(param_id) + if value is None: + pass + elif not isinstance(value, dict): + param['value'] = value + elif param.get('dataclass') == 'File' and value.get('path'): + param['value'] = value['path'] + + spec['script_parameters'][param_id] = param + spec['script_parameters']['cwl:tool'] = job_params['cwl:tool'] + return spec + + def save(self): + job_spec = self.pipeline_component_spec() + response = self.runner.api.pipeline_templates().create(body={ + "components": { + self.job.name: job_spec, + }, + "name": self.job.name, + "owner_uuid": self.runner.project_uuid, + }, ensure_unique_name=True).execute(num_retries=self.runner.num_retries) + self.uuid = response["uuid"] + logger.info("Created template %s", self.uuid) + + class ArvPathMapper(cwltool.pathmapper.PathMapper): """Convert container-local paths to and from Keep collection ids.""" - def __init__(self, arvrunner, referenced_files, basedir, + def __init__(self, arvrunner, referenced_files, input_basedir, collection_pattern, file_pattern, name=None, **kwargs): self._pathmap = arvrunner.get_uploaded() uploadfiles = set() @@ -416,7 +531,7 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper): if "#" in src: src = src[:src.index("#")] if src not in self._pathmap: - ab = cwltool.pathmapper.abspath(src, basedir) + ab = cwltool.pathmapper.abspath(src, input_basedir) st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern) if kwargs.get("conformance_test"): self._pathmap[src] = (src, ab) @@ -451,7 +566,7 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper): return super(ArvPathMapper, self).reversemap(target) -class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool): +class ArvadosCommandTool(CommandLineTool): """Wrap cwltool CommandLineTool to override selected methods.""" def __init__(self, arvrunner, toolpath_object, **kwargs): @@ -461,8 +576,8 @@ class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool): def makeJobRunner(self): return ArvadosJob(self.arvrunner) - def makePathMapper(self, reffiles, input_basedir, **kwargs): - return ArvPathMapper(self.arvrunner, reffiles, input_basedir, + def makePathMapper(self, reffiles, **kwargs): + return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"], "$(task.keep)/%s", "$(task.keep)/%s/%s", **kwargs) @@ -501,7 +616,6 @@ class ArvCwlRunner(object): body={"state": "Failed"}).execute(num_retries=self.num_retries) self.final_output = out - def on_message(self, event): if "object_uuid" in event: if event["object_uuid"] in self.jobs and event["event_type"] == "update": @@ -529,39 +643,45 @@ class ArvCwlRunner(object): def add_uploaded(self, src, pair): self.uploaded[src] = pair - def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs): - self.debug = args.debug + def arvExecutor(self, tool, job_order, **kwargs): + self.debug = kwargs.get("debug") - if args.quiet: + if kwargs.get("quiet"): logger.setLevel(logging.WARN) logging.getLogger('arvados.arv-run').setLevel(logging.WARN) useruuid = self.api.users().current().execute()["uuid"] - self.project_uuid = args.project_uuid if args.project_uuid else useruuid + self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid self.pipeline = None - if args.submit: - runnerjob = RunnerJob(self, tool, job_order, args.enable_reuse) - if not args.wait: + if kwargs.get("create_template"): + tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse")) + tmpl.save() + # cwltool.main will write our return value to stdout. + return tmpl.uuid + + if kwargs.get("submit"): + runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse")) + if not kwargs.get("wait"): runnerjob.run() - return + return runnerjob.uuid events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message) - self.debug = args.debug - self.ignore_docker_for_reuse = args.ignore_docker_for_reuse - self.fs_access = CollectionFsAccess(input_basedir) + self.debug = kwargs.get("debug") + self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse") + self.fs_access = CollectionFsAccess(kwargs["basedir"]) kwargs["fs_access"] = self.fs_access - kwargs["enable_reuse"] = args.enable_reuse + kwargs["enable_reuse"] = kwargs.get("enable_reuse") kwargs["outdir"] = "$(task.outdir)" kwargs["tmpdir"] = "$(task.tmpdir)" if kwargs.get("conformance_test"): - return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs) + return cwltool.main.single_job_executor(tool, job_order, **kwargs) else: - if args.submit: + if kwargs.get("submit"): jobiter = iter((runnerjob,)) else: components = {} @@ -578,7 +698,6 @@ class ArvCwlRunner(object): logger.info("Pipeline instance %s", self.pipeline["uuid"]) jobiter = tool.job(job_order, - input_basedir, self.output_callback, docker_outdir="$(task.outdir)", **kwargs) @@ -603,11 +722,6 @@ class ArvCwlRunner(object): self.cond.wait(1) events.close() - - if self.final_output is None: - raise cwltool.workflow.WorkflowException("Workflow did not return a result.") - - # create final output collection except: if sys.exc_info()[0] is KeyboardInterrupt: logger.error("Interrupted, marking pipeline as failed") @@ -619,6 +733,9 @@ class ArvCwlRunner(object): finally: self.cond.release() + if self.final_output is None: + raise cwltool.workflow.WorkflowException("Workflow did not return a result.") + return self.final_output def versionstring(): @@ -632,9 +749,27 @@ def versionstring(): "arvados-python-client", arvpkg[0].version, "cwltool", cwlpkg[0].version) -def main(args, stdout, stderr, api_client=None): - args.insert(0, "--leave-outputs") - parser = cwltool.main.arg_parser() +def arg_parser(): # type: () -> argparse.ArgumentParser + parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language') + + parser.add_argument("--conformance-test", action="store_true") + parser.add_argument("--basedir", type=str, + help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).") + parser.add_argument("--outdir", type=str, default=os.path.abspath('.'), + help="Output directory, default current directory") + + parser.add_argument("--eval-timeout", + help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.", + type=float, + default=20) + parser.add_argument("--version", action="store_true", help="Print version and exit") + + exgroup = parser.add_mutually_exclusive_group() + exgroup.add_argument("--verbose", action="store_true", help="Default logging") + exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.") + exgroup.add_argument("--debug", action="store_true", help="Print even more logging") + + parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool") exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--enable-reuse", action="store_true", @@ -644,7 +779,7 @@ def main(args, stdout, stderr, api_client=None): default=True, dest="enable_reuse", help="") - parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs") + parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.") parser.add_argument("--ignore-docker-for-reuse", action="store_true", help="Ignore Docker image version when deciding whether to reuse past jobs.", default=False) @@ -654,6 +789,7 @@ def main(args, stdout, stderr, api_client=None): default=True, dest="submit") exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).", default=True, dest="submit") + exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.") exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.", @@ -661,6 +797,19 @@ def main(args, stdout, stderr, api_client=None): exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.", default=True, dest="wait") + parser.add_argument("workflow", type=str, nargs="?", default=None) + parser.add_argument("job_order", nargs=argparse.REMAINDER) + + return parser + +def main(args, stdout, stderr, api_client=None): + parser = arg_parser() + + job_order_object = None + arvargs = parser.parse_args(args) + if arvargs.create_template and not arvargs.job_order: + job_order_object = ({}, "") + try: if api_client is None: api_client=arvados.api('v1', model=OrderedJsonModel()) @@ -669,10 +818,10 @@ def main(args, stdout, stderr, api_client=None): logger.error(e) return 1 - return cwltool.main.main(args, + return cwltool.main.main(args=arvargs, stdout=stdout, stderr=stderr, executor=runner.arvExecutor, makeTool=runner.arvMakeTool, - parser=parser, - versionfunc=versionstring) + versionfunc=versionstring, + job_order_object=job_order_object)