Merge branch 'main' into 18842-arv-mount-disk-config
[arvados.git] / sdk / cwl / arvados_cwl / http.py
index 33aa098845f4f45561f2768b14e3cd0c17ae9131..f2415bcffef40ef805b4e3a0213778caac16f63e 100644 (file)
@@ -72,19 +72,23 @@ def remember_headers(url, properties, headers, now):
         properties[url]["Date"] = my_formatdate(now)
 
 
-def changed(url, properties, now):
+def changed(url, clean_url, properties, now):
     req = requests.head(url, allow_redirects=True)
-    remember_headers(url, properties, req.headers, now)
 
     if req.status_code != 200:
         # Sometimes endpoints are misconfigured and will deny HEAD but
         # allow GET so instead of failing here, we'll try GET If-None-Match
         return True
 
-    pr = properties[url]
-    if "ETag" in pr and "ETag" in req.headers:
-        if pr["ETag"] == req.headers["ETag"]:
-            return False
+    etag = properties[url].get("ETag")
+
+    if url in properties:
+        del properties[url]
+    remember_headers(clean_url, properties, req.headers, now)
+
+    if "ETag" in req.headers and etag == req.headers["ETag"]:
+        # Didn't change
+        return False
 
     return True
 
@@ -96,39 +100,66 @@ def etag_quote(etag):
         # Add quotes.
         return '"' + etag + '"'
 
-def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow):
-    r = api.collections().list(filters=[["properties", "exists", url]]).execute()
+
+def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow, varying_url_params="", prefer_cached_downloads=False):
+    varying_params = [s.strip() for s in varying_url_params.split(",")]
+
+    parsed = urllib.parse.urlparse(url)
+    query = [q for q in urllib.parse.parse_qsl(parsed.query)
+             if q[0] not in varying_params]
+
+    clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params,
+                                         urllib.parse.urlencode(query, safe="/"),  parsed.fragment))
+
+    r1 = api.collections().list(filters=[["properties", "exists", url]]).execute()
+
+    if clean_url == url:
+        items = r1["items"]
+    else:
+        r2 = api.collections().list(filters=[["properties", "exists", clean_url]]).execute()
+        items = r1["items"] + r2["items"]
 
     now = utcnow()
 
     etags = {}
 
-    for item in r["items"]:
+    for item in items:
         properties = item["properties"]
-        if fresh_cache(url, properties, now):
-            # Do nothing
+
+        if clean_url in properties:
+            cache_url = clean_url
+        elif url in properties:
+            cache_url = url
+        else:
+            return False
+
+        if prefer_cached_downloads or fresh_cache(cache_url, properties, now):
+            # HTTP caching rules say we should use the cache
             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
             return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
 
-        if not changed(url, properties, now):
+        if not changed(cache_url, clean_url, properties, now):
             # ETag didn't change, same content, just update headers
             api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
             return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
 
-        if "ETag" in properties and len(properties["ETag"]) > 2:
-            etags[properties["ETag"]] = item
+        if "ETag" in properties[cache_url] and len(properties[cache_url]["ETag"]) > 2:
+            etags[properties[cache_url]["ETag"]] = item
+
+    logger.debug("Found ETags %s", etags)
 
     properties = {}
     headers = {}
     if etags:
         headers['If-None-Match'] = ', '.join([etag_quote(k) for k,v in etags.items()])
+    logger.debug("Sending GET request with headers %s", headers)
     req = requests.get(url, stream=True, allow_redirects=True, headers=headers)
 
     if req.status_code not in (200, 304):
         raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
 
-    remember_headers(url, properties, req.headers, now)
+    remember_headers(clean_url, properties, req.headers, now)
 
     if req.status_code == 304 and "ETag" in req.headers and req.headers["ETag"] in etags:
         item = etags[req.headers["ETag"]]
@@ -137,8 +168,8 @@ def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow):
         cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
         return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
 
-    if "Content-Length" in properties[url]:
-        cl = int(properties[url]["Content-Length"])
+    if "Content-Length" in properties[clean_url]:
+        cl = int(properties[clean_url]["Content-Length"])
         logger.info("Downloading %s (%s bytes)", url, cl)
     else:
         cl = None
@@ -153,7 +184,7 @@ def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow):
         else:
             name = grp.group(4)
     else:
-        name = urllib.parse.urlparse(url).path.split("/")[-1]
+        name = parsed.path.split("/")[-1]
 
     count = 0
     start = time.time()
@@ -176,7 +207,7 @@ def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow):
 
     logger.info("Download complete")
 
-    collectionname = "Downloaded from %s" % urllib.parse.quote(url, safe='')
+    collectionname = "Downloaded from %s" % urllib.parse.quote(clean_url, safe='')
 
     # max length - space to add a timestamp used by ensure_unique_name
     max_name_len = 254 - 28