X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/64e905e02fae8d63112be8a83b8f9f5f158fb2a2..8d8847e070f588b5d85ac2d7123fd929b4d417cd:/sdk/cwl/arvados_cwl/arvcontainer.py?ds=sidebyside diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py index 09cad7e41e..4c49a449b2 100644 --- a/sdk/cwl/arvados_cwl/arvcontainer.py +++ b/sdk/cwl/arvados_cwl/arvcontainer.py @@ -12,7 +12,7 @@ import ciso8601 import uuid import math -from arvados_cwl.util import get_current_container, get_intermediate_collection_info +import arvados_cwl.util import ruamel.yaml as yaml from cwltool.errors import WorkflowException @@ -36,7 +36,7 @@ metrics = logging.getLogger('arvados.cwl-runner.metrics') class ArvadosContainer(JobBase): """Submit and manage a Crunch container request for executing a CWL CommandLineTool.""" - def __init__(self, runner, + def __init__(self, runner, job_runtime, builder, # type: Builder joborder, # type: Dict[Text, Union[Dict[Text, Any], List, Text]] make_path_mapper, # type: Callable[..., PathMapper] @@ -46,6 +46,7 @@ class ArvadosContainer(JobBase): ): super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name) self.arvrunner = runner + self.job_runtime = job_runtime self.running = False self.uuid = None @@ -60,6 +61,8 @@ class ArvadosContainer(JobBase): # ArvadosContainer object by CommandLineTool.job() before # run() is called. + runtimeContext = self.job_runtime + container_request = { "command": self.command_line, "name": self.name, @@ -71,8 +74,8 @@ class ArvadosContainer(JobBase): } runtime_constraints = {} - if self.arvrunner.project_uuid: - container_request["owner_uuid"] = self.arvrunner.project_uuid + if runtimeContext.project_uuid: + container_request["owner_uuid"] = runtimeContext.project_uuid if self.arvrunner.secret_store.has_secret(self.command_line): raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets") @@ -168,10 +171,10 @@ class ArvadosContainer(JobBase): keepemptydirs(vwd) if not runtimeContext.current_container: - runtimeContext.current_container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger) - info = get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl) + runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger) + info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl) vwd.save_new(name=info["name"], - owner_uuid=self.arvrunner.project_uuid, + owner_uuid=runtimeContext.project_uuid, ensure_unique_name=True, trash_at=info["trash_at"], properties=info["properties"]) @@ -212,9 +215,9 @@ class ArvadosContainer(JobBase): docker_req = {"dockerImageId": "arvados/jobs"} container_request["container_image"] = arv_docker_get_image(self.arvrunner.api, - docker_req, - runtimeContext.pull_image, - self.arvrunner.project_uuid) + docker_req, + runtimeContext.pull_image, + runtimeContext.project_uuid) api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement") if api_req: @@ -250,6 +253,10 @@ class ArvadosContainer(JobBase): if self.timelimit is not None: scheduling_parameters["max_run_time"] = self.timelimit + extra_submit_params = {} + if runtimeContext.submit_runner_cluster: + extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster + container_request["output_name"] = "Output for step %s" % (self.name) container_request["output_ttl"] = self.output_ttl container_request["mounts"] = mounts @@ -277,11 +284,13 @@ class ArvadosContainer(JobBase): if runtimeContext.submit_request_uuid: response = self.arvrunner.api.container_requests().update( uuid=runtimeContext.submit_request_uuid, - body=container_request + body=container_request, + **extra_submit_params ).execute(num_retries=self.arvrunner.num_retries) else: response = self.arvrunner.api.container_requests().create( - body=container_request + body=container_request, + **extra_submit_params ).execute(num_retries=self.arvrunner.num_retries) self.uuid = response["uuid"] @@ -321,7 +330,10 @@ class ArvadosContainer(JobBase): api_client=self.arvrunner.api, keep_client=self.arvrunner.keep_client, num_retries=self.arvrunner.num_retries) - done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40) + label = self.arvrunner.label(self) + done.logtail( + logc, logger.error, + "%s (%s) error log:" % (label, record["uuid"]), maxlen=40) if record["output_uuid"]: if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl: @@ -395,7 +407,7 @@ class RunnerContainer(Runner): "secret_mounts": secret_mounts, "runtime_constraints": { "vcpus": math.ceil(self.submit_runner_cores), - "ram": math.ceil(1024*1024 * self.submit_runner_ram), + "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)), "API": True }, "use_existing": self.enable_reuse, @@ -429,6 +441,7 @@ class RunnerContainer(Runner): # --eval-timeout is the timeout for javascript invocation # --parallel-task-count is the number of threads to use for job submission # --enable/disable-reuse sets desired job reuse + # --collection-cache-size sets aside memory to store collections command = ["arvados-cwl-runner", "--local", "--api=containers", @@ -436,7 +449,8 @@ class RunnerContainer(Runner): "--disable-validate", "--eval-timeout=%s" % self.arvrunner.eval_timeout, "--thread-count=%s" % self.arvrunner.thread_count, - "--enable-reuse" if self.enable_reuse else "--disable-reuse"] + "--enable-reuse" if self.enable_reuse else "--disable-reuse", + "--collection-cache-size=%s" % self.collection_cache_size] if self.output_name: command.append("--output-name=" + self.output_name) @@ -476,14 +490,20 @@ class RunnerContainer(Runner): if self.arvrunner.project_uuid: job_spec["owner_uuid"] = self.arvrunner.project_uuid + extra_submit_params = {} + if runtimeContext.submit_runner_cluster: + extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster + if runtimeContext.submit_request_uuid: response = self.arvrunner.api.container_requests().update( uuid=runtimeContext.submit_request_uuid, - body=job_spec + body=job_spec, + **extra_submit_params ).execute(num_retries=self.arvrunner.num_retries) else: response = self.arvrunner.api.container_requests().create( - body=job_spec + body=job_spec, + **extra_submit_params ).execute(num_retries=self.arvrunner.num_retries) self.uuid = response["uuid"]