a88084e9d32d1b0bef84e5146188e86df2fecf16
[arvados.git] / sdk / python / arvados / http_import.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from future import standard_library
7 standard_library.install_aliases()
8
9 import email.utils
10 import time
11 import datetime
12 import re
13 import arvados
14 import arvados.collection
15 import urllib.parse
16 import logging
17 import calendar
18 import urllib.parse
19 import pycurl
20 from arvados.pycurl import PyCurlHelper
21
22 logger = logging.getLogger('arvados.http_import')
23
24 def my_formatdate(dt):
25     return email.utils.formatdate(timeval=calendar.timegm(dt.timetuple()),
26                                   localtime=False, usegmt=True)
27
28 def my_parsedate(text):
29     parsed = email.utils.parsedate_tz(text)
30     if parsed:
31         if parsed[9]:
32             # Adjust to UTC
33             return datetime.datetime(*parsed[:6]) + datetime.timedelta(seconds=parsed[9])
34         else:
35             # TZ is zero or missing, assume UTC.
36             return datetime.datetime(*parsed[:6])
37     else:
38         return datetime.datetime(1970, 1, 1)
39
40 def fresh_cache(url, properties, now):
41     pr = properties[url]
42     expires = None
43
44     logger.debug("Checking cache freshness for %s using %s", url, pr)
45
46     if "Cache-Control" in pr:
47         if re.match(r"immutable", pr["Cache-Control"]):
48             return True
49
50         g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"])
51         if g:
52             expires = my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2)))
53
54     if expires is None and "Expires" in pr:
55         expires = my_parsedate(pr["Expires"])
56
57     if expires is None:
58         # Use a default cache time of 24 hours if upstream didn't set
59         # any cache headers, to reduce redundant downloads.
60         expires = my_parsedate(pr["Date"]) + datetime.timedelta(hours=24)
61
62     if not expires:
63         return False
64
65     return (now < expires)
66
67 def remember_headers(url, properties, headers, now):
68     properties.setdefault(url, {})
69     for h in ("Cache-Control", "Etag", "Expires", "Date", "Content-Length"):
70         if h in headers:
71             properties[url][h] = headers[h]
72     if "Date" not in headers:
73         properties[url]["Date"] = my_formatdate(now)
74
75 class Response:
76     def __init__(self, status_code, headers):
77         self.status_code = status_code
78         self.headers = headers
79
80 class CurlDownloader(PyCurlHelper):
81     # Wait up to 60 seconds for connection
82     # How long it can be in "low bandwidth" state before it gives up
83     # Low bandwidth threshold is 32 KiB/s
84     DOWNLOADER_TIMEOUT = (60, 300, 32768)
85
86     def __init__(self):
87         super(CurlDownloader, self).__init__(title_case_headers=True)
88         self.curl = pycurl.Curl()
89         self.curl.setopt(pycurl.NOSIGNAL, 1)
90         self.curl.setopt(pycurl.OPENSOCKETFUNCTION,
91                     lambda *args, **kwargs: self._socket_open(*args, **kwargs))
92         self.target = None
93
94     def head(self, url, headers={}):
95         get_headers = {'Accept': 'application/octet-stream'}
96         get_headers.update(headers)
97         self._headers = {}
98
99         self.curl.setopt(pycurl.URL, url.encode('utf-8'))
100         self.curl.setopt(pycurl.HTTPHEADER, [
101             '{}: {}'.format(k,v) for k,v in get_headers.items()])
102
103         self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
104         self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
105         self.curl.setopt(pycurl.NOBODY, True)
106         self.curl.setopt(pycurl.FOLLOWLOCATION, True)
107
108         self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, True)
109
110         try:
111             self.curl.perform()
112         except Exception as e:
113             raise arvados.errors.HttpError(0, str(e))
114         finally:
115             if self._socket:
116                 self._socket.close()
117                 self._socket = None
118
119         return Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers)
120
121     def download(self, url, headers):
122         self.count = 0
123         self.start = time.time()
124         self.checkpoint = self.start
125         self._headers = {}
126         self._first_chunk = True
127         self.collection = None
128         self.parsedurl = urllib.parse.urlparse(url)
129
130         get_headers = {'Accept': 'application/octet-stream'}
131         get_headers.update(headers)
132
133         self.curl.setopt(pycurl.URL, url.encode('utf-8'))
134         self.curl.setopt(pycurl.HTTPHEADER, [
135             '{}: {}'.format(k,v) for k,v in get_headers.items()])
136
137         self.curl.setopt(pycurl.WRITEFUNCTION, self.body_write)
138         self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
139
140         self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
141         self.curl.setopt(pycurl.HTTPGET, True)
142         self.curl.setopt(pycurl.FOLLOWLOCATION, True)
143
144         self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, False)
145
146         try:
147             self.curl.perform()
148         except Exception as e:
149             raise arvados.errors.HttpError(0, str(e))
150         finally:
151             if self._socket:
152                 self._socket.close()
153                 self._socket = None
154
155         return Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers)
156
157     def headers_received(self):
158         self.collection = arvados.collection.Collection()
159
160         if "Content-Length" in self._headers:
161             self.contentlength = int(self._headers["Content-Length"])
162             logger.info("File size is %s bytes", self.contentlength)
163         else:
164             self.contentlength = None
165
166         if self._headers.get("Content-Disposition"):
167             grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))',
168                             self._headers["Content-Disposition"])
169             if grp.group(2):
170                 self.name = grp.group(2)
171             else:
172                 self.name = grp.group(4)
173         else:
174             self.name = self.parsedurl.path.split("/")[-1]
175
176         mt = re.match(r'^HTTP\/(\d(\.\d)?) ([1-5]\d\d) ([^\r\n\x00-\x08\x0b\x0c\x0e-\x1f\x7f]*)\r\n$', self._headers["x-status-line"])
177         code = int(mt.group(3))
178
179         if code == 200:
180             self.target = self.collection.open(self.name, "wb")
181
182     def body_write(self, chunk):
183         if self._first_chunk:
184             self.headers_received()
185             self._first_chunk = False
186
187         self.count += len(chunk)
188         self.target.write(chunk)
189         loopnow = time.time()
190         if (loopnow - self.checkpoint) < 20:
191             return
192
193         bps = self.count / (loopnow - self.start)
194         if self.contentlength is not None:
195             logger.info("%2.1f%% complete, %6.2f MiB/s, %1.0f seconds left",
196                         ((self.count * 100) / self.contentlength),
197                         (bps / (1024.0*1024.0)),
198                         ((self.contentlength-self.count) // bps))
199         else:
200             logger.info("%d downloaded, %6.2f MiB/s", count, (bps / (1024.0*1024.0)))
201         self.checkpoint = loopnow
202
203
204 def changed(url, clean_url, properties, now, curldownloader):
205     req = curldownloader.head(url)
206
207     if req.status_code != 200:
208         # Sometimes endpoints are misconfigured and will deny HEAD but
209         # allow GET so instead of failing here, we'll try GET If-None-Match
210         return True
211
212     # previous version of this code used "ETag", now we are
213     # normalizing to "Etag", check for both.
214     etag = properties[url].get("Etag") or properties[url].get("ETag")
215
216     if url in properties:
217         del properties[url]
218     remember_headers(clean_url, properties, req.headers, now)
219
220     if "Etag" in req.headers and etag == req.headers["Etag"]:
221         # Didn't change
222         return False
223
224     return True
225
226 def etag_quote(etag):
227     # if it already has leading and trailing quotes, do nothing
228     if etag[0] == '"' and etag[-1] == '"':
229         return etag
230     else:
231         # Add quotes.
232         return '"' + etag + '"'
233
234
235 def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow, varying_url_params="", prefer_cached_downloads=False):
236
237     logger.info("Checking Keep for %s", url)
238
239     varying_params = [s.strip() for s in varying_url_params.split(",")]
240
241     parsed = urllib.parse.urlparse(url)
242     query = [q for q in urllib.parse.parse_qsl(parsed.query)
243              if q[0] not in varying_params]
244
245     clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params,
246                                          urllib.parse.urlencode(query, safe="/"),  parsed.fragment))
247
248     r1 = api.collections().list(filters=[["properties", "exists", url]]).execute()
249
250     if clean_url == url:
251         items = r1["items"]
252     else:
253         r2 = api.collections().list(filters=[["properties", "exists", clean_url]]).execute()
254         items = r1["items"] + r2["items"]
255
256     now = utcnow()
257
258     etags = {}
259
260     curldownloader = CurlDownloader()
261
262     for item in items:
263         properties = item["properties"]
264
265         if clean_url in properties:
266             cache_url = clean_url
267         elif url in properties:
268             cache_url = url
269         else:
270             return False
271
272         if prefer_cached_downloads or fresh_cache(cache_url, properties, now):
273             # HTTP caching rules say we should use the cache
274             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
275             return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
276
277         if not changed(cache_url, clean_url, properties, now, curldownloader):
278             # Etag didn't change, same content, just update headers
279             api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
280             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
281             return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
282
283         if "Etag" in properties[cache_url] and len(properties[cache_url]["Etag"]) > 2:
284             etags[properties[cache_url]["Etag"]] = item
285
286     logger.debug("Found Etags %s", etags)
287
288     properties = {}
289     headers = {}
290     if etags:
291         headers['If-None-Match'] = ', '.join([etag_quote(k) for k,v in etags.items()])
292     logger.debug("Sending GET request with headers %s", headers)
293
294     logger.info("Beginning download of %s", url)
295
296     req = curldownloader.download(url, headers)
297
298     c = curldownloader.collection
299
300     if req.status_code not in (200, 304):
301         raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
302
303     if curldownloader.target is not None:
304         curldownloader.target.close()
305
306     remember_headers(clean_url, properties, req.headers, now)
307
308     if req.status_code == 304 and "Etag" in req.headers and req.headers["Etag"] in etags:
309         item = etags[req.headers["Etag"]]
310         item["properties"].update(properties)
311         api.collections().update(uuid=item["uuid"], body={"collection":{"properties": item["properties"]}}).execute()
312         cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
313         return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
314
315     logger.info("Download complete")
316
317     collectionname = "Downloaded from %s" % urllib.parse.quote(clean_url, safe='')
318
319     # max length - space to add a timestamp used by ensure_unique_name
320     max_name_len = 254 - 28
321
322     if len(collectionname) > max_name_len:
323         over = len(collectionname) - max_name_len
324         split = int(max_name_len/2)
325         collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
326
327     c.save_new(name=collectionname, owner_uuid=project_uuid, ensure_unique_name=True)
328
329     api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}) #.execute()
330
331     return "keep:%s/%s" % (c.portable_data_hash(), curldownloader.name)