1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import division
6 from future import standard_library
7 standard_library.install_aliases()
14 import arvados.collection
22 from arvados._pycurlhelper import PyCurlHelper
24 logger = logging.getLogger('arvados.http_import')
26 def _my_formatdate(dt):
27 return email.utils.formatdate(timeval=calendar.timegm(dt.timetuple()),
28 localtime=False, usegmt=True)
30 def _my_parsedate(text):
31 parsed = email.utils.parsedate_tz(text)
35 return datetime.datetime(*parsed[:6]) + datetime.timedelta(seconds=parsed[9])
37 # TZ is zero or missing, assume UTC.
38 return datetime.datetime(*parsed[:6])
40 return datetime.datetime(1970, 1, 1)
42 def _fresh_cache(url, properties, now):
46 logger.debug("Checking cache freshness for %s using %s", url, pr)
48 if "Cache-Control" in pr:
49 if re.match(r"immutable", pr["Cache-Control"]):
52 g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"])
54 expires = _my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2)))
56 if expires is None and "Expires" in pr:
57 expires = _my_parsedate(pr["Expires"])
60 # Use a default cache time of 24 hours if upstream didn't set
61 # any cache headers, to reduce redundant downloads.
62 expires = _my_parsedate(pr["Date"]) + datetime.timedelta(hours=24)
67 return (now < expires)
69 def _remember_headers(url, properties, headers, now):
70 properties.setdefault(url, {})
71 for h in ("Cache-Control", "Etag", "Expires", "Date", "Content-Length"):
73 properties[url][h] = headers[h]
74 if "Date" not in headers:
75 properties[url]["Date"] = _my_formatdate(now)
77 @dataclasses.dataclass
80 headers: typing.Mapping[str, str]
83 class _Downloader(PyCurlHelper):
84 # Wait up to 60 seconds for connection
85 # How long it can be in "low bandwidth" state before it gives up
86 # Low bandwidth threshold is 32 KiB/s
87 DOWNLOADER_TIMEOUT = (60, 300, 32768)
89 def __init__(self, apiclient):
90 super(_Downloader, self).__init__(title_case_headers=True)
91 self.curl = pycurl.Curl()
92 self.curl.setopt(pycurl.NOSIGNAL, 1)
93 self.curl.setopt(pycurl.OPENSOCKETFUNCTION,
94 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
96 self.apiclient = apiclient
99 get_headers = {'Accept': 'application/octet-stream'}
102 self.curl.setopt(pycurl.URL, url.encode('utf-8'))
103 self.curl.setopt(pycurl.HTTPHEADER, [
104 '{}: {}'.format(k,v) for k,v in get_headers.items()])
106 self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
107 self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
108 self.curl.setopt(pycurl.NOBODY, True)
109 self.curl.setopt(pycurl.FOLLOWLOCATION, True)
111 self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, True)
115 except Exception as e:
116 raise arvados.errors.HttpError(0, str(e))
122 return _Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers)
124 def download(self, url, headers):
126 self.start = time.time()
127 self.checkpoint = self.start
129 self._first_chunk = True
130 self.collection = None
131 self.parsedurl = urllib.parse.urlparse(url)
133 get_headers = {'Accept': 'application/octet-stream'}
134 get_headers.update(headers)
136 self.curl.setopt(pycurl.URL, url.encode('utf-8'))
137 self.curl.setopt(pycurl.HTTPHEADER, [
138 '{}: {}'.format(k,v) for k,v in get_headers.items()])
140 self.curl.setopt(pycurl.WRITEFUNCTION, self.body_write)
141 self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
143 self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
144 self.curl.setopt(pycurl.HTTPGET, True)
145 self.curl.setopt(pycurl.FOLLOWLOCATION, True)
147 self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, False)
151 except Exception as e:
152 raise arvados.errors.HttpError(0, str(e))
158 return _Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers)
160 def headers_received(self):
161 self.collection = arvados.collection.Collection(api_client=self.apiclient)
163 if "Content-Length" in self._headers:
164 self.contentlength = int(self._headers["Content-Length"])
165 logger.info("File size is %s bytes", self.contentlength)
167 self.contentlength = None
169 if self._headers.get("Content-Disposition"):
170 grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))',
171 self._headers["Content-Disposition"])
173 self.name = grp.group(2)
175 self.name = grp.group(4)
177 self.name = self.parsedurl.path.split("/")[-1]
179 # Can't call curl.getinfo(pycurl.RESPONSE_CODE) until
180 # perform() is done but we need to know the status before that
181 # so we have to parse the status line ourselves.
182 mt = re.match(r'^HTTP\/(\d(\.\d)?) ([1-5]\d\d) ([^\r\n\x00-\x08\x0b\x0c\x0e-\x1f\x7f]*)\r\n$', self._headers["x-status-line"])
183 code = int(mt.group(3))
186 logger.error("Cannot determine filename from URL or headers")
190 self.target = self.collection.open(self.name, "wb")
192 def body_write(self, chunk):
193 if self._first_chunk:
194 self.headers_received()
195 self._first_chunk = False
197 self.count += len(chunk)
199 if self.target is None:
200 # "If this number is not equal to the size of the byte
201 # string, this signifies an error and libcurl will abort
205 self.target.write(chunk)
206 loopnow = time.time()
207 if (loopnow - self.checkpoint) < 20:
210 bps = self.count / (loopnow - self.start)
211 if self.contentlength is not None:
212 logger.info("%2.1f%% complete, %6.2f MiB/s, %1.0f seconds left",
213 ((self.count * 100) / self.contentlength),
214 (bps / (1024.0*1024.0)),
215 ((self.contentlength-self.count) // bps))
217 logger.info("%d downloaded, %6.2f MiB/s", count, (bps / (1024.0*1024.0)))
218 self.checkpoint = loopnow
221 def _changed(url, clean_url, properties, now, curldownloader):
222 req = curldownloader.head(url)
224 if req.status_code != 200:
225 # Sometimes endpoints are misconfigured and will deny HEAD but
226 # allow GET so instead of failing here, we'll try GET If-None-Match
229 # previous version of this code used "ETag", now we are
230 # normalizing to "Etag", check for both.
231 etag = properties[url].get("Etag") or properties[url].get("ETag")
233 if url in properties:
235 _remember_headers(clean_url, properties, req.headers, now)
237 if "Etag" in req.headers and etag == req.headers["Etag"]:
243 def _etag_quote(etag):
244 # if it already has leading and trailing quotes, do nothing
245 if etag[0] == '"' and etag[-1] == '"':
249 return '"' + etag + '"'
252 def check_cached_url(api, project_uuid, url, etags,
253 utcnow=datetime.datetime.utcnow,
254 varying_url_params="",
255 prefer_cached_downloads=False):
257 logger.info("Checking Keep for %s", url)
259 varying_params = [s.strip() for s in varying_url_params.split(",")]
261 parsed = urllib.parse.urlparse(url)
262 query = [q for q in urllib.parse.parse_qsl(parsed.query)
263 if q[0] not in varying_params]
265 clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params,
266 urllib.parse.urlencode(query, safe="/"), parsed.fragment))
268 r1 = api.collections().list(filters=[["properties", "exists", url]]).execute()
273 r2 = api.collections().list(filters=[["properties", "exists", clean_url]]).execute()
274 items = r1["items"] + r2["items"]
278 curldownloader = _Downloader(api)
281 properties = item["properties"]
283 if clean_url in properties:
284 cache_url = clean_url
285 elif url in properties:
288 raise Exception("Shouldn't happen, got an API result for %s that doesn't have the URL in properties" % item["uuid"])
290 if prefer_cached_downloads or _fresh_cache(cache_url, properties, now):
291 # HTTP caching rules say we should use the cache
292 cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
293 return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now)
295 if not _changed(cache_url, clean_url, properties, now, curldownloader):
296 # Etag didn't change, same content, just update headers
297 api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
298 cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
299 return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now)
301 for etagstr in ("Etag", "ETag"):
302 if etagstr in properties[cache_url] and len(properties[cache_url][etagstr]) > 2:
303 etags[properties[cache_url][etagstr]] = item
305 logger.debug("Found ETag values %s", etags)
307 return (None, None, None, clean_url, now)
310 def http_to_keep(api, project_uuid, url,
311 utcnow=datetime.datetime.utcnow, varying_url_params="",
312 prefer_cached_downloads=False):
313 """Download a file over HTTP and upload it to keep, with HTTP headers as metadata.
315 Before downloading the URL, checks to see if the URL already
316 exists in Keep and applies HTTP caching policy, the
317 varying_url_params and prefer_cached_downloads flags in order to
318 decide whether to use the version in Keep or re-download it.
322 cache_result = check_cached_url(api, project_uuid, url, etags,
323 utcnow, varying_url_params,
324 prefer_cached_downloads)
326 if cache_result[0] is not None:
329 clean_url = cache_result[3]
330 now = cache_result[4]
335 headers['If-None-Match'] = ', '.join([_etag_quote(k) for k,v in etags.items()])
336 logger.debug("Sending GET request with headers %s", headers)
338 logger.info("Beginning download of %s", url)
340 curldownloader = _Downloader(api)
342 req = curldownloader.download(url, headers)
344 c = curldownloader.collection
346 if req.status_code not in (200, 304):
347 raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
349 if curldownloader.target is not None:
350 curldownloader.target.close()
352 _remember_headers(clean_url, properties, req.headers, now)
354 if req.status_code == 304 and "Etag" in req.headers and req.headers["Etag"] in etags:
355 item = etags[req.headers["Etag"]]
356 item["properties"].update(properties)
357 api.collections().update(uuid=item["uuid"], body={"collection":{"properties": item["properties"]}}).execute()
358 cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
359 return (item["portable_data_hash"], list(cr.keys())[0], item["uuid"], clean_url, now)
361 logger.info("Download complete")
363 collectionname = "Downloaded from %s" % urllib.parse.quote(clean_url, safe='')
365 # max length - space to add a timestamp used by ensure_unique_name
366 max_name_len = 254 - 28
368 if len(collectionname) > max_name_len:
369 over = len(collectionname) - max_name_len
370 split = int(max_name_len/2)
371 collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
373 c.save_new(name=collectionname, owner_uuid=project_uuid, ensure_unique_name=True)
375 api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
377 return (c.portable_data_hash(), curldownloader.name, c.manifest_locator(), clean_url, now)