11162: Support public http and https file references
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Fri, 11 May 2018 20:36:54 +0000 (16:36 -0400)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Thu, 24 May 2018 19:35:42 +0000 (15:35 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/http.py [new file with mode: 0644]
sdk/cwl/arvados_cwl/pathmapper.py
sdk/cwl/setup.py

index d509f400f1058396f2fc91e6ef320a2bbebe92e1..4eda886d1b4a12899e888feef8a5949db09b6e6f 100644 (file)
@@ -37,7 +37,7 @@ import arvados.commands._util as arv_cmd
 
 from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
-from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
+from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
 from .arvtool import ArvadosCommandTool
 from .arvworkflow import ArvadosWorkflow, upload_workflow
 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
diff --git a/sdk/cwl/arvados_cwl/http.py b/sdk/cwl/arvados_cwl/http.py
new file mode 100644 (file)
index 0000000..de7bfe6
--- /dev/null
@@ -0,0 +1,91 @@
+import requests
+import email.utils
+import time
+import datetime
+import re
+import arvados
+import arvados.collection
+import urlparse
+
+def my_formatdate(dt):
+    return email.utils.formatdate(timeval=time.mktime(now.timetuple()), localtime=False, usegmt=True)
+
+def my_parsedate(text):
+    return datetime.datetime(*email.utils.parsedate(text)[:6])
+
+def fresh_cache(url, properties):
+    pr = properties[url]
+    expires = None
+
+    if "Cache-Control" in pr:
+        if re.match(r"immutable", pr["Cache-Control"]):
+            return True
+
+        g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"])
+        if g:
+            expires = my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2)))
+
+    if expires is None and "Expires" in pr:
+        expires = my_parsedate(pr["Expires"])
+
+    if not expires:
+        return False
+
+    return (datetime.datetime.utcnow() < expires)
+
+def remember_headers(url, properties, headers):
+    properties.setdefault(url, {})
+    for h in ("Cache-Control", "ETag", "Expires", "Date"):
+        if h in headers:
+            properties[url][h] = headers[h]
+    if "Date" not in headers:
+        properties[url]["Date"] = my_formatdate(datetime.datetime.utcnow())
+
+
+def changed(url, properties):
+    req = requests.head(url)
+    remember_headers(url, properties, req.headers)
+
+    if req.status_code != 200:
+        raise Exception("Got status %s" % req.status_code)
+
+    pr = properties[url]
+    if "ETag" in pr and "ETag" in req.headers:
+        if pr["ETag"] == req.headers["ETag"]:
+            return False
+    return True
+
+def http_to_keep(api, project_uuid, url):
+    r = api.collections().list(filters=[["properties", "exists", url]]).execute()
+    name = urlparse.urlparse(url).path.split("/")[-1]
+
+    for item in r["items"]:
+        properties = item["properties"]
+        if fresh_cache(url, properties):
+            # Do nothing
+            return "keep:%s/%s" % (item["portable_data_hash"], name)
+
+        if not changed(url, properties):
+            # ETag didn't change, same content, just update headers
+            api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
+            return "keep:%s/%s" % (item["portable_data_hash"], name)
+
+    properties = {}
+    req = requests.get(url, stream=True)
+
+    if req.status_code != 200:
+        raise Exception("Got status %s" % req.status_code)
+
+    remember_headers(url, properties, req.headers)
+
+    c = arvados.collection.Collection()
+
+    with c.open(name, "w") as f:
+        for chunk in req.iter_content(chunk_size=128):
+            f.write(chunk)
+
+    c.save_new(name="Downloaded from %s" % url, owner_uuid=project_uuid)
+
+    api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
+
+    return "keep:%s/%s" % (c.portable_data_hash(), name)
index 6fedb120300b2bdb575a663614840c5ba765b7ec..bd4b5283fbe4ff3de751110e9a15fd5250697008 100644 (file)
@@ -16,6 +16,8 @@ from schema_salad.sourceline import SourceLine
 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
 from cwltool.workflow import WorkflowException
 
+from .http import http_to_keep
+
 logger = logging.getLogger('arvados.cwl-runner')
 
 def trim_listing(obj):
@@ -81,6 +83,10 @@ class ArvPathMapper(PathMapper):
                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
+            elif src.startswith("http:") or src.startswith("https:"):
+                keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
+                logger.info("%s is %s", src, keepref)
+                self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
             else:
                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
 
index 4df89ee75583f55a4c7d7cafb5b404dfe2c08467..05ff8a244ecd1166884284b45c605c87d23a1a57 100644 (file)
@@ -35,7 +35,7 @@ setup(name='arvados-cwl-runner',
       install_requires=[
           'cwltool==1.0.20180522135731',
           'schema-salad==2.7.20180501211602',
-          'typing==3.5.3.0',
+          'typing >= 3.5.3',
           'ruamel.yaml >=0.13.11, <0.15',
           'arvados-python-client>=1.1.4.20180507184611',
           'setuptools',