"""Convert container-local paths to and from Keep collection ids."""
def __init__(self, arvrunner, referenced_files, input_basedir,
- collection_pattern, file_pattern, name=None, single_collection=False):
+ collection_pattern, file_pattern, name=None, single_collection=False,
+ optional_deps=None):
self.arvrunner = arvrunner
self.input_basedir = input_basedir
self.collection_pattern = collection_pattern
self.referenced_files = [r["location"] for r in referenced_files]
self.single_collection = single_collection
self.pdh_to_uuid = {}
+ self.optional_deps = optional_deps or []
super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
def visit(self, srcobj, uploadfiles):
if isinstance(src, basestring) and src.startswith("keep:"):
if collection_pdh_pattern.match(src):
self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
+
if arvados_cwl.util.collectionUUID in srcobj:
self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
elif not collection_uuid_pattern.match(src):
f.write(obj["contents"])
remap.append((obj["location"], path + "/" + obj["basename"]))
else:
+ for opt in self.optional_deps:
+ if obj["location"] == opt["location"]:
+ return
raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
def needs_new_collection(self, srcobj, prefix=""):
loc = srcobj["location"]
if loc.startswith("_:"):
return True
- if prefix:
- if loc != prefix+srcobj["basename"]:
- return True
+
+ i = loc.rfind("/")
+ if i > -1:
+ loc_prefix = loc[:i+1]
+ if not prefix:
+ prefix = loc_prefix
+ # quote/unquote to ensure consistent quoting
+ suffix = urllib.parse.quote(urllib.parse.unquote(loc[i+1:]), "/+@")
else:
- i = loc.rfind("/")
- if i > -1:
- prefix = loc[:i+1]
- else:
- prefix = loc+"/"
+ # no '/' found
+ loc_prefix = loc+"/"
+ prefix = loc+"/"
+ suffix = ""
+
+ if prefix != loc_prefix:
+ return True
+
+ if "basename" in srcobj and suffix != urllib.parse.quote(srcobj["basename"], "/+@"):
+ return True
+
if srcobj["class"] == "File" and loc not in self._pathmap:
return True
for s in srcobj.get("secondaryFiles", []):
if self.needs_new_collection(s, prefix):
return True
if srcobj.get("listing"):
- prefix = "%s%s/" % (prefix, srcobj["basename"])
+ prefix = "%s%s/" % (prefix, urllib.parse.quote(srcobj.get("basename", suffix), "/+@"))
for l in srcobj["listing"]:
if self.needs_new_collection(l, prefix):
return True
packed=False)
for src, ab, st in uploadfiles:
- self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
+ self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), urllib.parse.quote(self.collection_pattern % st.fn[5:], "/:+@"),
"Directory" if os.path.isdir(ab) else "File", True)
for srcobj in referenced_files:
ab = self.collection_pattern % c.portable_data_hash()
self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
- elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
- (srcobj["location"].startswith("_:") and "contents" in srcobj)):
-
- # If all secondary files/directories are located in
- # the same collection as the primary file and the
- # paths and names that are consistent with staging,
- # don't create a new collection.
- if not self.needs_new_collection(srcobj):
- continue
-
+ elif srcobj["class"] == "File" and self.needs_new_collection(srcobj):
c = arvados.collection.Collection(api_client=self.arvrunner.api,
keep_client=self.arvrunner.keep_client,
- num_retries=self.arvrunner.num_retries )
+ num_retries=self.arvrunner.num_retries)
self.addentry(srcobj, c, ".", remap)
container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
sc_result = scandeps(uri, scanobj,
loadref_fields,
- set(("$include", "$schemas", "location")),
+ set(("$include", "location")),
loadref, urljoin=document_loader.fetcher.urljoin,
nestdirs=False)
+ optional_deps = scandeps(uri, scanobj,
+ loadref_fields,
+ set(("$schemas",)),
+ loadref, urljoin=document_loader.fetcher.urljoin,
+ nestdirs=False)
+
+ sc_result.extend(optional_deps)
+
sc = []
uuids = {}
if include_primary and "id" in workflowobj:
sc.append({"class": "File", "location": workflowobj["id"]})
- if "$schemas" in workflowobj:
- for s in workflowobj["$schemas"]:
- sc.append({"class": "File", "location": s})
-
def visit_default(obj):
- remove = [False]
- def ensure_default_location(f):
+ def defaults_are_optional(f):
if "location" not in f and "path" in f:
f["location"] = f["path"]
del f["path"]
- if "location" in f and not arvrunner.fs_access.exists(f["location"]):
- # Doesn't exist, remove from list of dependencies to upload
- sc[:] = [x for x in sc if x["location"] != f["location"]]
- # Delete "default" from workflowobj
- remove[0] = True
- visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
- if remove[0]:
- del obj["default"]
+ optional_deps.append(f)
+ visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
find_defaults(workflowobj, visit_default)
"keep:%s",
"keep:%s/%s",
name=name,
- single_collection=True)
+ single_collection=True,
+ optional_deps=optional_deps)
def setloc(p):
loc = p.get("location")