Merge branch '20825-cwl-separate-runner' refs #20825
[arvados.git] / sdk / python / arvados / http_to_keep.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from future import standard_library
7 standard_library.install_aliases()
8
9 import email.utils
10 import time
11 import datetime
12 import re
13 import arvados
14 import arvados.collection
15 import urllib.parse
16 import logging
17 import calendar
18 import urllib.parse
19 import pycurl
20 import dataclasses
21 import typing
22 from arvados._pycurlhelper import PyCurlHelper
23
24 logger = logging.getLogger('arvados.http_import')
25
26 def _my_formatdate(dt):
27     return email.utils.formatdate(timeval=calendar.timegm(dt.timetuple()),
28                                   localtime=False, usegmt=True)
29
30 def _my_parsedate(text):
31     parsed = email.utils.parsedate_tz(text)
32     if parsed:
33         if parsed[9]:
34             # Adjust to UTC
35             return datetime.datetime(*parsed[:6]) + datetime.timedelta(seconds=parsed[9])
36         else:
37             # TZ is zero or missing, assume UTC.
38             return datetime.datetime(*parsed[:6])
39     else:
40         return datetime.datetime(1970, 1, 1)
41
42 def _fresh_cache(url, properties, now):
43     pr = properties[url]
44     expires = None
45
46     logger.debug("Checking cache freshness for %s using %s", url, pr)
47
48     if "Cache-Control" in pr:
49         if re.match(r"immutable", pr["Cache-Control"]):
50             return True
51
52         g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"])
53         if g:
54             expires = _my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2)))
55
56     if expires is None and "Expires" in pr:
57         expires = _my_parsedate(pr["Expires"])
58
59     if expires is None:
60         # Use a default cache time of 24 hours if upstream didn't set
61         # any cache headers, to reduce redundant downloads.
62         expires = _my_parsedate(pr["Date"]) + datetime.timedelta(hours=24)
63
64     if not expires:
65         return False
66
67     return (now < expires)
68
69 def _remember_headers(url, properties, headers, now):
70     properties.setdefault(url, {})
71     for h in ("Cache-Control", "Etag", "Expires", "Date", "Content-Length"):
72         if h in headers:
73             properties[url][h] = headers[h]
74     if "Date" not in headers:
75         properties[url]["Date"] = _my_formatdate(now)
76
77 @dataclasses.dataclass
78 class _Response:
79     status_code: int
80     headers: typing.Mapping[str, str]
81
82
83 class _Downloader(PyCurlHelper):
84     # Wait up to 60 seconds for connection
85     # How long it can be in "low bandwidth" state before it gives up
86     # Low bandwidth threshold is 32 KiB/s
87     DOWNLOADER_TIMEOUT = (60, 300, 32768)
88
89     def __init__(self, apiclient):
90         super(_Downloader, self).__init__(title_case_headers=True)
91         self.curl = pycurl.Curl()
92         self.curl.setopt(pycurl.NOSIGNAL, 1)
93         self.curl.setopt(pycurl.OPENSOCKETFUNCTION,
94                     lambda *args, **kwargs: self._socket_open(*args, **kwargs))
95         self.target = None
96         self.apiclient = apiclient
97
98     def head(self, url):
99         get_headers = {'Accept': 'application/octet-stream'}
100         self._headers = {}
101
102         self.curl.setopt(pycurl.URL, url.encode('utf-8'))
103         self.curl.setopt(pycurl.HTTPHEADER, [
104             '{}: {}'.format(k,v) for k,v in get_headers.items()])
105
106         self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
107         self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
108         self.curl.setopt(pycurl.NOBODY, True)
109         self.curl.setopt(pycurl.FOLLOWLOCATION, True)
110
111         self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, True)
112
113         try:
114             self.curl.perform()
115         except Exception as e:
116             raise arvados.errors.HttpError(0, str(e))
117         finally:
118             if self._socket:
119                 self._socket.close()
120                 self._socket = None
121
122         return _Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers)
123
124     def download(self, url, headers):
125         self.count = 0
126         self.start = time.time()
127         self.checkpoint = self.start
128         self._headers = {}
129         self._first_chunk = True
130         self.collection = None
131         self.parsedurl = urllib.parse.urlparse(url)
132
133         get_headers = {'Accept': 'application/octet-stream'}
134         get_headers.update(headers)
135
136         self.curl.setopt(pycurl.URL, url.encode('utf-8'))
137         self.curl.setopt(pycurl.HTTPHEADER, [
138             '{}: {}'.format(k,v) for k,v in get_headers.items()])
139
140         self.curl.setopt(pycurl.WRITEFUNCTION, self.body_write)
141         self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
142
143         self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
144         self.curl.setopt(pycurl.HTTPGET, True)
145         self.curl.setopt(pycurl.FOLLOWLOCATION, True)
146
147         self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, False)
148
149         try:
150             self.curl.perform()
151         except Exception as e:
152             raise arvados.errors.HttpError(0, str(e))
153         finally:
154             if self._socket:
155                 self._socket.close()
156                 self._socket = None
157
158         return _Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers)
159
160     def headers_received(self):
161         self.collection = arvados.collection.Collection(api_client=self.apiclient)
162
163         if "Content-Length" in self._headers:
164             self.contentlength = int(self._headers["Content-Length"])
165             logger.info("File size is %s bytes", self.contentlength)
166         else:
167             self.contentlength = None
168
169         if self._headers.get("Content-Disposition"):
170             grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))',
171                             self._headers["Content-Disposition"])
172             if grp.group(2):
173                 self.name = grp.group(2)
174             else:
175                 self.name = grp.group(4)
176         else:
177             self.name = self.parsedurl.path.split("/")[-1]
178
179         # Can't call curl.getinfo(pycurl.RESPONSE_CODE) until
180         # perform() is done but we need to know the status before that
181         # so we have to parse the status line ourselves.
182         mt = re.match(r'^HTTP\/(\d(\.\d)?) ([1-5]\d\d) ([^\r\n\x00-\x08\x0b\x0c\x0e-\x1f\x7f]*)\r\n$', self._headers["x-status-line"])
183         code = int(mt.group(3))
184
185         if not self.name:
186             logger.error("Cannot determine filename from URL or headers")
187             return
188
189         if code == 200:
190             self.target = self.collection.open(self.name, "wb")
191
192     def body_write(self, chunk):
193         if self._first_chunk:
194             self.headers_received()
195             self._first_chunk = False
196
197         self.count += len(chunk)
198
199         if self.target is None:
200             # "If this number is not equal to the size of the byte
201             # string, this signifies an error and libcurl will abort
202             # the request."
203             return 0
204
205         self.target.write(chunk)
206         loopnow = time.time()
207         if (loopnow - self.checkpoint) < 20:
208             return
209
210         bps = self.count / (loopnow - self.start)
211         if self.contentlength is not None:
212             logger.info("%2.1f%% complete, %6.2f MiB/s, %1.0f seconds left",
213                         ((self.count * 100) / self.contentlength),
214                         (bps / (1024.0*1024.0)),
215                         ((self.contentlength-self.count) // bps))
216         else:
217             logger.info("%d downloaded, %6.2f MiB/s", count, (bps / (1024.0*1024.0)))
218         self.checkpoint = loopnow
219
220
221 def _changed(url, clean_url, properties, now, curldownloader):
222     req = curldownloader.head(url)
223
224     if req.status_code != 200:
225         # Sometimes endpoints are misconfigured and will deny HEAD but
226         # allow GET so instead of failing here, we'll try GET If-None-Match
227         return True
228
229     # previous version of this code used "ETag", now we are
230     # normalizing to "Etag", check for both.
231     etag = properties[url].get("Etag") or properties[url].get("ETag")
232
233     if url in properties:
234         del properties[url]
235     _remember_headers(clean_url, properties, req.headers, now)
236
237     if "Etag" in req.headers and etag == req.headers["Etag"]:
238         # Didn't change
239         return False
240
241     return True
242
243 def _etag_quote(etag):
244     # if it already has leading and trailing quotes, do nothing
245     if etag[0] == '"' and etag[-1] == '"':
246         return etag
247     else:
248         # Add quotes.
249         return '"' + etag + '"'
250
251
252 def check_cached_url(api, project_uuid, url, etags,
253                      utcnow=datetime.datetime.utcnow,
254                      varying_url_params="",
255                      prefer_cached_downloads=False):
256
257     logger.info("Checking Keep for %s", url)
258
259     varying_params = [s.strip() for s in varying_url_params.split(",")]
260
261     parsed = urllib.parse.urlparse(url)
262     query = [q for q in urllib.parse.parse_qsl(parsed.query)
263              if q[0] not in varying_params]
264
265     clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params,
266                                          urllib.parse.urlencode(query, safe="/"),  parsed.fragment))
267
268     r1 = api.collections().list(filters=[["properties", "exists", url]]).execute()
269
270     if clean_url == url:
271         items = r1["items"]
272     else:
273         r2 = api.collections().list(filters=[["properties", "exists", clean_url]]).execute()
274         items = r1["items"] + r2["items"]
275
276     now = utcnow()
277
278     curldownloader = _Downloader(api)
279
280     for item in items:
281         properties = item["properties"]
282
283         if clean_url in properties:
284             cache_url = clean_url
285         elif url in properties:
286             cache_url = url
287         else:
288             raise Exception("Shouldn't happen, got an API result for %s that doesn't have the URL in properties" % item["uuid"])
289
290         if prefer_cached_downloads or _fresh_cache(cache_url, properties, now):
291             # HTTP caching rules say we should use the cache
292             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
293             return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now)
294
295         if not _changed(cache_url, clean_url, properties, now, curldownloader):
296             # Etag didn't change, same content, just update headers
297             api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
298             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
299             return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now)
300
301         for etagstr in ("Etag", "ETag"):
302             if etagstr in properties[cache_url] and len(properties[cache_url][etagstr]) > 2:
303                 etags[properties[cache_url][etagstr]] = item
304
305     logger.debug("Found ETag values %s", etags)
306
307     return (None, None, None, clean_url, now)
308
309
310 def http_to_keep(api, project_uuid, url,
311                  utcnow=datetime.datetime.utcnow, varying_url_params="",
312                  prefer_cached_downloads=False):
313     """Download a file over HTTP and upload it to keep, with HTTP headers as metadata.
314
315     Before downloading the URL, checks to see if the URL already
316     exists in Keep and applies HTTP caching policy, the
317     varying_url_params and prefer_cached_downloads flags in order to
318     decide whether to use the version in Keep or re-download it.
319     """
320
321     etags = {}
322     cache_result = check_cached_url(api, project_uuid, url, etags,
323                                     utcnow, varying_url_params,
324                                     prefer_cached_downloads)
325
326     if cache_result[0] is not None:
327         return cache_result
328
329     clean_url = cache_result[3]
330     now = cache_result[4]
331
332     properties = {}
333     headers = {}
334     if etags:
335         headers['If-None-Match'] = ', '.join([_etag_quote(k) for k,v in etags.items()])
336     logger.debug("Sending GET request with headers %s", headers)
337
338     logger.info("Beginning download of %s", url)
339
340     curldownloader = _Downloader(api)
341
342     req = curldownloader.download(url, headers)
343
344     c = curldownloader.collection
345
346     if req.status_code not in (200, 304):
347         raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
348
349     if curldownloader.target is not None:
350         curldownloader.target.close()
351
352     _remember_headers(clean_url, properties, req.headers, now)
353
354     if req.status_code == 304 and "Etag" in req.headers and req.headers["Etag"] in etags:
355         item = etags[req.headers["Etag"]]
356         item["properties"].update(properties)
357         api.collections().update(uuid=item["uuid"], body={"collection":{"properties": item["properties"]}}).execute()
358         cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
359         return (item["portable_data_hash"], list(cr.keys())[0], item["uuid"], clean_url, now)
360
361     logger.info("Download complete")
362
363     collectionname = "Downloaded from %s" % urllib.parse.quote(clean_url, safe='')
364
365     # max length - space to add a timestamp used by ensure_unique_name
366     max_name_len = 254 - 28
367
368     if len(collectionname) > max_name_len:
369         over = len(collectionname) - max_name_len
370         split = int(max_name_len/2)
371         collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
372
373     c.save_new(name=collectionname, owner_uuid=project_uuid, ensure_unique_name=True)
374
375     api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
376
377     return (c.portable_data_hash(), curldownloader.name, c.manifest_locator(), clean_url, now)