Merge branch '13330-cwl-intermediate-collections-cleanup'
authorFuad Muhic <fmuhic@capeannenterprises.com>
Tue, 10 Jul 2018 14:20:09 +0000 (16:20 +0200)
committerFuad Muhic <fmuhic@capeannenterprises.com>
Tue, 10 Jul 2018 14:20:09 +0000 (16:20 +0200)
closes #13330

Arvados-DCO-1.1-Signed-off-by: Fuad Muhic <fmuhic@capeannenterprises.com>

sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/arvjob.py
sdk/cwl/arvados_cwl/context.py
sdk/cwl/arvados_cwl/pathmapper.py
sdk/cwl/arvados_cwl/util.py [new file with mode: 0644]
sdk/cwl/tests/makes_intermediates/echo.cwl [new file with mode: 0644]
sdk/cwl/tests/makes_intermediates/hello1.txt [new file with mode: 0644]
sdk/cwl/tests/makes_intermediates/run_in_single.cwl [new file with mode: 0644]
sdk/cwl/tests/makes_intermediates/subwf.cwl [new file with mode: 0644]
sdk/cwl/tests/test_container.py
sdk/cwl/tests/test_util.py [new file with mode: 0644]

index c0c6f77221a32d0e14201a5a5561ac055dc5de56..948a9a46feab30bf3f8759fee94d81d14205e42d 100644 (file)
@@ -11,6 +11,7 @@ import datetime
 import ciso8601
 import uuid
 
+from arvados_cwl.util import get_current_container, get_intermediate_collection_info
 import ruamel.yaml as yaml
 
 from cwltool.errors import WorkflowException
@@ -165,8 +166,14 @@ class ArvadosContainer(JobBase):
 
                 keepemptydirs(vwd)
 
-                with Perf(metrics, "generatefiles.save_new %s" % self.name):
-                    vwd.save_new()
+                if not runtimeContext.current_container:
+                    runtimeContext.current_container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
+                info = get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
+                vwd.save_new(name=info["name"],
+                             owner_uuid=self.arvrunner.project_uuid,
+                             ensure_unique_name=True,
+                             trash_at=info["trash_at"],
+                             properties=info["properties"])
 
                 prev = None
                 for f, p in sorteditems:
@@ -242,6 +249,7 @@ class ArvadosContainer(JobBase):
         if self.timelimit is not None:
             scheduling_parameters["max_run_time"] = self.timelimit
 
+        container_request["output_name"] = "Output for step %s" % (self.name)
         container_request["output_ttl"] = self.output_ttl
         container_request["mounts"] = mounts
         container_request["secret_mounts"] = secret_mounts
index 70c2173db9fa2f7ff5054ff4be7252bd64156b67..1287fbb6eaf7b8387ca3fe700c7c97cf0678b867 100644 (file)
@@ -18,6 +18,7 @@ from cwltool.job import JobBase
 
 from schema_salad.sourceline import SourceLine
 
+from arvados_cwl.util import get_current_container, get_intermediate_collection_info
 import ruamel.yaml as yaml
 
 import arvados.collection
@@ -76,7 +77,14 @@ class ArvadosJob(JobBase):
 
                 if vwd:
                     with Perf(metrics, "generatefiles.save_new %s" % self.name):
-                        vwd.save_new()
+                        if not runtimeContext.current_container:
+                            runtimeContext.current_container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
+                        info = get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
+                        vwd.save_new(name=info["name"],
+                                     owner_uuid=self.arvrunner.project_uuid,
+                                     ensure_unique_name=True,
+                                     trash_at=info["trash_at"],
+                                     properties=info["properties"])
 
                 for f, p in generatemapper.items():
                     if p.type == "File":
index cf0c1fb7e4576f9ef9f6d0f809e05b47e1186586..81e256ed545adbdf5e3be00eccdd108f65be26d4 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 from cwltool.context import LoadingContext, RuntimeContext
 
 class ArvLoadingContext(LoadingContext):
@@ -24,5 +28,6 @@ class ArvRuntimeContext(RuntimeContext):
         self.wait = True
         self.cwl_runner_job = None
         self.storage_classes = "default"
+        self.current_container = None
 
         super(ArvRuntimeContext, self).__init__(kwargs)
index 05a358e0d57a44f26cf6a4ec26c9d1bd35200163..d083b78f5a061906164a5978530af9230e767473 100644 (file)
@@ -8,11 +8,13 @@ import uuid
 import os
 import urllib
 
+from arvados_cwl.util import get_current_container, get_intermediate_collection_info
 import arvados.commands.run
 import arvados.collection
 
 from schema_salad.sourceline import SourceLine
 
