19699: Pass through --varying-url-params to runner container
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
index b04fb190e963e965524679bb4473767820e8b849..b0ef4a22cc1f7c55bcfa0d1746fcb7cb5fe082f5 100644 (file)
@@ -37,6 +37,9 @@ from ._version import __version__
 logger = logging.getLogger('arvados.cwl-runner')
 metrics = logging.getLogger('arvados.cwl-runner.metrics')
 
+def cleanup_name_for_collection(name):
+    return name.replace("/", " ")
+
 class ArvadosContainer(JobBase):
     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
 
@@ -88,6 +91,8 @@ class ArvadosContainer(JobBase):
         container_request["state"] = "Committed"
         container_request.setdefault("properties", {})
 
+        container_request["properties"]["cwl_input"] = self.joborder
+
         runtime_constraints = {}
 
         if runtimeContext.project_uuid:
@@ -146,6 +151,8 @@ class ArvadosContainer(JobBase):
                     mounts[targetdir]["path"] = path
             prevdir = targetdir + "/"
 
+        intermediate_collection_info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
+
         with Perf(metrics, "generatefiles %s" % self.name):
             if self.generatefiles["listing"]:
                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
@@ -197,12 +204,11 @@ class ArvadosContainer(JobBase):
 
                 if not runtimeContext.current_container:
                     runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
-                info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
-                vwd.save_new(name=info["name"],
+                vwd.save_new(name=intermediate_collection_info["name"],
                              owner_uuid=runtimeContext.project_uuid,
                              ensure_unique_name=True,
-                             trash_at=info["trash_at"],
-                             properties=info["properties"])
+                             trash_at=intermediate_collection_info["trash_at"],
+                             properties=intermediate_collection_info["properties"])
 
                 prev = None
                 for f, p in sorteditems:
@@ -247,7 +253,8 @@ class ArvadosContainer(JobBase):
                                                                     runtimeContext.project_uuid,
                                                                     runtimeContext.force_docker_pull,
                                                                     runtimeContext.tmp_outdir_prefix,
-                                                                    runtimeContext.match_local_docker)
+                                                                    runtimeContext.match_local_docker,
+                                                                    runtimeContext.copy_deps)
 
         network_req, _ = self.get_requirement("NetworkAccess")
         if network_req:
@@ -318,7 +325,7 @@ class ArvadosContainer(JobBase):
         if runtimeContext.submit_runner_cluster:
             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
 
-        container_request["output_name"] = "Output for step %s" % (self.name)
+        container_request["output_name"] = cleanup_name_for_collection("Output from step %s" % (self.name))
         container_request["output_ttl"] = self.output_ttl
         container_request["mounts"] = mounts
         container_request["secret_mounts"] = secret_mounts
