Merge branch 'master' into 13822-nm-delayed-daemon
[arvados.git] / sdk / cwl / arvados_cwl / http.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import requests
6 import email.utils
7 import time
8 import datetime
9 import re
10 import arvados
11 import arvados.collection
12 import urlparse
13 import logging
14 import calendar
15
16 logger = logging.getLogger('arvados.cwl-runner')
17
18 def my_formatdate(dt):
19     return email.utils.formatdate(timeval=calendar.timegm(dt.timetuple()),
20                                   localtime=False, usegmt=True)
21
22 def my_parsedate(text):
23     parsed = email.utils.parsedate_tz(text)
24     if parsed:
25         if parsed[9]:
26             # Adjust to UTC
27             return datetime.datetime(*parsed[:6]) + datetime.timedelta(seconds=parsed[9])
28         else:
29             # TZ is zero or missing, assume UTC.
30             return datetime.datetime(*parsed[:6])
31     else:
32         return datetime.datetime(1970, 1, 1)
33
34 def fresh_cache(url, properties, now):
35     pr = properties[url]
36     expires = None
37
38     logger.debug("Checking cache freshness for %s using %s", url, pr)
39
40     if "Cache-Control" in pr:
41         if re.match(r"immutable", pr["Cache-Control"]):
42             return True
43
44         g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"])
45         if g:
46             expires = my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2)))
47
48     if expires is None and "Expires" in pr:
49         expires = my_parsedate(pr["Expires"])
50
51     if expires is None:
52         # Use a default cache time of 24 hours if upstream didn't set
53         # any cache headers, to reduce redundant downloads.
54         expires = my_parsedate(pr["Date"]) + datetime.timedelta(hours=24)
55
56     if not expires:
57         return False
58
59     return (now < expires)
60
61 def remember_headers(url, properties, headers, now):
62     properties.setdefault(url, {})
63     for h in ("Cache-Control", "ETag", "Expires", "Date", "Content-Length"):
64         if h in headers:
65             properties[url][h] = headers[h]
66     if "Date" not in headers:
67         properties[url]["Date"] = my_formatdate(now)
68
69
70 def changed(url, properties, now):
71     req = requests.head(url, allow_redirects=True)
72     remember_headers(url, properties, req.headers, now)
73
74     if req.status_code != 200:
75         raise Exception("Got status %s" % req.status_code)
76
77     pr = properties[url]
78     if "ETag" in pr and "ETag" in req.headers:
79         if pr["ETag"] == req.headers["ETag"]:
80             return False
81
82     return True
83
84 def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow):
85     r = api.collections().list(filters=[["properties", "exists", url]]).execute()
86
87     now = utcnow()
88
89     for item in r["items"]:
90         properties = item["properties"]
91         if fresh_cache(url, properties, now):
92             # Do nothing
93             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
94             return "keep:%s/%s" % (item["portable_data_hash"], cr.keys()[0])
95
96         if not changed(url, properties, now):
97             # ETag didn't change, same content, just update headers
98             api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
99             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
100             return "keep:%s/%s" % (item["portable_data_hash"], cr.keys()[0])
101
102     properties = {}
103     req = requests.get(url, stream=True, allow_redirects=True)
104
105     if req.status_code != 200:
106         raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
107
108     remember_headers(url, properties, req.headers, now)
109
110     if "Content-Length" in properties[url]:
111         cl = int(properties[url]["Content-Length"])
112         logger.info("Downloading %s (%s bytes)", url, cl)
113     else:
114         cl = None
115         logger.info("Downloading %s (unknown size)", url)
116
117     c = arvados.collection.Collection()
118
119     if req.headers.get("Content-Disposition"):
120         grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))', req.headers["Content-Disposition"])
121         if grp.group(2):
122             name = grp.group(2)
123         else:
124             name = grp.group(4)
125     else:
126         name = urlparse.urlparse(url).path.split("/")[-1]
127
128     count = 0
129     start = time.time()
130     checkpoint = start
131     with c.open(name, "w") as f:
132         for chunk in req.iter_content(chunk_size=1024):
133             count += len(chunk)
134             f.write(chunk)
135             loopnow = time.time()
136             if (loopnow - checkpoint) > 20:
137                 bps = (float(count)/float(loopnow - start))
138                 if cl is not None:
139                     logger.info("%2.1f%% complete, %3.2f MiB/s, %1.0f seconds left",
140                                 float(count * 100) / float(cl),
141                                 bps/(1024*1024),
142                                 (cl-count)/bps)
143                 else:
144                     logger.info("%d downloaded, %3.2f MiB/s", count, bps/(1024*1024))
145                 checkpoint = loopnow
146
147     c.save_new(name="Downloaded from %s" % url, owner_uuid=project_uuid, ensure_unique_name=True)
148
149     api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
150
151     return "keep:%s/%s" % (c.portable_data_hash(), name)