18994: Fixing quoting WIP
authorPeter Amstutz <peter.amstutz@curii.com>
Fri, 15 Apr 2022 15:48:59 +0000 (11:48 -0400)
committerPeter Amstutz <peter.amstutz@curii.com>
Fri, 15 Apr 2022 15:48:59 +0000 (11:48 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

sdk/cwl/arvados_cwl/pathmapper.py
sdk/cwl/arvados_cwl/runner.py

index dbfa9902c3fcf7110a4a7f8688aa5c92f81bce6d..0cbf9eb56bfa3836e3d024b79ecbee50e4bf56e8 100644 (file)
@@ -52,7 +52,8 @@ class ArvPathMapper(PathMapper):
     """Convert container-local paths to and from Keep collection ids."""
 
     def __init__(self, arvrunner, referenced_files, input_basedir,
     """Convert container-local paths to and from Keep collection ids."""
 
     def __init__(self, arvrunner, referenced_files, input_basedir,
-                 collection_pattern, file_pattern, name=None, single_collection=False):
+                 collection_pattern, file_pattern, name=None, single_collection=False,
+                 optional_deps=None):
         self.arvrunner = arvrunner
         self.input_basedir = input_basedir
         self.collection_pattern = collection_pattern
         self.arvrunner = arvrunner
         self.input_basedir = input_basedir
         self.collection_pattern = collection_pattern
@@ -61,6 +62,7 @@ class ArvPathMapper(PathMapper):
         self.referenced_files = [r["location"] for r in referenced_files]
         self.single_collection = single_collection
         self.pdh_to_uuid = {}
         self.referenced_files = [r["location"] for r in referenced_files]
         self.single_collection = single_collection
         self.pdh_to_uuid = {}
+        self.optional_deps = optional_deps or []
         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
 
     def visit(self, srcobj, uploadfiles):
         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
 
     def visit(self, srcobj, uploadfiles):
@@ -72,7 +74,9 @@ class ArvPathMapper(PathMapper):
 
         if isinstance(src, basestring) and src.startswith("keep:"):
             if collection_pdh_pattern.match(src):
 
         if isinstance(src, basestring) and src.startswith("keep:"):
             if collection_pdh_pattern.match(src):
+                #self._pathmap[src] = MapperEnt(src, self.collection_pattern % src[5:], srcobj["class"], True)
                 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
                 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
+
                 if arvados_cwl.util.collectionUUID in srcobj:
                     self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
             elif not collection_uuid_pattern.match(src):
                 if arvados_cwl.util.collectionUUID in srcobj:
                     self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
             elif not collection_uuid_pattern.match(src):
@@ -90,9 +94,12 @@ class ArvPathMapper(PathMapper):
                                                    raiseOSError=True)
                 with SourceLine(srcobj, "location", WorkflowException, debug):
                     if isinstance(st, arvados.commands.run.UploadFile):
                                                    raiseOSError=True)
                 with SourceLine(srcobj, "location", WorkflowException, debug):
                     if isinstance(st, arvados.commands.run.UploadFile):
+                        print("VV", (src, ab, st))
                         uploadfiles.add((src, ab, st))
                     elif isinstance(st, arvados.commands.run.ArvFile):
                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
                         uploadfiles.add((src, ab, st))
                     elif isinstance(st, arvados.commands.run.ArvFile):
                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
+
+                        #self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % st.fn[5:], "File", True)
                     else:
                         raise WorkflowException("Input file path '%s' is invalid" % st)
             elif src.startswith("_:"):
                     else:
                         raise WorkflowException("Input file path '%s' is invalid" % st)
             elif src.startswith("_:"):
@@ -118,6 +125,7 @@ class ArvPathMapper(PathMapper):
                 self.visit(l, uploadfiles)
 
     def addentry(self, obj, c, path, remap):
                 self.visit(l, uploadfiles)
 
     def addentry(self, obj, c, path, remap):
+        print(obj["location"], self._pathmap)
         if obj["location"] in self._pathmap:
             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
             if srcpath == "":
         if obj["location"] in self._pathmap:
             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
             if srcpath == "":
@@ -135,6 +143,9 @@ class ArvPathMapper(PathMapper):
                 f.write(obj["contents"])
             remap.append((obj["location"], path + "/" + obj["basename"]))
         else:
                 f.write(obj["contents"])
             remap.append((obj["location"], path + "/" + obj["basename"]))
         else:
+            for opt in self.optional_deps:
+                if obj["location"] == opt["location"]:
+                    return
             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
 
     def needs_new_collection(self, srcobj, prefix=""):
             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
 
     def needs_new_collection(self, srcobj, prefix=""):
@@ -149,14 +160,19 @@ class ArvPathMapper(PathMapper):
         loc = srcobj["location"]
         if loc.startswith("_:"):
             return True
         loc = srcobj["location"]
         if loc.startswith("_:"):
             return True
+
         if not prefix:
             i = loc.rfind("/")
             if i > -1:
                 prefix = loc[:i+1]
         if not prefix:
             i = loc.rfind("/")
             if i > -1:
                 prefix = loc[:i+1]
+                suffix = urllib.parse.quote(urllib.parse.unquote(loc[i+1:]), "/+@")
             else:
                 prefix = loc+"/"
             else:
                 prefix = loc+"/"