+from arvados.errors import ApiError
 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
 from cwltool.workflow import WorkflowException
 
@@ -153,9 +155,14 @@ class ArvPathMapper(PathMapper):
                 for l in srcobj.get("listing", []):
                     self.addentry(l, c, ".", remap)
 
-                check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
-                if not check["items"]:
-                    c.save_new(owner_uuid=self.arvrunner.project_uuid)
+                container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
+                info = get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
+
+                c.save_new(name=info["name"],
+                           owner_uuid=self.arvrunner.project_uuid,
+                           ensure_unique_name=True,
+                           trash_at=info["trash_at"],
+                           properties=info["properties"])
 
                 ab = self.collection_pattern % c.portable_data_hash()
                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
@@ -167,9 +174,14 @@ class ArvPathMapper(PathMapper):
                                                   num_retries=self.arvrunner.num_retries                                                  )
                 self.addentry(srcobj, c, ".", remap)
 
-                check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
-                if not check["items"]:
-                    c.save_new(owner_uuid=self.arvrunner.project_uuid)
+                container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
+                info = get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
+
+                c.save_new(name=info["name"],
+                           owner_uuid=self.arvrunner.project_uuid,
+                           ensure_unique_name=True,
+                           trash_at=info["trash_at"],
+                           properties=info["properties"])
 
                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
@@ -202,6 +214,7 @@ class ArvPathMapper(PathMapper):
         else:
             return None
 
+
 class StagingPathMapper(PathMapper):
     _follow_dirs = True
 
diff --git a/sdk/cwl/arvados_cwl/util.py b/sdk/cwl/arvados_cwl/util.py
new file mode 100644 (file)
index 0000000..98a2a89
--- /dev/null
@@ -0,0 +1,31 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import datetime
+from arvados.errors import ApiError
+
+def get_intermediate_collection_info(workflow_step_name, current_container, intermediate_output_ttl):
+        if workflow_step_name:
+            name = "Intermediate collection for step %s" % (workflow_step_name)
+        else:
+            name = "Intermediate collection"
+        trash_time = None
+        if intermediate_output_ttl > 0:
+            trash_time = datetime.datetime.utcnow() + datetime.timedelta(seconds=intermediate_output_ttl)
+        container_uuid = None
+        if current_container:
+            container_uuid = current_container['uuid']
+        props = {"type": "intermediate", "container": container_uuid}
+
+        return {"name" : name, "trash_at" : trash_time, "properties" : props}
+
+def get_current_container(api, num_retries=0, logger=None):
+    current_container = None
+    try:
+        current_container = api.containers().current().execute(num_retries=num_retries)
+    except ApiError as e:
+        # Status code 404 just means we're not running in a container.
+        if e.resp.status != 404 and logger:
+            logger.info("Getting current container: %s", e)
+    return current_container
diff --git a/sdk/cwl/tests/makes_intermediates/echo.cwl b/sdk/cwl/tests/makes_intermediates/echo.cwl
new file mode 100644 (file)
index 0000000..5449bc3
--- /dev/null
@@ -0,0 +1,14 @@
+class: CommandLineTool
+cwlVersion: v1.0
+requirements:
+  InitialWorkDirRequirement:
+    listing:
+      - $(inputs.inp1)
+      - $(inputs.inp2)
+      - $(inputs.inp3)
+inputs:
+  inp1: File
+  inp2: [File, Directory]
+  inp3: Directory
+outputs: []
+arguments: [echo, $(inputs.inp1), $(inputs.inp2), $(inputs.inp3)]
diff --git a/sdk/cwl/tests/makes_intermediates/hello1.txt b/sdk/cwl/tests/makes_intermediates/hello1.txt
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/sdk/cwl/tests/makes_intermediates/run_in_single.cwl b/sdk/cwl/tests/makes_intermediates/run_in_single.cwl
new file mode 100644 (file)
index 0000000..bb596b2
--- /dev/null
@@ -0,0 +1,38 @@
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+  arv: "http://arvados.org/cwl#"
+requirements:
+  SubworkflowFeatureRequirement: {}
+inputs:
+  inp1:
+    type: File
+    default:
+      class: File
+      location: hello1.txt
+  inp2:
+    type: [File, Directory]
+    default:
+      class: File
+      basename: "hello2.txt"
+      contents: "Hello world"
+  inp3:
+    type: [File, Directory]
+    default:
+      class: Directory
+      basename: inp3
+      listing:
+        - class: File
+          basename: "hello3.txt"
+          contents: "hello world"
+outputs: []
+steps:
+  step1:
+    requirements:
+      arv:RunInSingleContainer: {}
+    in:
+      inp1: inp1
+      inp2: inp2
+      inp3: inp3
+    out: []
+    run: subwf.cwl
diff --git a/sdk/cwl/tests/makes_intermediates/subwf.cwl b/sdk/cwl/tests/makes_intermediates/subwf.cwl
new file mode 100644 (file)
index 0000000..1852ab4
--- /dev/null
@@ -0,0 +1,15 @@
+cwlVersion: v1.0
+class: Workflow
+inputs:
+  inp1: File
+  inp2: File
+  inp3: Directory
+outputs: []
+steps:
+  step1:
+    in:
+      inp1: inp1
+      inp2: inp2
+      inp3: inp3
+    out: []
+    run: echo.cwl
index dd484690c14ebb11caeae501da978f81b0e24abd..ae234414a3df90888cfbe9028c06aa5efbba9f55 100644 (file)
@@ -21,7 +21,6 @@ if not os.getenv('ARVADOS_DEBUG'):
     logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
     logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
 
