Merge branch '18002-update-python-deps' into main refs #18002
[arvados.git] / sdk / cwl / tests / test_pathmapper.py
index fb3c257d93e1be9cac211defc97d3282100ccdbc..b78e89012ad62c5f952476da0553b2d26dac5fd3 100644 (file)
@@ -102,3 +102,132 @@ class TestPathmap(unittest.TestCase):
                 "class": "File",
                 "location": "file:tests/hw.py"
             }], "", "/test/%s", "/test/%s/%s")
+
+    def test_needs_new_collection(self):
+        arvrunner = arvados_cwl.executor.ArvCwlExecutor(self.api)
+
+        # Plain file.  Don't need a new collection.
+        a = {
+            "class": "File",
+            "location": "keep:99999999999999999999999999999991+99/hw.py",
+            "basename": "hw.py"
+        }
+        p = ArvPathMapper(arvrunner, [], "", "%s", "%s/%s")
+        p._pathmap["keep:99999999999999999999999999999991+99/hw.py"] = True
+        self.assertFalse(p.needs_new_collection(a))
+
+        # A file that isn't in the pathmap (for some reason).  Need a new collection.
+        p = ArvPathMapper(arvrunner, [], "", "%s", "%s/%s")
+        self.assertTrue(p.needs_new_collection(a))
+
+        # A file with a secondary file in the same collection.  Don't need
+        # a new collection.
+        a = {
+            "class": "File",
+            "location": "keep:99999999999999999999999999999991+99/hw.py",
+            "basename": "hw.py",
+            "secondaryFiles": [{
+                "class": "File",
+                "location": "keep:99999999999999999999999999999991+99/hw.pyc",
+                "basename": "hw.pyc"
+            }]
+        }
+        p = ArvPathMapper(arvrunner, [], "", "%s", "%s/%s")
+        p._pathmap["keep:99999999999999999999999999999991+99/hw.py"] = True
+        p._pathmap["keep:99999999999999999999999999999991+99/hw.pyc"] = True
+        self.assertFalse(p.needs_new_collection(a))
+
+        # Secondary file is in a different collection from the
+        # a new collectionprimary.  Need a new collection.
+        a = {
+            "class": "File",
+            "location": "keep:99999999999999999999999999999991+99/hw.py",
+            "basename": "hw.py",
+            "secondaryFiles": [{
+                "class": "File",
+                "location": "keep:99999999999999999999999999999992+99/hw.pyc",
+                "basename": "hw.pyc"
+            }]
+        }
+        p = ArvPathMapper(arvrunner, [], "", "%s", "%s/%s")
+        p._pathmap["keep:99999999999999999999999999999991+99/hw.py"] = True
+        p._pathmap["keep:99999999999999999999999999999992+99/hw.pyc"] = True
+        self.assertTrue(p.needs_new_collection(a))
+
+        # Secondary file should be staged to a different name than
+        # path in location.  Need a new collection.
+        a = {
+            "class": "File",
+            "location": "keep:99999999999999999999999999999991+99/hw.py",
+            "basename": "hw.py",
+            "secondaryFiles": [{
+                "class": "File",
+                "location": "keep:99999999999999999999999999999991+99/hw.pyc",
+                "basename": "hw.other"
+            }]
+        }
+        p = ArvPathMapper(arvrunner, [], "", "%s", "%s/%s")
+        p._pathmap["keep:99999999999999999999999999999991+99/hw.py"] = True
+        p._pathmap["keep:99999999999999999999999999999991+99/hw.pyc"] = True
+        self.assertTrue(p.needs_new_collection(a))
+
+        # Secondary file is a directory.  Do not need a new collection.
+        a = {
+            "class": "File",
+            "location": "keep:99999999999999999999999999999991+99/hw.py",
+            "basename": "hw.py",
+            "secondaryFiles": [{
+                "class": "Directory",
+                "location": "keep:99999999999999999999999999999991+99/hw",
+                "basename": "hw",
+                "listing": [{
+                    "class": "File",
+                    "location": "keep:99999999999999999999999999999991+99/hw/h2",
+                    "basename": "h2"
+                }]
+            }]
+        }
+        p = ArvPathMapper(arvrunner, [], "", "%s", "%s/%s")
+        p._pathmap["keep:99999999999999999999999999999991+99/hw.py"] = True
+        p._pathmap["keep:99999999999999999999999999999991+99/hw"] = True
+        p._pathmap["keep:99999999999999999999999999999991+99/hw/h2"] = True
+        self.assertFalse(p.needs_new_collection(a))
+
+        # Secondary file is a renamed directory.  Need a new collection.
+        a = {
+            "class": "File",
+            "location": "keep:99999999999999999999999999999991+99/hw.py",
+            "basename": "hw.py",
+            "secondaryFiles": [{
+                "class": "Directory",
+                "location": "keep:99999999999999999999999999999991+99/hw",
+                "basename": "wh",
+                "listing": [{
+                    "class": "File",
+                    "location": "keep:99999999999999999999999999999991+99/hw/h2",
+                    "basename": "h2"
+                }]
+            }]
+        }
+        p = ArvPathMapper(arvrunner, [], "", "%s", "%s/%s")
+        p._pathmap["keep:99999999999999999999999999999991+99/hw.py"] = True
+        p._pathmap["keep:99999999999999999999999999999991+99/hw"] = True
+        p._pathmap["keep:99999999999999999999999999999991+99/hw/h2"] = True
+        self.assertTrue(p.needs_new_collection(a))
+
+        # Secondary file is a file literal.  Need a new collection.
+        a = {
+            "class": "File",
+            "location": "keep:99999999999999999999999999999991+99/hw.py",
+            "basename": "hw.py",
+            "secondaryFiles": [{
+                "class": "File",
+                "location": "_:123",
+                "basename": "hw.pyc",
+                "contents": "123"
+            }]
+        }
+        p = ArvPathMapper(arvrunner, [], "", "%s", "%s/%s")
+        p._pathmap["keep:99999999999999999999999999999991+99/hw.py"] = True
+        p._pathmap["_:123"] = True
+        self.assertTrue(p.needs_new_collection(a))