7 import arvados.collection
11 logger = logging.getLogger('arvados.cwl-runner')
13 def my_formatdate(dt):
14 return email.utils.formatdate(timeval=time.mktime(now.timetuple()), localtime=False, usegmt=True)
16 def my_parsedate(text):
17 return datetime.datetime(*email.utils.parsedate(text)[:6])
19 def fresh_cache(url, properties):
23 logger.debug("Checking cache freshness for %s using %s", url, pr)
25 if "Cache-Control" in pr:
26 if re.match(r"immutable", pr["Cache-Control"]):
29 g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"])
31 expires = my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2)))
33 if expires is None and "Expires" in pr:
34 expires = my_parsedate(pr["Expires"])
37 # Use a default cache time of 24 hours if upstream didn't set
38 # any cache headers, to reduce redundant downloads.
39 expires = my_parsedate(pr["Date"]) + datetime.timedelta(hours=24)
44 return (datetime.datetime.utcnow() < expires)
46 def remember_headers(url, properties, headers):
47 properties.setdefault(url, {})
48 for h in ("Cache-Control", "ETag", "Expires", "Date", "Content-Length"):
50 properties[url][h] = headers[h]
51 if "Date" not in headers:
52 properties[url]["Date"] = my_formatdate(datetime.datetime.utcnow())
55 def changed(url, properties):
56 req = requests.head(url)
57 remember_headers(url, properties, req.headers)
59 if req.status_code != 200:
60 raise Exception("Got status %s" % req.status_code)
63 if "ETag" in pr and "ETag" in req.headers:
64 if pr["ETag"] == req.headers["ETag"]:
68 def http_to_keep(api, project_uuid, url):
69 r = api.collections().list(filters=[["properties", "exists", url]]).execute()
70 name = urlparse.urlparse(url).path.split("/")[-1]
72 for item in r["items"]:
73 properties = item["properties"]
74 if fresh_cache(url, properties):
76 return "keep:%s/%s" % (item["portable_data_hash"], name)
78 if not changed(url, properties):
79 # ETag didn't change, same content, just update headers
80 api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
81 return "keep:%s/%s" % (item["portable_data_hash"], name)
84 req = requests.get(url, stream=True)
86 if req.status_code != 200:
87 raise Exception("Failed to download '%s' got status %s " % (req.status_code, url))
89 remember_headers(url, properties, req.headers)
91 logger.info("Downloading %s (%s bytes)", url, properties[url]["Content-Length"])
93 c = arvados.collection.Collection()
98 with c.open(name, "w") as f:
99 for chunk in req.iter_content(chunk_size=1024):
103 if (now - checkpoint) > 20:
104 bps = (float(count)/float(now - start))
105 logger.info("%2.1f%% complete, %3.2f MiB/s, %1.0f seconds left",
106 float(count * 100) / float(properties[url]["Content-Length"]),
108 (int(properties[url]["Content-Length"])-count)/bps)
111 c.save_new(name="Downloaded from %s" % url, owner_uuid=project_uuid, ensure_unique_name=True)
113 api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
115 return "keep:%s/%s" % (c.portable_data_hash(), name)