#
# SPDX-License-Identifier: Apache-2.0
-from future import standard_library
-standard_library.install_aliases()
-from builtins import str
-from past.builtins import basestring
-from future.utils import viewitems
-
import re
import logging
import uuid
from schema_salad.sourceline import SourceLine
from arvados.errors import ApiError
-from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
+from cwltool.pathmapper import PathMapper, MapperEnt
+from cwltool.utils import adjustFileObjs, adjustDirObjs
+from cwltool.stdfsaccess import abspath
from cwltool.workflow import WorkflowException
-from .http import http_to_keep
+from arvados.http_to_keep import http_to_keep
logger = logging.getLogger('arvados.cwl-runner')
"""Convert container-local paths to and from Keep collection ids."""
def __init__(self, arvrunner, referenced_files, input_basedir,
- collection_pattern, file_pattern, name=None, single_collection=False):
+ collection_pattern, file_pattern, name=None, single_collection=False,
+ optional_deps=None):
self.arvrunner = arvrunner
self.input_basedir = input_basedir
self.collection_pattern = collection_pattern
self.referenced_files = [r["location"] for r in referenced_files]
self.single_collection = single_collection
self.pdh_to_uuid = {}
+ self.optional_deps = optional_deps or []
super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
def visit(self, srcobj, uploadfiles):
debug = logger.isEnabledFor(logging.DEBUG)
- if isinstance(src, basestring) and src.startswith("keep:"):
+ if isinstance(src, str) and src.startswith("keep:"):
if collection_pdh_pattern.match(src):
self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
+
if arvados_cwl.util.collectionUUID in srcobj:
self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
elif not collection_uuid_pattern.match(src):
if srcobj["class"] == "Directory" and "listing" not in srcobj:
raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
elif src.startswith("http:") or src.startswith("https:"):
- keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
- logger.info("%s is %s", src, keepref)
- self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
+ try:
+ if self.arvrunner.defer_downloads:
+ # passthrough, we'll download it later.
+ self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
+ else:
+ results = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src,
+ varying_url_params=self.arvrunner.toplevel_runtimeContext.varying_url_params,
+ prefer_cached_downloads=self.arvrunner.toplevel_runtimeContext.prefer_cached_downloads)
+ keepref = "keep:%s/%s" % (results[0], results[1])
+ logger.info("%s is %s", src, keepref)
+ self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
+ except Exception as e:
+ logger.warning("Download error: %s", e)
else:
self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
f.write(obj["contents"])
remap.append((obj["location"], path + "/" + obj["basename"]))
else:
- raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
+ for opt in self.optional_deps:
+ if obj["location"] == opt["location"]:
+ return
+ raise SourceLine(obj, "location", WorkflowException).makeError("Can't handle '%s'" % obj["location"])
def needs_new_collection(self, srcobj, prefix=""):
"""Check if files need to be staged into a new collection.
loc = srcobj["location"]
if loc.startswith("_:"):
return True
- if prefix:
- if loc != prefix+srcobj["basename"]:
- return True
+
+ if self.arvrunner.defer_downloads and (loc.startswith("http:") or loc.startswith("https:")):
+ return False
+
+ i = loc.rfind("/")
+ if i > -1:
+ loc_prefix = loc[:i+1]
+ if not prefix:
+ prefix = loc_prefix
+ # quote/unquote to ensure consistent quoting
+ suffix = urllib.parse.quote(urllib.parse.unquote(loc[i+1:]), "/+@")
else:
- i = loc.rfind("/")
- if i > -1:
- prefix = loc[:i+1]
- else:
- prefix = loc+"/"
+ # no '/' found
+ loc_prefix = loc+"/"
+ prefix = loc+"/"
+ suffix = ""
+
+ if prefix != loc_prefix:
+ return True
+
+ if "basename" in srcobj and suffix != urllib.parse.quote(srcobj["basename"], "/+@"):
+ return True
+
if srcobj["class"] == "File" and loc not in self._pathmap:
return True
for s in srcobj.get("secondaryFiles", []):
if self.needs_new_collection(s, prefix):
return True
if srcobj.get("listing"):
- prefix = "%s%s/" % (prefix, srcobj["basename"])
+ prefix = "%s%s/" % (prefix, urllib.parse.quote(srcobj.get("basename", suffix), "/+@"))
for l in srcobj["listing"]:
if self.needs_new_collection(l, prefix):
return True
packed=False)
for src, ab, st in uploadfiles:
- self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
+ self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), urllib.parse.quote(self.collection_pattern % st.fn[5:], "/:+@"),
"Directory" if os.path.isdir(ab) else "File", True)
for srcobj in referenced_files:
ab = self.collection_pattern % c.portable_data_hash()
self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
- elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
- (srcobj["location"].startswith("_:") and "contents" in srcobj)):
-
- # If all secondary files/directories are located in
- # the same collection as the primary file and the
- # paths and names that are consistent with staging,
- # don't create a new collection.
- if not self.needs_new_collection(srcobj):
- continue
-
+ elif srcobj["class"] == "File" and self.needs_new_collection(srcobj):
c = arvados.collection.Collection(api_client=self.arvrunner.api,
keep_client=self.arvrunner.keep_client,
- num_retries=self.arvrunner.num_retries )
+ num_retries=self.arvrunner.num_retries)
self.addentry(srcobj, c, ".", remap)
container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
# Overridden to maintain the use case of mapping by source (identifier) to
# target regardless of how the map is structured interally.
def getMapperEnt(src):
- for k,v in viewitems(self._pathmap):
+ for k,v in self._pathmap.items():
if (v.type != "CreateFile" and v.resolved == src) or (v.type == "CreateFile" and k == src):
return v
# with any secondary files.
self.visitlisting(referenced_files, self.stagedir, basedir)
- for path, (ab, tgt, type, staged) in viewitems(self._pathmap):
+ for path, (ab, tgt, type, staged) in self._pathmap.items():
if type in ("File", "Directory") and ab.startswith("keep:"):
self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)