Merge branch '14826-cert-path' refs #14826
[arvados.git] / sdk / cwl / arvados_cwl / http.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from future import standard_library
7 standard_library.install_aliases()
8
9 import requests
10 import email.utils
11 import time
12 import datetime
13 import re
14 import arvados
15 import arvados.collection
16 import urllib.parse
17 import logging
18 import calendar
19
20 logger = logging.getLogger('arvados.cwl-runner')
21
22 def my_formatdate(dt):
23     return email.utils.formatdate(timeval=calendar.timegm(dt.timetuple()),
24                                   localtime=False, usegmt=True)
25
26 def my_parsedate(text):
27     parsed = email.utils.parsedate_tz(text)
28     if parsed:
29         if parsed[9]:
30             # Adjust to UTC
31             return datetime.datetime(*parsed[:6]) + datetime.timedelta(seconds=parsed[9])
32         else:
33             # TZ is zero or missing, assume UTC.
34             return datetime.datetime(*parsed[:6])
35     else:
36         return datetime.datetime(1970, 1, 1)
37
38 def fresh_cache(url, properties, now):
39     pr = properties[url]
40     expires = None
41
42     logger.debug("Checking cache freshness for %s using %s", url, pr)
43
44     if "Cache-Control" in pr:
45         if re.match(r"immutable", pr["Cache-Control"]):
46             return True
47
48         g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"])
49         if g:
50             expires = my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2)))
51
52     if expires is None and "Expires" in pr:
53         expires = my_parsedate(pr["Expires"])
54
55     if expires is None:
56         # Use a default cache time of 24 hours if upstream didn't set
57         # any cache headers, to reduce redundant downloads.
58         expires = my_parsedate(pr["Date"]) + datetime.timedelta(hours=24)
59
60     if not expires:
61         return False
62
63     return (now < expires)
64
65 def remember_headers(url, properties, headers, now):
66     properties.setdefault(url, {})
67     for h in ("Cache-Control", "ETag", "Expires", "Date", "Content-Length"):
68         if h in headers:
69             properties[url][h] = headers[h]
70     if "Date" not in headers:
71         properties[url]["Date"] = my_formatdate(now)
72
73
74 def changed(url, properties, now):
75     req = requests.head(url, allow_redirects=True)
76     remember_headers(url, properties, req.headers, now)
77
78     if req.status_code != 200:
79         raise Exception("Got status %s" % req.status_code)
80
81     pr = properties[url]
82     if "ETag" in pr and "ETag" in req.headers:
83         if pr["ETag"] == req.headers["ETag"]:
84             return False
85
86     return True
87
88 def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow):
89     r = api.collections().list(filters=[["properties", "exists", url]]).execute()
90
91     now = utcnow()
92
93     for item in r["items"]:
94         properties = item["properties"]
95         if fresh_cache(url, properties, now):
96             # Do nothing
97             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
98             return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
99
100         if not changed(url, properties, now):
101             # ETag didn't change, same content, just update headers
102             api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
103             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
104             return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
105
106     properties = {}
107     req = requests.get(url, stream=True, allow_redirects=True)
108
109     if req.status_code != 200:
110         raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
111
112     remember_headers(url, properties, req.headers, now)
113
114     if "Content-Length" in properties[url]:
115         cl = int(properties[url]["Content-Length"])
116         logger.info("Downloading %s (%s bytes)", url, cl)
117     else:
118         cl = None
119         logger.info("Downloading %s (unknown size)", url)
120
121     c = arvados.collection.Collection()
122
123     if req.headers.get("Content-Disposition"):
124         grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))', req.headers["Content-Disposition"])
125         if grp.group(2):
126             name = grp.group(2)
127         else:
128             name = grp.group(4)
129     else:
130         name = urllib.parse.urlparse(url).path.split("/")[-1]
131
132     count = 0
133     start = time.time()
134     checkpoint = start
135     with c.open(name, "wb") as f:
136         for chunk in req.iter_content(chunk_size=1024):
137             count += len(chunk)
138             f.write(chunk)
139             loopnow = time.time()
140             if (loopnow - checkpoint) > 20:
141                 bps = count / (loopnow - start)
142                 if cl is not None:
143                     logger.info("%2.1f%% complete, %3.2f MiB/s, %1.0f seconds left",
144                                 ((count * 100) / cl),
145                                 (bps // (1024*1024)),
146                                 ((cl-count) // bps))
147                 else:
148                     logger.info("%d downloaded, %3.2f MiB/s", count, (bps / (1024*1024)))
149                 checkpoint = loopnow
150
151     c.save_new(name="Downloaded from %s" % url, owner_uuid=project_uuid, ensure_unique_name=True)
152
153     api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
154
155     return "keep:%s/%s" % (c.portable_data_hash(), name)