logger = logging.getLogger('arvados.cwl-runner')
metrics = logging.getLogger('arvados.cwl-runner.metrics')
+def cleanup_name_for_collection(name):
+ return name.replace("/", " ")
+
class ArvadosContainer(JobBase):
"""Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
mounts[targetdir]["path"] = path
prevdir = targetdir + "/"
+ intermediate_collection_info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
+
with Perf(metrics, "generatefiles %s" % self.name):
if self.generatefiles["listing"]:
vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
if not runtimeContext.current_container:
runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
- info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
- vwd.save_new(name=info["name"],
+ vwd.save_new(name=intermediate_collection_info["name"],
owner_uuid=runtimeContext.project_uuid,
ensure_unique_name=True,
- trash_at=info["trash_at"],
- properties=info["properties"])
+ trash_at=intermediate_collection_info["trash_at"],
+ properties=intermediate_collection_info["properties"])
prev = None
for f, p in sorteditems:
runtimeContext.project_uuid,
runtimeContext.force_docker_pull,
runtimeContext.tmp_outdir_prefix,
- runtimeContext.match_local_docker)
+ runtimeContext.match_local_docker,
+ runtimeContext.copy_deps)
network_req, _ = self.get_requirement("NetworkAccess")
if network_req:
if runtimeContext.submit_runner_cluster:
extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
- container_request["output_name"] = "Output for step %s" % (self.name)
+ container_request["output_name"] = cleanup_name_for_collection("Output from step %s" % (self.name))
container_request["output_ttl"] = self.output_ttl
container_request["mounts"] = mounts
container_request["secret_mounts"] = secret_mounts
for pr in properties_req["processProperties"]:
container_request["properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
+ output_properties_req, _ = self.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
+ if output_properties_req:
+ if self.arvrunner.api._rootDesc["revision"] >= "20220510":
+ container_request["output_properties"] = {}
+ for pr in output_properties_req["outputProperties"]:
+ container_request["output_properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
+ else:
+ logger.warning("%s API revision is %s, revision %s is required to support setting properties on output collections.",
+ self.arvrunner.label(self), self.arvrunner.api._rootDesc["revision"], "20220510")
+
if runtimeContext.runnerjob.startswith("arvwf:"):
wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
processStatus = "permanentFail"
if rcode == 137:
- logger.warning("%s job was killed on the compute instance. The most common reason is that it attempted to allocate too much RAM and was targeted by the Out Of Memory (OOM) killer. Try resubmitting with a higher 'ramMin'.",
+ logger.warning("%s Container may have been killed for using too much RAM. Try resubmitting with a higher 'ramMin'.",
self.arvrunner.label(self))
else:
processStatus = "permanentFail"
"cwd": "/var/spool/cwl",
"priority": self.priority,
"state": "Committed",
- "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
+ "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext),
"mounts": {
"/var/lib/cwl/cwl.input.json": {
"kind": "json",
"portable_data_hash": "%s" % workflowcollection
}
else:
- packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
+ packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext)
workflowpath = "/var/lib/cwl/workflow.json#main"
container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
"kind": "json",
if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes:
command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
- if self.on_error:
+ if runtimeContext.on_error:
command.append("--on-error=" + self.on_error)
- if self.intermediate_output_ttl:
- command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl)
+ if runtimeContext.intermediate_output_ttl:
+ command.append("--intermediate-output-ttl=%d" % runtimeContext.intermediate_output_ttl)
- if self.arvrunner.trash_intermediate:
+ if runtimeContext.trash_intermediate:
command.append("--trash-intermediate")
- if self.arvrunner.project_uuid:
- command.append("--project-uuid="+self.arvrunner.project_uuid)
+ if runtimeContext.project_uuid:
+ command.append("--project-uuid="+runtimeContext.project_uuid)
if self.enable_dev:
command.append("--enable-dev")
def run(self, runtimeContext):
runtimeContext.keepprefix = "keep:"
job_spec = self.arvados_job_spec(runtimeContext)
- if self.arvrunner.project_uuid:
- job_spec["owner_uuid"] = self.arvrunner.project_uuid
+ if runtimeContext.project_uuid:
+ job_spec["owner_uuid"] = runtimeContext.project_uuid
extra_submit_params = {}
if runtimeContext.submit_runner_cluster: