Merge branch '11162-wes-support' refs #11162
[arvados.git] / sdk / cwl / arvados_cwl / http.py
1 import requests
2 import email.utils
3 import time
4 import datetime
5 import re
6 import arvados
7 import arvados.collection
8 import urlparse
9 import logging
10
11 logger = logging.getLogger('arvados.cwl-runner')
12
13 def my_formatdate(dt):
14     return email.utils.formatdate(timeval=time.mktime(dt.timetuple()),
15                                   localtime=False, usegmt=True)
16
17 def my_parsedate(text):
18     parsed = email.utils.parsedate(text)
19     if parsed:
20         return datetime.datetime(*parsed[:6])
21     else:
22         return datetime.datetime(1970, 1, 1)
23
24 def fresh_cache(url, properties, now):
25     pr = properties[url]
26     expires = None
27
28     logger.debug("Checking cache freshness for %s using %s", url, pr)
29
30     if "Cache-Control" in pr:
31         if re.match(r"immutable", pr["Cache-Control"]):
32             return True
33
34         g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"])
35         if g:
36             expires = my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2)))
37
38     if expires is None and "Expires" in pr:
39         expires = my_parsedate(pr["Expires"])
40
41     if expires is None:
42         # Use a default cache time of 24 hours if upstream didn't set
43         # any cache headers, to reduce redundant downloads.
44         expires = my_parsedate(pr["Date"]) + datetime.timedelta(hours=24)
45
46     if not expires:
47         return False
48
49     return (now < expires)
50
51 def remember_headers(url, properties, headers, now):
52     properties.setdefault(url, {})
53     for h in ("Cache-Control", "ETag", "Expires", "Date", "Content-Length"):
54         if h in headers:
55             properties[url][h] = headers[h]
56     if "Date" not in headers:
57         properties[url]["Date"] = my_formatdate(now)
58
59
60 def changed(url, properties, now):
61     req = requests.head(url, allow_redirects=True)
62     remember_headers(url, properties, req.headers, now)
63
64     if req.status_code != 200:
65         raise Exception("Got status %s" % req.status_code)
66
67     pr = properties[url]
68     if "ETag" in pr and "ETag" in req.headers:
69         if pr["ETag"] == req.headers["ETag"]:
70             return False
71
72     return True
73
74 def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow):
75     r = api.collections().list(filters=[["properties", "exists", url]]).execute()
76
77     now = utcnow()
78
79     for item in r["items"]:
80         properties = item["properties"]
81         if fresh_cache(url, properties, now):
82             # Do nothing
83             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
84             return "keep:%s/%s" % (item["portable_data_hash"], cr.keys()[0])
85
86         if not changed(url, properties, now):
87             # ETag didn't change, same content, just update headers
88             api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
89             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
90             return "keep:%s/%s" % (item["portable_data_hash"], cr.keys()[0])
91
92     properties = {}
93     req = requests.get(url, stream=True, allow_redirects=True)
94
95     if req.status_code != 200:
96         raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
97
98     remember_headers(url, properties, req.headers, now)
99
100     if "Content-Length" in properties[url]:
101         cl = int(properties[url]["Content-Length"])
102         logger.info("Downloading %s (%s bytes)", url, cl)
103     else:
104         cl = None
105         logger.info("Downloading %s (unknown size)", url)
106
107     c = arvados.collection.Collection()
108
109     if req.headers.get("Content-Disposition"):
110         grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))', req.headers["Content-Disposition"])
111         if grp.group(2):
112             name = grp.group(2)
113         else:
114             name = grp.group(4)
115     else:
116         name = urlparse.urlparse(url).path.split("/")[-1]
117
118     count = 0
119     start = time.time()
120     checkpoint = start
121     with c.open(name, "w") as f:
122         for chunk in req.iter_content(chunk_size=1024):
123             count += len(chunk)
124             f.write(chunk)
125             loopnow = time.time()
126             if (loopnow - checkpoint) > 20:
127                 bps = (float(count)/float(loopnow - start))
128                 if cl is not None:
129                     logger.info("%2.1f%% complete, %3.2f MiB/s, %1.0f seconds left",
130                                 float(count * 100) / float(cl),
131                                 bps/(1024*1024),
132                                 (cl-count)/bps)
133                 else:
134                     logger.info("%d downloaded, %3.2f MiB/s", count, bps/(1024*1024))
135                 checkpoint = loopnow
136
137     c.save_new(name="Downloaded from %s" % url, owner_uuid=project_uuid, ensure_unique_name=True)
138
139     api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
140
141     return "keep:%s/%s" % (c.portable_data_hash(), name)