Update cwltool refs #14510
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
index 948a9a46feab30bf3f8759fee94d81d14205e42d..6a91d6ff3c5076a28e06a685ed8c73bf45a84218 100644 (file)
@@ -10,8 +10,9 @@ import time
 import datetime
 import ciso8601
 import uuid
+import math
 
-from arvados_cwl.util import get_current_container, get_intermediate_collection_info
+import arvados_cwl.util
 import ruamel.yaml as yaml
 
 from cwltool.errors import WorkflowException
@@ -35,7 +36,7 @@ metrics = logging.getLogger('arvados.cwl-runner.metrics')
 class ArvadosContainer(JobBase):
     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
 
-    def __init__(self, runner,
+    def __init__(self, runner, job_runtime,
                  builder,   # type: Builder
                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
                  make_path_mapper,  # type: Callable[..., PathMapper]
@@ -45,6 +46,7 @@ class ArvadosContainer(JobBase):
     ):
         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
         self.arvrunner = runner
+        self.job_runtime = job_runtime
         self.running = False
         self.uuid = None
 
@@ -59,6 +61,8 @@ class ArvadosContainer(JobBase):
         # ArvadosContainer object by CommandLineTool.job() before
         # run() is called.
 
+        runtimeContext = self.job_runtime
+
         container_request = {
             "command": self.command_line,
             "name": self.name,
@@ -70,8 +74,8 @@ class ArvadosContainer(JobBase):
         }
         runtime_constraints = {}
 
-        if self.arvrunner.project_uuid:
-            container_request["owner_uuid"] = self.arvrunner.project_uuid
+        if runtimeContext.project_uuid:
+            container_request["owner_uuid"] = runtimeContext.project_uuid
 
         if self.arvrunner.secret_store.has_secret(self.command_line):
             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
@@ -81,17 +85,17 @@ class ArvadosContainer(JobBase):
 
         resources = self.builder.resources
         if resources is not None:
-            runtime_constraints["vcpus"] = resources.get("cores", 1)
-            runtime_constraints["ram"] = resources.get("ram") * 2**20
+            runtime_constraints["vcpus"] = math.ceil(resources.get("cores", 1))
+            runtime_constraints["ram"] = math.ceil(resources.get("ram") * 2**20)
 
         mounts = {
             self.outdir: {
                 "kind": "tmp",
-                "capacity": resources.get("outdirSize", 0) * 2**20
+                "capacity": math.ceil(resources.get("outdirSize", 0) * 2**20)
             },
             self.tmpdir: {
                 "kind": "tmp",
-                "capacity": resources.get("tmpdirSize", 0) * 2**20
+                "capacity": math.ceil(resources.get("tmpdirSize", 0) * 2**20)
             }
         }
         secret_mounts = {}
@@ -129,7 +133,7 @@ class ArvadosContainer(JobBase):
                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
                                                     keep_client=self.arvrunner.keep_client,
                                                     num_retries=self.arvrunner.num_retries)
-                generatemapper = NoFollowPathMapper([self.generatefiles], "", "",
+                generatemapper = NoFollowPathMapper(self.generatefiles["listing"], "", "",
                                                     separateDirs=False)
 
                 sorteditems = sorted(generatemapper.items(), None, key=lambda n: n[1].target)
@@ -167,10 +171,10 @@ class ArvadosContainer(JobBase):
                 keepemptydirs(vwd)
 
                 if not runtimeContext.current_container:
-                    runtimeContext.current_container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
-                info = get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
+                    runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
+                info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
                 vwd.save_new(name=info["name"],
-                             owner_uuid=self.arvrunner.project_uuid,
+                             owner_uuid=runtimeContext.project_uuid,
                              ensure_unique_name=True,
                              trash_at=info["trash_at"],
                              properties=info["properties"])
@@ -211,9 +215,9 @@ class ArvadosContainer(JobBase):
             docker_req = {"dockerImageId": "arvados/jobs"}
 
         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
-                                                                     docker_req,
-                                                                     runtimeContext.pull_image,
-                                                                     self.arvrunner.project_uuid)
+                                                                    docker_req,
+                                                                    runtimeContext.pull_image,
+                                                                    runtimeContext.project_uuid)
 
         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
         if api_req:
@@ -222,7 +226,7 @@ class ArvadosContainer(JobBase):
         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
         if runtime_req:
             if "keep_cache" in runtime_req:
-                runtime_constraints["keep_cache_ram"] = runtime_req["keep_cache"] * 2**20
+                runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
             if "outputDirType" in runtime_req:
                 if runtime_req["outputDirType"] == "local_output_dir":
                     # Currently the default behavior.
@@ -249,6 +253,10 @@ class ArvadosContainer(JobBase):
         if self.timelimit is not None:
             scheduling_parameters["max_run_time"] = self.timelimit
 
