1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
18 import arvados.collection
19 from arvados._pycurlhelper import PyCurlHelper
21 logger = logging.getLogger('arvados.http_import')
23 def _my_formatdate(dt):
24 return email.utils.formatdate(timeval=calendar.timegm(dt.timetuple()),
25 localtime=False, usegmt=True)
27 def _my_parsedate(text):
28 parsed = email.utils.parsedate_tz(text)
32 return datetime.datetime(*parsed[:6]) + datetime.timedelta(seconds=parsed[9])
34 # TZ is zero or missing, assume UTC.
35 return datetime.datetime(*parsed[:6])
37 return datetime.datetime(1970, 1, 1)
39 def _fresh_cache(url, properties, now):
43 logger.debug("Checking cache freshness for %s using %s", url, pr)
45 if "Cache-Control" in pr:
46 if re.match(r"immutable", pr["Cache-Control"]):
49 g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"])
51 expires = _my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2)))
53 if expires is None and "Expires" in pr:
54 expires = _my_parsedate(pr["Expires"])
57 # Use a default cache time of 24 hours if upstream didn't set
58 # any cache headers, to reduce redundant downloads.
59 expires = _my_parsedate(pr["Date"]) + datetime.timedelta(hours=24)
64 return (now < expires)
66 def _remember_headers(url, properties, headers, now):
67 properties.setdefault(url, {})
68 for h in ("Cache-Control", "Etag", "Expires", "Date", "Content-Length"):
70 properties[url][h] = headers[h]
71 if "Date" not in headers:
72 properties[url]["Date"] = _my_formatdate(now)
74 @dataclasses.dataclass
77 headers: typing.Mapping[str, str]
80 class _Downloader(PyCurlHelper):
81 # Wait up to 60 seconds for connection
82 # How long it can be in "low bandwidth" state before it gives up
83 # Low bandwidth threshold is 32 KiB/s
84 DOWNLOADER_TIMEOUT = (60, 300, 32768)
86 def __init__(self, apiclient):
87 super(_Downloader, self).__init__(title_case_headers=True)
88 self.curl = pycurl.Curl()
89 self.curl.setopt(pycurl.NOSIGNAL, 1)
90 self.curl.setopt(pycurl.OPENSOCKETFUNCTION,
91 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
93 self.apiclient = apiclient
96 get_headers = {'Accept': 'application/octet-stream'}
99 self.curl.setopt(pycurl.URL, url.encode('utf-8'))
100 self.curl.setopt(pycurl.HTTPHEADER, [
101 '{}: {}'.format(k,v) for k,v in get_headers.items()])
103 self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
104 self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
105 self.curl.setopt(pycurl.NOBODY, True)
106 self.curl.setopt(pycurl.FOLLOWLOCATION, True)
108 self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, True)
112 except Exception as e:
113 raise arvados.errors.HttpError(0, str(e))
119 return _Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers)
121 def download(self, url, headers):
123 self.start = time.time()
124 self.checkpoint = self.start
126 self._first_chunk = True
127 self.collection = None
128 self.parsedurl = urllib.parse.urlparse(url)
130 get_headers = {'Accept': 'application/octet-stream'}
131 get_headers.update(headers)
133 self.curl.setopt(pycurl.URL, url.encode('utf-8'))
134 self.curl.setopt(pycurl.HTTPHEADER, [
135 '{}: {}'.format(k,v) for k,v in get_headers.items()])
137 self.curl.setopt(pycurl.WRITEFUNCTION, self.body_write)
138 self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
140 self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
141 self.curl.setopt(pycurl.HTTPGET, True)
142 self.curl.setopt(pycurl.FOLLOWLOCATION, True)
144 self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, False)
148 except Exception as e:
149 raise arvados.errors.HttpError(0, str(e))
155 return _Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers)
157 def headers_received(self):
158 self.collection = arvados.collection.Collection(api_client=self.apiclient)
160 if "Content-Length" in self._headers:
161 self.contentlength = int(self._headers["Content-Length"])
162 logger.info("File size is %s bytes", self.contentlength)
164 self.contentlength = None
166 if self._headers.get("Content-Disposition"):
167 grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))',
168 self._headers["Content-Disposition"])
170 self.name = grp.group(2)
172 self.name = grp.group(4)
174 self.name = self.parsedurl.path.split("/")[-1]
176 # Can't call curl.getinfo(pycurl.RESPONSE_CODE) until
177 # perform() is done but we need to know the status before that
178 # so we have to parse the status line ourselves.
179 mt = re.match(r'^HTTP\/(\d(\.\d)?) ([1-5]\d\d) ([^\r\n\x00-\x08\x0b\x0c\x0e-\x1f\x7f]*)\r\n$', self._headers["x-status-line"])
180 code = int(mt.group(3))
183 logger.error("Cannot determine filename from URL or headers")
187 self.target = self.collection.open(self.name, "wb")
189 def body_write(self, chunk):
190 if self._first_chunk:
191 self.headers_received()
192 self._first_chunk = False
194 self.count += len(chunk)
196 if self.target is None:
197 # "If this number is not equal to the size of the byte
198 # string, this signifies an error and libcurl will abort
202 self.target.write(chunk)
203 loopnow = time.time()
204 if (loopnow - self.checkpoint) < 20:
207 bps = self.count / (loopnow - self.start)
208 if self.contentlength is not None:
209 logger.info("%2.1f%% complete, %6.2f MiB/s, %1.0f seconds left",
210 ((self.count * 100) / self.contentlength),
211 (bps / (1024.0*1024.0)),
212 ((self.contentlength-self.count) // bps))
214 logger.info("%d downloaded, %6.2f MiB/s", count, (bps / (1024.0*1024.0)))
215 self.checkpoint = loopnow
218 def _changed(url, clean_url, properties, now, curldownloader):
219 req = curldownloader.head(url)
221 if req.status_code != 200:
222 # Sometimes endpoints are misconfigured and will deny HEAD but
223 # allow GET so instead of failing here, we'll try GET If-None-Match
226 # previous version of this code used "ETag", now we are
227 # normalizing to "Etag", check for both.
228 etag = properties[url].get("Etag") or properties[url].get("ETag")
230 if url in properties:
232 _remember_headers(clean_url, properties, req.headers, now)
234 if "Etag" in req.headers and etag == req.headers["Etag"]:
240 def _etag_quote(etag):
241 # if it already has leading and trailing quotes, do nothing
242 if etag[0] == '"' and etag[-1] == '"':
246 return '"' + etag + '"'
249 def check_cached_url(api, project_uuid, url, etags,
250 utcnow=datetime.datetime.utcnow,
251 varying_url_params="",
252 prefer_cached_downloads=False):
254 logger.info("Checking Keep for %s", url)
256 varying_params = [s.strip() for s in varying_url_params.split(",")]
258 parsed = urllib.parse.urlparse(url)
259 query = [q for q in urllib.parse.parse_qsl(parsed.query)
260 if q[0] not in varying_params]
262 clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params,
263 urllib.parse.urlencode(query, safe="/"), parsed.fragment))
265 r1 = api.collections().list(filters=[["properties", "exists", url]]).execute()
270 r2 = api.collections().list(filters=[["properties", "exists", clean_url]]).execute()
271 items = r1["items"] + r2["items"]
275 curldownloader = _Downloader(api)
278 properties = item["properties"]
280 if clean_url in properties:
281 cache_url = clean_url
282 elif url in properties:
285 raise Exception("Shouldn't happen, got an API result for %s that doesn't have the URL in properties" % item["uuid"])
287 if prefer_cached_downloads or _fresh_cache(cache_url, properties, now):
288 # HTTP caching rules say we should use the cache
289 cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
290 return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now)
292 if not _changed(cache_url, clean_url, properties, now, curldownloader):
293 # Etag didn't change, same content, just update headers
294 api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
295 cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
296 return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now)
298 for etagstr in ("Etag", "ETag"):
299 if etagstr in properties[cache_url] and len(properties[cache_url][etagstr]) > 2:
300 etags[properties[cache_url][etagstr]] = item
302 logger.debug("Found ETag values %s", etags)
304 return (None, None, None, clean_url, now)
307 def http_to_keep(api, project_uuid, url,
308 utcnow=datetime.datetime.utcnow, varying_url_params="",
309 prefer_cached_downloads=False):
310 """Download a file over HTTP and upload it to keep, with HTTP headers as metadata.
312 Before downloading the URL, checks to see if the URL already
313 exists in Keep and applies HTTP caching policy, the
314 varying_url_params and prefer_cached_downloads flags in order to
315 decide whether to use the version in Keep or re-download it.
319 cache_result = check_cached_url(api, project_uuid, url, etags,
320 utcnow, varying_url_params,
321 prefer_cached_downloads)
323 if cache_result[0] is not None:
326 clean_url = cache_result[3]
327 now = cache_result[4]
332 headers['If-None-Match'] = ', '.join([_etag_quote(k) for k,v in etags.items()])
333 logger.debug("Sending GET request with headers %s", headers)
335 logger.info("Beginning download of %s", url)
337 curldownloader = _Downloader(api)
339 req = curldownloader.download(url, headers)
341 c = curldownloader.collection
343 if req.status_code not in (200, 304):
344 raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
346 if curldownloader.target is not None:
347 curldownloader.target.close()
349 _remember_headers(clean_url, properties, req.headers, now)
351 if req.status_code == 304 and "Etag" in req.headers and req.headers["Etag"] in etags:
352 item = etags[req.headers["Etag"]]
353 item["properties"].update(properties)
354 api.collections().update(uuid=item["uuid"], body={"collection":{"properties": item["properties"]}}).execute()
355 cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
356 return (item["portable_data_hash"], list(cr.keys())[0], item["uuid"], clean_url, now)
358 logger.info("Download complete")
360 collectionname = "Downloaded from %s" % urllib.parse.quote(clean_url, safe='')
362 # max length - space to add a timestamp used by ensure_unique_name
363 max_name_len = 254 - 28
365 if len(collectionname) > max_name_len:
366 over = len(collectionname) - max_name_len
367 split = int(max_name_len/2)
368 collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
370 c.save_new(name=collectionname, owner_uuid=project_uuid, ensure_unique_name=True)
372 api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
374 return (c.portable_data_hash(), curldownloader.name, c.manifest_locator(), clean_url, now)