Update intermediate collection name, properties and trash_at atrribute
authorFuad Muhic <fmuhic@capeannenterprises.com>
Mon, 18 Jun 2018 14:37:42 +0000 (16:37 +0200)
committerFuad Muhic <fmuhic@capeannenterprises.com>
Mon, 18 Jun 2018 14:37:42 +0000 (16:37 +0200)
Arvados-DCO-1.1-Signed-off-by: Fuad Muhic <fmuhic@capeannenterprises.com>

sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/arvjob.py
sdk/cwl/arvados_cwl/pathmapper.py

index 0bec692643ad805c02d6b8358fae8a65841c1367..1ad276e67a6810575ec570e4cd1b771098687d78 100644 (file)
@@ -20,6 +20,7 @@ from cwltool.utils import aslist
 
 import arvados.collection
 
+from arvados.errors import ApiError
 from .arvdocker import arv_docker_get_image
 from . import done
 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields
@@ -156,8 +157,26 @@ class ArvadosContainer(object):
 
                 keepemptydirs(vwd)
 
+                trash_time = None 
+                if self.arvrunner.intermediate_output_ttl > 0: 
+                    trash_time = datetime.datetime.now() + datetime.timedelta(seconds=self.arvrunner.intermediate_output_ttl) 
+                current_container_uuid = None 
+                try: 
+                    current_container = self.arvrunner.api.containers().current().execute(num_retries=self.arvrunner.num_retries) 
+                    current_container_uuid = current_container['uuid'] 
+                except ApiError as e: 
+                    # Status code 404 just means we're not running in a container. 
+                    if e.resp.status != 404: 
+                        logger.info("Getting current container: %s", e) 
+                props = {"type": "Intermediate", 
+                         "container": current_container_uuid}
+
                 with Perf(metrics, "generatefiles.save_new %s" % self.name):
-                    vwd.save_new()
+                    vwd.save_new(name="Intermediate collection", 
+                                 ensure_unique_name=True, 
+                                 trash_at=trash_time, 
+                                 properties=props)
 
                 prev = None
                 for f, p in sorteditems:
index 04256c68f8b10f47ede2fefcabb0172948c2ff00..2d98a53b33f3f2a7835a4f73392f9c2cb7e5ab1d 100644 (file)
@@ -6,6 +6,7 @@ import logging
 import re
 import copy
 import json
+import datetime
 import time
 
 from cwltool.process import get_feature, shortname, UnsupportedRequirement
@@ -66,8 +67,26 @@ class ArvadosJob(object):
                                 n.write(p.resolved.encode("utf-8"))
 
                 if vwd:
+                    trash_time = None 
+                    if self.arvrunner.intermediate_output_ttl > 0: 
+                        trash_time = datetime.datetime.now() + datetime.timedelta(seconds=self.arvrunner.intermediate_output_ttl) 
+                    current_container_uuid = None 
+                    try: 
+                        current_container = self.arvrunner.api.containers().current().execute(num_retries=self.arvrunner.num_retries) 
+                        current_container_uuid = current_container['uuid'] 
+                    except ApiError as e: 
+                        # Status code 404 just means we're not running in a container. 
+                        if e.resp.status != 404: 
+                            logger.info("Getting current container: %s", e) 
+                    props = {"type": "Intermediate", 
+                             "container": current_container_uuid}
                     with Perf(metrics, "generatefiles.save_new %s" % self.name):
-                        vwd.save_new()
+                        vwd.save_new(name="Intermediate collection", 
+                                     ensure_unique_name=True, 
+                                     trash_at=trash_time, 
+                                     properties=props)
 
                 for f, p in generatemapper.items():
                     if p.type == "File":
index 27e48f1f4408e33630985f2060ba738af720111f..a749b51625b3af008e18ac4a727ca461c78a8712 100644 (file)
@@ -7,12 +7,14 @@ import logging
 import uuid
 import os
 import urllib
+import datetime
 
 import arvados.commands.run
 import arvados.collection
 
 from schema_salad.sourceline import SourceLine
 
+from arvados.errors import ApiError
 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
 from cwltool.workflow import WorkflowException
 
@@ -153,9 +155,13 @@ class ArvPathMapper(PathMapper):
                 for l in srcobj.get("listing", []):
                     self.addentry(l, c, ".", remap)
 
-                check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
-                if not check["items"]:
-                    c.save_new(owner_uuid=self.arvrunner.project_uuid)
+                trash_time, props = self.__get_collection_attributes()
+
+                c.save_new(name="Intermediate collection", 
+                           owner_uuid=self.arvrunner.project_uuid, 
+                           ensure_unique_name=True, 
+                           trash_at=trash_time, 
+                           properties=props)
 
                 ab = self.collection_pattern % c.portable_data_hash()
                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
@@ -167,9 +173,13 @@ class ArvPathMapper(PathMapper):
                                                   num_retries=self.arvrunner.num_retries                                                  )
                 self.addentry(srcobj, c, ".", remap)
 
-                check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
-                if not check["items"]:
-                    c.save_new(owner_uuid=self.arvrunner.project_uuid)
+                trash_time, props = self.__get_collection_attributes()
+
+                c.save_new(name="Intermediate collection", 
+                           owner_uuid=self.arvrunner.project_uuid, 
+                           ensure_unique_name=True, 
+                           trash_at=trash_time, 
+                           properties=props)
 
                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
@@ -202,6 +212,25 @@ class ArvPathMapper(PathMapper):
         else:
             return None
 
+    def __get_collection_attributes(self):
+            trash_time = None 
+            if self.arvrunner.intermediate_output_ttl > 0: 
+                trash_time = datetime.datetime.now() + datetime.timedelta(seconds=self.arvrunner.intermediate_output_ttl) 
+
+            current_container_uuid = None 
+            try: 
+                current_container = self.arvrunner.api.containers().current().execute(num_retries=self.arvrunner.num_retries) 
+                current_container_uuid = current_container['uuid'] 
+            except ApiError as e: 
+                # Status code 404 just means we're not running in a container. 
+                if e.resp.status != 404: 
+                    logger.info("Getting current container: %s", e)
+            properties = {"type": "Intermediate", 
+                          "container": current_container_uuid}
+
+            return (trash_time, properties)
+
+
 class StagingPathMapper(PathMapper):
     _follow_dirs = True