+        extra_submit_params = {}
+        if runtimeContext.submit_runner_cluster:
+            extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
+
         container_request["output_name"] = "Output for step %s" % (self.name)
         container_request["output_ttl"] = self.output_ttl
         container_request["mounts"] = mounts
@@ -276,11 +284,13 @@ class ArvadosContainer(JobBase):
             if runtimeContext.submit_request_uuid:
                 response = self.arvrunner.api.container_requests().update(
                     uuid=runtimeContext.submit_request_uuid,
-                    body=container_request
+                    body=container_request,
+                    **extra_submit_params
                 ).execute(num_retries=self.arvrunner.num_retries)
             else:
                 response = self.arvrunner.api.container_requests().create(
-                    body=container_request
+                    body=container_request,
+                    **extra_submit_params
                 ).execute(num_retries=self.arvrunner.num_retries)
 
             self.uuid = response["uuid"]
@@ -320,7 +330,10 @@ class ArvadosContainer(JobBase):
                                                            api_client=self.arvrunner.api,
                                                            keep_client=self.arvrunner.keep_client,
                                                            num_retries=self.arvrunner.num_retries)
-                done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
+                label = self.arvrunner.label(self)
+                done.logtail(
+                    logc, logger.error,
+                    "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
 
             if record["output_uuid"]:
                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
@@ -393,16 +406,16 @@ class RunnerContainer(Runner):
             },
             "secret_mounts": secret_mounts,
             "runtime_constraints": {
-                "vcpus": self.submit_runner_cores,
-                "ram": 1024*1024 * self.submit_runner_ram,
+                "vcpus": math.ceil(self.submit_runner_cores),
+                "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
                 "API": True
             },
             "use_existing": self.enable_reuse,
             "properties": {}
         }
 
-        if self.tool.tool.get("id", "").startswith("keep:"):
-            sp = self.tool.tool["id"].split('/')
+        if self.embedded_tool.tool.get("id", "").startswith("keep:"):
+            sp = self.embedded_tool.tool["id"].split('/')
             workflowcollection = sp[0][5:]
             workflowname = "/".join(sp[1:])
             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
@@ -411,14 +424,14 @@ class RunnerContainer(Runner):
                 "portable_data_hash": "%s" % workflowcollection
             }
         else:
-            packed = packed_workflow(self.arvrunner, self.tool, self.merged_map)
+            packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
             workflowpath = "/var/lib/cwl/workflow.json#main"
             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
                 "kind": "json",
                 "content": packed
             }
-            if self.tool.tool.get("id", "").startswith("arvwf:"):
-                container_req["properties"]["template_uuid"] = self.tool.tool["id"][6:33]
+            if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
+                container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
 
 
         # --local means execute the workflow instead of submitting a container request
@@ -428,6 +441,7 @@ class RunnerContainer(Runner):
         # --eval-timeout is the timeout for javascript invocation
         # --parallel-task-count is the number of threads to use for job submission
         # --enable/disable-reuse sets desired job reuse
+        # --collection-cache-size sets aside memory to store collections
         command = ["arvados-cwl-runner",
                    "--local",
                    "--api=containers",
@@ -435,7 +449,8 @@ class RunnerContainer(Runner):
                    "--disable-validate",
                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
                    "--thread-count=%s" % self.arvrunner.thread_count,
-                   "--enable-reuse" if self.enable_reuse else "--disable-reuse"]
+                   "--enable-reuse" if self.enable_reuse else "--disable-reuse",
+                   "--collection-cache-size=%s" % self.collection_cache_size]
 
         if self.output_name:
             command.append("--output-name=" + self.output_name)
@@ -475,20 +490,26 @@ class RunnerContainer(Runner):
         if self.arvrunner.project_uuid:
             job_spec["owner_uuid"] = self.arvrunner.project_uuid
 
+        extra_submit_params = {}
+        if runtimeContext.submit_runner_cluster:
+            extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
+
         if runtimeContext.submit_request_uuid:
             response = self.arvrunner.api.container_requests().update(
                 uuid=runtimeContext.submit_request_uuid,
-                body=job_spec
+                body=job_spec,
+                **extra_submit_params
             ).execute(num_retries=self.arvrunner.num_retries)
         else:
             response = self.arvrunner.api.container_requests().create(
-                body=job_spec
+                body=job_spec,
+                **extra_submit_params
             ).execute(num_retries=self.arvrunner.num_retries)
 
         self.uuid = response["uuid"]
         self.arvrunner.process_submitted(self)
 
-        logger.info("%s submitted container %s", self.arvrunner.label(self), response["uuid"])
+        logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
 
     def done(self, record):
         try: