X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/e4ea2e1dd1cb597b02d15bd7b9323705d6342f99..c6428570be58b01ce80c257adce06114cea1d88f:/sdk/cwl/arvados_cwl/http.py diff --git a/sdk/cwl/arvados_cwl/http.py b/sdk/cwl/arvados_cwl/http.py index 1ee1607466..33aa098845 100644 --- a/sdk/cwl/arvados_cwl/http.py +++ b/sdk/cwl/arvados_cwl/http.py @@ -1,3 +1,11 @@ +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import division +from future import standard_library +standard_library.install_aliases() + import requests import email.utils import time @@ -5,22 +13,30 @@ import datetime import re import arvados import arvados.collection -import urlparse +import urllib.parse import logging +import calendar +import urllib.parse logger = logging.getLogger('arvados.cwl-runner') def my_formatdate(dt): - return email.utils.formatdate(timeval=time.mktime(now.timetuple()), localtime=False, usegmt=True) + return email.utils.formatdate(timeval=calendar.timegm(dt.timetuple()), + localtime=False, usegmt=True) def my_parsedate(text): - parsed = email.utils.parsedate(text) + parsed = email.utils.parsedate_tz(text) if parsed: - return datetime.datetime(*parsed[:6]) + if parsed[9]: + # Adjust to UTC + return datetime.datetime(*parsed[:6]) + datetime.timedelta(seconds=parsed[9]) + else: + # TZ is zero or missing, assume UTC. + return datetime.datetime(*parsed[:6]) else: return datetime.datetime(1970, 1, 1) -def fresh_cache(url, properties): +def fresh_cache(url, properties, now): pr = properties[url] expires = None @@ -45,84 +61,132 @@ def fresh_cache(url, properties): if not expires: return False - return (datetime.datetime.utcnow() < expires) + return (now < expires) -def remember_headers(url, properties, headers): +def remember_headers(url, properties, headers, now): properties.setdefault(url, {}) for h in ("Cache-Control", "ETag", "Expires", "Date", "Content-Length"): if h in headers: properties[url][h] = headers[h] if "Date" not in headers: - properties[url]["Date"] = my_formatdate(datetime.datetime.utcnow()) + properties[url]["Date"] = my_formatdate(now) -def changed(url, properties): +def changed(url, properties, now): req = requests.head(url, allow_redirects=True) - remember_headers(url, properties, req.headers) + remember_headers(url, properties, req.headers, now) if req.status_code != 200: - raise Exception("Got status %s" % req.status_code) + # Sometimes endpoints are misconfigured and will deny HEAD but + # allow GET so instead of failing here, we'll try GET If-None-Match + return True pr = properties[url] if "ETag" in pr and "ETag" in req.headers: if pr["ETag"] == req.headers["ETag"]: return False + return True -def http_to_keep(api, project_uuid, url): +def etag_quote(etag): + # if it already has leading and trailing quotes, do nothing + if etag[0] == '"' and etag[-1] == '"': + return etag + else: + # Add quotes. + return '"' + etag + '"' + +def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow): r = api.collections().list(filters=[["properties", "exists", url]]).execute() + now = utcnow() + + etags = {} + for item in r["items"]: properties = item["properties"] - if fresh_cache(url, properties): + if fresh_cache(url, properties, now): # Do nothing cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api) - return "keep:%s/%s" % (item["portable_data_hash"], cr.keys()[0]) + return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0]) - if not changed(url, properties): + if not changed(url, properties, now): # ETag didn't change, same content, just update headers api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute() cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api) - return "keep:%s/%s" % (item["portable_data_hash"], cr.keys()[0]) + return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0]) + + if "ETag" in properties and len(properties["ETag"]) > 2: + etags[properties["ETag"]] = item properties = {} - req = requests.get(url, stream=True, allow_redirects=True) + headers = {} + if etags: + headers['If-None-Match'] = ', '.join([etag_quote(k) for k,v in etags.items()]) + req = requests.get(url, stream=True, allow_redirects=True, headers=headers) - if req.status_code != 200: + if req.status_code not in (200, 304): raise Exception("Failed to download '%s' got status %s " % (url, req.status_code)) - remember_headers(url, properties, req.headers) + remember_headers(url, properties, req.headers, now) + + if req.status_code == 304 and "ETag" in req.headers and req.headers["ETag"] in etags: + item = etags[req.headers["ETag"]] + item["properties"].update(properties) + api.collections().update(uuid=item["uuid"], body={"collection":{"properties": item["properties"]}}).execute() + cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api) + return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0]) - logger.info("Downloading %s (%s bytes)", url, properties[url]["Content-Length"]) + if "Content-Length" in properties[url]: + cl = int(properties[url]["Content-Length"]) + logger.info("Downloading %s (%s bytes)", url, cl) + else: + cl = None + logger.info("Downloading %s (unknown size)", url) c = arvados.collection.Collection() if req.headers.get("Content-Disposition"): grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))', req.headers["Content-Disposition"]) - if grp.groups(2): - name = grp.groups(2) + if grp.group(2): + name = grp.group(2) else: - name = grp.groups(3) + name = grp.group(4) else: - name = urlparse.urlparse(url).path.split("/")[-1] + name = urllib.parse.urlparse(url).path.split("/")[-1] count = 0 start = time.time() checkpoint = start - with c.open(name, "w") as f: + with c.open(name, "wb") as f: for chunk in req.iter_content(chunk_size=1024): count += len(chunk) f.write(chunk) - now = time.time() - if (now - checkpoint) > 20: - bps = (float(count)/float(now - start)) - logger.info("%2.1f%% complete, %3.2f MiB/s, %1.0f seconds left", - float(count * 100) / float(properties[url]["Content-Length"]), - bps/(1024*1024), - (int(properties[url]["Content-Length"])-count)/bps) - checkpoint = now - - c.save_new(name="Downloaded from %s" % url, owner_uuid=project_uuid, ensure_unique_name=True) + loopnow = time.time() + if (loopnow - checkpoint) > 20: + bps = count / (loopnow - start) + if cl is not None: + logger.info("%2.1f%% complete, %3.2f MiB/s, %1.0f seconds left", + ((count * 100) / cl), + (bps // (1024*1024)), + ((cl-count) // bps)) + else: + logger.info("%d downloaded, %3.2f MiB/s", count, (bps / (1024*1024))) + checkpoint = loopnow + + logger.info("Download complete") + + collectionname = "Downloaded from %s" % urllib.parse.quote(url, safe='') + + # max length - space to add a timestamp used by ensure_unique_name + max_name_len = 254 - 28 + + if len(collectionname) > max_name_len: + over = len(collectionname) - max_name_len + split = int(max_name_len/2) + collectionname = collectionname[0:split] + "…" + collectionname[split+over:] + + c.save_new(name=collectionname, owner_uuid=project_uuid, ensure_unique_name=True) api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()