14510: Estimate collection cache size wip
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
index b4d01019fc099179dc0cae6fcb36821bfeab0471..4c49a449b2a68fdf1eaaa5cd674129ae257dfc6e 100644 (file)
@@ -12,7 +12,7 @@ import ciso8601
 import uuid
 import math
 
-from arvados_cwl.util import get_current_container, get_intermediate_collection_info
+import arvados_cwl.util
 import ruamel.yaml as yaml
 
 from cwltool.errors import WorkflowException
@@ -36,7 +36,7 @@ metrics = logging.getLogger('arvados.cwl-runner.metrics')
 class ArvadosContainer(JobBase):
     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
 
-    def __init__(self, runner,
+    def __init__(self, runner, job_runtime,
                  builder,   # type: Builder
                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
                  make_path_mapper,  # type: Callable[..., PathMapper]
@@ -46,6 +46,7 @@ class ArvadosContainer(JobBase):
     ):
         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
         self.arvrunner = runner
+        self.job_runtime = job_runtime
         self.running = False
         self.uuid = None
 
@@ -60,6 +61,8 @@ class ArvadosContainer(JobBase):
         # ArvadosContainer object by CommandLineTool.job() before
         # run() is called.
 
+        runtimeContext = self.job_runtime
+
         container_request = {
             "command": self.command_line,
             "name": self.name,
@@ -71,8 +74,8 @@ class ArvadosContainer(JobBase):
         }
         runtime_constraints = {}
 
-        if self.arvrunner.project_uuid:
-            container_request["owner_uuid"] = self.arvrunner.project_uuid
+        if runtimeContext.project_uuid:
+            container_request["owner_uuid"] = runtimeContext.project_uuid
 
         if self.arvrunner.secret_store.has_secret(self.command_line):
             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
@@ -168,10 +171,10 @@ class ArvadosContainer(JobBase):
                 keepemptydirs(vwd)
 
                 if not runtimeContext.current_container:
-                    runtimeContext.current_container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
-                info = get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
+                    runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
+                info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
                 vwd.save_new(name=info["name"],
-                             owner_uuid=self.arvrunner.project_uuid,
+                             owner_uuid=runtimeContext.project_uuid,
                              ensure_unique_name=True,
                              trash_at=info["trash_at"],
                              properties=info["properties"])
@@ -212,9 +215,9 @@ class ArvadosContainer(JobBase):
             docker_req = {"dockerImageId": "arvados/jobs"}
 
         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
-                                                                     docker_req,
-                                                                     runtimeContext.pull_image,
-                                                                     self.arvrunner.project_uuid)
+                                                                    docker_req,
+                                                                    runtimeContext.pull_image,
+                                                                    runtimeContext.project_uuid)
 
         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
         if api_req:
@@ -250,6 +253,10 @@ class ArvadosContainer(JobBase):
         if self.timelimit is not None:
             scheduling_parameters["max_run_time"] = self.timelimit
 
+        extra_submit_params = {}
+        if runtimeContext.submit_runner_cluster:
+            extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
+
         container_request["output_name"] = "Output for step %s" % (self.name)
         container_request["output_ttl"] = self.output_ttl
         container_request["mounts"] = mounts
@@ -277,11 +284,13 @@ class ArvadosContainer(JobBase):
             if runtimeContext.submit_request_uuid:
                 response = self.arvrunner.api.container_requests().update(
                     uuid=runtimeContext.submit_request_uuid,
-                    body=container_request
+                    body=container_request,
+                    **extra_submit_params
                 ).execute(num_retries=self.arvrunner.num_retries)
             else:
                 response = self.arvrunner.api.container_requests().create(
-                    body=container_request
+                    body=container_request,
+                    **extra_submit_params
                 ).execute(num_retries=self.arvrunner.num_retries)
 
             self.uuid = response["uuid"]
@@ -398,7 +407,7 @@ class RunnerContainer(Runner):
             "secret_mounts": secret_mounts,
             "runtime_constraints": {
                 "vcpus": math.ceil(self.submit_runner_cores),
-                "ram": math.ceil(1024*1024 * self.submit_runner_ram),
+                "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
                 "API": True
             },
             "use_existing": self.enable_reuse,
@@ -432,6 +441,7 @@ class RunnerContainer(Runner):
         # --eval-timeout is the timeout for javascript invocation
         # --parallel-task-count is the number of threads to use for job submission
         # --enable/disable-reuse sets desired job reuse
+        # --collection-cache-size sets aside memory to store collections
         command = ["arvados-cwl-runner",
                    "--local",
                    "--api=containers",
@@ -439,7 +449,8 @@ class RunnerContainer(Runner):
                    "--disable-validate",
                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
                    "--thread-count=%s" % self.arvrunner.thread_count,
-                   "--enable-reuse" if self.enable_reuse else "--disable-reuse"]
+                   "--enable-reuse" if self.enable_reuse else "--disable-reuse",
+                   "--collection-cache-size=%s" % self.collection_cache_size]
 
         if self.output_name:
             command.append("--output-name=" + self.output_name)
@@ -479,14 +490,20 @@ class RunnerContainer(Runner):
         if self.arvrunner.project_uuid:
             job_spec["owner_uuid"] = self.arvrunner.project_uuid
 
+        extra_submit_params = {}
+        if runtimeContext.submit_runner_cluster:
+            extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
+
         if runtimeContext.submit_request_uuid:
             response = self.arvrunner.api.container_requests().update(
                 uuid=runtimeContext.submit_request_uuid,
-                body=job_spec
+                body=job_spec,
+                **extra_submit_params
             ).execute(num_retries=self.arvrunner.num_retries)
         else:
             response = self.arvrunner.api.container_requests().create(
-                body=job_spec
+                body=job_spec,
+                **extra_submit_params
             ).execute(num_retries=self.arvrunner.num_retries)
 
         self.uuid = response["uuid"]