-
 class TestContainer(unittest.TestCase):
 
     def helper(self, runner, enable_reuse=True):
@@ -100,6 +99,7 @@ class TestContainer(unittest.TestCase):
                                                "capacity": 1073741824 }
                         },
                         'state': 'Committed',
+                        'output_name': 'Output for step test_run_'+str(enable_reuse),
                         'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
                         'output_path': '/var/spool/cwl',
                         'output_ttl': 0,
@@ -186,6 +186,7 @@ class TestContainer(unittest.TestCase):
                                    "capacity": 5242880000 }
             },
             'state': 'Committed',
+            'output_name': 'Output for step test_resource_requirements',
             'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
             'output_path': '/var/spool/cwl',
             'output_ttl': 7200,
@@ -318,6 +319,7 @@ class TestContainer(unittest.TestCase):
                 }
             },
             'state': 'Committed',
+            'output_name': 'Output for step test_initial_work_dir',
             'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
             'output_path': '/var/spool/cwl',
             'output_ttl': 0,
@@ -405,6 +407,7 @@ class TestContainer(unittest.TestCase):
                         },
                     },
                     'state': 'Committed',
+                    "output_name": "Output for step test_run_redirect",
                     'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
                     'output_path': '/var/spool/cwl',
                     'output_ttl': 0,
@@ -541,6 +544,7 @@ class TestContainer(unittest.TestCase):
                                            "capacity": 1073741824 }
                     },
                     'state': 'Committed',
+                    'output_name': 'Output for step test_run_mounts',
                     'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
                     'output_path': '/var/spool/cwl',
                     'output_ttl': 0,
@@ -633,6 +637,7 @@ class TestContainer(unittest.TestCase):
                                            "capacity": 1073741824 }
                     },
                     'state': 'Committed',
+                    'output_name': 'Output for step test_secrets',
                     'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
                     'output_path': '/var/spool/cwl',
                     'output_ttl': 0,
diff --git a/sdk/cwl/tests/test_util.py b/sdk/cwl/tests/test_util.py
new file mode 100644 (file)
index 0000000..2532bd5
--- /dev/null
@@ -0,0 +1,45 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import unittest
+import mock
+import datetime
+import httplib2
+
+from arvados_cwl.util import *
+from arvados.errors import ApiError
+
+class MockDateTime(datetime.datetime):
+    @classmethod
+    def utcnow(cls):
+        return datetime.datetime(2018, 1, 1, 0, 0, 0, 0)
+
+datetime.datetime = MockDateTime
+
+class TestUtil(unittest.TestCase):
+    def test_get_intermediate_collection_info(self):
+        name = "one"
+        current_container = {"uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"}
+        intermediate_output_ttl = 120
+
+        info = get_intermediate_collection_info(name, current_container, intermediate_output_ttl)
+
+        self.assertEqual(info["name"], "Intermediate collection for step one")
+        self.assertEqual(info["trash_at"], datetime.datetime(2018, 1, 1, 0, 2, 0, 0))
+        self.assertEqual(info["properties"], {"type" : "intermediate", "container" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"})
+
+    def test_get_current_container_success(self):
+        api = mock.MagicMock()
+        api.containers().current().execute.return_value = {"uuid" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"}
+
+        current_container = get_current_container(api)
+
+        self.assertEqual(current_container, {"uuid" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"})
+
+    def test_get_current_container_error(self):
+        api = mock.MagicMock()
+        api.containers().current().execute.side_effect = ApiError(httplib2.Response({"status": 300}), "")
+        logger = mock.MagicMock()
+
+        self.assertRaises(ApiError, get_current_container(api, num_retries=0, logger=logger))