20937: Parallel collection copy
[arvados.git] / sdk / python / arvados / http_to_keep.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from future import standard_library
7 standard_library.install_aliases()
8
9 import email.utils
10 import time
11 import datetime
12 import re
13 import arvados
14 import arvados.collection
15 import urllib.parse
16 import logging
17 import calendar
18 import urllib.parse
19 import pycurl
20 import dataclasses
21 import typing
22 from arvados._pycurlhelper import PyCurlHelper
23
24 logger = logging.getLogger('arvados.http_import')
25
26 def _my_formatdate(dt):
27     return email.utils.formatdate(timeval=calendar.timegm(dt.timetuple()),
28                                   localtime=False, usegmt=True)
29
30 def _my_parsedate(text):
31     parsed = email.utils.parsedate_tz(text)
32     if parsed:
33         if parsed[9]:
34             # Adjust to UTC
35             return datetime.datetime(*parsed[:6]) + datetime.timedelta(seconds=parsed[9])
36         else:
37             # TZ is zero or missing, assume UTC.
38             return datetime.datetime(*parsed[:6])
39     else:
40         return datetime.datetime(1970, 1, 1)
41
42 def _fresh_cache(url, properties, now):
43     pr = properties[url]
44     expires = None
45
46     logger.debug("Checking cache freshness for %s using %s", url, pr)
47
48     if "Cache-Control" in pr:
49         if re.match(r"immutable", pr["Cache-Control"]):
50             return True
51
52         g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"])
53         if g:
54             expires = _my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2)))
55
56     if expires is None and "Expires" in pr:
57         expires = _my_parsedate(pr["Expires"])
58
59     if expires is None:
60         # Use a default cache time of 24 hours if upstream didn't set
61         # any cache headers, to reduce redundant downloads.
62         expires = _my_parsedate(pr["Date"]) + datetime.timedelta(hours=24)
63
64     if not expires:
65         return False
66
67     return (now < expires)
68
69 def _remember_headers(url, properties, headers, now):
70     properties.setdefault(url, {})
71     for h in ("Cache-Control", "Etag", "Expires", "Date", "Content-Length"):
72         if h in headers:
73             properties[url][h] = headers[h]
74     if "Date" not in headers:
75         properties[url]["Date"] = _my_formatdate(now)
76
77 @dataclasses.dataclass
78 class _Response:
79     status_code: int
80     headers: typing.Mapping[str, str]
81
82
83 class _Downloader(PyCurlHelper):
84     # Wait up to 60 seconds for connection
85     # How long it can be in "low bandwidth" state before it gives up
86     # Low bandwidth threshold is 32 KiB/s
87     DOWNLOADER_TIMEOUT = (60, 300, 32768)
88
89     def __init__(self, apiclient):
90         super(_Downloader, self).__init__(title_case_headers=True)
91         self.curl = pycurl.Curl()
92         self.curl.setopt(pycurl.NOSIGNAL, 1)
93         self.curl.setopt(pycurl.OPENSOCKETFUNCTION,
94                     lambda *args, **kwargs: self._socket_open(*args, **kwargs))
95         self.target = None
96         self.apiclient = apiclient
97
98     def head(self, url):
99         get_headers = {'Accept': 'application/octet-stream'}
100         self._headers = {}
101
102         self.curl.setopt(pycurl.URL, url.encode('utf-8'))
103         self.curl.setopt(pycurl.HTTPHEADER, [
104             '{}: {}'.format(k,v) for k,v in get_headers.items()])
105
106         self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
107         self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
108         self.curl.setopt(pycurl.NOBODY, True)
109         self.curl.setopt(pycurl.FOLLOWLOCATION, True)
110
111         self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, True)
112
113         try:
114             self.curl.perform()
115         except Exception as e:
116             raise arvados.errors.HttpError(0, str(e))
117         finally:
118             if self._socket:
119                 self._socket.close()
120                 self._socket = None
121
122         return _Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers)
123
124     def download(self, url, headers):
125         self.count = 0
126         self.start = time.time()
127         self.checkpoint = self.start
128         self._headers = {}
129         self._first_chunk = True
130         self.collection = None
131         self.parsedurl = urllib.parse.urlparse(url)
132
133         get_headers = {'Accept': 'application/octet-stream'}
134         get_headers.update(headers)
135
136         self.curl.setopt(pycurl.URL, url.encode('utf-8'))
137         self.curl.setopt(pycurl.HTTPHEADER, [
138             '{}: {}'.format(k,v) for k,v in get_headers.items()])
139
140         self.curl.setopt(pycurl.WRITEFUNCTION, self.body_write)
141         self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
142
143         self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
144         self.curl.setopt(pycurl.HTTPGET, True)
145         self.curl.setopt(pycurl.FOLLOWLOCATION, True)
146
147         self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, False)
148
149         try:
150             self.curl.perform()
151         except Exception as e:
152             raise arvados.errors.HttpError(0, str(e))
153         finally:
154             if self._socket:
155                 self._socket.close()
156                 self._socket = None
157
158         return _Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers)
159
160     def headers_received(self):
161         self.collection = arvados.collection.Collection(api_client=self.apiclient)
162
163         if "Content-Length" in self._headers:
164             self.contentlength = int(self._headers["Content-Length"])
165             logger.info("File size is %s bytes", self.contentlength)
166         else:
167             self.contentlength = None
168
169         if self._headers.get("Content-Disposition"):
170             grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))',
171                             self._headers["Content-Disposition"])
172             if grp.group(2):
173                 self.name = grp.group(2)
174             else:
175                 self.name = grp.group(4)
176         else:
177             self.name = self.parsedurl.path.split("/")[-1]
178
179         # Can't call curl.getinfo(pycurl.RESPONSE_CODE) until
180         # perform() is done but we need to know the status before that
181         # so we have to parse the status line ourselves.
182         mt = re.match(r'^HTTP\/(\d(\.\d)?) ([1-5]\d\d) ([^\r\n\x00-\x08\x0b\x0c\x0e-\x1f\x7f]*)\r\n$', self._headers["x-status-line"])
183         code = int(mt.group(3))
184
185         if code == 200:
186             self.target = self.collection.open(self.name, "wb")
187
188     def body_write(self, chunk):
189         if self._first_chunk:
190             self.headers_received()
191             self._first_chunk = False
192
193         self.count += len(chunk)
194         self.target.write(chunk)
195         loopnow = time.time()
196         if (loopnow - self.checkpoint) < 20:
197             return
198
199         bps = self.count / (loopnow - self.start)
200         if self.contentlength is not None:
201             logger.info("%2.1f%% complete, %6.2f MiB/s, %1.0f seconds left",
202                         ((self.count * 100) / self.contentlength),
203                         (bps / (1024.0*1024.0)),
204                         ((self.contentlength-self.count) // bps))
205         else:
206             logger.info("%d downloaded, %6.2f MiB/s", count, (bps / (1024.0*1024.0)))
207         self.checkpoint = loopnow
208
209
210 def _changed(url, clean_url, properties, now, curldownloader):
211     req = curldownloader.head(url)
212
213     if req.status_code != 200:
214         # Sometimes endpoints are misconfigured and will deny HEAD but
215         # allow GET so instead of failing here, we'll try GET If-None-Match
216         return True
217
218     # previous version of this code used "ETag", now we are
219     # normalizing to "Etag", check for both.
220     etag = properties[url].get("Etag") or properties[url].get("ETag")
221
222     if url in properties:
223         del properties[url]
224     _remember_headers(clean_url, properties, req.headers, now)
225
226     if "Etag" in req.headers and etag == req.headers["Etag"]:
227         # Didn't change
228         return False
229
230     return True
231
232 def _etag_quote(etag):
233     # if it already has leading and trailing quotes, do nothing
234     if etag[0] == '"' and etag[-1] == '"':
235         return etag
236     else:
237         # Add quotes.
238         return '"' + etag + '"'
239
240
241 def check_cached_url(api, project_uuid, url, etags,
242                      utcnow=datetime.datetime.utcnow,
243                      varying_url_params="",
244                      prefer_cached_downloads=False):
245
246     logger.info("Checking Keep for %s", url)
247
248     varying_params = [s.strip() for s in varying_url_params.split(",")]
249
250     parsed = urllib.parse.urlparse(url)
251     query = [q for q in urllib.parse.parse_qsl(parsed.query)
252              if q[0] not in varying_params]
253
254     clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params,
255                                          urllib.parse.urlencode(query, safe="/"),  parsed.fragment))
256
257     r1 = api.collections().list(filters=[["properties", "exists", url]]).execute()
258
259     if clean_url == url:
260         items = r1["items"]
261     else:
262         r2 = api.collections().list(filters=[["properties", "exists", clean_url]]).execute()
263         items = r1["items"] + r2["items"]
264
265     now = utcnow()
266
267     curldownloader = _Downloader(api)
268
269     for item in items:
270         properties = item["properties"]
271
272         if clean_url in properties:
273             cache_url = clean_url
274         elif url in properties:
275             cache_url = url
276         else:
277             raise Exception("Shouldn't happen, got an API result for %s that doesn't have the URL in properties" % item["uuid"])
278
279         if prefer_cached_downloads or _fresh_cache(cache_url, properties, now):
280             # HTTP caching rules say we should use the cache
281             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
282             return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now)
283
284         if not _changed(cache_url, clean_url, properties, now, curldownloader):
285             # Etag didn't change, same content, just update headers
286             api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
287             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
288             return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now)
289
290         for etagstr in ("Etag", "ETag"):
291             if etagstr in properties[cache_url] and len(properties[cache_url][etagstr]) > 2:
292                 etags[properties[cache_url][etagstr]] = item
293
294     logger.debug("Found ETag values %s", etags)
295
296     return (None, None, None, clean_url, now)
297
298
299 def http_to_keep(api, project_uuid, url,
300                  utcnow=datetime.datetime.utcnow, varying_url_params="",
301                  prefer_cached_downloads=False):
302     """Download a file over HTTP and upload it to keep, with HTTP headers as metadata.
303
304     Before downloading the URL, checks to see if the URL already
305     exists in Keep and applies HTTP caching policy, the
306     varying_url_params and prefer_cached_downloads flags in order to
307     decide whether to use the version in Keep or re-download it.
308     """
309
310     etags = {}
311     cache_result = check_cached_url(api, project_uuid, url, etags,
312                                     utcnow, varying_url_params,
313                                     prefer_cached_downloads)
314
315     if cache_result[0] is not None:
316         return cache_result
317
318     clean_url = cache_result[3]
319     now = cache_result[4]
320
321     properties = {}
322     headers = {}
323     if etags:
324         headers['If-None-Match'] = ', '.join([_etag_quote(k) for k,v in etags.items()])
325     logger.debug("Sending GET request with headers %s", headers)
326
327     logger.info("Beginning download of %s", url)
328
329     curldownloader = _Downloader(api)
330
331     req = curldownloader.download(url, headers)
332
333     c = curldownloader.collection
334
335     if req.status_code not in (200, 304):
336         raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
337
338     if curldownloader.target is not None:
339         curldownloader.target.close()
340
341     _remember_headers(clean_url, properties, req.headers, now)
342
343     if req.status_code == 304 and "Etag" in req.headers and req.headers["Etag"] in etags:
344         item = etags[req.headers["Etag"]]
345         item["properties"].update(properties)
346         api.collections().update(uuid=item["uuid"], body={"collection":{"properties": item["properties"]}}).execute()
347         cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
348         return (item["portable_data_hash"], list(cr.keys())[0])
349
350     logger.info("Download complete")
351
352     collectionname = "Downloaded from %s" % urllib.parse.quote(clean_url, safe='')
353
354     # max length - space to add a timestamp used by ensure_unique_name
355     max_name_len = 254 - 28
356
357     if len(collectionname) > max_name_len:
358         over = len(collectionname) - max_name_len
359         split = int(max_name_len/2)
360         collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
361
362     c.save_new(name=collectionname, owner_uuid=project_uuid, ensure_unique_name=True)
363
364     api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
365
366     return (c.portable_data_hash(), curldownloader.name, c.manifest_locator())