Merge branch '20257-http-import' refs #20257
[arvados.git] / sdk / python / arvados / http_to_keep.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from future import standard_library
7 standard_library.install_aliases()
8
9 import email.utils
10 import time
11 import datetime
12 import re
13 import arvados
14 import arvados.collection
15 import urllib.parse
16 import logging
17 import calendar
18 import urllib.parse
19 import pycurl
20 import dataclasses
21 import typing
22 from arvados._pycurlhelper import PyCurlHelper
23
24 logger = logging.getLogger('arvados.http_import')
25
26 def _my_formatdate(dt):
27     return email.utils.formatdate(timeval=calendar.timegm(dt.timetuple()),
28                                   localtime=False, usegmt=True)
29
30 def _my_parsedate(text):
31     parsed = email.utils.parsedate_tz(text)
32     if parsed:
33         if parsed[9]:
34             # Adjust to UTC
35             return datetime.datetime(*parsed[:6]) + datetime.timedelta(seconds=parsed[9])
36         else:
37             # TZ is zero or missing, assume UTC.
38             return datetime.datetime(*parsed[:6])
39     else:
40         return datetime.datetime(1970, 1, 1)
41
42 def _fresh_cache(url, properties, now):
43     pr = properties[url]
44     expires = None
45
46     logger.debug("Checking cache freshness for %s using %s", url, pr)
47
48     if "Cache-Control" in pr:
49         if re.match(r"immutable", pr["Cache-Control"]):
50             return True
51
52         g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"])
53         if g:
54             expires = _my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2)))
55
56     if expires is None and "Expires" in pr:
57         expires = _my_parsedate(pr["Expires"])
58
59     if expires is None:
60         # Use a default cache time of 24 hours if upstream didn't set
61         # any cache headers, to reduce redundant downloads.
62         expires = _my_parsedate(pr["Date"]) + datetime.timedelta(hours=24)
63
64     if not expires:
65         return False
66
67     return (now < expires)
68
69 def _remember_headers(url, properties, headers, now):
70     properties.setdefault(url, {})
71     for h in ("Cache-Control", "Etag", "Expires", "Date", "Content-Length"):
72         if h in headers:
73             properties[url][h] = headers[h]
74     if "Date" not in headers:
75         properties[url]["Date"] = _my_formatdate(now)
76
77 @dataclasses.dataclass
78 class _Response:
79     status_code: int
80     headers: typing.Mapping[str, str]
81
82
83 class _Downloader(PyCurlHelper):
84     # Wait up to 60 seconds for connection
85     # How long it can be in "low bandwidth" state before it gives up
86     # Low bandwidth threshold is 32 KiB/s
87     DOWNLOADER_TIMEOUT = (60, 300, 32768)
88
89     def __init__(self, apiclient):
90         super(_Downloader, self).__init__(title_case_headers=True)
91         self.curl = pycurl.Curl()
92         self.curl.setopt(pycurl.NOSIGNAL, 1)
93         self.curl.setopt(pycurl.OPENSOCKETFUNCTION,
94                     lambda *args, **kwargs: self._socket_open(*args, **kwargs))
95         self.target = None
96         self.apiclient = apiclient
97
98     def head(self, url):
99         get_headers = {'Accept': 'application/octet-stream'}
100         self._headers = {}
101
102         self.curl.setopt(pycurl.URL, url.encode('utf-8'))
103         self.curl.setopt(pycurl.HTTPHEADER, [
104             '{}: {}'.format(k,v) for k,v in get_headers.items()])
105
106         self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
107         self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
108         self.curl.setopt(pycurl.NOBODY, True)
109         self.curl.setopt(pycurl.FOLLOWLOCATION, True)
110
111         self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, True)
112
113         try:
114             self.curl.perform()
115         except Exception as e:
116             raise arvados.errors.HttpError(0, str(e))
117         finally:
118             if self._socket:
119                 self._socket.close()
120                 self._socket = None
121
122         return _Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers)
123
124     def download(self, url, headers):
125         self.count = 0
126         self.start = time.time()
127         self.checkpoint = self.start
128         self._headers = {}
129         self._first_chunk = True
130         self.collection = None
131         self.parsedurl = urllib.parse.urlparse(url)
132
133         get_headers = {'Accept': 'application/octet-stream'}
134         get_headers.update(headers)
135
136         self.curl.setopt(pycurl.URL, url.encode('utf-8'))
137         self.curl.setopt(pycurl.HTTPHEADER, [
138             '{}: {}'.format(k,v) for k,v in get_headers.items()])
139
140         self.curl.setopt(pycurl.WRITEFUNCTION, self.body_write)
141         self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
142
143         self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
144         self.curl.setopt(pycurl.HTTPGET, True)
145         self.curl.setopt(pycurl.FOLLOWLOCATION, True)
146
147         self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, False)
148
149         try:
150             self.curl.perform()
151         except Exception as e:
152             raise arvados.errors.HttpError(0, str(e))
153         finally:
154             if self._socket:
155                 self._socket.close()
156                 self._socket = None
157
158         return _Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers)
159
160     def headers_received(self):
161         self.collection = arvados.collection.Collection(api_client=self.apiclient)
162
163         if "Content-Length" in self._headers:
164             self.contentlength = int(self._headers["Content-Length"])
165             logger.info("File size is %s bytes", self.contentlength)
166         else:
167             self.contentlength = None
168
169         if self._headers.get("Content-Disposition"):
170             grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))',
171                             self._headers["Content-Disposition"])
172             if grp.group(2):
173                 self.name = grp.group(2)
174             else:
175                 self.name = grp.group(4)
176         else:
177             self.name = self.parsedurl.path.split("/")[-1]
178
179         # Can't call curl.getinfo(pycurl.RESPONSE_CODE) until
180         # perform() is done but we need to know the status before that
181         # so we have to parse the status line ourselves.
182         mt = re.match(r'^HTTP\/(\d(\.\d)?) ([1-5]\d\d) ([^\r\n\x00-\x08\x0b\x0c\x0e-\x1f\x7f]*)\r\n$', self._headers["x-status-line"])
183         code = int(mt.group(3))
184
185         if code == 200:
186             self.target = self.collection.open(self.name, "wb")
187
188     def body_write(self, chunk):
189         if self._first_chunk:
190             self.headers_received()
191             self._first_chunk = False
192
193         self.count += len(chunk)
194         self.target.write(chunk)
195         loopnow = time.time()
196         if (loopnow - self.checkpoint) < 20:
197             return
198
199         bps = self.count / (loopnow - self.start)
200         if self.contentlength is not None:
201             logger.info("%2.1f%% complete, %6.2f MiB/s, %1.0f seconds left",
202                         ((self.count * 100) / self.contentlength),
203                         (bps / (1024.0*1024.0)),
204                         ((self.contentlength-self.count) // bps))
205         else:
206             logger.info("%d downloaded, %6.2f MiB/s", count, (bps / (1024.0*1024.0)))
207         self.checkpoint = loopnow
208
209
210 def _changed(url, clean_url, properties, now, curldownloader):
211     req = curldownloader.head(url)
212
213     if req.status_code != 200:
214         # Sometimes endpoints are misconfigured and will deny HEAD but
215         # allow GET so instead of failing here, we'll try GET If-None-Match
216         return True
217
218     # previous version of this code used "ETag", now we are
219     # normalizing to "Etag", check for both.
220     etag = properties[url].get("Etag") or properties[url].get("ETag")
221
222     if url in properties:
223         del properties[url]
224     _remember_headers(clean_url, properties, req.headers, now)
225
226     if "Etag" in req.headers and etag == req.headers["Etag"]:
227         # Didn't change
228         return False
229
230     return True
231
232 def _etag_quote(etag):
233     # if it already has leading and trailing quotes, do nothing
234     if etag[0] == '"' and etag[-1] == '"':
235         return etag
236     else:
237         # Add quotes.
238         return '"' + etag + '"'
239
240
241 def http_to_keep(api, project_uuid, url,
242                  utcnow=datetime.datetime.utcnow, varying_url_params="",
243                  prefer_cached_downloads=False):
244     """Download a file over HTTP and upload it to keep, with HTTP headers as metadata.
245
246     Before downloading the URL, checks to see if the URL already
247     exists in Keep and applies HTTP caching policy, the
248     varying_url_params and prefer_cached_downloads flags in order to
249     decide whether to use the version in Keep or re-download it.
250     """
251
252     logger.info("Checking Keep for %s", url)
253
254     varying_params = [s.strip() for s in varying_url_params.split(",")]
255
256     parsed = urllib.parse.urlparse(url)
257     query = [q for q in urllib.parse.parse_qsl(parsed.query)
258              if q[0] not in varying_params]
259
260     clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params,
261                                          urllib.parse.urlencode(query, safe="/"),  parsed.fragment))
262
263     r1 = api.collections().list(filters=[["properties", "exists", url]]).execute()
264
265     if clean_url == url:
266         items = r1["items"]
267     else:
268         r2 = api.collections().list(filters=[["properties", "exists", clean_url]]).execute()
269         items = r1["items"] + r2["items"]
270
271     now = utcnow()
272
273     etags = {}
274
275     curldownloader = _Downloader(api)
276
277     for item in items:
278         properties = item["properties"]
279
280         if clean_url in properties:
281             cache_url = clean_url
282         elif url in properties:
283             cache_url = url
284         else:
285             raise Exception("Shouldn't happen, got an API result for %s that doesn't have the URL in properties" % item["uuid"])
286
287         if prefer_cached_downloads or _fresh_cache(cache_url, properties, now):
288             # HTTP caching rules say we should use the cache
289             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
290             return (item["portable_data_hash"], next(iter(cr.keys())) )
291
292         if not _changed(cache_url, clean_url, properties, now, curldownloader):
293             # Etag didn't change, same content, just update headers
294             api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
295             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
296             return (item["portable_data_hash"], next(iter(cr.keys())))
297
298         for etagstr in ("Etag", "ETag"):
299             if etagstr in properties[cache_url] and len(properties[cache_url][etagstr]) > 2:
300                 etags[properties[cache_url][etagstr]] = item
301
302     logger.debug("Found ETag values %s", etags)
303
304     properties = {}
305     headers = {}
306     if etags:
307         headers['If-None-Match'] = ', '.join([_etag_quote(k) for k,v in etags.items()])
308     logger.debug("Sending GET request with headers %s", headers)
309
310     logger.info("Beginning download of %s", url)
311
312     req = curldownloader.download(url, headers)
313
314     c = curldownloader.collection
315
316     if req.status_code not in (200, 304):
317         raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
318
319     if curldownloader.target is not None:
320         curldownloader.target.close()
321
322     _remember_headers(clean_url, properties, req.headers, now)
323
324     if req.status_code == 304 and "Etag" in req.headers and req.headers["Etag"] in etags:
325         item = etags[req.headers["Etag"]]
326         item["properties"].update(properties)
327         api.collections().update(uuid=item["uuid"], body={"collection":{"properties": item["properties"]}}).execute()
328         cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
329         return (item["portable_data_hash"], list(cr.keys())[0])
330
331     logger.info("Download complete")
332
333     collectionname = "Downloaded from %s" % urllib.parse.quote(clean_url, safe='')
334
335     # max length - space to add a timestamp used by ensure_unique_name
336     max_name_len = 254 - 28
337
338     if len(collectionname) > max_name_len:
339         over = len(collectionname) - max_name_len
340         split = int(max_name_len/2)
341         collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
342
343     c.save_new(name=collectionname, owner_uuid=project_uuid, ensure_unique_name=True)
344
345     api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
346
347     return (c.portable_data_hash(), curldownloader.name)