X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/5603cefb38fbd2fc6493e8ccee587629e7af089e..0ff7b94edaaaa07932ae757c0a2b7ba3fde026cb:/sdk/cwl/arvados_cwl/arvjob.py diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py index 15cafad7bd..78bca6e0e1 100644 --- a/sdk/cwl/arvados_cwl/arvjob.py +++ b/sdk/cwl/arvados_cwl/arvjob.py @@ -1,18 +1,20 @@ import logging import re import copy +import json from cwltool.process import get_feature, shortname from cwltool.errors import WorkflowException from cwltool.draft2tool import revmap_file, CommandLineTool from cwltool.load_tool import fetch_document from cwltool.builder import Builder -from cwltool.pathmapper import PathMapper import arvados.collection from .arvdocker import arv_docker_get_image from .runner import Runner +from .pathmapper import InitialWorkDirPathMapper +from .perf import Perf from . import done logger = logging.getLogger('arvados.cwl-runner') @@ -38,8 +40,8 @@ class ArvadosJob(object): if self.generatefiles["listing"]: vwd = arvados.collection.Collection() script_parameters["task.vwd"] = {} - generatemapper = PathMapper([self.generatefiles], self.outdir, - ".", separateDirs=False) + generatemapper = InitialWorkDirPathMapper([self.generatefiles], "", "", + separateDirs=False) for f, p in generatemapper.items(): if p.type == "CreateFile": with vwd.open(p.target, "w") as n: @@ -47,11 +49,11 @@ class ArvadosJob(object): vwd.save_new() for f, p in generatemapper.items(): if p.type == "File": - script_parameters["task.vwd"][p.target] = self.pathmapper.mapper(f).target + script_parameters["task.vwd"][p.target] = p.resolved if p.type == "CreateFile": script_parameters["task.vwd"][p.target] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), p.target) - script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"} + script_parameters["task.env"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir} if self.environment: script_parameters["task.env"].update(self.environment) @@ -90,19 +92,20 @@ class ArvadosJob(object): filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]]) try: - response = self.arvrunner.api.jobs().create( - body={ - "owner_uuid": self.arvrunner.project_uuid, - "script": "crunchrunner", - "repository": "arvados", - "script_version": "master", - "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6", - "script_parameters": {"tasks": [script_parameters]}, - "runtime_constraints": runtime_constraints - }, - filters=filters, - find_or_create=kwargs.get("enable_reuse", True) - ).execute(num_retries=self.arvrunner.num_retries) + with Perf(logger, "create %s" % self.name): + response = self.arvrunner.api.jobs().create( + body={ + "owner_uuid": self.arvrunner.project_uuid, + "script": "crunchrunner", + "repository": "arvados", + "script_version": "master", + "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6", + "script_parameters": {"tasks": [script_parameters]}, + "runtime_constraints": runtime_constraints + }, + filters=filters, + find_or_create=kwargs.get("enable_reuse", True) + ).execute(num_retries=self.arvrunner.num_retries) self.arvrunner.processes[response["uuid"]] = self @@ -111,7 +114,8 @@ class ArvadosJob(object): logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"]) if response["state"] in ("Complete", "Failed", "Cancelled"): - self.done(response) + with Perf(logger, "done %s" % self.name): + self.done(response) except Exception as e: logger.error("Got error %s" % str(e)) self.output_callback({}, "permanentFail") @@ -119,7 +123,8 @@ class ArvadosJob(object): def update_pipeline_component(self, record): if self.arvrunner.pipeline: self.arvrunner.pipeline["components"][self.name] = {"job": record} - self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"], + with Perf(logger, "update_pipeline_component %s" % self.name): + self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"], body={ "components": self.arvrunner.pipeline["components"] }).execute(num_retries=self.arvrunner.num_retries) @@ -151,37 +156,41 @@ class ArvadosJob(object): outputs = {} try: if record["output"]: - logc = arvados.collection.Collection(record["log"]) - log = logc.open(logc.keys()[0]) - tmpdir = None - outdir = None - keepdir = None - for l in log: - # Determine the tmpdir, outdir and keepdir paths from - # the job run. Unfortunately, we can't take the first - # values we find (which are expected to be near the - # top) and stop scanning because if the node fails and - # the job restarts on a different node these values - # will different runs, and we need to know about the - # final run that actually produced output. - - g = tmpdirre.match(l) - if g: - tmpdir = g.group(1) - g = outdirre.match(l) - if g: - outdir = g.group(1) - g = keepre.match(l) - if g: - keepdir = g.group(1) - - outputs = done.done(self, record, tmpdir, outdir, keepdir) + with Perf(logger, "inspect log %s" % self.name): + logc = arvados.collection.Collection(record["log"]) + log = logc.open(logc.keys()[0]) + tmpdir = None + outdir = None + keepdir = None + for l in log: + # Determine the tmpdir, outdir and keepdir paths from + # the job run. Unfortunately, we can't take the first + # values we find (which are expected to be near the + # top) and stop scanning because if the node fails and + # the job restarts on a different node these values + # will different runs, and we need to know about the + # final run that actually produced output. + + g = tmpdirre.match(l) + if g: + tmpdir = g.group(1) + g = outdirre.match(l) + if g: + outdir = g.group(1) + g = keepre.match(l) + if g: + keepdir = g.group(1) + + with Perf(logger, "output collection %s" % self.name): + outputs = done.done(self, record, tmpdir, outdir, keepdir) except WorkflowException as e: logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False)) processStatus = "permanentFail" + outputs = None except Exception as e: logger.exception("Got unknown exception while collecting job outputs:") processStatus = "permanentFail" + outputs = None self.output_callback(outputs, processStatus) finally: @@ -201,7 +210,7 @@ class RunnerJob(Runner): workflowmapper = super(RunnerJob, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs) - self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1] + self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"]).target[5:] return { "script": "cwl-runner", "script_version": "master", @@ -227,7 +236,7 @@ class RunnerJob(Runner): logger.info("Submitted job %s", response["uuid"]) if kwargs.get("submit"): - self.pipeline = self.arvrunner.api.pipeline_instances().create( + self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().create( body={ "owner_uuid": self.arvrunner.project_uuid, "name": shortname(self.tool.tool["id"]), @@ -244,6 +253,7 @@ class RunnerTemplate(object): type_to_dataclass = { 'boolean': 'boolean', 'File': 'File', + 'Directory': 'Collection', 'float': 'number', 'int': 'number', 'string': 'text', @@ -310,8 +320,8 @@ class RunnerTemplate(object): pass elif not isinstance(value, dict): param['value'] = value - elif param.get('dataclass') == 'File' and value.get('location'): - param['value'] = value['location'] + elif param.get('dataclass') in ('File', 'Collection') and value.get('location'): + param['value'] = value['location'][5:] spec['script_parameters'][param_id] = param spec['script_parameters']['cwl:tool'] = job_params['cwl:tool']