logger = logging.getLogger('arvados.cwl-runner')
metrics = logging.getLogger('arvados.cwl-runner.metrics')
+def cleanup_name_for_collection(name):
+ return name.replace("/", " ")
+
class ArvadosContainer(JobBase):
"""Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
container_request["state"] = "Committed"
container_request.setdefault("properties", {})
+ container_request["properties"]["cwl_input"] = self.joborder
+
runtime_constraints = {}
if runtimeContext.project_uuid:
mounts[targetdir]["path"] = path
prevdir = targetdir + "/"
+ intermediate_collection_info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
+
with Perf(metrics, "generatefiles %s" % self.name):
if self.generatefiles["listing"]:
vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
if not runtimeContext.current_container:
runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
- info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
- vwd.save_new(name=info["name"],
+ vwd.save_new(name=intermediate_collection_info["name"],
owner_uuid=runtimeContext.project_uuid,
ensure_unique_name=True,
- trash_at=info["trash_at"],
- properties=info["properties"])
+ trash_at=intermediate_collection_info["trash_at"],
+ properties=intermediate_collection_info["properties"])
prev = None
for f, p in sorteditems:
runtimeContext.project_uuid,
runtimeContext.force_docker_pull,
runtimeContext.tmp_outdir_prefix,
- runtimeContext.match_local_docker)
+ runtimeContext.match_local_docker,
+ runtimeContext.copy_deps)
network_req, _ = self.get_requirement("NetworkAccess")
if network_req:
runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
if runtime_req:
if "keep_cache" in runtime_req:
- runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
+ if self.arvrunner.api.config()["Containers"].get("DefaultKeepCacheDisk", 0) > 0:
+ # If DefaultKeepCacheDisk is non-zero it means we should use disk cache.
+ runtime_constraints["keep_cache_disk"] = math.ceil(runtime_req["keep_cache"] * 2**20)
+ else:
+ runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
if "outputDirType" in runtime_req:
if runtime_req["outputDirType"] == "local_output_dir":
# Currently the default behavior.
if runtimeContext.submit_runner_cluster:
extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
- container_request["output_name"] = "Output for step %s" % (self.name)
+ container_request["output_name"] = cleanup_name_for_collection("Output from step %s" % (self.name))
container_request["output_ttl"] = self.output_ttl
container_request["mounts"] = mounts
container_request["secret_mounts"] = secret_mounts
for pr in properties_req["processProperties"]:
container_request["properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
+ output_properties_req, _ = self.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
+ if output_properties_req:
+ if self.arvrunner.api._rootDesc["revision"] >= "20220510":
+ container_request["output_properties"] = {}
+ for pr in output_properties_req["outputProperties"]:
+ container_request["output_properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
+ else:
+ logger.warning("%s API revision is %s, revision %s is required to support setting properties on output collections.",
+ self.arvrunner.label(self), self.arvrunner.api._rootDesc["revision"], "20220510")
+
if runtimeContext.runnerjob.startswith("arvwf:"):
wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
processStatus = "permanentFail"
if rcode == 137:
- logger.warning("%s job was killed on the compute instance. The most common reason is that it attempted to allocate too much RAM and was targeted by the Out Of Memory (OOM) killer. Try resubmitting with a higher 'ramMin'.",
+ logger.warning("%s Container may have been killed for using too much RAM. Try resubmitting with a higher 'ramMin'.",
self.arvrunner.label(self))
else:
processStatus = "permanentFail"
if container["output"]:
outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
+
+ properties = record["properties"].copy()
+ properties["cwl_output"] = outputs
+ self.arvrunner.api.container_requests().update(
+ uuid=self.uuid,
+ body={"container_request": {"properties": properties}}
+ ).execute(num_retries=self.arvrunner.num_retries)
except WorkflowException as e:
# Only include a stack trace if in debug mode.
# A stack trace may obfuscate more useful output about the workflow.
class RunnerContainer(Runner):
"""Submit and manage a container that runs arvados-cwl-runner."""
- def arvados_job_spec(self, runtimeContext):
+ def arvados_job_spec(self, runtimeContext, git_info):
"""Create an Arvados container request for this workflow.
The returned dict can be used to create a container passed as
"cwd": "/var/spool/cwl",
"priority": self.priority,
"state": "Committed",
- "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
+ "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext),
"mounts": {
"/var/lib/cwl/cwl.input.json": {
"kind": "json",
"kind": "collection",
"portable_data_hash": "%s" % workflowcollection
}
+ elif self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
+ workflowpath = "/var/lib/cwl/workflow.json#main"
+ record = self.arvrunner.api.workflows().get(uuid=self.embedded_tool.tool["id"][6:33]).execute(num_retries=self.arvrunner.num_retries)
+ packed = yaml.safe_load(record["definition"])
+ container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
+ "kind": "json",
+ "content": packed
+ }
+ container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
else:
- packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
+ packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext, git_info)
workflowpath = "/var/lib/cwl/workflow.json#main"
container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
"kind": "json",
"content": packed
}
- if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
- container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
+
+ container_req["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in git_info.items()})
properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties")
if properties_req:
if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes:
command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
- if self.on_error:
+ if runtimeContext.on_error:
command.append("--on-error=" + self.on_error)
- if self.intermediate_output_ttl:
- command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl)
+ if runtimeContext.intermediate_output_ttl:
+ command.append("--intermediate-output-ttl=%d" % runtimeContext.intermediate_output_ttl)
- if self.arvrunner.trash_intermediate:
+ if runtimeContext.trash_intermediate:
command.append("--trash-intermediate")
- if self.arvrunner.project_uuid:
- command.append("--project-uuid="+self.arvrunner.project_uuid)
+ if runtimeContext.project_uuid:
+ command.append("--project-uuid="+runtimeContext.project_uuid)
if self.enable_dev:
command.append("--enable-dev")
if runtimeContext.enable_preemptible is False:
command.append("--disable-preemptible")
+ if runtimeContext.varying_url_params:
+ command.append("--varying-url-params="+runtimeContext.varying_url_params)
+
+ if runtimeContext.prefer_cached_downloads:
+ command.append("--prefer-cached-downloads")
+
command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
container_req["command"] = command
def run(self, runtimeContext):
runtimeContext.keepprefix = "keep:"
- job_spec = self.arvados_job_spec(runtimeContext)
- if self.arvrunner.project_uuid:
- job_spec["owner_uuid"] = self.arvrunner.project_uuid
+ job_spec = self.arvados_job_spec(runtimeContext, self.git_info)
+ if runtimeContext.project_uuid:
+ job_spec["owner_uuid"] = runtimeContext.project_uuid
extra_submit_params = {}
if runtimeContext.submit_runner_cluster: