X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/01cf080f9983313a50b902e477d7b30b03afa131..08cc91f0493dd8dfe27046faf02c2c907e50443e:/sdk/cwl/arvados_cwl/__init__.py diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index 8341624b74..1740a90009 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -11,13 +11,12 @@ import arvados.events import arvados.util import copy import cwltool.docker -import cwltool.draft2tool +from cwltool.draft2tool import revmap_file, remove_hostfs, CommandLineTool from cwltool.errors import WorkflowException import cwltool.main -from cwltool.process import shortname import cwltool.workflow import fnmatch -import functools +from functools import partial import json import logging import os @@ -25,8 +24,11 @@ import pkg_resources # part of setuptools import re import sys import threading +from cwltool.load_tool import fetch_document +from cwltool.builder import Builder +import urlparse -from cwltool.process import get_feature, adjustFiles, scandeps +from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps from arvados.api import OrderedJsonModel logger = logging.getLogger('arvados.cwl-runner') @@ -66,8 +68,8 @@ class CollectionFsAccess(cwltool.process.StdFsAccess): """Implement the cwltool FsAccess interface for Arvados Collections.""" def __init__(self, basedir): + super(CollectionFsAccess, self).__init__(basedir) self.collections = {} - self.basedir = basedir def get_collection(self, path): p = path.split("/") @@ -315,7 +317,7 @@ class RunnerJob(object): pass def upload_docker(self, tool): - if isinstance(tool, cwltool.draft2tool.CommandLineTool): + if isinstance(tool, CommandLineTool): (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement") if docker_req: arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid) @@ -342,16 +344,16 @@ class RunnerJob(object): files.add(path) return path - document_loader, _, _ = cwltool.process.get_schema() + document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"]) def loadref(b, u): - return document_loader.resolve_ref(u, base_url=b)[0] + return document_loader.fetch(urlparse.urljoin(b, u)) - sc = scandeps("", self.tool.tool, + sc = scandeps(uri, workflowobj, set(("$import", "run")), set(("$include", "$schemas", "path")), loadref) - adjustFiles(sc, functools.partial(visitFiles, workflowfiles)) - adjustFiles(self.job_order, functools.partial(visitFiles, jobfiles)) + adjustFiles(sc, partial(visitFiles, workflowfiles)) + adjustFiles(self.job_order, partial(visitFiles, jobfiles)) workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "", "%s", @@ -410,6 +412,10 @@ class RunnerJob(object): outc = arvados.collection.Collection(record["output"]) with outc.open("cwl.output.json") as f: outputs = json.load(f) + def keepify(path): + if not path.startswith("keep:"): + return "keep:%s/%s" % (record["output"], path) + adjustFiles(outputs, keepify) except Exception as e: logger.error("While getting final output object: %s", e) self.arvrunner.output_callback(outputs, processStatus) @@ -504,7 +510,7 @@ class RunnerTemplate(object): }, "name": self.job.name, "owner_uuid": self.runner.project_uuid, - }).execute(num_retries=self.runner.num_retries) + }, ensure_unique_name=True).execute(num_retries=self.runner.num_retries) self.uuid = response["uuid"] logger.info("Created template %s", self.uuid) @@ -512,7 +518,7 @@ class RunnerTemplate(object): class ArvPathMapper(cwltool.pathmapper.PathMapper): """Convert container-local paths to and from Keep collection ids.""" - def __init__(self, arvrunner, referenced_files, basedir, + def __init__(self, arvrunner, referenced_files, input_basedir, collection_pattern, file_pattern, name=None, **kwargs): self._pathmap = arvrunner.get_uploaded() uploadfiles = set() @@ -525,7 +531,7 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper): if "#" in src: src = src[:src.index("#")] if src not in self._pathmap: - ab = cwltool.pathmapper.abspath(src, basedir) + ab = cwltool.pathmapper.abspath(src, input_basedir) st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern) if kwargs.get("conformance_test"): self._pathmap[src] = (src, ab) @@ -560,7 +566,7 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper): return super(ArvPathMapper, self).reversemap(target) -class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool): +class ArvadosCommandTool(CommandLineTool): """Wrap cwltool CommandLineTool to override selected methods.""" def __init__(self, arvrunner, toolpath_object, **kwargs): @@ -570,8 +576,8 @@ class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool): def makeJobRunner(self): return ArvadosJob(self.arvrunner) - def makePathMapper(self, reffiles, input_basedir, **kwargs): - return ArvPathMapper(self.arvrunner, reffiles, input_basedir, + def makePathMapper(self, reffiles, **kwargs): + return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"], "$(task.keep)/%s", "$(task.keep)/%s/%s", **kwargs) @@ -637,45 +643,45 @@ class ArvCwlRunner(object): def add_uploaded(self, src, pair): self.uploaded[src] = pair - def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs): - self.debug = args.debug + def arvExecutor(self, tool, job_order, **kwargs): + self.debug = kwargs.get("debug") - if args.quiet: + if kwargs.get("quiet"): logger.setLevel(logging.WARN) logging.getLogger('arvados.arv-run').setLevel(logging.WARN) useruuid = self.api.users().current().execute()["uuid"] - self.project_uuid = args.project_uuid if args.project_uuid else useruuid + self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid self.pipeline = None - if args.create_template: - tmpl = RunnerTemplate(self, tool, job_order, args.enable_reuse) + if kwargs.get("create_template"): + tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse")) tmpl.save() # cwltool.main will write our return value to stdout. return tmpl.uuid - if args.submit: - runnerjob = RunnerJob(self, tool, job_order, args.enable_reuse) - if not args.wait: + if kwargs.get("submit"): + runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse")) + if not kwargs.get("wait"): runnerjob.run() return runnerjob.uuid events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message) - self.debug = args.debug - self.ignore_docker_for_reuse = args.ignore_docker_for_reuse - self.fs_access = CollectionFsAccess(input_basedir) + self.debug = kwargs.get("debug") + self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse") + self.fs_access = CollectionFsAccess(kwargs["basedir"]) kwargs["fs_access"] = self.fs_access - kwargs["enable_reuse"] = args.enable_reuse + kwargs["enable_reuse"] = kwargs.get("enable_reuse") kwargs["outdir"] = "$(task.outdir)" kwargs["tmpdir"] = "$(task.tmpdir)" if kwargs.get("conformance_test"): - return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs) + return cwltool.main.single_job_executor(tool, job_order, **kwargs) else: - if args.submit: + if kwargs.get("submit"): jobiter = iter((runnerjob,)) else: components = {} @@ -692,7 +698,6 @@ class ArvCwlRunner(object): logger.info("Pipeline instance %s", self.pipeline["uuid"]) jobiter = tool.job(job_order, - input_basedir, self.output_callback, docker_outdir="$(task.outdir)", **kwargs) @@ -717,11 +722,6 @@ class ArvCwlRunner(object): self.cond.wait(1) events.close() - - if self.final_output is None: - raise cwltool.workflow.WorkflowException("Workflow did not return a result.") - - # create final output collection except: if sys.exc_info()[0] is KeyboardInterrupt: logger.error("Interrupted, marking pipeline as failed") @@ -733,6 +733,9 @@ class ArvCwlRunner(object): finally: self.cond.release() + if self.final_output is None: + raise cwltool.workflow.WorkflowException("Workflow did not return a result.") + return self.final_output def versionstring(): @@ -746,9 +749,27 @@ def versionstring(): "arvados-python-client", arvpkg[0].version, "cwltool", cwlpkg[0].version) -def main(args, stdout, stderr, api_client=None): - args.insert(0, "--leave-outputs") - parser = cwltool.main.arg_parser() +def arg_parser(): # type: () -> argparse.ArgumentParser + parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language') + + parser.add_argument("--conformance-test", action="store_true") + parser.add_argument("--basedir", type=str, + help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).") + parser.add_argument("--outdir", type=str, default=os.path.abspath('.'), + help="Output directory, default current directory") + + parser.add_argument("--eval-timeout", + help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.", + type=float, + default=20) + parser.add_argument("--version", action="store_true", help="Print version and exit") + + exgroup = parser.add_mutually_exclusive_group() + exgroup.add_argument("--verbose", action="store_true", help="Default logging") + exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.") + exgroup.add_argument("--debug", action="store_true", help="Print even more logging") + + parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool") exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--enable-reuse", action="store_true", @@ -758,7 +779,7 @@ def main(args, stdout, stderr, api_client=None): default=True, dest="enable_reuse", help="") - parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs") + parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.") parser.add_argument("--ignore-docker-for-reuse", action="store_true", help="Ignore Docker image version when deciding whether to reuse past jobs.", default=False) @@ -776,6 +797,19 @@ def main(args, stdout, stderr, api_client=None): exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.", default=True, dest="wait") + parser.add_argument("workflow", type=str, nargs="?", default=None) + parser.add_argument("job_order", nargs=argparse.REMAINDER) + + return parser + +def main(args, stdout, stderr, api_client=None): + parser = arg_parser() + + job_order_object = None + arvargs = parser.parse_args(args) + if arvargs.create_template and not arvargs.job_order: + job_order_object = ({}, "") + try: if api_client is None: api_client=arvados.api('v1', model=OrderedJsonModel()) @@ -784,10 +818,10 @@ def main(args, stdout, stderr, api_client=None): logger.error(e) return 1 - return cwltool.main.main(args, + return cwltool.main.main(args=arvargs, stdout=stdout, stderr=stderr, executor=runner.arvExecutor, makeTool=runner.arvMakeTool, - parser=parser, - versionfunc=versionstring) + versionfunc=versionstring, + job_order_object=job_order_object)