@@ -340,6 +347,16 @@ class ArvadosContainer(JobBase):
             for pr in properties_req["processProperties"]:
                 container_request["properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
 
+        output_properties_req, _ = self.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
+        if output_properties_req:
+            if self.arvrunner.api._rootDesc["revision"] >= "20220510":
+                container_request["output_properties"] = {}
+                for pr in output_properties_req["outputProperties"]:
+                    container_request["output_properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
+            else:
+                logger.warning("%s API revision is %s, revision %s is required to support setting properties on output collections.",
+                               self.arvrunner.label(self), self.arvrunner.api._rootDesc["revision"], "20220510")
+
         if runtimeContext.runnerjob.startswith("arvwf:"):
             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
@@ -394,7 +411,7 @@ class ArvadosContainer(JobBase):
                     processStatus = "permanentFail"
 
                 if rcode == 137:
-                    logger.warning("%s This container was killed on the compute instance.  The most common reason is that it attempted to allocate too much RAM and was targeted by the Out Of Memory (OOM) killer.  Try resubmitting with a higher 'ramMin'.",
+                    logger.warning("%s Container may have been killed for using too much RAM.  Try resubmitting with a higher 'ramMin'.",
                                  self.arvrunner.label(self))
             else:
                 processStatus = "permanentFail"
@@ -422,6 +439,13 @@ class ArvadosContainer(JobBase):
 
             if container["output"]:
                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
+
+            properties = record["properties"].copy()
+            properties["cwl_output"] = outputs
+            self.arvrunner.api.container_requests().update(
+                uuid=self.uuid,
+                body={"container_request": {"properties": properties}}
+            ).execute(num_retries=self.arvrunner.num_retries)
         except WorkflowException as e:
             # Only include a stack trace if in debug mode.
             # A stack trace may obfuscate more useful output about the workflow.
@@ -438,7 +462,7 @@ class ArvadosContainer(JobBase):
 class RunnerContainer(Runner):
     """Submit and manage a container that runs arvados-cwl-runner."""
 
-    def arvados_job_spec(self, runtimeContext):
+    def arvados_job_spec(self, runtimeContext, git_info):
         """Create an Arvados container request for this workflow.
 
         The returned dict can be used to create a container passed as
@@ -465,7 +489,7 @@ class RunnerContainer(Runner):
             "cwd": "/var/spool/cwl",
             "priority": self.priority,
             "state": "Committed",
-            "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
+            "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext),
             "mounts": {
                 "/var/lib/cwl/cwl.input.json": {
                     "kind": "json",
@@ -499,15 +523,24 @@ class RunnerContainer(Runner):
                 "kind": "collection",
                 "portable_data_hash": "%s" % workflowcollection
             }
+        elif self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
+            workflowpath = "/var/lib/cwl/workflow.json#main"
+            record = self.arvrunner.api.workflows().get(uuid=self.embedded_tool.tool["id"][6:33]).execute(num_retries=self.arvrunner.num_retries)
+            packed = yaml.safe_load(record["definition"])
+            container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
+                "kind": "json",
+                "content": packed
+            }
+            container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
         else:
-            packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
+            packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext, git_info)
             workflowpath = "/var/lib/cwl/workflow.json#main"
             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
                 "kind": "json",
                 "content": packed
             }
-            if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
-                container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
+
+        container_req["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in git_info.items()})
 
         properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties")
         if properties_req:
@@ -550,17 +583,17 @@ class RunnerContainer(Runner):
         if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes:
             command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
 
-        if self.on_error:
+        if runtimeContext.on_error:
             command.append("--on-error=" + self.on_error)
 
-        if self.intermediate_output_ttl:
-            command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl)
+        if runtimeContext.intermediate_output_ttl:
+            command.append("--intermediate-output-ttl=%d" % runtimeContext.intermediate_output_ttl)
 
-        if self.arvrunner.trash_intermediate:
+        if runtimeContext.trash_intermediate:
             command.append("--trash-intermediate")
 
-        if self.arvrunner.project_uuid:
-            command.append("--project-uuid="+self.arvrunner.project_uuid)
+        if runtimeContext.project_uuid:
+            command.append("--project-uuid="+runtimeContext.project_uuid)
 
         if self.enable_dev:
             command.append("--enable-dev")
@@ -571,6 +604,9 @@ class RunnerContainer(Runner):
         if runtimeContext.enable_preemptible is False:
             command.append("--disable-preemptible")
 
+        if runtimeContext.varying_url_params:
+            command.append("--varying-url-params="+runtimeContext.varying_url_params)
+
         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
 
         container_req["command"] = command
@@ -580,9 +616,9 @@ class RunnerContainer(Runner):
 
     def run(self, runtimeContext):
         runtimeContext.keepprefix = "keep:"
-        job_spec = self.arvados_job_spec(runtimeContext)
-        if self.arvrunner.project_uuid:
-            job_spec["owner_uuid"] = self.arvrunner.project_uuid
+        job_spec = self.arvados_job_spec(runtimeContext, self.git_info)
+        if runtimeContext.project_uuid:
+            job_spec["owner_uuid"] = runtimeContext.project_uuid
 
         extra_submit_params = {}
         if runtimeContext.submit_runner_cluster: