1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import division
6 from future import standard_library
7 standard_library.install_aliases()
14 import arvados.collection
20 from arvados.pycurl import PyCurlHelper
22 logger = logging.getLogger('arvados.http_import')
24 def my_formatdate(dt):
25 return email.utils.formatdate(timeval=calendar.timegm(dt.timetuple()),
26 localtime=False, usegmt=True)
28 def my_parsedate(text):
29 parsed = email.utils.parsedate_tz(text)
33 return datetime.datetime(*parsed[:6]) + datetime.timedelta(seconds=parsed[9])
35 # TZ is zero or missing, assume UTC.
36 return datetime.datetime(*parsed[:6])
38 return datetime.datetime(1970, 1, 1)
40 def fresh_cache(url, properties, now):
44 logger.debug("Checking cache freshness for %s using %s", url, pr)
46 if "Cache-Control" in pr:
47 if re.match(r"immutable", pr["Cache-Control"]):
50 g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"])
52 expires = my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2)))
54 if expires is None and "Expires" in pr:
55 expires = my_parsedate(pr["Expires"])
58 # Use a default cache time of 24 hours if upstream didn't set
59 # any cache headers, to reduce redundant downloads.
60 expires = my_parsedate(pr["Date"]) + datetime.timedelta(hours=24)
65 return (now < expires)
67 def remember_headers(url, properties, headers, now):
68 properties.setdefault(url, {})
69 for h in ("Cache-Control", "Etag", "Expires", "Date", "Content-Length"):
71 properties[url][h] = headers[h]
72 if "Date" not in headers:
73 properties[url]["Date"] = my_formatdate(now)
76 def __init__(self, status_code, headers):
77 self.status_code = status_code
78 self.headers = headers
80 class CurlDownloader(PyCurlHelper):
81 # Wait up to 60 seconds for connection
82 # How long it can be in "low bandwidth" state before it gives up
83 # Low bandwidth threshold is 32 KiB/s
84 DOWNLOADER_TIMEOUT = (60, 300, 32768)
87 super(CurlDownloader, self).__init__(title_case_headers=True)
88 self.curl = pycurl.Curl()
89 self.curl.setopt(pycurl.NOSIGNAL, 1)
90 self.curl.setopt(pycurl.OPENSOCKETFUNCTION,
91 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
94 def head(self, url, headers={}):
95 get_headers = {'Accept': 'application/octet-stream'}
96 get_headers.update(headers)
99 self.curl.setopt(pycurl.URL, url.encode('utf-8'))
100 self.curl.setopt(pycurl.HTTPHEADER, [
101 '{}: {}'.format(k,v) for k,v in get_headers.items()])
103 self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
104 self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
105 self.curl.setopt(pycurl.NOBODY, True)
106 self.curl.setopt(pycurl.FOLLOWLOCATION, True)
108 self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, True)
112 except Exception as e:
113 raise arvados.errors.HttpError(0, str(e))
119 return Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers)
121 def download(self, url, headers):
123 self.start = time.time()
124 self.checkpoint = self.start
126 self._first_chunk = True
127 self.collection = None
128 self.parsedurl = urllib.parse.urlparse(url)
130 get_headers = {'Accept': 'application/octet-stream'}
131 get_headers.update(headers)
133 self.curl.setopt(pycurl.URL, url.encode('utf-8'))
134 self.curl.setopt(pycurl.HTTPHEADER, [
135 '{}: {}'.format(k,v) for k,v in get_headers.items()])
137 self.curl.setopt(pycurl.WRITEFUNCTION, self.body_write)
138 self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
140 self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
141 self.curl.setopt(pycurl.HTTPGET, True)
142 self.curl.setopt(pycurl.FOLLOWLOCATION, True)
144 self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, False)
148 except Exception as e:
149 raise arvados.errors.HttpError(0, str(e))
155 return Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers)
157 def headers_received(self):
158 self.collection = arvados.collection.Collection()
160 if "Content-Length" in self._headers:
161 self.contentlength = int(self._headers["Content-Length"])
162 logger.info("File size is %s bytes", self.contentlength)
164 self.contentlength = None
166 if self._headers.get("Content-Disposition"):
167 grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))',
168 self._headers["Content-Disposition"])
170 self.name = grp.group(2)
172 self.name = grp.group(4)
174 self.name = self.parsedurl.path.split("/")[-1]
176 mt = re.match(r'^HTTP\/(\d(\.\d)?) ([1-5]\d\d) ([^\r\n\x00-\x08\x0b\x0c\x0e-\x1f\x7f]*)\r\n$', self._headers["x-status-line"])
177 code = int(mt.group(3))
180 self.target = self.collection.open(self.name, "wb")
182 def body_write(self, chunk):
183 if self._first_chunk:
184 self.headers_received()
185 self._first_chunk = False
187 self.count += len(chunk)
188 self.target.write(chunk)
189 loopnow = time.time()
190 if (loopnow - self.checkpoint) < 20:
193 bps = self.count / (loopnow - self.start)
194 if self.contentlength is not None:
195 logger.info("%2.1f%% complete, %6.2f MiB/s, %1.0f seconds left",
196 ((self.count * 100) / self.contentlength),
197 (bps / (1024.0*1024.0)),
198 ((self.contentlength-self.count) // bps))
200 logger.info("%d downloaded, %6.2f MiB/s", count, (bps / (1024.0*1024.0)))
201 self.checkpoint = loopnow
204 def changed(url, clean_url, properties, now, curldownloader):
205 req = curldownloader.head(url)
207 if req.status_code != 200:
208 # Sometimes endpoints are misconfigured and will deny HEAD but
209 # allow GET so instead of failing here, we'll try GET If-None-Match
212 # previous version of this code used "ETag", now we are
213 # normalizing to "Etag", check for both.
214 etag = properties[url].get("Etag") or properties[url].get("ETag")
216 if url in properties:
218 remember_headers(clean_url, properties, req.headers, now)
220 if "Etag" in req.headers and etag == req.headers["Etag"]:
226 def etag_quote(etag):
227 # if it already has leading and trailing quotes, do nothing
228 if etag[0] == '"' and etag[-1] == '"':
232 return '"' + etag + '"'
235 def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow, varying_url_params="", prefer_cached_downloads=False):
237 logger.info("Checking Keep for %s", url)
239 varying_params = [s.strip() for s in varying_url_params.split(",")]
241 parsed = urllib.parse.urlparse(url)
242 query = [q for q in urllib.parse.parse_qsl(parsed.query)
243 if q[0] not in varying_params]
245 clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params,
246 urllib.parse.urlencode(query, safe="/"), parsed.fragment))
248 r1 = api.collections().list(filters=[["properties", "exists", url]]).execute()
253 r2 = api.collections().list(filters=[["properties", "exists", clean_url]]).execute()
254 items = r1["items"] + r2["items"]
260 curldownloader = CurlDownloader()
263 properties = item["properties"]
265 if clean_url in properties:
266 cache_url = clean_url
267 elif url in properties:
272 if prefer_cached_downloads or fresh_cache(cache_url, properties, now):
273 # HTTP caching rules say we should use the cache
274 cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
275 return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
277 if not changed(cache_url, clean_url, properties, now, curldownloader):
278 # Etag didn't change, same content, just update headers
279 api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
280 cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
281 return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
283 if "Etag" in properties[cache_url] and len(properties[cache_url]["Etag"]) > 2:
284 etags[properties[cache_url]["Etag"]] = item
286 logger.debug("Found Etags %s", etags)
291 headers['If-None-Match'] = ', '.join([etag_quote(k) for k,v in etags.items()])
292 logger.debug("Sending GET request with headers %s", headers)
294 logger.info("Beginning download of %s", url)
296 req = curldownloader.download(url, headers)
298 c = curldownloader.collection
300 if req.status_code not in (200, 304):
301 raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
303 if curldownloader.target is not None:
304 curldownloader.target.close()
306 remember_headers(clean_url, properties, req.headers, now)
308 if req.status_code == 304 and "Etag" in req.headers and req.headers["Etag"] in etags:
309 item = etags[req.headers["Etag"]]
310 item["properties"].update(properties)
311 api.collections().update(uuid=item["uuid"], body={"collection":{"properties": item["properties"]}}).execute()
312 cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
313 return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
315 logger.info("Download complete")
317 collectionname = "Downloaded from %s" % urllib.parse.quote(clean_url, safe='')
319 # max length - space to add a timestamp used by ensure_unique_name
320 max_name_len = 254 - 28
322 if len(collectionname) > max_name_len:
323 over = len(collectionname) - max_name_len
324 split = int(max_name_len/2)
325 collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
327 c.save_new(name=collectionname, owner_uuid=project_uuid, ensure_unique_name=True)
329 api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}) #.execute()
331 return "keep:%s/%s" % (c.portable_data_hash(), curldownloader.name)