1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import division
6 from future import standard_library
7 standard_library.install_aliases()
15 import arvados.collection
21 logger = logging.getLogger('arvados.cwl-runner')
23 def my_formatdate(dt):
24 return email.utils.formatdate(timeval=calendar.timegm(dt.timetuple()),
25 localtime=False, usegmt=True)
27 def my_parsedate(text):
28 parsed = email.utils.parsedate_tz(text)
32 return datetime.datetime(*parsed[:6]) + datetime.timedelta(seconds=parsed[9])
34 # TZ is zero or missing, assume UTC.
35 return datetime.datetime(*parsed[:6])
37 return datetime.datetime(1970, 1, 1)
39 def fresh_cache(url, properties, now):
43 logger.debug("Checking cache freshness for %s using %s", url, pr)
45 if "Cache-Control" in pr:
46 if re.match(r"immutable", pr["Cache-Control"]):
49 g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"])
51 expires = my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2)))
53 if expires is None and "Expires" in pr:
54 expires = my_parsedate(pr["Expires"])
57 # Use a default cache time of 24 hours if upstream didn't set
58 # any cache headers, to reduce redundant downloads.
59 expires = my_parsedate(pr["Date"]) + datetime.timedelta(hours=24)
64 return (now < expires)
66 def remember_headers(url, properties, headers, now):
67 properties.setdefault(url, {})
68 for h in ("Cache-Control", "ETag", "Expires", "Date", "Content-Length"):
70 properties[url][h] = headers[h]
71 if "Date" not in headers:
72 properties[url]["Date"] = my_formatdate(now)
75 def changed(url, clean_url, properties, now):
76 req = requests.head(url, allow_redirects=True)
78 if req.status_code != 200:
79 # Sometimes endpoints are misconfigured and will deny HEAD but
80 # allow GET so instead of failing here, we'll try GET If-None-Match
83 etag = properties[url].get("ETag")
87 remember_headers(clean_url, properties, req.headers, now)
89 if "ETag" in req.headers and etag == req.headers["ETag"]:
96 # if it already has leading and trailing quotes, do nothing
97 if etag[0] == '"' and etag[-1] == '"':
101 return '"' + etag + '"'
104 def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow, varying_url_params="", prefer_cached_downloads=False):
105 varying_params = [s.strip() for s in varying_url_params.split(",")]
107 parsed = urllib.parse.urlparse(url)
108 query = [q for q in urllib.parse.parse_qsl(parsed.query)
109 if q[0] not in varying_params]
111 clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params,
112 urllib.parse.urlencode(query, safe="/"), parsed.fragment))
114 r1 = api.collections().list(filters=[["properties", "exists", url]]).execute()
119 r2 = api.collections().list(filters=[["properties", "exists", clean_url]]).execute()
120 items = r1["items"] + r2["items"]
127 properties = item["properties"]
129 if clean_url in properties:
130 cache_url = clean_url
131 elif url in properties:
136 if prefer_cached_downloads or fresh_cache(cache_url, properties, now):
137 # HTTP caching rules say we should use the cache
138 cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
139 return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
141 if not changed(cache_url, clean_url, properties, now):
142 # ETag didn't change, same content, just update headers
143 api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
144 cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
145 return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
147 if "ETag" in properties[cache_url] and len(properties[cache_url]["ETag"]) > 2:
148 etags[properties[cache_url]["ETag"]] = item
150 logger.debug("Found ETags %s", etags)
155 headers['If-None-Match'] = ', '.join([etag_quote(k) for k,v in etags.items()])
156 logger.debug("Sending GET request with headers %s", headers)
157 req = requests.get(url, stream=True, allow_redirects=True, headers=headers)
159 if req.status_code not in (200, 304):
160 raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
162 remember_headers(clean_url, properties, req.headers, now)
164 if req.status_code == 304 and "ETag" in req.headers and req.headers["ETag"] in etags:
165 item = etags[req.headers["ETag"]]
166 item["properties"].update(properties)
167 api.collections().update(uuid=item["uuid"], body={"collection":{"properties": item["properties"]}}).execute()
168 cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
169 return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
171 if "Content-Length" in properties[clean_url]:
172 cl = int(properties[clean_url]["Content-Length"])
173 logger.info("Downloading %s (%s bytes)", url, cl)
176 logger.info("Downloading %s (unknown size)", url)
178 c = arvados.collection.Collection()
180 if req.headers.get("Content-Disposition"):
181 grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))', req.headers["Content-Disposition"])
187 name = parsed.path.split("/")[-1]
192 with c.open(name, "wb") as f:
193 for chunk in req.iter_content(chunk_size=1024):
196 loopnow = time.time()
197 if (loopnow - checkpoint) > 20:
198 bps = count / (loopnow - start)
200 logger.info("%2.1f%% complete, %3.2f MiB/s, %1.0f seconds left",
201 ((count * 100) / cl),
202 (bps // (1024*1024)),
205 logger.info("%d downloaded, %3.2f MiB/s", count, (bps / (1024*1024)))
208 logger.info("Download complete")
210 collectionname = "Downloaded from %s" % urllib.parse.quote(clean_url, safe='')
212 # max length - space to add a timestamp used by ensure_unique_name
213 max_name_len = 254 - 28
215 if len(collectionname) > max_name_len:
216 over = len(collectionname) - max_name_len
217 split = int(max_name_len/2)
218 collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
220 c.save_new(name=collectionname, owner_uuid=project_uuid, ensure_unique_name=True)
222 api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
224 return "keep:%s/%s" % (c.portable_data_hash(), name)