Merge branch '12917-group-contents-include-container'
[arvados.git] / sdk / python / arvados / http_to_keep.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import calendar
6 import dataclasses
7 import datetime
8 import email.utils
9 import logging
10 import re
11 import time
12 import typing
13 import urllib.parse
14
15 import pycurl
16
17 import arvados
18 import arvados.collection
19 from arvados._pycurlhelper import PyCurlHelper
20
21 logger = logging.getLogger('arvados.http_import')
22
23 def _my_formatdate(dt):
24     return email.utils.formatdate(timeval=calendar.timegm(dt.timetuple()),
25                                   localtime=False, usegmt=True)
26
27 def _my_parsedate(text):
28     parsed = email.utils.parsedate_tz(text)
29     if parsed:
30         if parsed[9]:
31             # Adjust to UTC
32             return datetime.datetime(*parsed[:6]) + datetime.timedelta(seconds=parsed[9])
33         else:
34             # TZ is zero or missing, assume UTC.
35             return datetime.datetime(*parsed[:6])
36     else:
37         return datetime.datetime(1970, 1, 1)
38
39 def _fresh_cache(url, properties, now):
40     pr = properties[url]
41     expires = None
42
43     logger.debug("Checking cache freshness for %s using %s", url, pr)
44
45     if "Cache-Control" in pr:
46         if re.match(r"immutable", pr["Cache-Control"]):
47             return True
48
49         g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"])
50         if g:
51             expires = _my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2)))
52
53     if expires is None and "Expires" in pr:
54         expires = _my_parsedate(pr["Expires"])
55
56     if expires is None:
57         # Use a default cache time of 24 hours if upstream didn't set
58         # any cache headers, to reduce redundant downloads.
59         expires = _my_parsedate(pr["Date"]) + datetime.timedelta(hours=24)
60
61     if not expires:
62         return False
63
64     return (now < expires)
65
66 def _remember_headers(url, properties, headers, now):
67     properties.setdefault(url, {})
68     for h in ("Cache-Control", "Etag", "Expires", "Date", "Content-Length"):
69         if h in headers:
70             properties[url][h] = headers[h]
71     if "Date" not in headers:
72         properties[url]["Date"] = _my_formatdate(now)
73
74 @dataclasses.dataclass
75 class _Response:
76     status_code: int
77     headers: typing.Mapping[str, str]
78
79
80 class _Downloader(PyCurlHelper):
81     # Wait up to 60 seconds for connection
82     # How long it can be in "low bandwidth" state before it gives up
83     # Low bandwidth threshold is 32 KiB/s
84     DOWNLOADER_TIMEOUT = (60, 300, 32768)
85
86     def __init__(self, apiclient):
87         super(_Downloader, self).__init__(title_case_headers=True)
88         self.curl = pycurl.Curl()
89         self.curl.setopt(pycurl.NOSIGNAL, 1)
90         self.curl.setopt(pycurl.OPENSOCKETFUNCTION,
91                     lambda *args, **kwargs: self._socket_open(*args, **kwargs))
92         self.target = None
93         self.apiclient = apiclient
94
95     def head(self, url):
96         get_headers = {'Accept': 'application/octet-stream'}
97         self._headers = {}
98
99         self.curl.setopt(pycurl.URL, url.encode('utf-8'))
100         self.curl.setopt(pycurl.HTTPHEADER, [
101             '{}: {}'.format(k,v) for k,v in get_headers.items()])
102
103         self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
104         self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
105         self.curl.setopt(pycurl.NOBODY, True)
106         self.curl.setopt(pycurl.FOLLOWLOCATION, True)
107
108         self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, True)
109
110         try:
111             self.curl.perform()
112         except Exception as e:
113             raise arvados.errors.HttpError(0, str(e))
114         finally:
115             if self._socket:
116                 self._socket.close()
117                 self._socket = None
118
119         return _Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers)
120
121     def download(self, url, headers):
122         self.count = 0
123         self.start = time.time()
124         self.checkpoint = self.start
125         self._headers = {}
126         self._first_chunk = True
127         self.collection = None
128         self.parsedurl = urllib.parse.urlparse(url)
129
130         get_headers = {'Accept': 'application/octet-stream'}
131         get_headers.update(headers)
132
133         self.curl.setopt(pycurl.URL, url.encode('utf-8'))
134         self.curl.setopt(pycurl.HTTPHEADER, [
135             '{}: {}'.format(k,v) for k,v in get_headers.items()])
136
137         self.curl.setopt(pycurl.WRITEFUNCTION, self.body_write)
138         self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
139
140         self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
141         self.curl.setopt(pycurl.HTTPGET, True)
142         self.curl.setopt(pycurl.FOLLOWLOCATION, True)
143
144         self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, False)
145
146         try:
147             self.curl.perform()
148         except Exception as e:
149             raise arvados.errors.HttpError(0, str(e))
150         finally:
151             if self._socket:
152                 self._socket.close()
153                 self._socket = None
154
155         return _Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers)
156
157     def headers_received(self):
158         self.collection = arvados.collection.Collection(api_client=self.apiclient)
159
160         if "Content-Length" in self._headers:
161             self.contentlength = int(self._headers["Content-Length"])
162             logger.info("File size is %s bytes", self.contentlength)
163         else:
164             self.contentlength = None
165
166         if self._headers.get("Content-Disposition"):
167             grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))',
168                             self._headers["Content-Disposition"])
169             if grp.group(2):
170                 self.name = grp.group(2)
171             else:
172                 self.name = grp.group(4)
173         else:
174             self.name = self.parsedurl.path.split("/")[-1]
175
176         # Can't call curl.getinfo(pycurl.RESPONSE_CODE) until
177         # perform() is done but we need to know the status before that
178         # so we have to parse the status line ourselves.
179         mt = re.match(r'^HTTP\/(\d(\.\d)?) ([1-5]\d\d) ([^\r\n\x00-\x08\x0b\x0c\x0e-\x1f\x7f]*)\r\n$', self._headers["x-status-line"])
180         code = int(mt.group(3))
181
182         if not self.name:
183             logger.error("Cannot determine filename from URL or headers")
184             return
185
186         if code == 200:
187             self.target = self.collection.open(self.name, "wb")
188
189     def body_write(self, chunk):
190         if self._first_chunk:
191             self.headers_received()
192             self._first_chunk = False
193
194         self.count += len(chunk)
195
196         if self.target is None:
197             # "If this number is not equal to the size of the byte
198             # string, this signifies an error and libcurl will abort
199             # the request."
200             return 0
201
202         self.target.write(chunk)
203         loopnow = time.time()
204         if (loopnow - self.checkpoint) < 20:
205             return
206
207         bps = self.count / (loopnow - self.start)
208         if self.contentlength is not None:
209             logger.info("%2.1f%% complete, %6.2f MiB/s, %1.0f seconds left",
210                         ((self.count * 100) / self.contentlength),
211                         (bps / (1024.0*1024.0)),
212                         ((self.contentlength-self.count) // bps))
213         else:
214             logger.info("%d downloaded, %6.2f MiB/s", count, (bps / (1024.0*1024.0)))
215         self.checkpoint = loopnow
216
217
218 def _changed(url, clean_url, properties, now, curldownloader):
219     req = curldownloader.head(url)
220
221     if req.status_code != 200:
222         # Sometimes endpoints are misconfigured and will deny HEAD but
223         # allow GET so instead of failing here, we'll try GET If-None-Match
224         return True
225
226     # previous version of this code used "ETag", now we are
227     # normalizing to "Etag", check for both.
228     etag = properties[url].get("Etag") or properties[url].get("ETag")
229
230     if url in properties:
231         del properties[url]
232     _remember_headers(clean_url, properties, req.headers, now)
233
234     if "Etag" in req.headers and etag == req.headers["Etag"]:
235         # Didn't change
236         return False
237
238     return True
239
240 def _etag_quote(etag):
241     # if it already has leading and trailing quotes, do nothing
242     if etag[0] == '"' and etag[-1] == '"':
243         return etag
244     else:
245         # Add quotes.
246         return '"' + etag + '"'
247
248
249 def check_cached_url(api, project_uuid, url, etags,
250                      utcnow=datetime.datetime.utcnow,
251                      varying_url_params="",
252                      prefer_cached_downloads=False):
253
254     logger.info("Checking Keep for %s", url)
255
256     varying_params = [s.strip() for s in varying_url_params.split(",")]
257
258     parsed = urllib.parse.urlparse(url)
259     query = [q for q in urllib.parse.parse_qsl(parsed.query)
260              if q[0] not in varying_params]
261
262     clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params,
263                                          urllib.parse.urlencode(query, safe="/"),  parsed.fragment))
264
265     r1 = api.collections().list(filters=[["properties", "exists", url]]).execute()
266
267     if clean_url == url:
268         items = r1["items"]
269     else:
270         r2 = api.collections().list(filters=[["properties", "exists", clean_url]]).execute()
271         items = r1["items"] + r2["items"]
272
273     now = utcnow()
274
275     curldownloader = _Downloader(api)
276
277     for item in items:
278         properties = item["properties"]
279
280         if clean_url in properties:
281             cache_url = clean_url
282         elif url in properties:
283             cache_url = url
284         else:
285             raise Exception("Shouldn't happen, got an API result for %s that doesn't have the URL in properties" % item["uuid"])
286
287         if prefer_cached_downloads or _fresh_cache(cache_url, properties, now):
288             # HTTP caching rules say we should use the cache
289             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
290             return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now)
291
292         if not _changed(cache_url, clean_url, properties, now, curldownloader):
293             # Etag didn't change, same content, just update headers
294             api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
295             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
296             return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now)
297
298         for etagstr in ("Etag", "ETag"):
299             if etagstr in properties[cache_url] and len(properties[cache_url][etagstr]) > 2:
300                 etags[properties[cache_url][etagstr]] = item
301
302     logger.debug("Found ETag values %s", etags)
303
304     return (None, None, None, clean_url, now)
305
306
307 def http_to_keep(api, project_uuid, url,
308                  utcnow=datetime.datetime.utcnow, varying_url_params="",
309                  prefer_cached_downloads=False):
310     """Download a file over HTTP and upload it to keep, with HTTP headers as metadata.
311
312     Before downloading the URL, checks to see if the URL already
313     exists in Keep and applies HTTP caching policy, the
314     varying_url_params and prefer_cached_downloads flags in order to
315     decide whether to use the version in Keep or re-download it.
316     """
317
318     etags = {}
319     cache_result = check_cached_url(api, project_uuid, url, etags,
320                                     utcnow, varying_url_params,
321                                     prefer_cached_downloads)
322
323     if cache_result[0] is not None:
324         return cache_result
325
326     clean_url = cache_result[3]
327     now = cache_result[4]
328
329     properties = {}
330     headers = {}
331     if etags:
332         headers['If-None-Match'] = ', '.join([_etag_quote(k) for k,v in etags.items()])
333     logger.debug("Sending GET request with headers %s", headers)
334
335     logger.info("Beginning download of %s", url)
336
337     curldownloader = _Downloader(api)
338
339     req = curldownloader.download(url, headers)
340
341     c = curldownloader.collection
342
343     if req.status_code not in (200, 304):
344         raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
345
346     if curldownloader.target is not None:
347         curldownloader.target.close()
348
349     _remember_headers(clean_url, properties, req.headers, now)
350
351     if req.status_code == 304 and "Etag" in req.headers and req.headers["Etag"] in etags:
352         item = etags[req.headers["Etag"]]
353         item["properties"].update(properties)
354         api.collections().update(uuid=item["uuid"], body={"collection":{"properties": item["properties"]}}).execute()
355         cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
356         return (item["portable_data_hash"], list(cr.keys())[0], item["uuid"], clean_url, now)
357
358     logger.info("Download complete")
359
360     collectionname = "Downloaded from %s" % urllib.parse.quote(clean_url, safe='')
361
362     # max length - space to add a timestamp used by ensure_unique_name
363     max_name_len = 254 - 28
364
365     if len(collectionname) > max_name_len:
366         over = len(collectionname) - max_name_len
367         split = int(max_name_len/2)
368         collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
369
370     c.save_new(name=collectionname, owner_uuid=project_uuid, ensure_unique_name=True)
371
372     api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
373
374     return (c.portable_data_hash(), curldownloader.name, c.manifest_locator(), clean_url, now)