Merge branch 'main' into 18842-arv-mount-disk-config
[arvados.git] / sdk / cwl / arvados_cwl / http.py
index dcc2a51192dfc4d4b573da302b3373fd08d67fff..f2415bcffef40ef805b4e3a0213778caac16f63e 100644 (file)
@@ -72,48 +72,104 @@ def remember_headers(url, properties, headers, now):
         properties[url]["Date"] = my_formatdate(now)
 
 
-def changed(url, properties, now):
+def changed(url, clean_url, properties, now):
     req = requests.head(url, allow_redirects=True)
-    remember_headers(url, properties, req.headers, now)
 
     if req.status_code != 200:
-        raise Exception("Got status %s" % req.status_code)
+        # Sometimes endpoints are misconfigured and will deny HEAD but
+        # allow GET so instead of failing here, we'll try GET If-None-Match
+        return True
 
-    pr = properties[url]
-    if "ETag" in pr and "ETag" in req.headers:
-        if pr["ETag"] == req.headers["ETag"]:
-            return False
+    etag = properties[url].get("ETag")
+
+    if url in properties:
+        del properties[url]
+    remember_headers(clean_url, properties, req.headers, now)
+
+    if "ETag" in req.headers and etag == req.headers["ETag"]:
+        # Didn't change
+        return False
 
     return True
 
-def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow):
-    r = api.collections().list(filters=[["properties", "exists", url]]).execute()
+def etag_quote(etag):
+    # if it already has leading and trailing quotes, do nothing
+    if etag[0] == '"' and etag[-1] == '"':
+        return etag
+    else:
+        # Add quotes.
+        return '"' + etag + '"'
+
+
+def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow, varying_url_params="", prefer_cached_downloads=False):
+    varying_params = [s.strip() for s in varying_url_params.split(",")]
+
+    parsed = urllib.parse.urlparse(url)
+    query = [q for q in urllib.parse.parse_qsl(parsed.query)
+             if q[0] not in varying_params]
+
+    clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params,
+                                         urllib.parse.urlencode(query, safe="/"),  parsed.fragment))
+
+    r1 = api.collections().list(filters=[["properties", "exists", url]]).execute()
+
+    if clean_url == url:
+        items = r1["items"]
+    else:
+        r2 = api.collections().list(filters=[["properties", "exists", clean_url]]).execute()
+        items = r1["items"] + r2["items"]
 
     now = utcnow()
 
-    for item in r["items"]:
+    etags = {}
+
+    for item in items:
         properties = item["properties"]
-        if fresh_cache(url, properties, now):
-            # Do nothing
+
+        if clean_url in properties:
+            cache_url = clean_url
+        elif url in properties:
+            cache_url = url
+        else:
+            return False
+
+        if prefer_cached_downloads or fresh_cache(cache_url, properties, now):
+            # HTTP caching rules say we should use the cache
             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
             return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
 
-        if not changed(url, properties, now):
+        if not changed(cache_url, clean_url, properties, now):
             # ETag didn't change, same content, just update headers
             api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
             return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
 
+        if "ETag" in properties[cache_url] and len(properties[cache_url]["ETag"]) > 2:
+            etags[properties[cache_url]["ETag"]] = item
+
+    logger.debug("Found ETags %s", etags)
+
     properties = {}
-    req = requests.get(url, stream=True, allow_redirects=True)
+    headers = {}
+    if etags:
+        headers['If-None-Match'] = ', '.join([etag_quote(k) for k,v in etags.items()])
+    logger.debug("Sending GET request with headers %s", headers)
+    req = requests.get(url, stream=True, allow_redirects=True, headers=headers)
 
-    if req.status_code != 200:
+    if req.status_code not in (200, 304):
         raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
 
-    remember_headers(url, properties, req.headers, now)
+    remember_headers(clean_url, properties, req.headers, now)
+
+    if req.status_code == 304 and "ETag" in req.headers and req.headers["ETag"] in etags:
+        item = etags[req.headers["ETag"]]
+        item["properties"].update(properties)
+        api.collections().update(uuid=item["uuid"], body={"collection":{"properties": item["properties"]}}).execute()
+        cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
+        return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
 
-    if "Content-Length" in properties[url]:
-        cl = int(properties[url]["Content-Length"])
+    if "Content-Length" in properties[clean_url]:
+        cl = int(properties[clean_url]["Content-Length"])
         logger.info("Downloading %s (%s bytes)", url, cl)
     else:
         cl = None
@@ -128,7 +184,7 @@ def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow):
         else:
             name = grp.group(4)
     else:
-        name = urllib.parse.urlparse(url).path.split("/")[-1]
+        name = parsed.path.split("/")[-1]
 
     count = 0
     start = time.time()
@@ -149,8 +205,18 @@ def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow):
                     logger.info("%d downloaded, %3.2f MiB/s", count, (bps / (1024*1024)))
                 checkpoint = loopnow
 
+    logger.info("Download complete")
+
+    collectionname = "Downloaded from %s" % urllib.parse.quote(clean_url, safe='')
+
+    # max length - space to add a timestamp used by ensure_unique_name
+    max_name_len = 254 - 28
+
+    if len(collectionname) > max_name_len:
+        over = len(collectionname) - max_name_len
+        split = int(max_name_len/2)
+        collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
 
-    collectionname = "Downloaded from %s" % urllib.parse.quote(url, safe='')
     c.save_new(name=collectionname, owner_uuid=project_uuid, ensure_unique_name=True)
 
     api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()