+                suffix = ""
 
 
-        if loc != prefix+srcobj["basename"]:
+        print("LLL", prefix+suffix, prefix+urllib.parse.quote(srcobj["basename"], "/+@"))
+        if prefix+suffix != prefix+urllib.parse.quote(srcobj["basename"], "/+@"):
+            print("LLL -> needs new collection")
             return True
 
         if srcobj["class"] == "File" and loc not in self._pathmap:
             return True
 
         if srcobj["class"] == "File" and loc not in self._pathmap:
@@ -195,9 +211,12 @@ class ArvPathMapper(PathMapper):
                                          packed=False)
 
         for src, ab, st in uploadfiles:
                                          packed=False)
 
         for src, ab, st in uploadfiles:
-            self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
+            print("BBBBB", src, ab, st.fn, urllib.parse.quote(st.fn, "/:+@"))
+            self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), urllib.parse.quote(self.collection_pattern % st.fn[5:], "/:+@"),
                                            "Directory" if os.path.isdir(ab) else "File", True)
 
                                            "Directory" if os.path.isdir(ab) else "File", True)
 
+        print("CCCCC", self._pathmap)
+
         for srcobj in referenced_files:
             remap = []
             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
         for srcobj in referenced_files:
             remap = []
             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
@@ -221,7 +240,7 @@ class ArvPathMapper(PathMapper):
             elif srcobj["class"] == "File" and self.needs_new_collection(srcobj):
                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
                                                   keep_client=self.arvrunner.keep_client,
             elif srcobj["class"] == "File" and self.needs_new_collection(srcobj):
                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
                                                   keep_client=self.arvrunner.keep_client,
-                                                  num_retries=self.arvrunner.num_retries                                                  )
+                                                  num_retries=self.arvrunner.num_retries)
                 self.addentry(srcobj, c, ".", remap)
 
                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
                 self.addentry(srcobj, c, ".", remap)
 
                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
index 38e2c4d806f36ed80f9dac70da8cf37006f3de19..eae7a77896c702976eb457bbade436a76e4441f5 100644 (file)
@@ -285,10 +285,18 @@ def upload_dependencies(arvrunner, name, document_loader,
 
     sc_result = scandeps(uri, scanobj,
                          loadref_fields,
 
     sc_result = scandeps(uri, scanobj,
                          loadref_fields,
-                         set(("$include", "$schemas", "location")),
+                         set(("$include", "location")),
                          loadref, urljoin=document_loader.fetcher.urljoin,
                          nestdirs=False)
 
                          loadref, urljoin=document_loader.fetcher.urljoin,
                          nestdirs=False)
 
+    optional_deps = scandeps(uri, scanobj,
+                                  loadref_fields,
+                                  set(("$schemas",)),
+                                  loadref, urljoin=document_loader.fetcher.urljoin,
+                                  nestdirs=False)
+
+    sc_result.extend(optional_deps)
+
     sc = []
     uuids = {}
 
     sc = []
     uuids = {}
 
@@ -345,24 +353,25 @@ def upload_dependencies(arvrunner, name, document_loader,
     if include_primary and "id" in workflowobj:
         sc.append({"class": "File", "location": workflowobj["id"]})
 
     if include_primary and "id" in workflowobj:
         sc.append({"class": "File", "location": workflowobj["id"]})
 
-    if "$schemas" in workflowobj:
-        for s in workflowobj["$schemas"]:
-            sc.append({"class": "File", "location": s})
+    #if "$schemas" in workflowobj:
+    #    for s in workflowobj["$schemas"]:
+    #        sc.append({"class": "File", "location": s})
 
     def visit_default(obj):
 
     def visit_default(obj):
-        remove = [False]
+        #remove = [False]
         def ensure_default_location(f):
             if "location" not in f and "path" in f:
                 f["location"] = f["path"]
                 del f["path"]
         def ensure_default_location(f):
             if "location" not in f and "path" in f:
                 f["location"] = f["path"]
                 del f["path"]
-            if "location" in f and not arvrunner.fs_access.exists(f["location"]):
-                # Doesn't exist, remove from list of dependencies to upload
-                sc[:] = [x for x in sc if x["location"] != f["location"]]
-                # Delete "default" from workflowobj
-                remove[0] = True
+            optional_deps.append(f)
+            #if "location" in f and not arvrunner.fs_access.exists(f["location"]):
+            #    # Doesn't exist, remove from list of dependencies to upload
+            #    sc[:] = [x for x in sc if x["location"] != f["location"]]
+            #    # Delete "default" from workflowobj
+            #    remove[0] = True
         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
-        if remove[0]:
-            del obj["default"]
+        #if remove[0]:
+        #    del obj["default"]
 
     find_defaults(workflowobj, visit_default)
 
 
     find_defaults(workflowobj, visit_default)
 
@@ -394,11 +403,16 @@ def upload_dependencies(arvrunner, name, document_loader,
         else:
             del discovered[d]
 
         else:
             del discovered[d]
 
+    print("NN", sc)
+
     mapper = ArvPathMapper(arvrunner, sc, "",
                            "keep:%s",
                            "keep:%s/%s",
                            name=name,
     mapper = ArvPathMapper(arvrunner, sc, "",
                            "keep:%s",
                            "keep:%s/%s",
                            name=name,
-                           single_collection=True)
+                           single_collection=True,
+                           optional_deps=optional_deps)
+
+    print("whargh", mapper._pathmap)
 
     def setloc(p):
         loc = p.get("location")
 
     def setloc(p):
         loc = p.get("location")