From 6c43be47cb3756a0e6ffc924572259d1a1c8f2c3 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Mon, 13 Jun 2016 15:45:29 -0400 Subject: [PATCH] 8442: Adding --submit support with --crunch2. General refactoring into more/smaller files. --- apps/workbench/Gemfile.lock | 3 - sdk/cwl/arvados_cwl/__init__.py | 492 ++++------------------------ sdk/cwl/arvados_cwl/arvcontainer.py | 88 ++++- sdk/cwl/arvados_cwl/arvjob.py | 156 ++++++++- sdk/cwl/arvados_cwl/arvtool.py | 30 ++ sdk/cwl/arvados_cwl/fsaccess.py | 59 ++++ sdk/cwl/arvados_cwl/pathmapper.py | 55 ++++ sdk/cwl/arvados_cwl/runner.py | 98 ++++++ sdk/cwl/tests/test_container.py | 185 +++++++++++ sdk/cwl/tests/test_job.py | 4 +- sdk/cwl/tests/test_submit.py | 82 ++++- services/api/Gemfile.lock | 3 - 12 files changed, 810 insertions(+), 445 deletions(-) create mode 100644 sdk/cwl/arvados_cwl/arvtool.py create mode 100644 sdk/cwl/arvados_cwl/fsaccess.py create mode 100644 sdk/cwl/arvados_cwl/pathmapper.py create mode 100644 sdk/cwl/arvados_cwl/runner.py create mode 100644 sdk/cwl/tests/test_container.py diff --git a/apps/workbench/Gemfile.lock b/apps/workbench/Gemfile.lock index 1b177ec65d..2618e47cbf 100644 --- a/apps/workbench/Gemfile.lock +++ b/apps/workbench/Gemfile.lock @@ -309,6 +309,3 @@ DEPENDENCIES therubyracer uglifier (>= 1.0.3) wiselinks - -BUNDLED WITH - 1.12.1 diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index af74808e1d..136d5aec15 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -3,397 +3,28 @@ # Implement cwl-runner interface for submitting and running jobs on Arvados. import argparse -import arvados -import arvados.collection -import arvados.commands.keepdocker -import arvados.commands.run -import arvados.events -import arvados.util -import copy -import cwltool.docker -from cwltool.draft2tool import revmap_file, remove_hostfs, CommandLineTool -from cwltool.errors import WorkflowException -import cwltool.main -import cwltool.workflow -import fnmatch -from functools import partial -import json import logging import os -import pkg_resources # part of setuptools -import re import sys import threading -from cwltool.load_tool import fetch_document -from cwltool.builder import Builder -import urlparse -from .arvcontainer import ArvadosContainer -from .arvjob import ArvadosJob -from .arvdocker import arv_docker_get_image - -from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps, UnsupportedRequirement -from arvados.api import OrderedJsonModel - -logger = logging.getLogger('arvados.cwl-runner') -logger.setLevel(logging.INFO) - -class CollectionFsAccess(cwltool.process.StdFsAccess): - """Implement the cwltool FsAccess interface for Arvados Collections.""" - - def __init__(self, basedir): - super(CollectionFsAccess, self).__init__(basedir) - self.collections = {} - - def get_collection(self, path): - p = path.split("/") - if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]): - pdh = p[0][5:] - if pdh not in self.collections: - self.collections[pdh] = arvados.collection.CollectionReader(pdh) - return (self.collections[pdh], "/".join(p[1:])) - else: - return (None, path) - - def _match(self, collection, patternsegments, parent): - if not patternsegments: - return [] - - if not isinstance(collection, arvados.collection.RichCollectionBase): - return [] - - ret = [] - # iterate over the files and subcollections in 'collection' - for filename in collection: - if patternsegments[0] == '.': - # Pattern contains something like "./foo" so just shift - # past the "./" - ret.extend(self._match(collection, patternsegments[1:], parent)) - elif fnmatch.fnmatch(filename, patternsegments[0]): - cur = os.path.join(parent, filename) - if len(patternsegments) == 1: - ret.append(cur) - else: - ret.extend(self._match(collection[filename], patternsegments[1:], cur)) - return ret - - def glob(self, pattern): - collection, rest = self.get_collection(pattern) - patternsegments = rest.split("/") - return self._match(collection, patternsegments, "keep:" + collection.manifest_locator()) - - def open(self, fn, mode): - collection, rest = self.get_collection(fn) - if collection: - return collection.open(rest, mode) - else: - return open(self._abs(fn), mode) - - def exists(self, fn): - collection, rest = self.get_collection(fn) - if collection: - return collection.exists(rest) - else: - return os.path.exists(self._abs(fn)) - - - -class RunnerJob(object): - """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner.""" - - def __init__(self, runner, tool, job_order, enable_reuse): - self.arvrunner = runner - self.tool = tool - self.job_order = job_order - self.running = False - self.enable_reuse = enable_reuse - - def update_pipeline_component(self, record): - pass - - def upload_docker(self, tool): - if isinstance(tool, CommandLineTool): - (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement") - if docker_req: - arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid) - elif isinstance(tool, cwltool.workflow.Workflow): - for s in tool.steps: - self.upload_docker(s.embedded_tool) - - def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs): - """Create an Arvados job specification for this workflow. - - The returned dict can be used to create a job (i.e., passed as - the +body+ argument to jobs().create()), or as a component in - a pipeline template or pipeline instance. - """ - self.upload_docker(self.tool) - - workflowfiles = set() - jobfiles = set() - workflowfiles.add(self.tool.tool["id"]) - - self.name = os.path.basename(self.tool.tool["id"]) - - def visitFiles(files, path): - files.add(path) - return path - - document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"]) - def loadref(b, u): - return document_loader.fetch(urlparse.urljoin(b, u)) - - sc = scandeps(uri, workflowobj, - set(("$import", "run")), - set(("$include", "$schemas", "path")), - loadref) - adjustFiles(sc, partial(visitFiles, workflowfiles)) - adjustFiles(self.job_order, partial(visitFiles, jobfiles)) - - workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "", - "%s", - "%s/%s", - name=self.name, - **kwargs) - - jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "", - "%s", - "%s/%s", - name=os.path.basename(self.job_order.get("id", "#")), - **kwargs) - - adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1]) - - if "id" in self.job_order: - del self.job_order["id"] - - self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1] - return { - "script": "cwl-runner", - "script_version": "master", - "repository": "arvados", - "script_parameters": self.job_order, - "runtime_constraints": { - "docker_image": "arvados/jobs" - } - } - - def run(self, *args, **kwargs): - job_spec = self.arvados_job_spec(*args, **kwargs) - job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid) - - response = self.arvrunner.api.jobs().create( - body=job_spec, - find_or_create=self.enable_reuse - ).execute(num_retries=self.arvrunner.num_retries) - - self.uuid = response["uuid"] - self.arvrunner.jobs[self.uuid] = self - - logger.info("Submitted job %s", response["uuid"]) - - if kwargs.get("submit"): - self.pipeline = self.arvrunner.api.pipeline_instances().create( - body={ - "owner_uuid": self.arvrunner.project_uuid, - "name": shortname(self.tool.tool["id"]), - "components": {"cwl-runner": {"job": {"uuid": self.uuid, "state": response["state"]} } }, - "state": "RunningOnClient"}).execute(num_retries=self.arvrunner.num_retries) - - if response["state"] in ("Complete", "Failed", "Cancelled"): - self.done(response) - - def done(self, record): - if record["state"] == "Complete": - processStatus = "success" - else: - processStatus = "permanentFail" - - outputs = None - try: - try: - outc = arvados.collection.Collection(record["output"]) - with outc.open("cwl.output.json") as f: - outputs = json.load(f) - def keepify(path): - if not path.startswith("keep:"): - return "keep:%s/%s" % (record["output"], path) - adjustFiles(outputs, keepify) - except Exception as e: - logger.error("While getting final output object: %s", e) - self.arvrunner.output_callback(outputs, processStatus) - finally: - del self.arvrunner.jobs[record["uuid"]] - - -class RunnerTemplate(object): - """An Arvados pipeline template that invokes a CWL workflow.""" - - type_to_dataclass = { - 'boolean': 'boolean', - 'File': 'File', - 'float': 'number', - 'int': 'number', - 'string': 'text', - } - - def __init__(self, runner, tool, job_order, enable_reuse): - self.runner = runner - self.tool = tool - self.job = RunnerJob( - runner=runner, - tool=tool, - job_order=job_order, - enable_reuse=enable_reuse) - - def pipeline_component_spec(self): - """Return a component that Workbench and a-r-p-i will understand. - - Specifically, translate CWL input specs to Arvados pipeline - format, like {"dataclass":"File","value":"xyz"}. - """ - spec = self.job.arvados_job_spec() - - # Most of the component spec is exactly the same as the job - # spec (script, script_version, etc.). - # spec['script_parameters'] isn't right, though. A component - # spec's script_parameters hash is a translation of - # self.tool.tool['inputs'] with defaults/overrides taken from - # the job order. So we move the job parameters out of the way - # and build a new spec['script_parameters']. - job_params = spec['script_parameters'] - spec['script_parameters'] = {} - - for param in self.tool.tool['inputs']: - param = copy.deepcopy(param) - - # Data type and "required" flag... - types = param['type'] - if not isinstance(types, list): - types = [types] - param['required'] = 'null' not in types - non_null_types = set(types) - set(['null']) - if len(non_null_types) == 1: - the_type = [c for c in non_null_types][0] - dataclass = self.type_to_dataclass.get(the_type) - if dataclass: - param['dataclass'] = dataclass - # Note: If we didn't figure out a single appropriate - # dataclass, we just left that attribute out. We leave - # the "type" attribute there in any case, which might help - # downstream. - - # Title and description... - title = param.pop('label', '') - descr = param.pop('description', '').rstrip('\n') - if title: - param['title'] = title - if descr: - param['description'] = descr - - # Fill in the value from the current job order, if any. - param_id = shortname(param.pop('id')) - value = job_params.get(param_id) - if value is None: - pass - elif not isinstance(value, dict): - param['value'] = value - elif param.get('dataclass') == 'File' and value.get('path'): - param['value'] = value['path'] - - spec['script_parameters'][param_id] = param - spec['script_parameters']['cwl:tool'] = job_params['cwl:tool'] - return spec - - def save(self): - job_spec = self.pipeline_component_spec() - response = self.runner.api.pipeline_templates().create(body={ - "components": { - self.job.name: job_spec, - }, - "name": self.job.name, - "owner_uuid": self.runner.project_uuid, - }, ensure_unique_name=True).execute(num_retries=self.runner.num_retries) - self.uuid = response["uuid"] - logger.info("Created template %s", self.uuid) - - -class ArvPathMapper(cwltool.pathmapper.PathMapper): - """Convert container-local paths to and from Keep collection ids.""" - - def __init__(self, arvrunner, referenced_files, input_basedir, - collection_pattern, file_pattern, name=None, **kwargs): - self._pathmap = arvrunner.get_uploaded() - uploadfiles = set() - - pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+') - - for src in referenced_files: - if isinstance(src, basestring) and pdh_path.match(src): - self._pathmap[src] = (src, collection_pattern % src[5:]) - if "#" in src: - src = src[:src.index("#")] - if src not in self._pathmap: - ab = cwltool.pathmapper.abspath(src, input_basedir) - st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern) - if kwargs.get("conformance_test"): - self._pathmap[src] = (src, ab) - elif isinstance(st, arvados.commands.run.UploadFile): - uploadfiles.add((src, ab, st)) - elif isinstance(st, arvados.commands.run.ArvFile): - self._pathmap[src] = (ab, st.fn) - else: - raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st) - - if uploadfiles: - arvados.commands.run.uploadfiles([u[2] for u in uploadfiles], - arvrunner.api, - dry_run=kwargs.get("dry_run"), - num_retries=3, - fnPattern=file_pattern, - name=name, - project=arvrunner.project_uuid) - - for src, ab, st in uploadfiles: - arvrunner.add_uploaded(src, (ab, st.fn)) - self._pathmap[src] = (ab, st.fn) - - self.keepdir = None - - def reversemap(self, target): - if target.startswith("keep:"): - return (target, target) - elif self.keepdir and target.startswith(self.keepdir): - return (target, "keep:" + target[len(self.keepdir)+1:]) - else: - return super(ArvPathMapper, self).reversemap(target) - +import pkg_resources # part of setuptools -class ArvadosCommandTool(CommandLineTool): - """Wrap cwltool CommandLineTool to override selected methods.""" +from cwltool.errors import WorkflowException +import cwltool.main +import cwltool.workflow - def __init__(self, arvrunner, toolpath_object, crunch2, **kwargs): - super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs) - self.arvrunner = arvrunner - self.crunch2 = crunch2 +import arvados +import arvados.events - def makeJobRunner(self): - if self.crunch2: - return ArvadosContainer(self.arvrunner) - else: - return ArvadosJob(self.arvrunner) +from .arvcontainer import ArvadosContainer, RunnerContainer +from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate +from .arvtool import ArvadosCommandTool - def makePathMapper(self, reffiles, **kwargs): - if self.crunch2: - return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"], - "/keep/%s", - "/keep/%s/%s", - **kwargs) - else: - return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"], - "$(task.keep)/%s", - "$(task.keep)/%s/%s", - **kwargs) +from cwltool.process import shortname, UnsupportedRequirement +from arvados.api import OrderedJsonModel +logger = logging.getLogger('arvados.cwl-runner') +logger.setLevel(logging.INFO) class ArvCwlRunner(object): """Execute a CWL tool or workflow, submit crunch jobs, wait for them to @@ -475,7 +106,10 @@ class ArvCwlRunner(object): return tmpl.uuid if kwargs.get("submit"): - runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse")) + if self.crunch2: + runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse")) + else: + runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse")) if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.crunch2: # Create pipeline for local run @@ -510,56 +144,54 @@ class ArvCwlRunner(object): kwargs["outdir"] = "$(task.outdir)" kwargs["tmpdir"] = "$(task.tmpdir)" - if kwargs.get("conformance_test"): - return cwltool.main.single_job_executor(tool, job_order, **kwargs) + if kwargs.get("submit"): + jobiter = iter((runnerjob,)) else: - if kwargs.get("submit"): - jobiter = iter((runnerjob,)) - else: - if "cwl_runner_job" in kwargs: - self.uuid = kwargs.get("cwl_runner_job").get('uuid') - jobiter = tool.job(job_order, - self.output_callback, - docker_outdir="$(task.outdir)", - **kwargs) - - try: - self.cond.acquire() - # Will continue to hold the lock for the duration of this code - # except when in cond.wait(), at which point on_message can update - # job state and process output callbacks. - - for runnable in jobiter: - if runnable: - runnable.run(**kwargs) - else: - if self.jobs: - self.cond.wait(1) - else: - logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.") - break - - while self.jobs: - self.cond.wait(1) - - events.close() - except UnsupportedRequirement: - raise - except: - if sys.exc_info()[0] is KeyboardInterrupt: - logger.error("Interrupted, marking pipeline as failed") + if "cwl_runner_job" in kwargs: + self.uuid = kwargs.get("cwl_runner_job").get('uuid') + jobiter = tool.job(job_order, + self.output_callback, + docker_outdir="$(task.outdir)", + **kwargs) + + try: + self.cond.acquire() + # Will continue to hold the lock for the duration of this code + # except when in cond.wait(), at which point on_message can update + # job state and process output callbacks. + + for runnable in jobiter: + if runnable: + runnable.run(**kwargs) else: - logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False)) - if self.pipeline: - self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], - body={"state": "Failed"}).execute(num_retries=self.num_retries) - finally: - self.cond.release() + if self.jobs: + self.cond.wait(1) + else: + logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.") + break + + while self.jobs: + self.cond.wait(1) + + events.close() + except UnsupportedRequirement: + raise + except: + if sys.exc_info()[0] is KeyboardInterrupt: + logger.error("Interrupted, marking pipeline as failed") + else: + logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False)) + if self.pipeline: + self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], + body={"state": "Failed"}).execute(num_retries=self.num_retries) + finally: + self.cond.release() - if self.final_output is None: - raise cwltool.workflow.WorkflowException("Workflow did not return a result.") + if self.final_output is None: + raise cwltool.workflow.WorkflowException("Workflow did not return a result.") + + return self.final_output - return self.final_output def versionstring(): """Print version string of key packages for provenance and debugging.""" @@ -572,6 +204,7 @@ def versionstring(): "arvados-python-client", arvpkg[0].version, "cwltool", cwlpkg[0].version) + def arg_parser(): # type: () -> argparse.ArgumentParser parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language') @@ -634,6 +267,7 @@ def arg_parser(): # type: () -> argparse.ArgumentParser return parser + def main(args, stdout, stderr, api_client=None): parser = arg_parser() diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py index b0e2c1f067..be1140469a 100644 --- a/sdk/cwl/arvados_cwl/arvcontainer.py +++ b/sdk/cwl/arvados_cwl/arvcontainer.py @@ -1,10 +1,15 @@ import logging +import json +import os + +from cwltool.errors import WorkflowException +from cwltool.process import get_feature, adjustFiles, UnsupportedRequirement, shortname + import arvados.collection -from cwltool.process import get_feature, adjustFiles + from .arvdocker import arv_docker_get_image from . import done -from cwltool.errors import WorkflowException -from cwltool.process import UnsupportedRequirement +from .runner import Runner logger = logging.getLogger('arvados.cwl-runner') @@ -45,7 +50,7 @@ class ArvadosContainer(object): if self.generatefiles: raise UnsupportedRequirement("Generate files not supported") - vwd = arvados.collection.Collection() + vwd = arvados.collection.Collection(api_client=self.arvrunner.api_client) container_request["task.vwd"] = {} for t in self.generatefiles: if isinstance(self.generatefiles[t], dict): @@ -124,3 +129,78 @@ class ArvadosContainer(object): self.output_callback(outputs, processStatus) finally: del self.arvrunner.jobs[record["uuid"]] + + +class RunnerContainer(Runner): + """Submit and manage a container that runs arvados-cwl-runner.""" + + def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs): + """Create an Arvados job specification for this workflow. + + The returned dict can be used to create a job (i.e., passed as + the +body+ argument to jobs().create()), or as a component in + a pipeline template or pipeline instance. + """ + + workflowmapper = super(RunnerContainer, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs) + + with arvados.collection.Collection(api_client=self.arvrunner.api) as jobobj: + with jobobj.open("cwl.input.json", "w") as f: + json.dump(self.job_order, f, sort_keys=True, indent=4) + jobobj.save_new(owner_uuid=self.arvrunner.project_uuid) + + workflowname = os.path.basename(self.tool.tool["id"]) + workflowpath = "/var/lib/cwl/workflow/%s" % workflowname + workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1] + workflowcollection = workflowcollection[5:workflowcollection.index('/')] + jobpath = "/var/lib/cwl/job/cwl.input.json" + + container_image = arv_docker_get_image(self.arvrunner.api, + {"dockerImageId": "arvados/jobs"}, + pull_image, + self.arvrunner.project_uuid) + + return { + "command": ["arvados-cwl-runner", "--local", "--crunch2", workflowpath, jobpath], + "owner_uuid": self.arvrunner.project_uuid, + "name": self.name, + "output_path": "/var/spool/cwl", + "cwd": "/var/spool/cwl", + "priority": 1, + "state": "Committed", + "container_image": container_image, + "mounts": { + workflowpath: { + "kind": "collection", + "portable_data_hash": "%s" % workflowcollection + }, + jobpath: { + "kind": "collection", + "portable_data_hash": "%s/cwl.input.json" % jobobj.portable_data_hash() + }, + "stdout": { + "kind": "file", + "path": "/var/spool/cwl/cwl.output.json" + } + }, + "runtime_constraints": { + "vcpus": 1, + "ram": 1024*1024*256 + } + } + + def run(self, *args, **kwargs): + job_spec = self.arvados_job_spec(*args, **kwargs) + job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid) + + response = self.arvrunner.api.container_requests().create( + body=job_spec + ).execute(num_retries=self.arvrunner.num_retries) + + self.uuid = response["uuid"] + self.arvrunner.jobs[response["container_uuid"]] = self + + logger.info("Submitted container %s", response["uuid"]) + + if response["state"] in ("Complete", "Failed", "Cancelled"): + self.done(response) diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py index 88a8eeb3d5..397b6d58c0 100644 --- a/sdk/cwl/arvados_cwl/arvjob.py +++ b/sdk/cwl/arvados_cwl/arvjob.py @@ -1,11 +1,19 @@ import logging import re -from . import done -from .arvdocker import arv_docker_get_image -from cwltool.process import get_feature +import copy + +from cwltool.process import get_feature, shortname from cwltool.errors import WorkflowException +from cwltool.draft2tool import revmap_file, remove_hostfs, CommandLineTool +from cwltool.load_tool import fetch_document +from cwltool.builder import Builder + import arvados.collection +from .arvdocker import arv_docker_get_image +from .runner import Runner +from . import done + logger = logging.getLogger('arvados.cwl-runner') tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)") @@ -164,3 +172,145 @@ class ArvadosJob(object): self.output_callback(outputs, processStatus) finally: del self.arvrunner.jobs[record["uuid"]] + + +class RunnerJob(Runner): + """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner.""" + + def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs): + """Create an Arvados job specification for this workflow. + + The returned dict can be used to create a job (i.e., passed as + the +body+ argument to jobs().create()), or as a component in + a pipeline template or pipeline instance. + """ + + workflowmapper = super(RunnerJob, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs) + + self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1] + return { + "script": "cwl-runner", + "script_version": "master", + "repository": "arvados", + "script_parameters": self.job_order, + "runtime_constraints": { + "docker_image": "arvados/jobs" + } + } + + def run(self, *args, **kwargs): + job_spec = self.arvados_job_spec(*args, **kwargs) + job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid) + + response = self.arvrunner.api.jobs().create( + body=job_spec, + find_or_create=self.enable_reuse + ).execute(num_retries=self.arvrunner.num_retries) + + self.uuid = response["uuid"] + self.arvrunner.jobs[self.uuid] = self + + logger.info("Submitted job %s", response["uuid"]) + + if kwargs.get("submit"): + self.pipeline = self.arvrunner.api.pipeline_instances().create( + body={ + "owner_uuid": self.arvrunner.project_uuid, + "name": shortname(self.tool.tool["id"]), + "components": {"cwl-runner": {"job": {"uuid": self.uuid, "state": response["state"]} } }, + "state": "RunningOnClient"}).execute(num_retries=self.arvrunner.num_retries) + + if response["state"] in ("Complete", "Failed", "Cancelled"): + self.done(response) + + +class RunnerTemplate(object): + """An Arvados pipeline template that invokes a CWL workflow.""" + + type_to_dataclass = { + 'boolean': 'boolean', + 'File': 'File', + 'float': 'number', + 'int': 'number', + 'string': 'text', + } + + def __init__(self, runner, tool, job_order, enable_reuse): + self.runner = runner + self.tool = tool + self.job = RunnerJob( + runner=runner, + tool=tool, + job_order=job_order, + enable_reuse=enable_reuse) + + def pipeline_component_spec(self): + """Return a component that Workbench and a-r-p-i will understand. + + Specifically, translate CWL input specs to Arvados pipeline + format, like {"dataclass":"File","value":"xyz"}. + """ + spec = self.job.arvados_job_spec() + + # Most of the component spec is exactly the same as the job + # spec (script, script_version, etc.). + # spec['script_parameters'] isn't right, though. A component + # spec's script_parameters hash is a translation of + # self.tool.tool['inputs'] with defaults/overrides taken from + # the job order. So we move the job parameters out of the way + # and build a new spec['script_parameters']. + job_params = spec['script_parameters'] + spec['script_parameters'] = {} + + for param in self.tool.tool['inputs']: + param = copy.deepcopy(param) + + # Data type and "required" flag... + types = param['type'] + if not isinstance(types, list): + types = [types] + param['required'] = 'null' not in types + non_null_types = set(types) - set(['null']) + if len(non_null_types) == 1: + the_type = [c for c in non_null_types][0] + dataclass = self.type_to_dataclass.get(the_type) + if dataclass: + param['dataclass'] = dataclass + # Note: If we didn't figure out a single appropriate + # dataclass, we just left that attribute out. We leave + # the "type" attribute there in any case, which might help + # downstream. + + # Title and description... + title = param.pop('label', '') + descr = param.pop('description', '').rstrip('\n') + if title: + param['title'] = title + if descr: + param['description'] = descr + + # Fill in the value from the current job order, if any. + param_id = shortname(param.pop('id')) + value = job_params.get(param_id) + if value is None: + pass + elif not isinstance(value, dict): + param['value'] = value + elif param.get('dataclass') == 'File' and value.get('path'): + param['value'] = value['path'] + + spec['script_parameters'][param_id] = param + spec['script_parameters']['cwl:tool'] = job_params['cwl:tool'] + return spec + + def save(self): + job_spec = self.pipeline_component_spec() + response = self.runner.api.pipeline_templates().create(body={ + "components": { + self.job.name: job_spec, + }, + "name": self.job.name, + "owner_uuid": self.runner.project_uuid, + }, ensure_unique_name=True).execute(num_retries=self.runner.num_retries) + self.uuid = response["uuid"] + logger.info("Created template %s", self.uuid) diff --git a/sdk/cwl/arvados_cwl/arvtool.py b/sdk/cwl/arvados_cwl/arvtool.py new file mode 100644 index 0000000000..a2c5c9e498 --- /dev/null +++ b/sdk/cwl/arvados_cwl/arvtool.py @@ -0,0 +1,30 @@ +from cwltool.draft2tool import CommandLineTool +from .arvjob import ArvadosJob +from .arvcontainer import ArvadosContainer +from .pathmapper import ArvPathMapper + +class ArvadosCommandTool(CommandLineTool): + """Wrap cwltool CommandLineTool to override selected methods.""" + + def __init__(self, arvrunner, toolpath_object, crunch2, **kwargs): + super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs) + self.arvrunner = arvrunner + self.crunch2 = crunch2 + + def makeJobRunner(self): + if self.crunch2: + return ArvadosContainer(self.arvrunner) + else: + return ArvadosJob(self.arvrunner) + + def makePathMapper(self, reffiles, **kwargs): + if self.crunch2: + return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"], + "/keep/%s", + "/keep/%s/%s", + **kwargs) + else: + return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"], + "$(task.keep)/%s", + "$(task.keep)/%s/%s", + **kwargs) diff --git a/sdk/cwl/arvados_cwl/fsaccess.py b/sdk/cwl/arvados_cwl/fsaccess.py new file mode 100644 index 0000000000..c911895fa2 --- /dev/null +++ b/sdk/cwl/arvados_cwl/fsaccess.py @@ -0,0 +1,59 @@ +import fnmatch + +class CollectionFsAccess(cwltool.process.StdFsAccess): + """Implement the cwltool FsAccess interface for Arvados Collections.""" + + def __init__(self, basedir): + super(CollectionFsAccess, self).__init__(basedir) + self.collections = {} + + def get_collection(self, path): + p = path.split("/") + if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]): + pdh = p[0][5:] + if pdh not in self.collections: + self.collections[pdh] = arvados.collection.CollectionReader(pdh) + return (self.collections[pdh], "/".join(p[1:])) + else: + return (None, path) + + def _match(self, collection, patternsegments, parent): + if not patternsegments: + return [] + + if not isinstance(collection, arvados.collection.RichCollectionBase): + return [] + + ret = [] + # iterate over the files and subcollections in 'collection' + for filename in collection: + if patternsegments[0] == '.': + # Pattern contains something like "./foo" so just shift + # past the "./" + ret.extend(self._match(collection, patternsegments[1:], parent)) + elif fnmatch.fnmatch(filename, patternsegments[0]): + cur = os.path.join(parent, filename) + if len(patternsegments) == 1: + ret.append(cur) + else: + ret.extend(self._match(collection[filename], patternsegments[1:], cur)) + return ret + + def glob(self, pattern): + collection, rest = self.get_collection(pattern) + patternsegments = rest.split("/") + return self._match(collection, patternsegments, "keep:" + collection.manifest_locator()) + + def open(self, fn, mode): + collection, rest = self.get_collection(fn) + if collection: + return collection.open(rest, mode) + else: + return open(self._abs(fn), mode) + + def exists(self, fn): + collection, rest = self.get_collection(fn) + if collection: + return collection.exists(rest) + else: + return os.path.exists(self._abs(fn)) diff --git a/sdk/cwl/arvados_cwl/pathmapper.py b/sdk/cwl/arvados_cwl/pathmapper.py new file mode 100644 index 0000000000..9538a9176f --- /dev/null +++ b/sdk/cwl/arvados_cwl/pathmapper.py @@ -0,0 +1,55 @@ +import re + +import arvados.commands.run +import arvados.collection +import cwltool.pathmapper + +class ArvPathMapper(cwltool.pathmapper.PathMapper): + """Convert container-local paths to and from Keep collection ids.""" + + def __init__(self, arvrunner, referenced_files, input_basedir, + collection_pattern, file_pattern, name=None, **kwargs): + self._pathmap = arvrunner.get_uploaded() + uploadfiles = set() + + pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+') + + for src in referenced_files: + if isinstance(src, basestring) and pdh_path.match(src): + self._pathmap[src] = (src, collection_pattern % src[5:]) + if "#" in src: + src = src[:src.index("#")] + if src not in self._pathmap: + ab = cwltool.pathmapper.abspath(src, input_basedir) + st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern) + if kwargs.get("conformance_test"): + self._pathmap[src] = (src, ab) + elif isinstance(st, arvados.commands.run.UploadFile): + uploadfiles.add((src, ab, st)) + elif isinstance(st, arvados.commands.run.ArvFile): + self._pathmap[src] = (ab, st.fn) + else: + raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st) + + if uploadfiles: + arvados.commands.run.uploadfiles([u[2] for u in uploadfiles], + arvrunner.api, + dry_run=kwargs.get("dry_run"), + num_retries=3, + fnPattern=file_pattern, + name=name, + project=arvrunner.project_uuid) + + for src, ab, st in uploadfiles: + arvrunner.add_uploaded(src, (ab, st.fn)) + self._pathmap[src] = (ab, st.fn) + + self.keepdir = None + + def reversemap(self, target): + if target.startswith("keep:"): + return (target, target) + elif self.keepdir and target.startswith(self.keepdir): + return (target, "keep:" + target[len(self.keepdir)+1:]) + else: + return super(ArvPathMapper, self).reversemap(target) diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py new file mode 100644 index 0000000000..0cc23ab459 --- /dev/null +++ b/sdk/cwl/arvados_cwl/runner.py @@ -0,0 +1,98 @@ +import os +import urlparse +from functools import partial + +from cwltool.draft2tool import CommandLineTool +import cwltool.workflow +from cwltool.process import get_feature, scandeps, adjustFiles +from cwltool.load_tool import fetch_document + +from .arvdocker import arv_docker_get_image +from .pathmapper import ArvPathMapper + +class Runner(object): + def __init__(self, runner, tool, job_order, enable_reuse): + self.arvrunner = runner + self.tool = tool + self.job_order = job_order + self.running = False + self.enable_reuse = enable_reuse + + def update_pipeline_component(self, record): + pass + + def upload_docker(self, tool): + if isinstance(tool, CommandLineTool): + (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement") + if docker_req: + arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid) + elif isinstance(tool, cwltool.workflow.Workflow): + for s in tool.steps: + self.upload_docker(s.embedded_tool) + + + def arvados_job_spec(self, *args, **kwargs): + self.upload_docker(self.tool) + + workflowfiles = set() + jobfiles = set() + workflowfiles.add(self.tool.tool["id"]) + + self.name = os.path.basename(self.tool.tool["id"]) + + def visitFiles(files, path): + files.add(path) + return path + + document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"]) + def loadref(b, u): + return document_loader.fetch(urlparse.urljoin(b, u)) + + sc = scandeps(uri, workflowobj, + set(("$import", "run")), + set(("$include", "$schemas", "path")), + loadref) + adjustFiles(sc, partial(visitFiles, workflowfiles)) + adjustFiles(self.job_order, partial(visitFiles, jobfiles)) + + workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "", + "%s", + "%s/%s", + name=self.name, + **kwargs) + + jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "", + "%s", + "%s/%s", + name=os.path.basename(self.job_order.get("id", "#")), + **kwargs) + + adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1]) + + if "id" in self.job_order: + del self.job_order["id"] + + return workflowmapper + + + def done(self, record): + if record["state"] == "Complete": + processStatus = "success" + else: + processStatus = "permanentFail" + + outputs = None + try: + try: + outc = arvados.collection.Collection(record["output"]) + with outc.open("cwl.output.json") as f: + outputs = json.load(f) + def keepify(path): + if not path.startswith("keep:"): + return "keep:%s/%s" % (record["output"], path) + adjustFiles(outputs, keepify) + except Exception as e: + logger.error("While getting final output object: %s", e) + self.arvrunner.output_callback(outputs, processStatus) + finally: + del self.arvrunner.jobs[record["uuid"]] diff --git a/sdk/cwl/tests/test_container.py b/sdk/cwl/tests/test_container.py new file mode 100644 index 0000000000..0237e801a9 --- /dev/null +++ b/sdk/cwl/tests/test_container.py @@ -0,0 +1,185 @@ +import arvados_cwl +import logging +import mock +import unittest +import os +import cwltool.process + +if not os.getenv('ARVADOS_DEBUG'): + logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN) + logging.getLogger('arvados.arv-run').setLevel(logging.WARN) + + +class TestContainer(unittest.TestCase): + + # The test passes no builder.resources + # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024} + @mock.patch("arvados.commands.keepdocker.list_images_in_arv") + def test_run(self, keepdocker): + runner = mock.MagicMock() + runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz" + runner.ignore_docker_for_reuse = False + + keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")] + runner.api.collections().get().execute.return_value = { + "portable_data_hash": "99999999999999999999999999999993+99"} + + document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("draft-3") + + tool = { + "inputs": [], + "outputs": [], + "baseCommand": "ls" + } + arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, True, avsc_names=avsc_names, basedir="") + arvtool.formatgraph = None + for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_run"): + j.run() + runner.api.container_requests().create.assert_called_with( + body={ + 'environment': { + 'TMPDIR': '/tmp' + }, + 'name': 'test_run', + 'runtime_constraints': { + 'vcpus': 1, + 'ram': 1073741824 + }, 'priority': 1, + 'mounts': { + '/var/spool/cwl': {'kind': 'tmp'} + }, + 'state': 'Committed', + 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz', + 'output_path': '/var/spool/cwl', + 'container_image': '99999999999999999999999999999993+99', + 'command': ['ls'], + 'cwd': '/var/spool/cwl' + }) + + # The test passes some fields in builder.resources + # For the remaining fields, the defaults will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024} + @mock.patch("arvados.commands.keepdocker.list_images_in_arv") + def test_resource_requirements(self, keepdocker): + runner = mock.MagicMock() + runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz" + runner.ignore_docker_for_reuse = False + document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("draft-3") + + keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")] + runner.api.collections().get().execute.return_value = { + "portable_data_hash": "99999999999999999999999999999993+99"} + + tool = { + "inputs": [], + "outputs": [], + "hints": [{ + "class": "ResourceRequirement", + "coresMin": 3, + "ramMin": 3000, + "tmpdirMin": 4000 + }], + "baseCommand": "ls" + } + arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, True, avsc_names=avsc_names) + arvtool.formatgraph = None + for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_resource_requirements"): + j.run() + + runner.api.container_requests().create.assert_called_with( + body={ + 'environment': { + 'TMPDIR': '/tmp' + }, + 'name': 'test_resource_requirements', + 'runtime_constraints': { + 'vcpus': 3, + 'ram': 3145728000 + }, 'priority': 1, + 'mounts': { + '/var/spool/cwl': {'kind': 'tmp'} + }, + 'state': 'Committed', + 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz', + 'output_path': '/var/spool/cwl', + 'container_image': '99999999999999999999999999999993+99', + 'command': ['ls'], + 'cwd': '/var/spool/cwl' + }) + + @mock.patch("arvados.collection.Collection") + def test_done(self, col): + api = mock.MagicMock() + + runner = mock.MagicMock() + runner.api = api + runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz" + runner.num_retries = 0 + runner.ignore_docker_for_reuse = False + + col().open.return_value = [] + api.collections().list().execute.side_effect = ({"items": []}, + {"items": [{"manifest_text": "XYZ"}]}) + + arvjob = arvados_cwl.ArvadosContainer(runner) + arvjob.name = "testjob" + arvjob.builder = mock.MagicMock() + arvjob.output_callback = mock.MagicMock() + arvjob.collect_outputs = mock.MagicMock() + + arvjob.done({ + "state": "Complete", + "output": "99999999999999999999999999999993+99", + "log": "99999999999999999999999999999994+99", + "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz" + }) + + api.collections().list.assert_has_calls([ + mock.call(), + mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'], + ['portable_data_hash', '=', '99999999999999999999999999999993+99'], + ['name', '=', 'Output 9999999 of testjob']]), + mock.call().execute(num_retries=0), + mock.call(limit=1, filters=[['portable_data_hash', '=', '99999999999999999999999999999993+99']], + select=['manifest_text']), + mock.call().execute(num_retries=0)]) + + api.collections().create.assert_called_with( + ensure_unique_name=True, + body={'portable_data_hash': '99999999999999999999999999999993+99', + 'manifest_text': 'XYZ', + 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz', + 'name': 'Output 9999999 of testjob'}) + + @mock.patch("arvados.collection.Collection") + def test_done_use_existing_collection(self, col): + api = mock.MagicMock() + + runner = mock.MagicMock() + runner.api = api + runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz" + runner.num_retries = 0 + + col().open.return_value = [] + api.collections().list().execute.side_effect = ({"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},) + + arvjob = arvados_cwl.ArvadosContainer(runner) + arvjob.name = "testjob" + arvjob.builder = mock.MagicMock() + arvjob.output_callback = mock.MagicMock() + arvjob.collect_outputs = mock.MagicMock() + + arvjob.done({ + "state": "Complete", + "output": "99999999999999999999999999999993+99", + "log": "99999999999999999999999999999994+99", + "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz" + }) + + api.collections().list.assert_has_calls([ + mock.call(), + mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'], + ['portable_data_hash', '=', '99999999999999999999999999999993+99'], + ['name', '=', 'Output 9999999 of testjob']]), + mock.call().execute(num_retries=0)]) + + self.assertFalse(api.collections().create.called) diff --git a/sdk/cwl/tests/test_job.py b/sdk/cwl/tests/test_job.py index dba65b0f8a..701afcb6f4 100644 --- a/sdk/cwl/tests/test_job.py +++ b/sdk/cwl/tests/test_job.py @@ -25,7 +25,7 @@ class TestJob(unittest.TestCase): "outputs": [], "baseCommand": "ls" } - arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, avsc_names=avsc_names, basedir="") + arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, False, avsc_names=avsc_names, basedir="") arvtool.formatgraph = None for j in arvtool.job({}, mock.MagicMock(), basedir=""): j.run() @@ -76,7 +76,7 @@ class TestJob(unittest.TestCase): }], "baseCommand": "ls" } - arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, avsc_names=avsc_names) + arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, False, avsc_names=avsc_names) arvtool.formatgraph = None for j in arvtool.job({}, mock.MagicMock(), basedir=""): j.run() diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py index 38741ebf02..48e6ed2f41 100644 --- a/sdk/cwl/tests/test_submit.py +++ b/sdk/cwl/tests/test_submit.py @@ -30,7 +30,7 @@ def stubs(func): return "%s+%i" % (hashlib.md5(p).hexdigest(), len(p)) stubs.KeepClient().put.side_effect = putstub - stubs.keepdocker.return_value = True + stubs.keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")] stubs.fake_user_uuid = "zzzzz-tpzed-zzzzzzzzzzzzzzz" stubs.api = mock.MagicMock() @@ -44,12 +44,28 @@ def stubs(func): }, { "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2", "portable_data_hash": "99999999999999999999999999999992+99", + }, + { + "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz4", + "portable_data_hash": "99999999999999999999999999999994+99", + "manifest_text": "" }) + stubs.api.collections().get().execute.return_value = { + "portable_data_hash": "99999999999999999999999999999993+99"} + stubs.expect_job_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz" stubs.api.jobs().create().execute.return_value = { "uuid": stubs.expect_job_uuid, "state": "Queued", } + + stubs.expect_container_request_uuid = "zzzzz-xvhdp-zzzzzzzzzzzzzzz" + stubs.api.container_requests().create().execute.return_value = { + "uuid": stubs.expect_container_request_uuid, + "container_uuid": "zzzzz-dz642-zzzzzzzzzzzzzzz", + "state": "Queued" + } + stubs.expect_pipeline_template_uuid = "zzzzz-d1hrv-zzzzzzzzzzzzzzz" stubs.api.pipeline_templates().create().execute.return_value = { "uuid": stubs.expect_pipeline_template_uuid, @@ -70,6 +86,35 @@ def stubs(func): 'script_version': 'master', 'script': 'cwl-runner' } + + stubs.expect_container_spec = { + 'priority': 1, + 'mounts': { + 'stdout': { + 'path': '/var/spool/cwl/cwl.output.json', + 'kind': 'file' + }, + '/var/lib/cwl/workflow/submit_wf.cwl': { + 'portable_data_hash': '999999999999999999999999991+99', + 'kind': 'collection' + }, + '/var/lib/cwl/job/cwl.input.json': { + 'portable_data_hash': '102435082199e5229f99b01165b67096+60/cwl.input.json', + 'kind': 'collection' + } + }, + 'state': 'Committed', + 'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz', + 'command': ['arvados-cwl-runner', '--local', '--crunch2', '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/job/cwl.input.json'], + 'name': 'submit_wf.cwl', + 'container_image': '99999999999999999999999999999993+99', + 'output_path': '/var/spool/cwl', + 'cwd': '/var/spool/cwl', + 'runtime_constraints': { + 'vcpus': 1, + 'ram': 268435456 + } + } return func(self, stubs, *args, **kwargs) return wrapped @@ -128,6 +173,41 @@ class TestSubmit(unittest.TestCase): body=expect_body, find_or_create=True) + @stubs + def test_submit_container(self, stubs): + capture_stdout = cStringIO.StringIO() + exited = arvados_cwl.main( + ["--submit", "--no-wait", "--crunch2", "--debug", + "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"], + capture_stdout, sys.stderr, api_client=stubs.api) + self.assertEqual(exited, 0) + + stubs.api.collections().create.assert_has_calls([ + mock.call(), + mock.call(body={ + 'manifest_text': + './tool a3954c369b8924d40547ec8cf5f6a7f4+449 ' + '0:16:blub.txt 16:433:submit_tool.cwl\n./wf ' + 'e046cace0b1a0a6ee645f6ea8688f7e2+364 0:364:submit_wf.cwl\n', + 'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz', + 'name': 'submit_wf.cwl', + }, ensure_unique_name=True), + mock.call().execute(), + mock.call(body={ + 'manifest_text': + '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n', + 'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz', + 'name': '#', + }, ensure_unique_name=True), + mock.call().execute()]) + + expect_container = copy.deepcopy(stubs.expect_container_spec) + expect_container["owner_uuid"] = stubs.fake_user_uuid + stubs.api.container_requests().create.assert_called_with( + body=expect_container) + self.assertEqual(capture_stdout.getvalue(), + stubs.expect_container_request_uuid + '\n') + class TestCreateTemplate(unittest.TestCase): @stubs diff --git a/services/api/Gemfile.lock b/services/api/Gemfile.lock index 7be4e0f39d..3715718717 100644 --- a/services/api/Gemfile.lock +++ b/services/api/Gemfile.lock @@ -257,6 +257,3 @@ DEPENDENCIES therubyracer trollop uglifier (>= 1.0.3) - -BUNDLED WITH - 1.12.1 -- 2.30.2