"""Convert container-local paths to and from Keep collection ids."""
def __init__(self, arvrunner, referenced_files, input_basedir,
- collection_pattern, file_pattern, name=None, single_collection=False):
+ collection_pattern, file_pattern, name=None, single_collection=False,
+ optional_deps=None):
self.arvrunner = arvrunner
self.input_basedir = input_basedir
self.collection_pattern = collection_pattern
self.referenced_files = [r["location"] for r in referenced_files]
self.single_collection = single_collection
self.pdh_to_uuid = {}
+ self.optional_deps = optional_deps or []
super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
def visit(self, srcobj, uploadfiles):
if isinstance(src, basestring) and src.startswith("keep:"):
if collection_pdh_pattern.match(src):
+ #self._pathmap[src] = MapperEnt(src, self.collection_pattern % src[5:], srcobj["class"], True)
self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
+
if arvados_cwl.util.collectionUUID in srcobj:
self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
elif not collection_uuid_pattern.match(src):
raiseOSError=True)
with SourceLine(srcobj, "location", WorkflowException, debug):
if isinstance(st, arvados.commands.run.UploadFile):
+ print("VV", (src, ab, st))
uploadfiles.add((src, ab, st))
elif isinstance(st, arvados.commands.run.ArvFile):
self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
+
+ #self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % st.fn[5:], "File", True)
else:
raise WorkflowException("Input file path '%s' is invalid" % st)
elif src.startswith("_:"):
self.visit(l, uploadfiles)
def addentry(self, obj, c, path, remap):
+ print(obj["location"], self._pathmap)
if obj["location"] in self._pathmap:
src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
if srcpath == "":
f.write(obj["contents"])
remap.append((obj["location"], path + "/" + obj["basename"]))
else:
+ for opt in self.optional_deps:
+ if obj["location"] == opt["location"]:
+ return
raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
def needs_new_collection(self, srcobj, prefix=""):
loc = srcobj["location"]
if loc.startswith("_:"):
return True
+
if not prefix:
i = loc.rfind("/")
if i > -1:
prefix = loc[:i+1]
+ suffix = urllib.parse.quote(urllib.parse.unquote(loc[i+1:]), "/+@")
else:
prefix = loc+"/"
+ suffix = ""
- if loc != prefix+srcobj["basename"]:
+ print("LLL", prefix+suffix, prefix+urllib.parse.quote(srcobj["basename"], "/+@"))
+ if prefix+suffix != prefix+urllib.parse.quote(srcobj["basename"], "/+@"):
+ print("LLL -> needs new collection")
return True
if srcobj["class"] == "File" and loc not in self._pathmap:
packed=False)
for src, ab, st in uploadfiles:
- self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
+ print("BBBBB", src, ab, st.fn, urllib.parse.quote(st.fn, "/:+@"))
+ self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), urllib.parse.quote(self.collection_pattern % st.fn[5:], "/:+@"),
"Directory" if os.path.isdir(ab) else "File", True)
+ print("CCCCC", self._pathmap)
+
for srcobj in referenced_files:
remap = []
if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
elif srcobj["class"] == "File" and self.needs_new_collection(srcobj):
c = arvados.collection.Collection(api_client=self.arvrunner.api,
keep_client=self.arvrunner.keep_client,
- num_retries=self.arvrunner.num_retries )
+ num_retries=self.arvrunner.num_retries)
self.addentry(srcobj, c, ".", remap)
container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
sc_result = scandeps(uri, scanobj,
loadref_fields,
- set(("$include", "$schemas", "location")),
+ set(("$include", "location")),
loadref, urljoin=document_loader.fetcher.urljoin,
nestdirs=False)
+ optional_deps = scandeps(uri, scanobj,
+ loadref_fields,
+ set(("$schemas",)),
+ loadref, urljoin=document_loader.fetcher.urljoin,
+ nestdirs=False)
+
+ sc_result.extend(optional_deps)
+
sc = []
uuids = {}
if include_primary and "id" in workflowobj:
sc.append({"class": "File", "location": workflowobj["id"]})
- if "$schemas" in workflowobj:
- for s in workflowobj["$schemas"]:
- sc.append({"class": "File", "location": s})
+ #if "$schemas" in workflowobj:
+ # for s in workflowobj["$schemas"]:
+ # sc.append({"class": "File", "location": s})
def visit_default(obj):
- remove = [False]
+ #remove = [False]
def ensure_default_location(f):
if "location" not in f and "path" in f:
f["location"] = f["path"]
del f["path"]
- if "location" in f and not arvrunner.fs_access.exists(f["location"]):
- # Doesn't exist, remove from list of dependencies to upload
- sc[:] = [x for x in sc if x["location"] != f["location"]]
- # Delete "default" from workflowobj
- remove[0] = True
+ optional_deps.append(f)
+ #if "location" in f and not arvrunner.fs_access.exists(f["location"]):
+ # # Doesn't exist, remove from list of dependencies to upload
+ # sc[:] = [x for x in sc if x["location"] != f["location"]]
+ # # Delete "default" from workflowobj
+ # remove[0] = True
visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
- if remove[0]:
- del obj["default"]
+ #if remove[0]:
+ # del obj["default"]
find_defaults(workflowobj, visit_default)
else:
del discovered[d]
+ print("NN", sc)
+
mapper = ArvPathMapper(arvrunner, sc, "",
"keep:%s",
"keep:%s/%s",
name=name,
- single_collection=True)
+ single_collection=True,
+ optional_deps=optional_deps)
+
+ print("whargh", mapper._pathmap)
def setloc(p):
loc = p.get("location")