outputs are scheduled for trash.
h3. Check Docker access
-In order to pull and upload Docker images, @arvados-cwl-runner@ requires access to Docker. You do not need Docker if the Docker images you intend to use are already available in Aravdos.
+In order to pull and upload Docker images, @arvados-cwl-runner@ requires access to Docker. You do not need Docker if the Docker images you intend to use are already available in Arvados.
You can determine if you have access to Docker by running @docker version@:
collection_cache=self.collection_cache)
self.fs_access = make_fs_access(kwargs["basedir"])
- self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
+
self.trash_intermediate = kwargs["trash_intermediate"]
+ if self.trash_intermediate and self.work_api != "containers":
+ raise Exception("--trash-intermediate is only supported with --api=containers.")
+
+ self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
if self.intermediate_output_ttl and self.work_api != "containers":
- raise Exception("--intermediate-output-ttl is only supported when using the containers api.")
+ raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
+ if self.intermediate_output_ttl < 0:
+ raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
if not kwargs.get("name"):
kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
import json
import os
import urllib
+import time
+import datetime
+import ciso8601
import ruamel.yaml as yaml
def done(self, record):
outputs = {}
try:
- self.arvrunner.add_intermediate_output(record["output_uuid"])
-
container = self.arvrunner.api.containers().get(
uuid=record["container_uuid"]
).execute(num_retries=self.arvrunner.num_retries)
num_retries=self.arvrunner.num_retries)
done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self))
+ if record["output_uuid"]:
+ if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
+ # Compute the trash time to avoid requesting the collection record.
+ trash_at = ciso8601.parse_datetime_unaware(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl)
+ aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else ""
+ orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else ""
+ oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else ""
+ logger.info("%s Intermediate output %s (%s) will be trashed%s%s%s." % (
+ self.arvrunner.label(self), record["output_uuid"], container["output"], aftertime, orpart, oncomplete))
+ self.arvrunner.add_intermediate_output(record["output_uuid"])
+
if container["output"]:
outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
except WorkflowException as e:
if self.intermediate_output_ttl:
command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl)
+ if self.arvrunner.trash_intermediate:
+ command.append("--trash-intermediate")
+
command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
container_req["command"] = command
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_container_request_uuid + '\n')
+ @stubs
+ def test_submit_container_trash_intermediate(self, stubs):
+ capture_stdout = cStringIO.StringIO()
+ try:
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug", "--trash-intermediate",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+ self.assertEqual(exited, 0)
+ except:
+ logging.exception("")
+
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+ '--enable-reuse', '--on-error=continue',
+ "--trash-intermediate",
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+
+ stubs.api.container_requests().create.assert_called_with(
+ body=JsonDiffMatcher(expect_container))
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_container_request_uuid + '\n')
+
@stubs
def test_submit_container_output_tags(self, stubs):
output_tags = "tag0,tag1,tag2"