7 import arvados.collection
11 logger = logging.getLogger('arvados.cwl-runner')
13 def my_formatdate(dt):
14 return email.utils.formatdate(timeval=time.mktime(dt.timetuple()),
15 localtime=False, usegmt=True)
17 def my_parsedate(text):
18 parsed = email.utils.parsedate(text)
20 return datetime.datetime(*parsed[:6])
22 return datetime.datetime(1970, 1, 1)
24 def fresh_cache(url, properties, now):
28 logger.debug("Checking cache freshness for %s using %s", url, pr)
30 if "Cache-Control" in pr:
31 if re.match(r"immutable", pr["Cache-Control"]):
34 g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"])
36 expires = my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2)))
38 if expires is None and "Expires" in pr:
39 expires = my_parsedate(pr["Expires"])
42 # Use a default cache time of 24 hours if upstream didn't set
43 # any cache headers, to reduce redundant downloads.
44 expires = my_parsedate(pr["Date"]) + datetime.timedelta(hours=24)
49 return (now < expires)
51 def remember_headers(url, properties, headers, now):
52 properties.setdefault(url, {})
53 for h in ("Cache-Control", "ETag", "Expires", "Date", "Content-Length"):
55 properties[url][h] = headers[h]
56 if "Date" not in headers:
57 properties[url]["Date"] = my_formatdate(now)
60 def changed(url, properties, now):
61 req = requests.head(url, allow_redirects=True)
62 remember_headers(url, properties, req.headers, now)
64 if req.status_code != 200:
65 raise Exception("Got status %s" % req.status_code)
68 if "ETag" in pr and "ETag" in req.headers:
69 if pr["ETag"] == req.headers["ETag"]:
74 def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow):
75 r = api.collections().list(filters=[["properties", "exists", url]]).execute()
79 for item in r["items"]:
80 properties = item["properties"]
81 if fresh_cache(url, properties, now):
83 cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
84 return "keep:%s/%s" % (item["portable_data_hash"], cr.keys()[0])
86 if not changed(url, properties, now):
87 # ETag didn't change, same content, just update headers
88 api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
89 cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
90 return "keep:%s/%s" % (item["portable_data_hash"], cr.keys()[0])
93 req = requests.get(url, stream=True, allow_redirects=True)
95 if req.status_code != 200:
96 raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
98 remember_headers(url, properties, req.headers, now)
100 if "Content-Length" in properties[url]:
101 cl = int(properties[url]["Content-Length"])
102 logger.info("Downloading %s (%s bytes)", url, cl)
105 logger.info("Downloading %s (unknown size)", url)
107 c = arvados.collection.Collection()
109 if req.headers.get("Content-Disposition"):
110 grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))', req.headers["Content-Disposition"])
116 name = urlparse.urlparse(url).path.split("/")[-1]
121 with c.open(name, "w") as f:
122 for chunk in req.iter_content(chunk_size=1024):
125 loopnow = time.time()
126 if (loopnow - checkpoint) > 20:
127 bps = (float(count)/float(loopnow - start))
129 logger.info("%2.1f%% complete, %3.2f MiB/s, %1.0f seconds left",
130 float(count * 100) / float(cl),
134 logger.info("%d downloaded, %3.2f MiB/s", count, bps/(1024*1024))
137 c.save_new(name="Downloaded from %s" % url, owner_uuid=project_uuid, ensure_unique_name=True)
139 api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
141 return "keep:%s/%s" % (c.portable_data_hash(), name)