20257: Set INFO log level explicitly
[arvados.git] / sdk / python / arvados / http_import.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from future import standard_library
7 standard_library.install_aliases()
8
9 import email.utils
10 import time
11 import datetime
12 import re
13 import arvados
14 import arvados.collection
15 import urllib.parse
16 import logging
17 import calendar
18 import urllib.parse
19 import pycurl
20 from arvados.pycurl import PyCurlHelper
21
22 logger = logging.getLogger('arvados.http_import')
23
24 def my_formatdate(dt):
25     return email.utils.formatdate(timeval=calendar.timegm(dt.timetuple()),
26                                   localtime=False, usegmt=True)
27
28 def my_parsedate(text):
29     parsed = email.utils.parsedate_tz(text)
30     if parsed:
31         if parsed[9]:
32             # Adjust to UTC
33             return datetime.datetime(*parsed[:6]) + datetime.timedelta(seconds=parsed[9])
34         else:
35             # TZ is zero or missing, assume UTC.
36             return datetime.datetime(*parsed[:6])
37     else:
38         return datetime.datetime(1970, 1, 1)
39
40 def fresh_cache(url, properties, now):
41     pr = properties[url]
42     expires = None
43
44     logger.debug("Checking cache freshness for %s using %s", url, pr)
45
46     if "Cache-Control" in pr:
47         if re.match(r"immutable", pr["Cache-Control"]):
48             return True
49
50         g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"])
51         if g:
52             expires = my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2)))
53
54     if expires is None and "Expires" in pr:
55         expires = my_parsedate(pr["Expires"])
56
57     if expires is None:
58         # Use a default cache time of 24 hours if upstream didn't set
59         # any cache headers, to reduce redundant downloads.
60         expires = my_parsedate(pr["Date"]) + datetime.timedelta(hours=24)
61
62     if not expires:
63         return False
64
65     return (now < expires)
66
67 def remember_headers(url, properties, headers, now):
68     properties.setdefault(url, {})
69     for h in ("Cache-Control", "Etag", "Expires", "Date", "Content-Length"):
70         if h in headers:
71             properties[url][h] = headers[h]
72     if "Date" not in headers:
73         properties[url]["Date"] = my_formatdate(now)
74
75 class Response:
76     def __init__(self, status_code, headers):
77         self.status_code = status_code
78         self.headers = headers
79
80 class CurlDownloader(PyCurlHelper):
81     # Wait up to 60 seconds for connection
82     # How long it can be in "low bandwidth" state before it gives up
83     # Low bandwidth threshold is 32 KiB/s
84     DOWNLOADER_TIMEOUT = (60, 300, 32768)
85
86     def __init__(self):
87         super(CurlDownloader, self).__init__(title_case_headers=True)
88         self.curl = pycurl.Curl()
89         self.curl.setopt(pycurl.NOSIGNAL, 1)
90         self.curl.setopt(pycurl.OPENSOCKETFUNCTION,
91                     lambda *args, **kwargs: self._socket_open(*args, **kwargs))
92         self.target = None
93
94     def head(self, url, headers={}):
95         get_headers = {'Accept': 'application/octet-stream'}
96         get_headers.update(headers)
97         self._headers = {}
98
99         self.curl.setopt(pycurl.URL, url.encode('utf-8'))
100         self.curl.setopt(pycurl.HTTPHEADER, [
101             '{}: {}'.format(k,v) for k,v in get_headers.items()])
102
103         self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
104         self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
105         self.curl.setopt(pycurl.NOBODY, True)
106         self.curl.setopt(pycurl.FOLLOWLOCATION, True)
107
108         self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, True)
109
110         try:
111             self.curl.perform()
112         except Exception as e:
113             raise arvados.errors.HttpError(0, str(e))
114         finally:
115             if self._socket:
116                 self._socket.close()
117                 self._socket = None
118
119         return Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers)
120
121     def download(self, url, headers):
122         self.count = 0
123         self.start = time.time()
124         self.checkpoint = self.start
125         self._headers = {}
126         self._first_chunk = True
127         self.collection = None
128         self.parsedurl = urllib.parse.urlparse(url)
129
130         get_headers = {'Accept': 'application/octet-stream'}
131         get_headers.update(headers)
132
133         self.curl.setopt(pycurl.URL, url.encode('utf-8'))
134         self.curl.setopt(pycurl.HTTPHEADER, [
135             '{}: {}'.format(k,v) for k,v in get_headers.items()])
136
137         self.curl.setopt(pycurl.WRITEFUNCTION, self.body_write)
138         self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
139
140         self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
141         self.curl.setopt(pycurl.HTTPGET, True)
142         self.curl.setopt(pycurl.FOLLOWLOCATION, True)
143
144         self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, False)
145
146         try:
147             self.curl.perform()
148         except Exception as e:
149             raise arvados.errors.HttpError(0, str(e))
150         finally:
151             if self._socket:
152                 self._socket.close()
153                 self._socket = None
154
155         return Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers)
156
157     def headers_received(self):
158         self.collection = arvados.collection.Collection()
159
160         if "Content-Length" in self._headers:
161             self.contentlength = int(self._headers["Content-Length"])
162             logger.info("File size is %s bytes", self.contentlength)
163         else:
164             self.contentlength = None
165
166         if self._headers.get("Content-Disposition"):
167             grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))',
168                             self._headers["Content-Disposition"])
169             if grp.group(2):
170                 self.name = grp.group(2)
171             else:
172                 self.name = grp.group(4)
173         else:
174             self.name = self.parsedurl.path.split("/")[-1]
175
176         mt = re.match(r'^HTTP\/(\d(\.\d)?) ([1-5]\d\d) ([^\r\n\x00-\x08\x0b\x0c\x0e-\x1f\x7f]*)\r\n$', self._headers["x-status-line"])
177         code = int(mt.group(3))
178
179         if code == 200:
180             self.target = self.collection.open(self.name, "wb")
181
182     def body_write(self, chunk):
183         if self._first_chunk:
184             self.headers_received()
185             self._first_chunk = False
186
187         self.count += len(chunk)
188         self.target.write(chunk)
189         loopnow = time.time()
190         if (loopnow - self.checkpoint) < 20:
191             return
192
193         bps = self.count / (loopnow - self.start)
194         if self.contentlength is not None:
195             logger.info("%2.1f%% complete, %6.2f MiB/s, %1.0f seconds left",
196                         ((self.count * 100) / self.contentlength),
197                         (bps / (1024.0*1024.0)),
198                         ((self.contentlength-self.count) // bps))
199         else:
200             logger.info("%d downloaded, %6.2f MiB/s", count, (bps / (1024.0*1024.0)))
201         self.checkpoint = loopnow
202
203
204 def changed(url, clean_url, properties, now, curldownloader):
205     req = curldownloader.head(url)
206
207     if req.status_code != 200:
208         # Sometimes endpoints are misconfigured and will deny HEAD but
209         # allow GET so instead of failing here, we'll try GET If-None-Match
210         return True
211
212     # previous version of this code used "ETag", now we are
213     # normalizing to "Etag", check for both.
214     etag = properties[url].get("Etag") or properties[url].get("ETag")
215
216     if url in properties:
217         del properties[url]
218     remember_headers(clean_url, properties, req.headers, now)
219
220     if "Etag" in req.headers and etag == req.headers["Etag"]:
221         # Didn't change
222         return False
223
224     return True
225
226 def etag_quote(etag):
227     # if it already has leading and trailing quotes, do nothing
228     if etag[0] == '"' and etag[-1] == '"':
229         return etag
230     else:
231         # Add quotes.
232         return '"' + etag + '"'
233
234
235 def http_to_keep(api, project_uuid, url,
236                  utcnow=datetime.datetime.utcnow, varying_url_params="",
237                  prefer_cached_downloads=False):
238
239     logger.info("Checking Keep for %s", url)
240
241     varying_params = [s.strip() for s in varying_url_params.split(",")]
242
243     parsed = urllib.parse.urlparse(url)
244     query = [q for q in urllib.parse.parse_qsl(parsed.query)
245              if q[0] not in varying_params]
246
247     clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params,
248                                          urllib.parse.urlencode(query, safe="/"),  parsed.fragment))
249
250     r1 = api.collections().list(filters=[["properties", "exists", url]]).execute()
251
252     if clean_url == url:
253         items = r1["items"]
254     else:
255         r2 = api.collections().list(filters=[["properties", "exists", clean_url]]).execute()
256         items = r1["items"] + r2["items"]
257
258     now = utcnow()
259
260     etags = {}
261
262     curldownloader = CurlDownloader()
263
264     for item in items:
265         properties = item["properties"]
266
267         if clean_url in properties:
268             cache_url = clean_url
269         elif url in properties:
270             cache_url = url
271         else:
272             return False
273
274         if prefer_cached_downloads or fresh_cache(cache_url, properties, now):
275             # HTTP caching rules say we should use the cache
276             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
277             return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
278
279         if not changed(cache_url, clean_url, properties, now, curldownloader):
280             # Etag didn't change, same content, just update headers
281             api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
282             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
283             return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
284
285         if "Etag" in properties[cache_url] and len(properties[cache_url]["Etag"]) > 2:
286             etags[properties[cache_url]["Etag"]] = item
287
288     logger.debug("Found Etags %s", etags)
289
290     properties = {}
291     headers = {}
292     if etags:
293         headers['If-None-Match'] = ', '.join([etag_quote(k) for k,v in etags.items()])
294     logger.debug("Sending GET request with headers %s", headers)
295
296     logger.info("Beginning download of %s", url)
297
298     req = curldownloader.download(url, headers)
299
300     c = curldownloader.collection
301
302     if req.status_code not in (200, 304):
303         raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
304
305     if curldownloader.target is not None:
306         curldownloader.target.close()
307
308     remember_headers(clean_url, properties, req.headers, now)
309
310     if req.status_code == 304 and "Etag" in req.headers and req.headers["Etag"] in etags:
311         item = etags[req.headers["Etag"]]
312         item["properties"].update(properties)
313         api.collections().update(uuid=item["uuid"], body={"collection":{"properties": item["properties"]}}).execute()
314         cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
315         return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
316
317     logger.info("Download complete")
318
319     collectionname = "Downloaded from %s" % urllib.parse.quote(clean_url, safe='')
320
321     # max length - space to add a timestamp used by ensure_unique_name
322     max_name_len = 254 - 28
323
324     if len(collectionname) > max_name_len:
325         over = len(collectionname) - max_name_len
326         split = int(max_name_len/2)
327         collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
328
329     c.save_new(name=collectionname, owner_uuid=project_uuid, ensure_unique_name=True)
330
331     api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}) #.execute()
332
333     return "keep:%s/%s" % (c.portable_data_hash(), curldownloader.name)