19699: Add --varying-url-params
[arvados.git] / sdk / cwl / arvados_cwl / http.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from future import standard_library
7 standard_library.install_aliases()
8
9 import requests
10 import email.utils
11 import time
12 import datetime
13 import re
14 import arvados
15 import arvados.collection
16 import urllib.parse
17 import logging
18 import calendar
19 import urllib.parse
20
21 logger = logging.getLogger('arvados.cwl-runner')
22
23 def my_formatdate(dt):
24     return email.utils.formatdate(timeval=calendar.timegm(dt.timetuple()),
25                                   localtime=False, usegmt=True)
26
27 def my_parsedate(text):
28     parsed = email.utils.parsedate_tz(text)
29     if parsed:
30         if parsed[9]:
31             # Adjust to UTC
32             return datetime.datetime(*parsed[:6]) + datetime.timedelta(seconds=parsed[9])
33         else:
34             # TZ is zero or missing, assume UTC.
35             return datetime.datetime(*parsed[:6])
36     else:
37         return datetime.datetime(1970, 1, 1)
38
39 def fresh_cache(url, properties, now):
40     pr = properties[url]
41     expires = None
42
43     logger.debug("Checking cache freshness for %s using %s", url, pr)
44
45     if "Cache-Control" in pr:
46         if re.match(r"immutable", pr["Cache-Control"]):
47             return True
48
49         g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"])
50         if g:
51             expires = my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2)))
52
53     if expires is None and "Expires" in pr:
54         expires = my_parsedate(pr["Expires"])
55
56     if expires is None:
57         # Use a default cache time of 24 hours if upstream didn't set
58         # any cache headers, to reduce redundant downloads.
59         expires = my_parsedate(pr["Date"]) + datetime.timedelta(hours=24)
60
61     if not expires:
62         return False
63
64     return (now < expires)
65
66 def remember_headers(url, properties, headers, now):
67     properties.setdefault(url, {})
68     for h in ("Cache-Control", "ETag", "Expires", "Date", "Content-Length"):
69         if h in headers:
70             properties[url][h] = headers[h]
71     if "Date" not in headers:
72         properties[url]["Date"] = my_formatdate(now)
73
74
75 def changed(url, clean_url, properties, now):
76     req = requests.head(url, allow_redirects=True)
77     remember_headers(url, properties, req.headers, now)
78
79     if req.status_code != 200:
80         # Sometimes endpoints are misconfigured and will deny HEAD but
81         # allow GET so instead of failing here, we'll try GET If-None-Match
82         return True
83
84     pr = properties[clean_url]
85     if "ETag" in pr and "ETag" in req.headers:
86         if pr["ETag"] == req.headers["ETag"]:
87             return False
88
89     return True
90
91 def etag_quote(etag):
92     # if it already has leading and trailing quotes, do nothing
93     if etag[0] == '"' and etag[-1] == '"':
94         return etag
95     else:
96         # Add quotes.
97         return '"' + etag + '"'
98
99
100 def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow, varying_url_params=""):
101     varying_params = [s.strip() for s in varying_url_params.split(",")]
102
103     parsed = urllib.parse.urlparse(url)
104     query = [q for q in urllib.parse.parse_qsl(parsed.query)
105              if q[0] not in varying_params]
106     clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params,
107                                          urllib.parse.urlencode(query),  parsed.fragment))
108
109     r = api.collections().list(filters=[["properties", "exists", clean_url]]).execute()
110
111     now = utcnow()
112
113     etags = {}
114
115     for item in r["items"]:
116         properties = item["properties"]
117         if fresh_cache(clean_url, properties, now):
118             # Do nothing
119             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
120             return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
121
122         if not changed(url, clean_url, properties, now):
123             # ETag didn't change, same content, just update headers
124             api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
125             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
126             return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
127
128         if "ETag" in properties and len(properties["ETag"]) > 2:
129             etags[properties["ETag"]] = item
130
131     logger.debug("Found ETags %s", etags)
132
133     properties = {}
134     headers = {}
135     if etags:
136         headers['If-None-Match'] = ', '.join([etag_quote(k) for k,v in etags.items()])
137     logger.debug("Sending GET request with headers %s", headers)
138     req = requests.get(url, stream=True, allow_redirects=True, headers=headers)
139
140     if req.status_code not in (200, 304):
141         raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
142
143     remember_headers(clean_url, properties, req.headers, now)
144
145     if req.status_code == 304 and "ETag" in req.headers and req.headers["ETag"] in etags:
146         item = etags[req.headers["ETag"]]
147         item["properties"].update(properties)
148         api.collections().update(uuid=item["uuid"], body={"collection":{"properties": item["properties"]}}).execute()
149         cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
150         return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
151
152     if "Content-Length" in properties[clean_url]:
153         cl = int(properties[clean_url]["Content-Length"])
154         logger.info("Downloading %s (%s bytes)", url, cl)
155     else:
156         cl = None
157         logger.info("Downloading %s (unknown size)", url)
158
159     c = arvados.collection.Collection()
160
161     if req.headers.get("Content-Disposition"):
162         grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))', req.headers["Content-Disposition"])
163         if grp.group(2):
164             name = grp.group(2)
165         else:
166             name = grp.group(4)
167     else:
168         name = parsed.path.split("/")[-1]
169
170     count = 0
171     start = time.time()
172     checkpoint = start
173     with c.open(name, "wb") as f:
174         for chunk in req.iter_content(chunk_size=1024):
175             count += len(chunk)
176             f.write(chunk)
177             loopnow = time.time()
178             if (loopnow - checkpoint) > 20:
179                 bps = count / (loopnow - start)
180                 if cl is not None:
181                     logger.info("%2.1f%% complete, %3.2f MiB/s, %1.0f seconds left",
182                                 ((count * 100) / cl),
183                                 (bps // (1024*1024)),
184                                 ((cl-count) // bps))
185                 else:
186                     logger.info("%d downloaded, %3.2f MiB/s", count, (bps / (1024*1024)))
187                 checkpoint = loopnow
188
189     logger.info("Download complete")
190
191     collectionname = "Downloaded from %s" % urllib.parse.quote(clean_url, safe='')
192
193     # max length - space to add a timestamp used by ensure_unique_name
194     max_name_len = 254 - 28
195
196     if len(collectionname) > max_name_len:
197         over = len(collectionname) - max_name_len
198         split = int(max_name_len/2)
199         collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
200
201     c.save_new(name=collectionname, owner_uuid=project_uuid, ensure_unique_name=True)
202
203     api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
204
205     return "keep:%s/%s" % (c.portable_data_hash(), name)