From: Peter Amstutz Date: Fri, 15 Apr 2022 15:48:59 +0000 (-0400) Subject: 18994: Fixing quoting WIP X-Git-Tag: 2.5.0~199^2~5 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/29e3fe60d22218d122a1a3448a144bfcfd64785a 18994: Fixing quoting WIP Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- diff --git a/sdk/cwl/arvados_cwl/pathmapper.py b/sdk/cwl/arvados_cwl/pathmapper.py index dbfa9902c3..0cbf9eb56b 100644 --- a/sdk/cwl/arvados_cwl/pathmapper.py +++ b/sdk/cwl/arvados_cwl/pathmapper.py @@ -52,7 +52,8 @@ class ArvPathMapper(PathMapper): """Convert container-local paths to and from Keep collection ids.""" def __init__(self, arvrunner, referenced_files, input_basedir, - collection_pattern, file_pattern, name=None, single_collection=False): + collection_pattern, file_pattern, name=None, single_collection=False, + optional_deps=None): self.arvrunner = arvrunner self.input_basedir = input_basedir self.collection_pattern = collection_pattern @@ -61,6 +62,7 @@ class ArvPathMapper(PathMapper): self.referenced_files = [r["location"] for r in referenced_files] self.single_collection = single_collection self.pdh_to_uuid = {} + self.optional_deps = optional_deps or [] super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None) def visit(self, srcobj, uploadfiles): @@ -72,7 +74,9 @@ class ArvPathMapper(PathMapper): if isinstance(src, basestring) and src.startswith("keep:"): if collection_pdh_pattern.match(src): + #self._pathmap[src] = MapperEnt(src, self.collection_pattern % src[5:], srcobj["class"], True) self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True) + if arvados_cwl.util.collectionUUID in srcobj: self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID] elif not collection_uuid_pattern.match(src): @@ -90,9 +94,12 @@ class ArvPathMapper(PathMapper): raiseOSError=True) with SourceLine(srcobj, "location", WorkflowException, debug): if isinstance(st, arvados.commands.run.UploadFile): + print("VV", (src, ab, st)) uploadfiles.add((src, ab, st)) elif isinstance(st, arvados.commands.run.ArvFile): self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True) + + #self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % st.fn[5:], "File", True) else: raise WorkflowException("Input file path '%s' is invalid" % st) elif src.startswith("_:"): @@ -118,6 +125,7 @@ class ArvPathMapper(PathMapper): self.visit(l, uploadfiles) def addentry(self, obj, c, path, remap): + print(obj["location"], self._pathmap) if obj["location"] in self._pathmap: src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved) if srcpath == "": @@ -135,6 +143,9 @@ class ArvPathMapper(PathMapper): f.write(obj["contents"]) remap.append((obj["location"], path + "/" + obj["basename"])) else: + for opt in self.optional_deps: + if obj["location"] == opt["location"]: + return raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"]) def needs_new_collection(self, srcobj, prefix=""): @@ -149,14 +160,19 @@ class ArvPathMapper(PathMapper): loc = srcobj["location"] if loc.startswith("_:"): return True + if not prefix: i = loc.rfind("/") if i > -1: prefix = loc[:i+1] + suffix = urllib.parse.quote(urllib.parse.unquote(loc[i+1:]), "/+@") else: prefix = loc+"/" + suffix = "" - if loc != prefix+srcobj["basename"]: + print("LLL", prefix+suffix, prefix+urllib.parse.quote(srcobj["basename"], "/+@")) + if prefix+suffix != prefix+urllib.parse.quote(srcobj["basename"], "/+@"): + print("LLL -> needs new collection") return True if srcobj["class"] == "File" and loc not in self._pathmap: @@ -195,9 +211,12 @@ class ArvPathMapper(PathMapper): packed=False) for src, ab, st in uploadfiles: - self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:], + print("BBBBB", src, ab, st.fn, urllib.parse.quote(st.fn, "/:+@")) + self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), urllib.parse.quote(self.collection_pattern % st.fn[5:], "/:+@"), "Directory" if os.path.isdir(ab) else "File", True) + print("CCCCC", self._pathmap) + for srcobj in referenced_files: remap = [] if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap: @@ -221,7 +240,7 @@ class ArvPathMapper(PathMapper): elif srcobj["class"] == "File" and self.needs_new_collection(srcobj): c = arvados.collection.Collection(api_client=self.arvrunner.api, keep_client=self.arvrunner.keep_client, - num_retries=self.arvrunner.num_retries ) + num_retries=self.arvrunner.num_retries) self.addentry(srcobj, c, ".", remap) container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger) diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index 38e2c4d806..eae7a77896 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -285,10 +285,18 @@ def upload_dependencies(arvrunner, name, document_loader, sc_result = scandeps(uri, scanobj, loadref_fields, - set(("$include", "$schemas", "location")), + set(("$include", "location")), loadref, urljoin=document_loader.fetcher.urljoin, nestdirs=False) + optional_deps = scandeps(uri, scanobj, + loadref_fields, + set(("$schemas",)), + loadref, urljoin=document_loader.fetcher.urljoin, + nestdirs=False) + + sc_result.extend(optional_deps) + sc = [] uuids = {} @@ -345,24 +353,25 @@ def upload_dependencies(arvrunner, name, document_loader, if include_primary and "id" in workflowobj: sc.append({"class": "File", "location": workflowobj["id"]}) - if "$schemas" in workflowobj: - for s in workflowobj["$schemas"]: - sc.append({"class": "File", "location": s}) + #if "$schemas" in workflowobj: + # for s in workflowobj["$schemas"]: + # sc.append({"class": "File", "location": s}) def visit_default(obj): - remove = [False] + #remove = [False] def ensure_default_location(f): if "location" not in f and "path" in f: f["location"] = f["path"] del f["path"] - if "location" in f and not arvrunner.fs_access.exists(f["location"]): - # Doesn't exist, remove from list of dependencies to upload - sc[:] = [x for x in sc if x["location"] != f["location"]] - # Delete "default" from workflowobj - remove[0] = True + optional_deps.append(f) + #if "location" in f and not arvrunner.fs_access.exists(f["location"]): + # # Doesn't exist, remove from list of dependencies to upload + # sc[:] = [x for x in sc if x["location"] != f["location"]] + # # Delete "default" from workflowobj + # remove[0] = True visit_class(obj["default"], ("File", "Directory"), ensure_default_location) - if remove[0]: - del obj["default"] + #if remove[0]: + # del obj["default"] find_defaults(workflowobj, visit_default) @@ -394,11 +403,16 @@ def upload_dependencies(arvrunner, name, document_loader, else: del discovered[d] + print("NN", sc) + mapper = ArvPathMapper(arvrunner, sc, "", "keep:%s", "keep:%s/%s", name=name, - single_collection=True) + single_collection=True, + optional_deps=optional_deps) + + print("whargh", mapper._pathmap) def setloc(p): loc = p.get("location")