X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a82231ef8bdf251a5f243461a4fafaf3b3ad5579..49043f14cb72a6eb5825aea529e3477b73e297c7:/sdk/cwl/arvados_cwl/arvjob.py diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py index 88a8eeb3d5..397b6d58c0 100644 --- a/sdk/cwl/arvados_cwl/arvjob.py +++ b/sdk/cwl/arvados_cwl/arvjob.py @@ -1,11 +1,19 @@ import logging import re -from . import done -from .arvdocker import arv_docker_get_image -from cwltool.process import get_feature +import copy + +from cwltool.process import get_feature, shortname from cwltool.errors import WorkflowException +from cwltool.draft2tool import revmap_file, remove_hostfs, CommandLineTool +from cwltool.load_tool import fetch_document +from cwltool.builder import Builder + import arvados.collection +from .arvdocker import arv_docker_get_image +from .runner import Runner +from . import done + logger = logging.getLogger('arvados.cwl-runner') tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)") @@ -164,3 +172,145 @@ class ArvadosJob(object): self.output_callback(outputs, processStatus) finally: del self.arvrunner.jobs[record["uuid"]] + + +class RunnerJob(Runner): + """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner.""" + + def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs): + """Create an Arvados job specification for this workflow. + + The returned dict can be used to create a job (i.e., passed as + the +body+ argument to jobs().create()), or as a component in + a pipeline template or pipeline instance. + """ + + workflowmapper = super(RunnerJob, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs) + + self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1] + return { + "script": "cwl-runner", + "script_version": "master", + "repository": "arvados", + "script_parameters": self.job_order, + "runtime_constraints": { + "docker_image": "arvados/jobs" + } + } + + def run(self, *args, **kwargs): + job_spec = self.arvados_job_spec(*args, **kwargs) + job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid) + + response = self.arvrunner.api.jobs().create( + body=job_spec, + find_or_create=self.enable_reuse + ).execute(num_retries=self.arvrunner.num_retries) + + self.uuid = response["uuid"] + self.arvrunner.jobs[self.uuid] = self + + logger.info("Submitted job %s", response["uuid"]) + + if kwargs.get("submit"): + self.pipeline = self.arvrunner.api.pipeline_instances().create( + body={ + "owner_uuid": self.arvrunner.project_uuid, + "name": shortname(self.tool.tool["id"]), + "components": {"cwl-runner": {"job": {"uuid": self.uuid, "state": response["state"]} } }, + "state": "RunningOnClient"}).execute(num_retries=self.arvrunner.num_retries) + + if response["state"] in ("Complete", "Failed", "Cancelled"): + self.done(response) + + +class RunnerTemplate(object): + """An Arvados pipeline template that invokes a CWL workflow.""" + + type_to_dataclass = { + 'boolean': 'boolean', + 'File': 'File', + 'float': 'number', + 'int': 'number', + 'string': 'text', + } + + def __init__(self, runner, tool, job_order, enable_reuse): + self.runner = runner + self.tool = tool + self.job = RunnerJob( + runner=runner, + tool=tool, + job_order=job_order, + enable_reuse=enable_reuse) + + def pipeline_component_spec(self): + """Return a component that Workbench and a-r-p-i will understand. + + Specifically, translate CWL input specs to Arvados pipeline + format, like {"dataclass":"File","value":"xyz"}. + """ + spec = self.job.arvados_job_spec() + + # Most of the component spec is exactly the same as the job + # spec (script, script_version, etc.). + # spec['script_parameters'] isn't right, though. A component + # spec's script_parameters hash is a translation of + # self.tool.tool['inputs'] with defaults/overrides taken from + # the job order. So we move the job parameters out of the way + # and build a new spec['script_parameters']. + job_params = spec['script_parameters'] + spec['script_parameters'] = {} + + for param in self.tool.tool['inputs']: + param = copy.deepcopy(param) + + # Data type and "required" flag... + types = param['type'] + if not isinstance(types, list): + types = [types] + param['required'] = 'null' not in types + non_null_types = set(types) - set(['null']) + if len(non_null_types) == 1: + the_type = [c for c in non_null_types][0] + dataclass = self.type_to_dataclass.get(the_type) + if dataclass: + param['dataclass'] = dataclass + # Note: If we didn't figure out a single appropriate + # dataclass, we just left that attribute out. We leave + # the "type" attribute there in any case, which might help + # downstream. + + # Title and description... + title = param.pop('label', '') + descr = param.pop('description', '').rstrip('\n') + if title: + param['title'] = title + if descr: + param['description'] = descr + + # Fill in the value from the current job order, if any. + param_id = shortname(param.pop('id')) + value = job_params.get(param_id) + if value is None: + pass + elif not isinstance(value, dict): + param['value'] = value + elif param.get('dataclass') == 'File' and value.get('path'): + param['value'] = value['path'] + + spec['script_parameters'][param_id] = param + spec['script_parameters']['cwl:tool'] = job_params['cwl:tool'] + return spec + + def save(self): + job_spec = self.pipeline_component_spec() + response = self.runner.api.pipeline_templates().create(body={ + "components": { + self.job.name: job_spec, + }, + "name": self.job.name, + "owner_uuid": self.runner.project_uuid, + }, ensure_unique_name=True).execute(num_retries=self.runner.num_retries) + self.uuid = response["uuid"] + logger.info("Created template %s", self.uuid)