X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/aace3aa52c1b9fe3b2e907f43e8d6801ceeb28f9..a93ef946eb1e73ee190ea4ff19c4f9278235530c:/sdk/cwl/arvados_cwl/arvjob.py diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py index 4973c8a8c6..1287fbb6ea 100644 --- a/sdk/cwl/arvados_cwl/arvjob.py +++ b/sdk/cwl/arvados_cwl/arvjob.py @@ -14,9 +14,11 @@ from cwltool.command_line_tool import revmap_file, CommandLineTool from cwltool.load_tool import fetch_document from cwltool.builder import Builder from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class +from cwltool.job import JobBase from schema_salad.sourceline import SourceLine +from arvados_cwl.util import get_current_container, get_intermediate_collection_info import ruamel.yaml as yaml import arvados.collection @@ -36,10 +38,18 @@ crunchrunner_re = re.compile(r"^.*crunchrunner: \$\(task\.(tmpdir|outdir|keep)\) crunchrunner_git_commit = 'a3f2cb186e437bfce0031b024b2157b73ed2717d' -class ArvadosJob(object): +class ArvadosJob(JobBase): """Submit and manage a Crunch job for executing a CWL CommandLineTool.""" - def __init__(self, runner): + def __init__(self, runner, + builder, # type: Builder + joborder, # type: Dict[Text, Union[Dict[Text, Any], List, Text]] + make_path_mapper, # type: Callable[..., PathMapper] + requirements, # type: List[Dict[Text, Text]] + hints, # type: List[Dict[Text, Text]] + name # type: Text + ): + super(ArvadosJob, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name) self.arvrunner = runner self.running = False self.uuid = None @@ -67,7 +77,14 @@ class ArvadosJob(object): if vwd: with Perf(metrics, "generatefiles.save_new %s" % self.name): - vwd.save_new() + if not runtimeContext.current_container: + runtimeContext.current_container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger) + info = get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl) + vwd.save_new(name=info["name"], + owner_uuid=self.arvrunner.project_uuid, + ensure_unique_name=True, + trash_at=info["trash_at"], + properties=info["properties"]) for f, p in generatemapper.items(): if p.type == "File": @@ -97,11 +114,14 @@ class ArvadosJob(object): with Perf(metrics, "arv_docker_get_image %s" % self.name): (docker_req, docker_is_req) = self.get_requirement("DockerRequirement") - if docker_req and runtimeContextuse_container is not False: + if docker_req and runtimeContext.use_container is not False: if docker_req.get("dockerOutputDirectory"): raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError( "Option 'dockerOutputDirectory' of DockerRequirement not supported.") - runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid) + runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, + docker_req, + runtimeContext.pull_image, + self.arvrunner.project_uuid) else: runtime_constraints["docker_image"] = "arvados/jobs" @@ -243,7 +263,7 @@ class ArvadosJob(object): dirs[g.group(1)] = g.group(2) if processStatus == "permanentFail": - done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self)) + done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40) with Perf(metrics, "output collection %s" % self.name): outputs = done.done(self, record, dirs["tmpdir"], @@ -269,7 +289,7 @@ class ArvadosJob(object): class RunnerJob(Runner): """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner.""" - def arvados_job_spec(self, runtimeContext): + def arvados_job_spec(self, debug=False): """Create an Arvados job specification for this workflow. The returned dict can be used to create a job (i.e., passed as @@ -299,7 +319,7 @@ class RunnerJob(Runner): if self.on_error: self.job_order["arv:on_error"] = self.on_error - if runtimeContext.debug: + if debug: self.job_order["arv:debug"] = True return { @@ -315,7 +335,7 @@ class RunnerJob(Runner): } def run(self, runtimeContext): - job_spec = self.arvados_job_spec(runtimeContext) + job_spec = self.arvados_job_spec(runtimeContext.debug) job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)