20257: Need to parse status code out of the header, because pyCurl
[arvados.git] / sdk / python / arvados / http_import.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from future import standard_library
7 standard_library.install_aliases()
8
9 import email.utils
10 import time
11 import datetime
12 import re
13 import arvados
14 import arvados.collection
15 import urllib.parse
16 import logging
17 import calendar
18 import urllib.parse
19 import pycurl
20 from arvados.pycurl import PyCurlHelper
21
22 logger = logging.getLogger('arvados.http_import')
23
24 def my_formatdate(dt):
25     return email.utils.formatdate(timeval=calendar.timegm(dt.timetuple()),
26                                   localtime=False, usegmt=True)
27
28 def my_parsedate(text):
29     parsed = email.utils.parsedate_tz(text)
30     if parsed:
31         if parsed[9]:
32             # Adjust to UTC
33             return datetime.datetime(*parsed[:6]) + datetime.timedelta(seconds=parsed[9])
34         else:
35             # TZ is zero or missing, assume UTC.
36             return datetime.datetime(*parsed[:6])
37     else:
38         return datetime.datetime(1970, 1, 1)
39
40 def fresh_cache(url, properties, now):
41     pr = properties[url]
42     expires = None
43
44     logger.debug("Checking cache freshness for %s using %s", url, pr)
45
46     if "Cache-Control" in pr:
47         if re.match(r"immutable", pr["Cache-Control"]):
48             return True
49
50         g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"])
51         if g:
52             expires = my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2)))
53
54     if expires is None and "Expires" in pr:
55         expires = my_parsedate(pr["Expires"])
56
57     if expires is None:
58         # Use a default cache time of 24 hours if upstream didn't set
59         # any cache headers, to reduce redundant downloads.
60         expires = my_parsedate(pr["Date"]) + datetime.timedelta(hours=24)
61
62     if not expires:
63         return False
64
65     return (now < expires)
66
67 def remember_headers(url, properties, headers, now):
68     properties.setdefault(url, {})
69     for h in ("Cache-Control", "Etag", "Expires", "Date", "Content-Length"):
70         if h in headers:
71             properties[url][h] = headers[h]
72     if "Date" not in headers:
73         properties[url]["Date"] = my_formatdate(now)
74
75 class Response:
76     def __init__(self, status_code, headers):
77         self.status_code = status_code
78         self.headers = headers
79
80 class CurlDownloader(PyCurlHelper):
81     # Wait up to 60 seconds for connection
82     # How long it can be in "low bandwidth" state before it gives up
83     # Low bandwidth threshold is 32 KiB/s
84     DOWNLOADER_TIMEOUT = (60, 300, 32768)
85
86     def __init__(self):
87         super(CurlDownloader, self).__init__(title_case_headers=True)
88         self.curl = pycurl.Curl()
89         self.curl.setopt(pycurl.NOSIGNAL, 1)
90         self.curl.setopt(pycurl.OPENSOCKETFUNCTION,
91                     lambda *args, **kwargs: self._socket_open(*args, **kwargs))
92         self.target = None
93
94     def head(self, url, headers={}):
95         get_headers = {'Accept': 'application/octet-stream'}
96         get_headers.update(headers)
97         self._headers = {}
98
99         self.curl.setopt(pycurl.URL, url.encode('utf-8'))
100         self.curl.setopt(pycurl.HTTPHEADER, [
101             '{}: {}'.format(k,v) for k,v in get_headers.items()])
102
103         self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
104         self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
105         self.curl.setopt(pycurl.NOBODY, True)
106         self.curl.setopt(pycurl.FOLLOWLOCATION, True)
107
108         self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, True)
109
110         try:
111             self.curl.perform()
112         except Exception as e:
113             raise arvados.errors.HttpError(0, str(e))
114         finally:
115             if self._socket:
116                 self._socket.close()
117                 self._socket = None
118
119         return Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers)
120
121     def download(self, url, headers):
122         self.count = 0
123         self.start = time.time()
124         self.checkpoint = self.start
125         self._headers = {}
126         self._first_chunk = True
127         self.collection = None
128         self.parsedurl = urllib.parse.urlparse(url)
129
130         get_headers = {'Accept': 'application/octet-stream'}
131         get_headers.update(headers)
132
133         self.curl.setopt(pycurl.URL, url.encode('utf-8'))
134         self.curl.setopt(pycurl.HTTPHEADER, [
135             '{}: {}'.format(k,v) for k,v in get_headers.items()])
136
137         self.curl.setopt(pycurl.WRITEFUNCTION, self.body_write)
138         self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
139
140         self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
141         self.curl.setopt(pycurl.HTTPGET, True)
142         self.curl.setopt(pycurl.FOLLOWLOCATION, True)
143
144         self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, False)
145
146         try:
147             self.curl.perform()
148         except Exception as e:
149             raise arvados.errors.HttpError(0, str(e))
150         finally:
151             if self._socket:
152                 self._socket.close()
153                 self._socket = None
154
155         return Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers)
156
157     def headers_received(self):
158         self.collection = arvados.collection.Collection()
159
160         if "Content-Length" in self._headers:
161             self.contentlength = int(self._headers["Content-Length"])
162             logger.info("File size is %s bytes", self.contentlength)
163         else:
164             self.contentlength = None
165
166         if self._headers.get("Content-Disposition"):
167             grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))',
168                             self._headers["Content-Disposition"])
169             if grp.group(2):
170                 self.name = grp.group(2)
171             else:
172                 self.name = grp.group(4)
173         else:
174             self.name = self.parsedurl.path.split("/")[-1]
175
176         mt = re.match(r'^HTTP\/(\d(\.\d)?) ([1-5]\d\d) ([^\r\n\x00-\x08\x0b\x0c\x0e-\x1f\x7f]*)\r\n$', self._headers["x-status-line"])
177         code = int(mt.group(3))
178
179         if code == 200:
180             self.target = self.collection.open(self.name, "wb")
181
182     def body_write(self, chunk):
183         if self._first_chunk:
184             self.headers_received()
185             self._first_chunk = False
186
187         self.count += len(chunk)
188         self.target.write(chunk)
189         loopnow = time.time()
190         if (loopnow - self.checkpoint) < 20:
191             return
192
193         bps = self.count / (loopnow - self.start)
194         if self.contentlength is not None:
195             logger.info("%2.1f%% complete, %6.2f MiB/s, %1.0f seconds left",
196                         ((self.count * 100) / self.contentlength),
197                         (bps / (1024.0*1024.0)),
198                         ((self.contentlength-self.count) // bps))
199         else:
200             logger.info("%d downloaded, %6.2f MiB/s", count, (bps / (1024.0*1024.0)))
201         self.checkpoint = loopnow
202
203
204 def changed(url, clean_url, properties, now, curldownloader):
205     req = curldownloader.head(url)
206
207     if req.status_code != 200:
208         # Sometimes endpoints are misconfigured and will deny HEAD but
209         # allow GET so instead of failing here, we'll try GET If-None-Match
210         return True
211
212     # previous version of this code used "ETag", now we are
213     # normalizing to "Etag", check for both.
214     etag = properties[url].get("Etag") or properties[url].get("ETag")
215
216     if url in properties:
217         del properties[url]
218     remember_headers(clean_url, properties, req.headers, now)
219
220     if "Etag" in req.headers and etag == req.headers["Etag"]:
221         # Didn't change
222         return False
223
224     return True
225
226 def etag_quote(etag):
227     # if it already has leading and trailing quotes, do nothing
228     if etag[0] == '"' and etag[-1] == '"':
229         return etag
230     else:
231         # Add quotes.
232         return '"' + etag + '"'
233
234
235 def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow, varying_url_params="", prefer_cached_downloads=False):
236     varying_params = [s.strip() for s in varying_url_params.split(",")]
237
238     parsed = urllib.parse.urlparse(url)
239     query = [q for q in urllib.parse.parse_qsl(parsed.query)
240              if q[0] not in varying_params]
241
242     clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params,
243                                          urllib.parse.urlencode(query, safe="/"),  parsed.fragment))
244
245     r1 = api.collections().list(filters=[["properties", "exists", url]]).execute()
246
247     if clean_url == url:
248         items = r1["items"]
249     else:
250         r2 = api.collections().list(filters=[["properties", "exists", clean_url]]).execute()
251         items = r1["items"] + r2["items"]
252
253     now = utcnow()
254
255     etags = {}
256
257     curldownloader = CurlDownloader()
258
259     for item in items:
260         properties = item["properties"]
261
262         if clean_url in properties:
263             cache_url = clean_url
264         elif url in properties:
265             cache_url = url
266         else:
267             return False
268
269         if prefer_cached_downloads or fresh_cache(cache_url, properties, now):
270             # HTTP caching rules say we should use the cache
271             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
272             return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
273
274         if not changed(cache_url, clean_url, properties, now, curldownloader):
275             # Etag didn't change, same content, just update headers
276             api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
277             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
278             return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
279
280         if "Etag" in properties[cache_url] and len(properties[cache_url]["Etag"]) > 2:
281             etags[properties[cache_url]["Etag"]] = item
282
283     logger.debug("Found Etags %s", etags)
284
285     properties = {}
286     headers = {}
287     if etags:
288         headers['If-None-Match'] = ', '.join([etag_quote(k) for k,v in etags.items()])
289     logger.debug("Sending GET request with headers %s", headers)
290
291     logger.info("Beginning download of %s", url)
292
293     req = curldownloader.download(url, headers)
294
295     c = curldownloader.collection
296
297     if req.status_code not in (200, 304):
298         raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
299
300     if curldownloader.target is not None:
301         curldownloader.target.close()
302
303     remember_headers(clean_url, properties, req.headers, now)
304
305     if req.status_code == 304 and "Etag" in req.headers and req.headers["Etag"] in etags:
306         item = etags[req.headers["Etag"]]
307         item["properties"].update(properties)
308         api.collections().update(uuid=item["uuid"], body={"collection":{"properties": item["properties"]}}).execute()
309         cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
310         return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
311
312     logger.info("Download complete")
313
314     collectionname = "Downloaded from %s" % urllib.parse.quote(clean_url, safe='')
315
316     # max length - space to add a timestamp used by ensure_unique_name
317     max_name_len = 254 - 28
318
319     if len(collectionname) > max_name_len:
320         over = len(collectionname) - max_name_len
321         split = int(max_name_len/2)
322         collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
323
324     c.save_new(name=collectionname, owner_uuid=project_uuid, ensure_unique_name=True)
325
326     api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}) #.execute()
327
328     return "keep:%s/%s" % (c.portable_data_hash(), curldownloader.name)