from cwltool.stdfsaccess import abspath
from cwltool.workflow import WorkflowException
-from .http import http_to_keep
+from arvados.http_to_keep import http_to_keep
logger = logging.getLogger('arvados.cwl-runner')
"""Convert container-local paths to and from Keep collection ids."""
def __init__(self, arvrunner, referenced_files, input_basedir,
- collection_pattern, file_pattern, name=None, single_collection=False):
+ collection_pattern, file_pattern, name=None, single_collection=False,
+ optional_deps=None):
self.arvrunner = arvrunner
self.input_basedir = input_basedir
self.collection_pattern = collection_pattern
self.referenced_files = [r["location"] for r in referenced_files]
self.single_collection = single_collection
self.pdh_to_uuid = {}
+ self.optional_deps = optional_deps or []
super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
def visit(self, srcobj, uploadfiles):
if isinstance(src, basestring) and src.startswith("keep:"):
if collection_pdh_pattern.match(src):
self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
+
if arvados_cwl.util.collectionUUID in srcobj:
self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
elif not collection_uuid_pattern.match(src):
raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
elif src.startswith("http:") or src.startswith("https:"):
try:
- keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
- logger.info("%s is %s", src, keepref)
- self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
+ if self.arvrunner.defer_downloads:
+ # passthrough, we'll download it later.
+ self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
+ else:
+ keepref = "keep:%s/%s" % http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src,
+ varying_url_params=self.arvrunner.toplevel_runtimeContext.varying_url_params,
+ prefer_cached_downloads=self.arvrunner.toplevel_runtimeContext.prefer_cached_downloads)
+ logger.info("%s is %s", src, keepref)
+ self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
except Exception as e:
- logger.warning(str(e))
+ logger.warning("Download error: %s", e)
else:
self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
f.write(obj["contents"])
remap.append((obj["location"], path + "/" + obj["basename"]))
else:
- raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
+ for opt in self.optional_deps:
+ if obj["location"] == opt["location"]:
+ return
+ raise SourceLine(obj, "location", WorkflowException).makeError("Can't handle '%s'" % obj["location"])
def needs_new_collection(self, srcobj, prefix=""):
"""Check if files need to be staged into a new collection.
loc = srcobj["location"]
if loc.startswith("_:"):
return True
- if not prefix:
- i = loc.rfind("/")
- if i > -1:
- prefix = loc[:i+1]
- else:
- prefix = loc+"/"
- if loc != prefix+srcobj["basename"]:
+ if self.arvrunner.defer_downloads and (loc.startswith("http:") or loc.startswith("https:")):
+ return False
+
+ i = loc.rfind("/")
+ if i > -1:
+ loc_prefix = loc[:i+1]
+ if not prefix:
+ prefix = loc_prefix
+ # quote/unquote to ensure consistent quoting
+ suffix = urllib.parse.quote(urllib.parse.unquote(loc[i+1:]), "/+@")
+ else:
+ # no '/' found
+ loc_prefix = loc+"/"
+ prefix = loc+"/"
+ suffix = ""
+
+ if prefix != loc_prefix:
+ return True
+
+ if "basename" in srcobj and suffix != urllib.parse.quote(srcobj["basename"], "/+@"):
return True
if srcobj["class"] == "File" and loc not in self._pathmap:
if self.needs_new_collection(s, prefix):
return True
if srcobj.get("listing"):
- prefix = "%s%s/" % (prefix, srcobj["basename"])
+ prefix = "%s%s/" % (prefix, urllib.parse.quote(srcobj.get("basename", suffix), "/+@"))
for l in srcobj["listing"]:
if self.needs_new_collection(l, prefix):
return True
packed=False)
for src, ab, st in uploadfiles:
- self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
+ self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), urllib.parse.quote(self.collection_pattern % st.fn[5:], "/:+@"),
"Directory" if os.path.isdir(ab) else "File", True)
for srcobj in referenced_files:
elif srcobj["class"] == "File" and self.needs_new_collection(srcobj):
c = arvados.collection.Collection(api_client=self.arvrunner.api,
keep_client=self.arvrunner.keep_client,
- num_retries=self.arvrunner.num_retries )
+ num_retries=self.arvrunner.num_retries)
self.addentry(srcobj, c, ".", remap)
container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)