1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import division
6 from future import standard_library
7 standard_library.install_aliases()
14 import arvados.collection
22 from arvados._pycurlhelper import PyCurlHelper
24 logger = logging.getLogger('arvados.http_import')
26 def _my_formatdate(dt):
27 return email.utils.formatdate(timeval=calendar.timegm(dt.timetuple()),
28 localtime=False, usegmt=True)
30 def _my_parsedate(text):
31 parsed = email.utils.parsedate_tz(text)
35 return datetime.datetime(*parsed[:6]) + datetime.timedelta(seconds=parsed[9])
37 # TZ is zero or missing, assume UTC.
38 return datetime.datetime(*parsed[:6])
40 return datetime.datetime(1970, 1, 1)
42 def _fresh_cache(url, properties, now):
46 logger.debug("Checking cache freshness for %s using %s", url, pr)
48 if "Cache-Control" in pr:
49 if re.match(r"immutable", pr["Cache-Control"]):
52 g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"])
54 expires = _my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2)))
56 if expires is None and "Expires" in pr:
57 expires = _my_parsedate(pr["Expires"])
60 # Use a default cache time of 24 hours if upstream didn't set
61 # any cache headers, to reduce redundant downloads.
62 expires = _my_parsedate(pr["Date"]) + datetime.timedelta(hours=24)
67 return (now < expires)
69 def _remember_headers(url, properties, headers, now):
70 properties.setdefault(url, {})
71 for h in ("Cache-Control", "Etag", "Expires", "Date", "Content-Length"):
73 properties[url][h] = headers[h]
74 if "Date" not in headers:
75 properties[url]["Date"] = _my_formatdate(now)
77 @dataclasses.dataclass
80 headers: typing.Mapping[str, str]
83 class _Downloader(PyCurlHelper):
84 # Wait up to 60 seconds for connection
85 # How long it can be in "low bandwidth" state before it gives up
86 # Low bandwidth threshold is 32 KiB/s
87 DOWNLOADER_TIMEOUT = (60, 300, 32768)
89 def __init__(self, apiclient):
90 super(_Downloader, self).__init__(title_case_headers=True)
91 self.curl = pycurl.Curl()
92 self.curl.setopt(pycurl.NOSIGNAL, 1)
93 self.curl.setopt(pycurl.OPENSOCKETFUNCTION,
94 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
96 self.apiclient = apiclient
99 get_headers = {'Accept': 'application/octet-stream'}
102 self.curl.setopt(pycurl.URL, url.encode('utf-8'))
103 self.curl.setopt(pycurl.HTTPHEADER, [
104 '{}: {}'.format(k,v) for k,v in get_headers.items()])
106 self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
107 self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
108 self.curl.setopt(pycurl.NOBODY, True)
109 self.curl.setopt(pycurl.FOLLOWLOCATION, True)
111 self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, True)
115 except Exception as e:
116 raise arvados.errors.HttpError(0, str(e))
122 return _Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers)
124 def download(self, url, headers):
126 self.start = time.time()
127 self.checkpoint = self.start
129 self._first_chunk = True
130 self.collection = None
131 self.parsedurl = urllib.parse.urlparse(url)
133 get_headers = {'Accept': 'application/octet-stream'}
134 get_headers.update(headers)
136 self.curl.setopt(pycurl.URL, url.encode('utf-8'))
137 self.curl.setopt(pycurl.HTTPHEADER, [
138 '{}: {}'.format(k,v) for k,v in get_headers.items()])
140 self.curl.setopt(pycurl.WRITEFUNCTION, self.body_write)
141 self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
143 self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
144 self.curl.setopt(pycurl.HTTPGET, True)
145 self.curl.setopt(pycurl.FOLLOWLOCATION, True)
147 self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, False)
151 except Exception as e:
152 raise arvados.errors.HttpError(0, str(e))
158 return _Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers)
160 def headers_received(self):
161 self.collection = arvados.collection.Collection(api_client=self.apiclient)
163 if "Content-Length" in self._headers:
164 self.contentlength = int(self._headers["Content-Length"])
165 logger.info("File size is %s bytes", self.contentlength)
167 self.contentlength = None
169 if self._headers.get("Content-Disposition"):
170 grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))',
171 self._headers["Content-Disposition"])
173 self.name = grp.group(2)
175 self.name = grp.group(4)
177 self.name = self.parsedurl.path.split("/")[-1]
179 # Can't call curl.getinfo(pycurl.RESPONSE_CODE) until
180 # perform() is done but we need to know the status before that
181 # so we have to parse the status line ourselves.
182 mt = re.match(r'^HTTP\/(\d(\.\d)?) ([1-5]\d\d) ([^\r\n\x00-\x08\x0b\x0c\x0e-\x1f\x7f]*)\r\n$', self._headers["x-status-line"])
183 code = int(mt.group(3))
186 self.target = self.collection.open(self.name, "wb")
188 def body_write(self, chunk):
189 if self._first_chunk:
190 self.headers_received()
191 self._first_chunk = False
193 self.count += len(chunk)
194 self.target.write(chunk)
195 loopnow = time.time()
196 if (loopnow - self.checkpoint) < 20:
199 bps = self.count / (loopnow - self.start)
200 if self.contentlength is not None:
201 logger.info("%2.1f%% complete, %6.2f MiB/s, %1.0f seconds left",
202 ((self.count * 100) / self.contentlength),
203 (bps / (1024.0*1024.0)),
204 ((self.contentlength-self.count) // bps))
206 logger.info("%d downloaded, %6.2f MiB/s", count, (bps / (1024.0*1024.0)))
207 self.checkpoint = loopnow
210 def _changed(url, clean_url, properties, now, curldownloader):
211 req = curldownloader.head(url)
213 if req.status_code != 200:
214 # Sometimes endpoints are misconfigured and will deny HEAD but
215 # allow GET so instead of failing here, we'll try GET If-None-Match
218 # previous version of this code used "ETag", now we are
219 # normalizing to "Etag", check for both.
220 etag = properties[url].get("Etag") or properties[url].get("ETag")
222 if url in properties:
224 _remember_headers(clean_url, properties, req.headers, now)
226 if "Etag" in req.headers and etag == req.headers["Etag"]:
232 def _etag_quote(etag):
233 # if it already has leading and trailing quotes, do nothing
234 if etag[0] == '"' and etag[-1] == '"':
238 return '"' + etag + '"'
241 def check_cached_url(api, project_uuid, url, etags,
242 utcnow=datetime.datetime.utcnow,
243 varying_url_params="",
244 prefer_cached_downloads=False):
246 logger.info("Checking Keep for %s", url)
248 varying_params = [s.strip() for s in varying_url_params.split(",")]
250 parsed = urllib.parse.urlparse(url)
251 query = [q for q in urllib.parse.parse_qsl(parsed.query)
252 if q[0] not in varying_params]
254 clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params,
255 urllib.parse.urlencode(query, safe="/"), parsed.fragment))
257 r1 = api.collections().list(filters=[["properties", "exists", url]]).execute()
262 r2 = api.collections().list(filters=[["properties", "exists", clean_url]]).execute()
263 items = r1["items"] + r2["items"]
267 curldownloader = _Downloader(api)
270 properties = item["properties"]
272 if clean_url in properties:
273 cache_url = clean_url
274 elif url in properties:
277 raise Exception("Shouldn't happen, got an API result for %s that doesn't have the URL in properties" % item["uuid"])
279 if prefer_cached_downloads or _fresh_cache(cache_url, properties, now):
280 # HTTP caching rules say we should use the cache
281 cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
282 return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now)
284 if not _changed(cache_url, clean_url, properties, now, curldownloader):
285 # Etag didn't change, same content, just update headers
286 api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
287 cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
288 return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now)
290 for etagstr in ("Etag", "ETag"):
291 if etagstr in properties[cache_url] and len(properties[cache_url][etagstr]) > 2:
292 etags[properties[cache_url][etagstr]] = item
294 logger.debug("Found ETag values %s", etags)
296 return (None, None, None, clean_url, now)
299 def http_to_keep(api, project_uuid, url,
300 utcnow=datetime.datetime.utcnow, varying_url_params="",
301 prefer_cached_downloads=False):
302 """Download a file over HTTP and upload it to keep, with HTTP headers as metadata.
304 Before downloading the URL, checks to see if the URL already
305 exists in Keep and applies HTTP caching policy, the
306 varying_url_params and prefer_cached_downloads flags in order to
307 decide whether to use the version in Keep or re-download it.
311 cache_result = check_cached_url(api, project_uuid, url, etags,
312 utcnow, varying_url_params,
313 prefer_cached_downloads)
315 if cache_result[0] is not None:
318 clean_url = cache_result[3]
319 now = cache_result[4]
324 headers['If-None-Match'] = ', '.join([_etag_quote(k) for k,v in etags.items()])
325 logger.debug("Sending GET request with headers %s", headers)
327 logger.info("Beginning download of %s", url)
329 curldownloader = _Downloader(api)
331 req = curldownloader.download(url, headers)
333 c = curldownloader.collection
335 if req.status_code not in (200, 304):
336 raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
338 if curldownloader.target is not None:
339 curldownloader.target.close()
341 _remember_headers(clean_url, properties, req.headers, now)
343 if req.status_code == 304 and "Etag" in req.headers and req.headers["Etag"] in etags:
344 item = etags[req.headers["Etag"]]
345 item["properties"].update(properties)
346 api.collections().update(uuid=item["uuid"], body={"collection":{"properties": item["properties"]}}).execute()
347 cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
348 return (item["portable_data_hash"], list(cr.keys())[0])
350 logger.info("Download complete")
352 collectionname = "Downloaded from %s" % urllib.parse.quote(clean_url, safe='')
354 # max length - space to add a timestamp used by ensure_unique_name
355 max_name_len = 254 - 28
357 if len(collectionname) > max_name_len:
358 over = len(collectionname) - max_name_len
359 split = int(max_name_len/2)
360 collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
362 c.save_new(name=collectionname, owner_uuid=project_uuid, ensure_unique_name=True)
364 api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
366 return (c.portable_data_hash(), curldownloader.name, c.manifest_locator())