1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import division
6 from future import standard_library
7 standard_library.install_aliases()
14 import arvados.collection
22 from arvados._pycurlhelper import PyCurlHelper
24 logger = logging.getLogger('arvados.http_import')
26 def _my_formatdate(dt):
27 return email.utils.formatdate(timeval=calendar.timegm(dt.timetuple()),
28 localtime=False, usegmt=True)
30 def _my_parsedate(text):
31 parsed = email.utils.parsedate_tz(text)
35 return datetime.datetime(*parsed[:6]) + datetime.timedelta(seconds=parsed[9])
37 # TZ is zero or missing, assume UTC.
38 return datetime.datetime(*parsed[:6])
40 return datetime.datetime(1970, 1, 1)
42 def _fresh_cache(url, properties, now):
46 logger.debug("Checking cache freshness for %s using %s", url, pr)
48 if "Cache-Control" in pr:
49 if re.match(r"immutable", pr["Cache-Control"]):
52 g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"])
54 expires = _my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2)))
56 if expires is None and "Expires" in pr:
57 expires = _my_parsedate(pr["Expires"])
60 # Use a default cache time of 24 hours if upstream didn't set
61 # any cache headers, to reduce redundant downloads.
62 expires = _my_parsedate(pr["Date"]) + datetime.timedelta(hours=24)
67 return (now < expires)
69 def _remember_headers(url, properties, headers, now):
70 properties.setdefault(url, {})
71 for h in ("Cache-Control", "Etag", "Expires", "Date", "Content-Length"):
73 properties[url][h] = headers[h]
74 if "Date" not in headers:
75 properties[url]["Date"] = _my_formatdate(now)
77 @dataclasses.dataclass
80 headers: typing.Mapping[str, str]
83 class _Downloader(PyCurlHelper):
84 # Wait up to 60 seconds for connection
85 # How long it can be in "low bandwidth" state before it gives up
86 # Low bandwidth threshold is 32 KiB/s
87 DOWNLOADER_TIMEOUT = (60, 300, 32768)
89 def __init__(self, apiclient):
90 super(_Downloader, self).__init__(title_case_headers=True)
91 self.curl = pycurl.Curl()
92 self.curl.setopt(pycurl.NOSIGNAL, 1)
93 self.curl.setopt(pycurl.OPENSOCKETFUNCTION,
94 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
96 self.apiclient = apiclient
99 get_headers = {'Accept': 'application/octet-stream'}
102 self.curl.setopt(pycurl.URL, url.encode('utf-8'))
103 self.curl.setopt(pycurl.HTTPHEADER, [
104 '{}: {}'.format(k,v) for k,v in get_headers.items()])
106 self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
107 self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
108 self.curl.setopt(pycurl.NOBODY, True)
109 self.curl.setopt(pycurl.FOLLOWLOCATION, True)
111 self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, True)
115 except Exception as e:
116 raise arvados.errors.HttpError(0, str(e))
122 return _Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers)
124 def download(self, url, headers):
126 self.start = time.time()
127 self.checkpoint = self.start
129 self._first_chunk = True
130 self.collection = None
131 self.parsedurl = urllib.parse.urlparse(url)
133 get_headers = {'Accept': 'application/octet-stream'}
134 get_headers.update(headers)
136 self.curl.setopt(pycurl.URL, url.encode('utf-8'))
137 self.curl.setopt(pycurl.HTTPHEADER, [
138 '{}: {}'.format(k,v) for k,v in get_headers.items()])
140 self.curl.setopt(pycurl.WRITEFUNCTION, self.body_write)
141 self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
143 self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
144 self.curl.setopt(pycurl.HTTPGET, True)
145 self.curl.setopt(pycurl.FOLLOWLOCATION, True)
147 self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, False)
151 except Exception as e:
152 raise arvados.errors.HttpError(0, str(e))
158 return _Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers)
160 def headers_received(self):
161 self.collection = arvados.collection.Collection(api_client=self.apiclient)
163 if "Content-Length" in self._headers:
164 self.contentlength = int(self._headers["Content-Length"])
165 logger.info("File size is %s bytes", self.contentlength)
167 self.contentlength = None
169 if self._headers.get("Content-Disposition"):
170 grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))',
171 self._headers["Content-Disposition"])
173 self.name = grp.group(2)
175 self.name = grp.group(4)
177 self.name = self.parsedurl.path.split("/")[-1]
179 # Can't call curl.getinfo(pycurl.RESPONSE_CODE) until
180 # perform() is done but we need to know the status before that
181 # so we have to parse the status line ourselves.
182 mt = re.match(r'^HTTP\/(\d(\.\d)?) ([1-5]\d\d) ([^\r\n\x00-\x08\x0b\x0c\x0e-\x1f\x7f]*)\r\n$', self._headers["x-status-line"])
183 code = int(mt.group(3))
186 self.target = self.collection.open(self.name, "wb")
188 def body_write(self, chunk):
189 if self._first_chunk:
190 self.headers_received()
191 self._first_chunk = False
193 self.count += len(chunk)
194 self.target.write(chunk)
195 loopnow = time.time()
196 if (loopnow - self.checkpoint) < 20:
199 bps = self.count / (loopnow - self.start)
200 if self.contentlength is not None:
201 logger.info("%2.1f%% complete, %6.2f MiB/s, %1.0f seconds left",
202 ((self.count * 100) / self.contentlength),
203 (bps / (1024.0*1024.0)),
204 ((self.contentlength-self.count) // bps))
206 logger.info("%d downloaded, %6.2f MiB/s", count, (bps / (1024.0*1024.0)))
207 self.checkpoint = loopnow
210 def _changed(url, clean_url, properties, now, curldownloader):
211 req = curldownloader.head(url)
213 if req.status_code != 200:
214 # Sometimes endpoints are misconfigured and will deny HEAD but
215 # allow GET so instead of failing here, we'll try GET If-None-Match
218 # previous version of this code used "ETag", now we are
219 # normalizing to "Etag", check for both.
220 etag = properties[url].get("Etag") or properties[url].get("ETag")
222 if url in properties:
224 _remember_headers(clean_url, properties, req.headers, now)
226 if "Etag" in req.headers and etag == req.headers["Etag"]:
232 def _etag_quote(etag):
233 # if it already has leading and trailing quotes, do nothing
234 if etag[0] == '"' and etag[-1] == '"':
238 return '"' + etag + '"'
241 def http_to_keep(api, project_uuid, url,
242 utcnow=datetime.datetime.utcnow, varying_url_params="",
243 prefer_cached_downloads=False):
244 """Download a file over HTTP and upload it to keep, with HTTP headers as metadata.
246 Before downloading the URL, checks to see if the URL already
247 exists in Keep and applies HTTP caching policy, the
248 varying_url_params and prefer_cached_downloads flags in order to
249 decide whether to use the version in Keep or re-download it.
252 logger.info("Checking Keep for %s", url)
254 varying_params = [s.strip() for s in varying_url_params.split(",")]
256 parsed = urllib.parse.urlparse(url)
257 query = [q for q in urllib.parse.parse_qsl(parsed.query)
258 if q[0] not in varying_params]
260 clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params,
261 urllib.parse.urlencode(query, safe="/"), parsed.fragment))
263 r1 = api.collections().list(filters=[["properties", "exists", url]]).execute()
268 r2 = api.collections().list(filters=[["properties", "exists", clean_url]]).execute()
269 items = r1["items"] + r2["items"]
275 curldownloader = _Downloader(api)
278 properties = item["properties"]
280 if clean_url in properties:
281 cache_url = clean_url
282 elif url in properties:
285 raise Exception("Shouldn't happen, got an API result for %s that doesn't have the URL in properties" % item["uuid"])
287 if prefer_cached_downloads or _fresh_cache(cache_url, properties, now):
288 # HTTP caching rules say we should use the cache
289 cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
290 return (item["portable_data_hash"], next(iter(cr.keys())) )
292 if not _changed(cache_url, clean_url, properties, now, curldownloader):
293 # Etag didn't change, same content, just update headers
294 api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
295 cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
296 return (item["portable_data_hash"], next(iter(cr.keys())))
298 for etagstr in ("Etag", "ETag"):
299 if etagstr in properties[cache_url] and len(properties[cache_url][etagstr]) > 2:
300 etags[properties[cache_url][etagstr]] = item
302 logger.debug("Found ETag values %s", etags)
307 headers['If-None-Match'] = ', '.join([_etag_quote(k) for k,v in etags.items()])
308 logger.debug("Sending GET request with headers %s", headers)
310 logger.info("Beginning download of %s", url)
312 req = curldownloader.download(url, headers)
314 c = curldownloader.collection
316 if req.status_code not in (200, 304):
317 raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
319 if curldownloader.target is not None:
320 curldownloader.target.close()
322 _remember_headers(clean_url, properties, req.headers, now)
324 if req.status_code == 304 and "Etag" in req.headers and req.headers["Etag"] in etags:
325 item = etags[req.headers["Etag"]]
326 item["properties"].update(properties)
327 api.collections().update(uuid=item["uuid"], body={"collection":{"properties": item["properties"]}}).execute()
328 cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
329 return (item["portable_data_hash"], list(cr.keys())[0])
331 logger.info("Download complete")
333 collectionname = "Downloaded from %s" % urllib.parse.quote(clean_url, safe='')
335 # max length - space to add a timestamp used by ensure_unique_name
336 max_name_len = 254 - 28
338 if len(collectionname) > max_name_len:
339 over = len(collectionname) - max_name_len
340 split = int(max_name_len/2)
341 collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
343 c.save_new(name=collectionname, owner_uuid=project_uuid, ensure_unique_name=True)
345 api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
347 return (c.portable_data_hash(), curldownloader.name)