X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/6e098e8fde75fcec7e5be1a4873029f394bd055a..f159fab8f9d6bc4254192ce43432defd5bd400aa:/sdk/cwl/arvados_cwl/arvcontainer.py diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py index 9588803e73..49c40b1dae 100644 --- a/sdk/cwl/arvados_cwl/arvcontainer.py +++ b/sdk/cwl/arvados_cwl/arvcontainer.py @@ -11,12 +11,14 @@ import datetime import ciso8601 import uuid +from arvados_cwl.util import get_current_container, get_intermediate_collection_info import ruamel.yaml as yaml from cwltool.errors import WorkflowException -from cwltool.process import get_feature, UnsupportedRequirement, shortname +from cwltool.process import UnsupportedRequirement, shortname from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class from cwltool.utils import aslist +from cwltool.job import JobBase import arvados.collection @@ -30,10 +32,18 @@ from .perf import Perf logger = logging.getLogger('arvados.cwl-runner') metrics = logging.getLogger('arvados.cwl-runner.metrics') -class ArvadosContainer(object): +class ArvadosContainer(JobBase): """Submit and manage a Crunch container request for executing a CWL CommandLineTool.""" - def __init__(self, runner): + def __init__(self, runner, + builder, # type: Builder + joborder, # type: Dict[Text, Union[Dict[Text, Any], List, Text]] + make_path_mapper, # type: Callable[..., PathMapper] + requirements, # type: List[Dict[Text, Text]] + hints, # type: List[Dict[Text, Text]] + name # type: Text + ): + super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name) self.arvrunner = runner self.running = False self.uuid = None @@ -41,7 +51,7 @@ class ArvadosContainer(object): def update_pipeline_component(self, r): pass - def run(self, dry_run=False, pull_image=True, **kwargs): + def run(self, runtimeContext): # ArvadosCommandTool subclasses from cwltool.CommandLineTool, # which calls makeJobRunner() to get a new ArvadosContainer # object. The fields that define execution such as @@ -51,16 +61,18 @@ class ArvadosContainer(object): container_request = { "command": self.command_line, - "owner_uuid": self.arvrunner.project_uuid, "name": self.name, "output_path": self.outdir, "cwd": self.outdir, - "priority": kwargs.get("priority"), + "priority": runtimeContext.priority, "state": "Committed", "properties": {}, } runtime_constraints = {} + if self.arvrunner.project_uuid: + container_request["owner_uuid"] = self.arvrunner.project_uuid + if self.arvrunner.secret_store.has_secret(self.command_line): raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets") @@ -154,8 +166,14 @@ class ArvadosContainer(object): keepemptydirs(vwd) - with Perf(metrics, "generatefiles.save_new %s" % self.name): - vwd.save_new() + if not runtimeContext.current_container: + runtimeContext.current_container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger) + info = get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl) + vwd.save_new(name=info["name"], + owner_uuid=self.arvrunner.project_uuid, + ensure_unique_name=True, + trash_at=info["trash_at"], + properties=info["properties"]) prev = None for f, p in sorteditems: @@ -188,20 +206,20 @@ class ArvadosContainer(object): mounts["stdout"] = {"kind": "file", "path": "%s/%s" % (self.outdir, self.stdout)} - (docker_req, docker_is_req) = get_feature(self, "DockerRequirement") + (docker_req, docker_is_req) = self.get_requirement("DockerRequirement") if not docker_req: docker_req = {"dockerImageId": "arvados/jobs"} container_request["container_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, - pull_image, + runtimeContext.pull_image, self.arvrunner.project_uuid) - api_req, _ = get_feature(self, "http://arvados.org/cwl#APIRequirement") + api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement") if api_req: runtime_constraints["API"] = True - runtime_req, _ = get_feature(self, "http://arvados.org/cwl#RuntimeConstraints") + runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints") if runtime_req: if "keep_cache" in runtime_req: runtime_constraints["keep_cache_ram"] = runtime_req["keep_cache"] * 2**20 @@ -215,11 +233,11 @@ class ArvadosContainer(object): "writable": True } - partition_req, _ = get_feature(self, "http://arvados.org/cwl#PartitionRequirement") + partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement") if partition_req: scheduling_parameters["partitions"] = aslist(partition_req["partition"]) - intermediate_output_req, _ = get_feature(self, "http://arvados.org/cwl#IntermediateOutput") + intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput") if intermediate_output_req: self.output_ttl = intermediate_output_req["outputTTL"] else: @@ -228,21 +246,25 @@ class ArvadosContainer(object): if self.output_ttl < 0: raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"]) + if self.timelimit is not None: + scheduling_parameters["max_run_time"] = self.timelimit + + container_request["output_name"] = "Output for step %s" % (self.name) container_request["output_ttl"] = self.output_ttl container_request["mounts"] = mounts container_request["secret_mounts"] = secret_mounts container_request["runtime_constraints"] = runtime_constraints container_request["scheduling_parameters"] = scheduling_parameters - enable_reuse = kwargs.get("enable_reuse", True) + enable_reuse = runtimeContext.enable_reuse if enable_reuse: - reuse_req, _ = get_feature(self, "http://arvados.org/cwl#ReuseRequirement") + reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement") if reuse_req: enable_reuse = reuse_req["enableReuse"] container_request["use_existing"] = enable_reuse - if kwargs.get("runnerjob", "").startswith("arvwf:"): - wfuuid = kwargs["runnerjob"][6:kwargs["runnerjob"].index("#")] + if runtimeContext.runnerjob.startswith("arvwf:"): + wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")] wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries) if container_request["name"] == "main": container_request["name"] = wfrecord["name"] @@ -251,9 +273,15 @@ class ArvadosContainer(object): self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback) try: - response = self.arvrunner.api.container_requests().create( - body=container_request - ).execute(num_retries=self.arvrunner.num_retries) + if runtimeContext.submit_request_uuid: + response = self.arvrunner.api.container_requests().update( + uuid=runtimeContext.submit_request_uuid, + body=container_request + ).execute(num_retries=self.arvrunner.num_retries) + else: + response = self.arvrunner.api.container_requests().create( + body=container_request + ).execute(num_retries=self.arvrunner.num_retries) self.uuid = response["uuid"] self.arvrunner.process_submitted(self) @@ -292,7 +320,7 @@ class ArvadosContainer(object): api_client=self.arvrunner.api, keep_client=self.arvrunner.keep_client, num_retries=self.arvrunner.num_retries) - done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self)) + done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40) if record["output_uuid"]: if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl: @@ -316,13 +344,12 @@ class ArvadosContainer(object): processStatus = "permanentFail" finally: self.output_callback(outputs, processStatus) - self.arvrunner.process_done(record["uuid"]) class RunnerContainer(Runner): """Submit and manage a container that runs arvados-cwl-runner.""" - def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs): + def arvados_job_spec(self, runtimeContext): """Create an Arvados container request for this workflow. The returned dict can be used to create a container passed as @@ -344,7 +371,6 @@ class RunnerContainer(Runner): self.job_order[param] = {"$include": mnt} container_req = { - "owner_uuid": self.arvrunner.project_uuid, "name": self.name, "output_path": "/var/spool/cwl", "cwd": "/var/spool/cwl", @@ -367,7 +393,7 @@ class RunnerContainer(Runner): }, "secret_mounts": secret_mounts, "runtime_constraints": { - "vcpus": 1, + "vcpus": self.submit_runner_cores, "ram": 1024*1024 * self.submit_runner_ram, "API": True }, @@ -418,9 +444,12 @@ class RunnerContainer(Runner): if self.output_tags: command.append("--output-tags=" + self.output_tags) - if kwargs.get("debug"): + if runtimeContext.debug: command.append("--debug") + if runtimeContext.storage_classes != "default": + command.append("--storage-classes=" + runtimeContext.storage_classes) + if self.on_error: command.append("--on-error=" + self.on_error) @@ -440,19 +469,26 @@ class RunnerContainer(Runner): return container_req - def run(self, **kwargs): - kwargs["keepprefix"] = "keep:" - job_spec = self.arvados_job_spec(**kwargs) - job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid) + def run(self, runtimeContext): + runtimeContext.keepprefix = "keep:" + job_spec = self.arvados_job_spec(runtimeContext) + if self.arvrunner.project_uuid: + job_spec["owner_uuid"] = self.arvrunner.project_uuid - response = self.arvrunner.api.container_requests().create( - body=job_spec - ).execute(num_retries=self.arvrunner.num_retries) + if runtimeContext.submit_request_uuid: + response = self.arvrunner.api.container_requests().update( + uuid=runtimeContext.submit_request_uuid, + body=job_spec + ).execute(num_retries=self.arvrunner.num_retries) + else: + response = self.arvrunner.api.container_requests().create( + body=job_spec + ).execute(num_retries=self.arvrunner.num_retries) self.uuid = response["uuid"] self.arvrunner.process_submitted(self) - logger.info("%s submitted container %s", self.arvrunner.label(self), response["uuid"]) + logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"]) def done(self, record): try: @@ -464,5 +500,3 @@ class RunnerContainer(Runner): self.arvrunner.output_callback({}, "permanentFail") else: super(RunnerContainer, self).done(container) - finally: - self.arvrunner.process_done(record["uuid"])