11162: Add progress for http data download.
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Wed, 23 May 2018 00:45:58 +0000 (20:45 -0400)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Thu, 24 May 2018 19:35:42 +0000 (15:35 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

sdk/cwl/arvados_cwl/http.py

index de7bfe6a4550b386bf541e9911956c372e16da86..064aa8d5a9637fdabf013f0e12f6717630390a78 100644 (file)
@@ -6,6 +6,9 @@ import re
 import arvados
 import arvados.collection
 import urlparse
+import logging
+
+logger = logging.getLogger('arvados.cwl-runner')
 
 def my_formatdate(dt):
     return email.utils.formatdate(timeval=time.mktime(now.timetuple()), localtime=False, usegmt=True)
@@ -35,7 +38,7 @@ def fresh_cache(url, properties):
 
 def remember_headers(url, properties, headers):
     properties.setdefault(url, {})
-    for h in ("Cache-Control", "ETag", "Expires", "Date"):
+    for h in ("Cache-Control", "ETag", "Expires", "Date", "Content-Length"):
         if h in headers:
             properties[url][h] = headers[h]
     if "Date" not in headers:
@@ -74,17 +77,31 @@ def http_to_keep(api, project_uuid, url):
     req = requests.get(url, stream=True)
 
     if req.status_code != 200:
-        raise Exception("Got status %s" % req.status_code)
+        raise Exception("Failed to download '%s' got status %s " % (req.status_code, url))
 
     remember_headers(url, properties, req.headers)
 
+    logger.info("Downloading %s (%s bytes)", url, properties[url]["Content-Length"])
+
     c = arvados.collection.Collection()
 
+    count = 0
+    start = time.time()
+    checkpoint = start
     with c.open(name, "w") as f:
-        for chunk in req.iter_content(chunk_size=128):
+        for chunk in req.iter_content(chunk_size=1024):
+            count += len(chunk)
             f.write(chunk)
-
-    c.save_new(name="Downloaded from %s" % url, owner_uuid=project_uuid)
+            now = time.time()
+            if (now - checkpoint) > 20:
+                bps = (float(count)/float(now - start))
+                logger.info("%2.1f%% complete, %3.2f MiB/s, %1.0f seconds left",
+                            float(count * 100) / float(properties[url]["Content-Length"]),
+                            bps/(1024*1024),
+                            (int(properties[url]["Content-Length"])-count)/bps)
+                checkpoint = now
+
+    c.save_new(name="Downloaded from %s" % url, owner_uuid=project_uuid, ensure_unique_name=True)
 
     api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()