From 67fbf00f0d868384f1585f2473b5f89455001638 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Fri, 31 Mar 2017 17:49:38 -0400 Subject: [PATCH] 11100: a-c-r sets output_ttl and deletes intermediate collections on success. --- sdk/cwl/arvados_cwl/__init__.py | 26 ++++++++++++++++++++++++++ sdk/cwl/arvados_cwl/arvcontainer.py | 5 ++++- 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index 46436b54dc..e0d103ce8c 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -74,6 +74,8 @@ class ArvCwlRunner(object): self.output_name = output_name self.output_tags = output_tags self.project_uuid = None + self.output_ttl = 0 + self.intermediate_output_collections = [] if keep_client is not None: self.keep_client = keep_client @@ -202,6 +204,20 @@ class ArvCwlRunner(object): def add_uploaded(self, src, pair): self.uploaded[src] = pair + def add_intermediate_output(self, uuid): + if uuid: + self.intermediate_output_collections.append(uuid) + + def trash_intermediate_output(self): + logger.info("Cleaning up intermediate output collections") + for i in self.intermediate_output_collections: + try: + self.api_client.collections().delete(uuid=i).execute(num_retries=self.num_retries) + except: + logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False)) + if sys.exc_info()[0] is KeyboardInterrupt: + break + def check_features(self, obj): if isinstance(obj, dict): if obj.get("writable"): @@ -329,6 +345,10 @@ class ArvCwlRunner(object): collection_cache=self.collection_cache) self.fs_access = make_fs_access(kwargs["basedir"]) + self.output_ttl = kwargs["intermediate_output_ttl"] + if self.output_ttl and self.work_api != "containers": + raise Exception("--intermediate-output-ttl is only supported when using the containers api.") + if not kwargs.get("name"): kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"]) @@ -512,6 +532,9 @@ class ArvCwlRunner(object): adjustDirObjs(self.final_output, partial(get_listing, self.fs_access)) adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access)) + if self.output_ttl and self.final_status == "success": + self.trash_intermediate_output() + return (self.final_output, self.final_status) @@ -619,6 +642,9 @@ def arg_parser(): # type: () -> argparse.ArgumentParser parser.add_argument("--enable-dev", action="store_true", help="Enable loading and running development versions " "of CWL spec.", default=False) + parser.add_argument("--intermediate-output-ttl", type=int, metavar="N", + help="If N > 0, intermediate output collections will be trashed N seconds after creation, or on successful completion of workflow (whichever comes first).", + default=0) parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute") parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.") diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py index 0b302b6280..fe3efccfbf 100644 --- a/sdk/cwl/arvados_cwl/arvcontainer.py +++ b/sdk/cwl/arvados_cwl/arvcontainer.py @@ -42,7 +42,8 @@ class ArvadosContainer(object): "cwd": self.outdir, "priority": 1, "state": "Committed", - "properties": {} + "properties": {}, + "output_ttl", self.arvrunner.output_ttl } runtime_constraints = {} @@ -200,6 +201,8 @@ class ArvadosContainer(object): def done(self, record): try: + self.arvrunner.add_intermediate_output(record["output_uuid"]) + container = self.arvrunner.api.containers().get( uuid=record["container_uuid"] ).execute(num_retries=self.arvrunner.num_retries) -- 2.30.2