11162: Add progress for http data download.
[arvados.git] / sdk / cwl / arvados_cwl / http.py
1 import requests
2 import email.utils
3 import time
4 import datetime
5 import re
6 import arvados
7 import arvados.collection
8 import urlparse
9 import logging
10
11 logger = logging.getLogger('arvados.cwl-runner')
12
13 def my_formatdate(dt):
14     return email.utils.formatdate(timeval=time.mktime(now.timetuple()), localtime=False, usegmt=True)
15
16 def my_parsedate(text):
17     return datetime.datetime(*email.utils.parsedate(text)[:6])
18
19 def fresh_cache(url, properties):
20     pr = properties[url]
21     expires = None
22
23     if "Cache-Control" in pr:
24         if re.match(r"immutable", pr["Cache-Control"]):
25             return True
26
27         g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"])
28         if g:
29             expires = my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2)))
30
31     if expires is None and "Expires" in pr:
32         expires = my_parsedate(pr["Expires"])
33
34     if not expires:
35         return False
36
37     return (datetime.datetime.utcnow() < expires)
38
39 def remember_headers(url, properties, headers):
40     properties.setdefault(url, {})
41     for h in ("Cache-Control", "ETag", "Expires", "Date", "Content-Length"):
42         if h in headers:
43             properties[url][h] = headers[h]
44     if "Date" not in headers:
45         properties[url]["Date"] = my_formatdate(datetime.datetime.utcnow())
46
47
48 def changed(url, properties):
49     req = requests.head(url)
50     remember_headers(url, properties, req.headers)
51
52     if req.status_code != 200:
53         raise Exception("Got status %s" % req.status_code)
54
55     pr = properties[url]
56     if "ETag" in pr and "ETag" in req.headers:
57         if pr["ETag"] == req.headers["ETag"]:
58             return False
59     return True
60
61 def http_to_keep(api, project_uuid, url):
62     r = api.collections().list(filters=[["properties", "exists", url]]).execute()
63     name = urlparse.urlparse(url).path.split("/")[-1]
64
65     for item in r["items"]:
66         properties = item["properties"]
67         if fresh_cache(url, properties):
68             # Do nothing
69             return "keep:%s/%s" % (item["portable_data_hash"], name)
70
71         if not changed(url, properties):
72             # ETag didn't change, same content, just update headers
73             api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
74             return "keep:%s/%s" % (item["portable_data_hash"], name)
75
76     properties = {}
77     req = requests.get(url, stream=True)
78
79     if req.status_code != 200:
80         raise Exception("Failed to download '%s' got status %s " % (req.status_code, url))
81
82     remember_headers(url, properties, req.headers)
83
84     logger.info("Downloading %s (%s bytes)", url, properties[url]["Content-Length"])
85
86     c = arvados.collection.Collection()
87
88     count = 0
89     start = time.time()
90     checkpoint = start
91     with c.open(name, "w") as f:
92         for chunk in req.iter_content(chunk_size=1024):
93             count += len(chunk)
94             f.write(chunk)
95             now = time.time()
96             if (now - checkpoint) > 20:
97                 bps = (float(count)/float(now - start))
98                 logger.info("%2.1f%% complete, %3.2f MiB/s, %1.0f seconds left",
99                             float(count * 100) / float(properties[url]["Content-Length"]),
100                             bps/(1024*1024),
101                             (int(properties[url]["Content-Length"])-count)/bps)
102                 checkpoint = now
103
104     c.save_new(name="Downloaded from %s" % url, owner_uuid=project_uuid, ensure_unique_name=True)
105
106     api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
107
108     return "keep:%s/%s" % (c.portable_data_hash(), name)