19699: Add --varying-url-params
authorPeter Amstutz <peter.amstutz@curii.com>
Tue, 8 Nov 2022 15:08:27 +0000 (10:08 -0500)
committerPeter Amstutz <peter.amstutz@curii.com>
Mon, 14 Nov 2022 16:27:45 +0000 (11:27 -0500)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/context.py
sdk/cwl/arvados_cwl/executor.py
sdk/cwl/arvados_cwl/http.py
sdk/cwl/arvados_cwl/pathmapper.py

index e3b8e500a1700db18d01ae8cd0f57a05e31890f8..8de1b60bc4bbc4c8a7f5688d845a15fcad9864f9 100644 (file)
@@ -221,6 +221,9 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     parser.add_argument("--defer-downloads", action="store_true", default=False,
                         help="When submitting a workflow, defer downloading HTTP URLs to workflow launch instead of downloading to Keep before submit.")
 
+    parser.add_argument("--varying-url-params", type=str, default="",
+                        help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
+
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--enable-preemptible", dest="enable_preemptible", default=None, action="store_true", help="Use preemptible instances. Control individual steps with arv:UsePreemptible hint.")
     exgroup.add_argument("--disable-preemptible", dest="enable_preemptible", default=None, action="store_false", help="Don't use preemptible instances.")
index ec473a04feea1071c27cd43eeae35c112edf1733..66ccb26fae1e6884955faabde9af10b4224a9d68 100644 (file)
@@ -40,6 +40,7 @@ class ArvRuntimeContext(RuntimeContext):
         self.enable_preemptible = None
         self.copy_deps = None
         self.defer_downloads = False
+        self.varying_url_params = ""
 
         super(ArvRuntimeContext, self).__init__(kwargs)
 
index 84a9799f6101cdb4b4566a759a9e0ccebe2442b8..cbeee924278d12fba688ce5389fb14e42e47775e 100644 (file)
@@ -207,6 +207,7 @@ The 'jobs' API is no longer supported.
                                                      collection_cache=self.collection_cache)
 
         self.defer_downloads = arvargs.submit and arvargs.defer_downloads
+        self.varying_url_params = arvargs.varying_url_params
 
         validate_cluster_target(self, self.toplevel_runtimeContext)
 
index 3acc06d48e425f4511edf0c5a1bbfea1f5fecde2..34f921133344705ac114bf6e2e8cfff501deb551 100644 (file)
@@ -72,7 +72,7 @@ def remember_headers(url, properties, headers, now):
         properties[url]["Date"] = my_formatdate(now)
 
 
-def changed(url, properties, now):
+def changed(url, clean_url, properties, now):
     req = requests.head(url, allow_redirects=True)
     remember_headers(url, properties, req.headers, now)
 
@@ -81,7 +81,7 @@ def changed(url, properties, now):
         # allow GET so instead of failing here, we'll try GET If-None-Match
         return True
 
-    pr = properties[url]
+    pr = properties[clean_url]
     if "ETag" in pr and "ETag" in req.headers:
         if pr["ETag"] == req.headers["ETag"]:
             return False
@@ -96,8 +96,17 @@ def etag_quote(etag):
         # Add quotes.
         return '"' + etag + '"'
 
-def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow):
-    r = api.collections().list(filters=[["properties", "exists", url]]).execute()
+
+def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow, varying_url_params=""):
+    varying_params = [s.strip() for s in varying_url_params.split(",")]
+
+    parsed = urllib.parse.urlparse(url)
+    query = [q for q in urllib.parse.parse_qsl(parsed.query)
+             if q[0] not in varying_params]
+    clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params,
+                                         urllib.parse.urlencode(query),  parsed.fragment))
+
+    r = api.collections().list(filters=[["properties", "exists", clean_url]]).execute()
 
     now = utcnow()
 
@@ -105,12 +114,12 @@ def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow):
 
     for item in r["items"]:
         properties = item["properties"]
-        if fresh_cache(url, properties, now):
+        if fresh_cache(clean_url, properties, now):
             # Do nothing
             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
             return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
 
-        if not changed(url, properties, now):
+        if not changed(url, clean_url, properties, now):
             # ETag didn't change, same content, just update headers
             api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
@@ -131,7 +140,7 @@ def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow):
     if req.status_code not in (200, 304):
         raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
 
-    remember_headers(url, properties, req.headers, now)
+    remember_headers(clean_url, properties, req.headers, now)
 
     if req.status_code == 304 and "ETag" in req.headers and req.headers["ETag"] in etags:
         item = etags[req.headers["ETag"]]
@@ -140,8 +149,8 @@ def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow):
         cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
         return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
 
-    if "Content-Length" in properties[url]:
-        cl = int(properties[url]["Content-Length"])
+    if "Content-Length" in properties[clean_url]:
+        cl = int(properties[clean_url]["Content-Length"])
         logger.info("Downloading %s (%s bytes)", url, cl)
     else:
         cl = None
@@ -156,7 +165,7 @@ def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow):
         else:
             name = grp.group(4)
     else:
-        name = urllib.parse.urlparse(url).path.split("/")[-1]
+        name = parsed.path.split("/")[-1]
 
     count = 0
     start = time.time()
@@ -179,7 +188,7 @@ def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow):
 
     logger.info("Download complete")
 
-    collectionname = "Downloaded from %s" % urllib.parse.quote(url, safe='')
+    collectionname = "Downloaded from %s" % urllib.parse.quote(clean_url, safe='')
 
     # max length - space to add a timestamp used by ensure_unique_name
     max_name_len = 254 - 28
index a7f2103476718ec1dd3f07a36e370e46521783e2..700b5b5f94aab099a85ab1a0e239337400517752 100644 (file)
@@ -109,7 +109,7 @@ class ArvPathMapper(PathMapper):
                         # passthrough, we'll download it later.
                         self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
                     else:
-                        keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
+                        keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src, varying_url_params=self.arvrunner.varying_url_params)
                         logger.info("%s is %s", src, keepref)
                         self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
                 except Exception as e: