Merge branch 'master' of git.curoverse.com:arvados into 13330-cwl-intermediate-collec...
authorFuad Muhic <fmuhic@capeannenterprises.com>
Thu, 5 Jul 2018 15:36:24 +0000 (17:36 +0200)
committerFuad Muhic <fmuhic@capeannenterprises.com>
Thu, 5 Jul 2018 15:36:24 +0000 (17:36 +0200)
Arvados-DCO-1.1-Signed-off-by: Fuad Muhic <fmuhic@capeannenterprises.com>

sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/arvjob.py
sdk/cwl/arvados_cwl/pathmapper.py
sdk/cwl/tests/makes_intermediates/echo.cwl [new file with mode: 0644]
sdk/cwl/tests/makes_intermediates/hello1.txt [new file with mode: 0644]
sdk/cwl/tests/makes_intermediates/run_in_single.cwl [new file with mode: 0644]
sdk/cwl/tests/makes_intermediates/subwf.cwl [new file with mode: 0644]
sdk/cwl/tests/test_container.py
sdk/cwl/tests/test_job.py
sdk/cwl/tests/test_pathmapper.py

index 4ebcefb1365029a9f70665cbddedc71ffa8172bc..03f5a5eb678201f9610cf5fd76e085260695b742 100644 (file)
@@ -21,6 +21,7 @@ from cwltool.job import JobBase
 
 import arvados.collection
 
+from arvados.errors import ApiError
 from .arvdocker import arv_docker_get_image
 from . import done
 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields
@@ -165,8 +166,11 @@ class ArvadosContainer(JobBase):
 
                 keepemptydirs(vwd)
 
-                with Perf(metrics, "generatefiles.save_new %s" % self.name):
-                    vwd.save_new()
+                info = self._get_intermediate_collection_info()
+                vwd.save_new(name=info["name"],
+                             ensure_unique_name=True,
+                             trash_at=info["trash_at"],
+                             properties=info["properties"])
 
                 prev = None
                 for f, p in sorteditems:
@@ -337,6 +341,26 @@ class ArvadosContainer(JobBase):
         finally:
             self.output_callback(outputs, processStatus)
 
+    def _get_intermediate_collection_info(self):
+            trash_time = None
+            if self.arvrunner.intermediate_output_ttl > 0:
+                trash_time = datetime.datetime.now() + datetime.timedelta(seconds=self.arvrunner.intermediate_output_ttl)
+
+            current_container_uuid = None
+            try:
+                current_container = self.arvrunner.api.containers().current().execute(num_retries=self.arvrunner.num_retries)
+                current_container_uuid = current_container['uuid']
+            except ApiError as e:
+                # Status code 404 just means we're not running in a container.
+                if e.resp.status != 404:
+                    logger.info("Getting current container: %s", e)
+            props = {"type": "Intermediate",
+                          "container": current_container_uuid}
+
+            return {"name" : "Intermediate collection",
+                    "trash_at" : trash_time,
+                    "properties" : props}
+
 
 class RunnerContainer(Runner):
     """Submit and manage a container that runs arvados-cwl-runner."""
index 70c2173db9fa2f7ff5054ff4be7252bd64156b67..2d112c87a7c5affa2b6d490e526ae1e2d2aea28b 100644 (file)
@@ -6,6 +6,7 @@ import logging
 import re
 import copy
 import json
+import datetime
 import time
 
 from cwltool.process import shortname, UnsupportedRequirement
@@ -76,7 +77,11 @@ class ArvadosJob(JobBase):
 
                 if vwd:
                     with Perf(metrics, "generatefiles.save_new %s" % self.name):
-                        vwd.save_new()
+                        info = self._get_intermediate_collection_info()
+                        vwd.save_new(name=info["name"],
+                                     ensure_unique_name=True,
+                                     trash_at=info["trash_at"],
+                                     properties=info["properties"])
 
                 for f, p in generatemapper.items():
                     if p.type == "File":
@@ -277,6 +282,26 @@ class ArvadosJob(JobBase):
         finally:
             self.output_callback(outputs, processStatus)
 
