Merge branch '20531-cwl-log-tail' refs #20531
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
index a5dd5ccf390c564c45643bc6e6550827fcaa1595..539188fddd995b9cda5c58c89f1f8ef1dd96293a 100644 (file)
@@ -26,7 +26,7 @@ from cwltool.utils import adjustFileObjs, adjustDirObjs
 from cwltool.stdfsaccess import abspath
 from cwltool.workflow import WorkflowException
 
-from .http import http_to_keep
+from arvados.http_to_keep import http_to_keep
 
 logger = logging.getLogger('arvados.cwl-runner')
 
@@ -105,11 +105,17 @@ class ArvPathMapper(PathMapper):
                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
             elif src.startswith("http:") or src.startswith("https:"):
                 try:
-                    keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
-                    logger.info("%s is %s", src, keepref)
-                    self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
+                    if self.arvrunner.defer_downloads:
+                        # passthrough, we'll download it later.
+                        self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
+                    else:
+                        keepref = "keep:%s/%s" % http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src,
+                                                              varying_url_params=self.arvrunner.toplevel_runtimeContext.varying_url_params,
+                                                              prefer_cached_downloads=self.arvrunner.toplevel_runtimeContext.prefer_cached_downloads)
+                        logger.info("%s is %s", src, keepref)
+                        self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
                 except Exception as e:
-                    logger.warning(str(e))
+                    logger.warning("Download error: %s", e)
             else:
                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
 
@@ -141,7 +147,7 @@ class ArvPathMapper(PathMapper):
             for opt in self.optional_deps:
                 if obj["location"] == opt["location"]:
                     return
-            raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
+            raise SourceLine(obj, "location", WorkflowException).makeError("Can't handle '%s'" % obj["location"])
 
     def needs_new_collection(self, srcobj, prefix=""):
         """Check if files need to be staged into a new collection.
@@ -156,16 +162,26 @@ class ArvPathMapper(PathMapper):
         if loc.startswith("_:"):
             return True
 
-        if not prefix:
-            i = loc.rfind("/")
-            if i > -1:
-                prefix = loc[:i+1]
-                suffix = urllib.parse.quote(urllib.parse.unquote(loc[i+1:]), "/+@")
-            else:
-                prefix = loc+"/"
-                suffix = ""
+        if self.arvrunner.defer_downloads and (loc.startswith("http:") or loc.startswith("https:")):
+            return False
+
+        i = loc.rfind("/")
+        if i > -1:
+            loc_prefix = loc[:i+1]
+            if not prefix:
+                prefix = loc_prefix
+            # quote/unquote to ensure consistent quoting
+            suffix = urllib.parse.quote(urllib.parse.unquote(loc[i+1:]), "/+@")
+        else:
+            # no '/' found
+            loc_prefix = loc+"/"
+            prefix = loc+"/"
+            suffix = ""
+
+        if prefix != loc_prefix:
+            return True
 
-        if prefix+suffix != prefix+urllib.parse.quote(srcobj["basename"], "/+@"):
+        if "basename" in srcobj and suffix != urllib.parse.quote(srcobj["basename"], "/+@"):
             return True
 
         if srcobj["class"] == "File" and loc not in self._pathmap:
@@ -174,7 +190,7 @@ class ArvPathMapper(PathMapper):
             if self.needs_new_collection(s, prefix):
                 return True
         if srcobj.get("listing"):
-            prefix = "%s%s/" % (prefix, srcobj["basename"])
+            prefix = "%s%s/" % (prefix, urllib.parse.quote(srcobj.get("basename", suffix), "/+@"))
             for l in srcobj["listing"]:
                 if self.needs_new_collection(l, prefix):
                     return True