+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import division
+from future import standard_library
+standard_library.install_aliases()
+
import requests
import email.utils
import time
import re
import arvados
import arvados.collection
-import urlparse
+import urllib.parse
import logging
+import calendar
+import urllib.parse
logger = logging.getLogger('arvados.cwl-runner')
def my_formatdate(dt):
- return email.utils.formatdate(timeval=time.mktime(now.timetuple()), localtime=False, usegmt=True)
+ return email.utils.formatdate(timeval=calendar.timegm(dt.timetuple()),
+ localtime=False, usegmt=True)
def my_parsedate(text):
- return datetime.datetime(*email.utils.parsedate(text)[:6])
-
-def fresh_cache(url, properties):
+ parsed = email.utils.parsedate_tz(text)
+ if parsed:
+ if parsed[9]:
+ # Adjust to UTC
+ return datetime.datetime(*parsed[:6]) + datetime.timedelta(seconds=parsed[9])
+ else:
+ # TZ is zero or missing, assume UTC.
+ return datetime.datetime(*parsed[:6])
+ else:
+ return datetime.datetime(1970, 1, 1)
+
+def fresh_cache(url, properties, now):
pr = properties[url]
expires = None
if not expires:
return False
- return (datetime.datetime.utcnow() < expires)
+ return (now < expires)
-def remember_headers(url, properties, headers):
+def remember_headers(url, properties, headers, now):
properties.setdefault(url, {})
for h in ("Cache-Control", "ETag", "Expires", "Date", "Content-Length"):
if h in headers:
properties[url][h] = headers[h]
if "Date" not in headers:
- properties[url]["Date"] = my_formatdate(datetime.datetime.utcnow())
+ properties[url]["Date"] = my_formatdate(now)
-def changed(url, properties):
- req = requests.head(url)
- remember_headers(url, properties, req.headers)
+def changed(url, properties, now):
+ req = requests.head(url, allow_redirects=True)
+ remember_headers(url, properties, req.headers, now)
if req.status_code != 200:
raise Exception("Got status %s" % req.status_code)
if "ETag" in pr and "ETag" in req.headers:
if pr["ETag"] == req.headers["ETag"]:
return False
+
return True
-def http_to_keep(api, project_uuid, url):
+def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow):
r = api.collections().list(filters=[["properties", "exists", url]]).execute()
- name = urlparse.urlparse(url).path.split("/")[-1]
+
+ now = utcnow()
for item in r["items"]:
properties = item["properties"]
- if fresh_cache(url, properties):
+ if fresh_cache(url, properties, now):
# Do nothing
- return "keep:%s/%s" % (item["portable_data_hash"], name)
+ cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
+ return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
- if not changed(url, properties):
+ if not changed(url, properties, now):
# ETag didn't change, same content, just update headers
api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
- return "keep:%s/%s" % (item["portable_data_hash"], name)
+ cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
+ return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
properties = {}
- req = requests.get(url, stream=True)
+ req = requests.get(url, stream=True, allow_redirects=True)
if req.status_code != 200:
- raise Exception("Failed to download '%s' got status %s " % (req.status_code, url))
+ raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
- remember_headers(url, properties, req.headers)
+ remember_headers(url, properties, req.headers, now)
- logger.info("Downloading %s (%s bytes)", url, properties[url]["Content-Length"])
+ if "Content-Length" in properties[url]:
+ cl = int(properties[url]["Content-Length"])
+ logger.info("Downloading %s (%s bytes)", url, cl)
+ else:
+ cl = None
+ logger.info("Downloading %s (unknown size)", url)
c = arvados.collection.Collection()
+ if req.headers.get("Content-Disposition"):
+ grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))', req.headers["Content-Disposition"])
+ if grp.group(2):
+ name = grp.group(2)
+ else:
+ name = grp.group(4)
+ else:
+ name = urllib.parse.urlparse(url).path.split("/")[-1]
+
count = 0
start = time.time()
checkpoint = start
- with c.open(name, "w") as f:
+ with c.open(name, "wb") as f:
for chunk in req.iter_content(chunk_size=1024):
count += len(chunk)
f.write(chunk)
- now = time.time()
- if (now - checkpoint) > 20:
- bps = (float(count)/float(now - start))
- logger.info("%2.1f%% complete, %3.2f MiB/s, %1.0f seconds left",
- float(count * 100) / float(properties[url]["Content-Length"]),
- bps/(1024*1024),
- (int(properties[url]["Content-Length"])-count)/bps)
- checkpoint = now
-
- c.save_new(name="Downloaded from %s" % url, owner_uuid=project_uuid, ensure_unique_name=True)
+ loopnow = time.time()
+ if (loopnow - checkpoint) > 20:
+ bps = count / (loopnow - start)
+ if cl is not None:
+ logger.info("%2.1f%% complete, %3.2f MiB/s, %1.0f seconds left",
+ ((count * 100) / cl),
+ (bps // (1024*1024)),
+ ((cl-count) // bps))
+ else:
+ logger.info("%d downloaded, %3.2f MiB/s", count, (bps / (1024*1024)))
+ checkpoint = loopnow
+
+
+ collectionname = "Downloaded from %s" % urllib.parse.quote(url, safe='')
+ c.save_new(name=collectionname, owner_uuid=project_uuid, ensure_unique_name=True)
api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()