+    def _get_intermediate_collection_info(self):
+            trash_time = None
+            if self.arvrunner.intermediate_output_ttl > 0:
+                trash_time = datetime.datetime.now() + datetime.timedelta(seconds=self.arvrunner.intermediate_output_ttl)
+
+            current_container_uuid = None
+            try:
+                current_container = self.arvrunner.api.containers().current().execute(num_retries=self.arvrunner.num_retries)
+                current_container_uuid = current_container['uuid']
+            except ApiError as e:
+                # Status code 404 just means we're not running in a container.
+                if e.resp.status != 404:
+                    logger.info("Getting current container: %s", e)
+            props = {"type": "Intermediate",
+                          "container": current_container_uuid}
+
+            return {"name" : "Intermediate collection",
+                    "trash_at" : trash_time,
+                    "properties" : props}
+
 
 class RunnerJob(Runner):
     """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
index 05a358e0d57a44f26cf6a4ec26c9d1bd35200163..9a76f81da54bbd1131ade82be8adda9e5d21d2a5 100644 (file)
@@ -7,12 +7,14 @@ import logging
 import uuid
 import os
 import urllib
+import datetime
 
 import arvados.commands.run
 import arvados.collection
 
 from schema_salad.sourceline import SourceLine
 
+from arvados.errors import ApiError
 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
 from cwltool.workflow import WorkflowException
 
@@ -153,9 +155,13 @@ class ArvPathMapper(PathMapper):
                 for l in srcobj.get("listing", []):
                     self.addentry(l, c, ".", remap)
 
-                check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
-                if not check["items"]:
-                    c.save_new(owner_uuid=self.arvrunner.project_uuid)
+                info = self._get_intermediate_collection_info()
+
+                c.save_new(name=info["name"],
+                           owner_uuid=self.arvrunner.project_uuid,
+                           ensure_unique_name=True,
+                           trash_at=info["trash_at"],
+                           properties=info["properties"])
 
                 ab = self.collection_pattern % c.portable_data_hash()
                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
@@ -167,9 +173,13 @@ class ArvPathMapper(PathMapper):
                                                   num_retries=self.arvrunner.num_retries                                                  )
                 self.addentry(srcobj, c, ".", remap)
 
-                check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
-                if not check["items"]:
-                    c.save_new(owner_uuid=self.arvrunner.project_uuid)
+                info = self._get_intermediate_collection_info()
+
+                c.save_new(name=info["name"],
+                           owner_uuid=self.arvrunner.project_uuid,
+                           ensure_unique_name=True,
+                           trash_at=info["trash_at"],
+                           properties=info["properties"])
 
                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
@@ -202,6 +212,27 @@ class ArvPathMapper(PathMapper):
         else:
             return None
 
+    def _get_intermediate_collection_info(self):
+            trash_time = None
+            if self.arvrunner.intermediate_output_ttl > 0:
+                trash_time = datetime.datetime.now() + datetime.timedelta(seconds=self.arvrunner.intermediate_output_ttl)
+
+            current_container_uuid = None
+            try:
+                current_container = self.arvrunner.api.containers().current().execute(num_retries=self.arvrunner.num_retries)
+                current_container_uuid = current_container['uuid']
+            except ApiError as e:
+                # Status code 404 just means we're not running in a container.
+                if e.resp.status != 404:
+                    logger.info("Getting current container: %s", e)
+            props = {"type": "Intermediate",
+                          "container": current_container_uuid}
+
+            return {"name" : "Intermediate collection",
+                    "trash_at" : trash_time,
+                    "properties" : props}
+
+
 class StagingPathMapper(PathMapper):
     _follow_dirs = True
 
diff --git a/sdk/cwl/tests/makes_intermediates/echo.cwl b/sdk/cwl/tests/makes_intermediates/echo.cwl
new file mode 100644 (file)
index 0000000..5449bc3
--- /dev/null
@@ -0,0 +1,14 @@
+class: CommandLineTool
+cwlVersion: v1.0
+requirements:
+  InitialWorkDirRequirement:
+    listing:
+      - $(inputs.inp1)
+      - $(inputs.inp2)
+      - $(inputs.inp3)
+inputs:
+  inp1: File
+  inp2: [File, Directory]
+  inp3: Directory
+outputs: []
+arguments: [echo, $(inputs.inp1), $(inputs.inp2), $(inputs.inp3)]
diff --git a/sdk/cwl/tests/makes_intermediates/hello1.txt b/sdk/cwl/tests/makes_intermediates/hello1.txt
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/sdk/cwl/tests/makes_intermediates/run_in_single.cwl b/sdk/cwl/tests/makes_intermediates/run_in_single.cwl
new file mode 100644 (file)
index 0000000..bb596b2
--- /dev/null
@@ -0,0 +1,38 @@
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+  arv: "http://arvados.org/cwl#"
+requirements:
+  SubworkflowFeatureRequirement: {}
+inputs:
+  inp1:
+    type: File
+    default:
+      class: File
+      location: hello1.txt
+  inp2:
+    type: [File, Directory]
+    default:
+      class: File
+      basename: "hello2.txt"
+      contents: "Hello world"
+  inp3:
+    type: [File, Directory]
+    default:
+      class: Directory
+      basename: inp3
+      listing:
+        - class: File
+          basename: "hello3.txt"
+          contents: "hello world"
+outputs: []
+steps:
+  step1:
+    requirements:
+      arv:RunInSingleContainer: {}
+    in:
+      inp1: inp1
+      inp2: inp2
+      inp3: inp3
+    out: []
+    run: subwf.cwl
diff --git a/sdk/cwl/tests/makes_intermediates/subwf.cwl b/sdk/cwl/tests/makes_intermediates/subwf.cwl
new file mode 100644 (file)
index 0000000..1852ab4
--- /dev/null
@@ -0,0 +1,15 @@
+cwlVersion: v1.0
+class: Workflow
+inputs:
+  inp1: File
+  inp2: File
+  inp3: Directory
+outputs: []
+steps:
+  step1:
+    in:
+      inp1: inp1
+      inp2: inp2
+      inp3: inp3
+    out: []
+    run: echo.cwl
index dd484690c14ebb11caeae501da978f81b0e24abd..bd5b1a1b613850995ecdc36cb741d81c8b13766e 100644 (file)
@@ -9,6 +9,7 @@ import logging
 import mock
 import unittest
 import os
+import datetime
 import functools
 import cwltool.process
 import cwltool.secrets
@@ -21,6 +22,12 @@ if not os.getenv('ARVADOS_DEBUG'):
     logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
     logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
 
+class MockDateTime(datetime.datetime):
+    @classmethod
+    def now(cls):
+        return datetime.datetime(2018, 1, 1, 0, 0, 0, 0)
+
+datetime.datetime = MockDateTime
 
 class TestContainer(unittest.TestCase):
 
@@ -691,3 +698,17 @@ class TestContainer(unittest.TestCase):
 
         _, kwargs = runner.api.container_requests().create.call_args
         self.assertEqual(42, kwargs['body']['scheduling_parameters'].get('max_run_time'))
+
+
+    def test_get_intermediate_collection_info(self):
+        arvrunner = mock.MagicMock()
+        arvrunner.intermediate_output_ttl = 60
+        arvrunner.api.containers().current().execute.return_value = {"uuid" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"}
+
+        container = arvados_cwl.ArvadosContainer(arvrunner)
+
+        info = container._get_intermediate_collection_info()
+
+        self.assertEqual(info["name"], "Intermediate collection")
+        self.assertEqual(info["trash_at"], datetime.datetime(2018, 1, 1, 0, 1))
+        self.assertEqual(info["properties"], {"type" : "Intermediate", "container" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"})
index c110bc5d53cd4634656d93fab2937954be973d07..b9f1c396172b6d32e0b22423878ed2b0c78696cf 100644 (file)
@@ -10,6 +10,7 @@ import os
 import unittest
 import copy
 import StringIO
+import datetime
 
 import arvados
 import arvados_cwl
@@ -24,6 +25,13 @@ if not os.getenv('ARVADOS_DEBUG'):
     logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
     logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
 
+class MockDateTime(datetime.datetime):
+    @classmethod
+    def now(cls):
+        return datetime.datetime(2018, 1, 1, 0, 0, 0, 0)
+
+datetime.datetime = MockDateTime
+
 class TestJob(unittest.TestCase):
 
     def helper(self, runner, enable_reuse=True):
@@ -331,6 +339,20 @@ class TestJob(unittest.TestCase):
 
         arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
 
+    def test_get_intermediate_collection_info(self):
+        arvrunner = mock.MagicMock()
+        arvrunner.intermediate_output_ttl = 60
+        arvrunner.api.containers().current().execute.return_value = {"uuid" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"}
+
+        job = arvados_cwl.ArvadosJob(arvrunner)
+
+        info = job._get_intermediate_collection_info()
+
+        self.assertEqual(info["name"], "Intermediate collection")
+        self.assertEqual(info["trash_at"], datetime.datetime(2018, 1, 1, 0, 1))
+        self.assertEqual(info["properties"], {"type" : "Intermediate", "container" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"})
+
+
 
 class TestWorkflow(unittest.TestCase):
     def helper(self, runner, enable_reuse=True):
index eaa57114222233d6bcbd02ff2674c89f5169b168..e5326147d7737d9f0f0d2e52fda8120a54770ae0 100644 (file)
@@ -9,6 +9,7 @@ import unittest
 import json
 import logging
 import os
+import datetime
 
 import arvados
 import arvados.keep
@@ -26,6 +27,13 @@ def upload_mock(files, api, dry_run=False, num_retries=0, project=None, fnPatter
         c.keepref = "%s/%s" % (pdh, os.path.basename(c.fn))
         c.fn = fnPattern % (pdh, os.path.basename(c.fn))
 
+class MockDateTime(datetime.datetime):
+    @classmethod
+    def now(cls):
+        return datetime.datetime(2018, 1, 1, 0, 0, 0, 0)
+
+datetime.datetime = MockDateTime
+
 class TestPathmap(unittest.TestCase):
     def setUp(self):
         self.api = mock.MagicMock()
@@ -101,3 +109,19 @@ class TestPathmap(unittest.TestCase):
                 "class": "File",
                 "location": "file:tests/hw.py"
             }], "", "/test/%s", "/test/%s/%s")
+
+    def test_get_intermediate_collection_info(self):
+        self.api.containers().current().execute.return_value = {"uuid" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"}
+        arvrunner = arvados_cwl.ArvCwlRunner(self.api)
+        arvrunner.intermediate_output_ttl = 60
+
+        path_mapper = ArvPathMapper(arvrunner, [{
+            "class": "File",
+            "location": "keep:99999999999999999999999999999991+99/hw.py"
+        }], "", "/test/%s", "/test/%s/%s")
+
+        info = path_mapper._get_intermediate_collection_info()
+
+        self.assertEqual(info["name"], "Intermediate collection")
+        self.assertEqual(info["trash_at"], datetime.datetime(2018, 1, 1, 0, 1))
+        self.assertEqual(info["properties"], {"type" : "Intermediate", "container" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"})