Merge branch '19364-diag-docs'
[arvados.git] / sdk / cwl / arvados_cwl / http.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from future import standard_library
7 standard_library.install_aliases()
8
9 import requests
10 import email.utils
11 import time
12 import datetime
13 import re
14 import arvados
15 import arvados.collection
16 import urllib.parse
17 import logging
18 import calendar
19 import urllib.parse
20
21 logger = logging.getLogger('arvados.cwl-runner')
22
23 def my_formatdate(dt):
24     return email.utils.formatdate(timeval=calendar.timegm(dt.timetuple()),
25                                   localtime=False, usegmt=True)
26
27 def my_parsedate(text):
28     parsed = email.utils.parsedate_tz(text)
29     if parsed:
30         if parsed[9]:
31             # Adjust to UTC
32             return datetime.datetime(*parsed[:6]) + datetime.timedelta(seconds=parsed[9])
33         else:
34             # TZ is zero or missing, assume UTC.
35             return datetime.datetime(*parsed[:6])
36     else:
37         return datetime.datetime(1970, 1, 1)
38
39 def fresh_cache(url, properties, now):
40     pr = properties[url]
41     expires = None
42
43     logger.debug("Checking cache freshness for %s using %s", url, pr)
44
45     if "Cache-Control" in pr:
46         if re.match(r"immutable", pr["Cache-Control"]):
47             return True
48
49         g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"])
50         if g:
51             expires = my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2)))
52
53     if expires is None and "Expires" in pr:
54         expires = my_parsedate(pr["Expires"])
55
56     if expires is None:
57         # Use a default cache time of 24 hours if upstream didn't set
58         # any cache headers, to reduce redundant downloads.
59         expires = my_parsedate(pr["Date"]) + datetime.timedelta(hours=24)
60
61     if not expires:
62         return False
63
64     return (now < expires)
65
66 def remember_headers(url, properties, headers, now):
67     properties.setdefault(url, {})
68     for h in ("Cache-Control", "ETag", "Expires", "Date", "Content-Length"):
69         if h in headers:
70             properties[url][h] = headers[h]
71     if "Date" not in headers:
72         properties[url]["Date"] = my_formatdate(now)
73
74
75 def changed(url, clean_url, properties, now):
76     req = requests.head(url, allow_redirects=True)
77
78     if req.status_code != 200:
79         # Sometimes endpoints are misconfigured and will deny HEAD but
80         # allow GET so instead of failing here, we'll try GET If-None-Match
81         return True
82
83     etag = properties[url].get("ETag")
84
85     if url in properties:
86         del properties[url]
87     remember_headers(clean_url, properties, req.headers, now)
88
89     if "ETag" in req.headers and etag == req.headers["ETag"]:
90         # Didn't change
91         return False
92
93     return True
94
95 def etag_quote(etag):
96     # if it already has leading and trailing quotes, do nothing
97     if etag[0] == '"' and etag[-1] == '"':
98         return etag
99     else:
100         # Add quotes.
101         return '"' + etag + '"'
102
103
104 def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow, varying_url_params="", prefer_cached_downloads=False):
105     varying_params = [s.strip() for s in varying_url_params.split(",")]
106
107     parsed = urllib.parse.urlparse(url)
108     query = [q for q in urllib.parse.parse_qsl(parsed.query)
109              if q[0] not in varying_params]
110
111     clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params,
112                                          urllib.parse.urlencode(query, safe="/"),  parsed.fragment))
113
114     r1 = api.collections().list(filters=[["properties", "exists", url]]).execute()
115
116     if clean_url == url:
117         items = r1["items"]
118     else:
119         r2 = api.collections().list(filters=[["properties", "exists", clean_url]]).execute()
120         items = r1["items"] + r2["items"]
121
122     now = utcnow()
123
124     etags = {}
125
126     for item in items:
127         properties = item["properties"]
128
129         if clean_url in properties:
130             cache_url = clean_url
131         elif url in properties:
132             cache_url = url
133         else:
134             return False
135
136         if prefer_cached_downloads or fresh_cache(cache_url, properties, now):
137             # HTTP caching rules say we should use the cache
138             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
139             return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
140
141         if not changed(cache_url, clean_url, properties, now):
142             # ETag didn't change, same content, just update headers
143             api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
144             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
145             return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
146
147         if "ETag" in properties[cache_url] and len(properties[cache_url]["ETag"]) > 2:
148             etags[properties[cache_url]["ETag"]] = item
149
150     logger.debug("Found ETags %s", etags)
151
152     properties = {}
153     headers = {}
154     if etags:
155         headers['If-None-Match'] = ', '.join([etag_quote(k) for k,v in etags.items()])
156     logger.debug("Sending GET request with headers %s", headers)
157     req = requests.get(url, stream=True, allow_redirects=True, headers=headers)
158
159     if req.status_code not in (200, 304):
160         raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
161
162     remember_headers(clean_url, properties, req.headers, now)
163
164     if req.status_code == 304 and "ETag" in req.headers and req.headers["ETag"] in etags:
165         item = etags[req.headers["ETag"]]
166         item["properties"].update(properties)
167         api.collections().update(uuid=item["uuid"], body={"collection":{"properties": item["properties"]}}).execute()
168         cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
169         return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
170
171     if "Content-Length" in properties[clean_url]:
172         cl = int(properties[clean_url]["Content-Length"])
173         logger.info("Downloading %s (%s bytes)", url, cl)
174     else:
175         cl = None
176         logger.info("Downloading %s (unknown size)", url)
177
178     c = arvados.collection.Collection()
179
180     if req.headers.get("Content-Disposition"):
181         grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))', req.headers["Content-Disposition"])
182         if grp.group(2):
183             name = grp.group(2)
184         else:
185             name = grp.group(4)
186     else:
187         name = parsed.path.split("/")[-1]
188
189     count = 0
190     start = time.time()
191     checkpoint = start
192     with c.open(name, "wb") as f:
193         for chunk in req.iter_content(chunk_size=1024):
194             count += len(chunk)
195             f.write(chunk)
196             loopnow = time.time()
197             if (loopnow - checkpoint) > 20:
198                 bps = count / (loopnow - start)
199                 if cl is not None:
200                     logger.info("%2.1f%% complete, %3.2f MiB/s, %1.0f seconds left",
201                                 ((count * 100) / cl),
202                                 (bps // (1024*1024)),
203                                 ((cl-count) // bps))
204                 else:
205                     logger.info("%d downloaded, %3.2f MiB/s", count, (bps / (1024*1024)))
206                 checkpoint = loopnow
207
208     logger.info("Download complete")
209
210     collectionname = "Downloaded from %s" % urllib.parse.quote(clean_url, safe='')
211
212     # max length - space to add a timestamp used by ensure_unique_name
213     max_name_len = 254 - 28
214
215     if len(collectionname) > max_name_len:
216         over = len(collectionname) - max_name_len
217         split = int(max_name_len/2)
218         collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
219
220     c.save_new(name=collectionname, owner_uuid=project_uuid, ensure_unique_name=True)
221
222     api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
223
224     return "keep:%s/%s" % (c.portable_data_hash(), name)