Merge branch '20257-http-import' refs #20257
authorPeter Amstutz <peter.amstutz@curii.com>
Fri, 21 Apr 2023 16:23:27 +0000 (12:23 -0400)
committerPeter Amstutz <peter.amstutz@curii.com>
Fri, 21 Apr 2023 16:42:10 +0000 (12:42 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/executor.py
sdk/cwl/arvados_cwl/http.py [deleted file]
sdk/cwl/arvados_cwl/pathmapper.py
sdk/python/arvados/_pycurlhelper.py [new file with mode: 0644]
sdk/python/arvados/http_to_keep.py [new file with mode: 0644]
sdk/python/arvados/keep.py
sdk/python/tests/test_http.py [moved from sdk/cwl/tests/test_http.py with 63% similarity]
services/api/Gemfile
services/api/Gemfile.lock

index 74ca9312bf54b1e965984f8bb893525ee2147e05..fe27b91ab2165754dd94f7a85878328db6126b4d 100644 (file)
@@ -353,6 +353,12 @@ def main(args=sys.argv[1:],
 
     # Note that unless in debug mode, some stack traces related to user
     # workflow errors may be suppressed.
+
+    # Set the logging on most modules INFO (instead of default which is WARNING)
+    logger.setLevel(logging.INFO)
+    logging.getLogger('arvados').setLevel(logging.INFO)
+    logging.getLogger('arvados.keep').setLevel(logging.WARNING)
+
     if arvargs.debug:
         logger.setLevel(logging.DEBUG)
         logging.getLogger('arvados').setLevel(logging.DEBUG)
index ef84dd4983c870d2779a3cf993bcaeff8aa13f6f..22e3f1fdeb124ae6cd0943a18214af73a8f1a1eb 100644 (file)
@@ -266,7 +266,7 @@ The 'jobs' API is no longer supported.
         activity statuses, for example in the RuntimeStatusLoggingHandler.
         """
 
-        if kind not in ('error', 'warning'):
+        if kind not in ('error', 'warning', 'activity'):
             # Ignore any other status kind
             return
 
@@ -281,7 +281,7 @@ The 'jobs' API is no longer supported.
             runtime_status = current.get('runtime_status', {})
 
             original_updatemessage = updatemessage = runtime_status.get(kind, "")
-            if not updatemessage:
+            if kind == "activity" or not updatemessage:
                 updatemessage = message
 
             # Subsequent messages tacked on in detail
@@ -593,6 +593,8 @@ The 'jobs' API is no longer supported.
     def arv_executor(self, updated_tool, job_order, runtimeContext, logger=None):
         self.debug = runtimeContext.debug
 
+        self.runtime_status_update("activity", "initialization")
+
         git_info = self.get_git_info(updated_tool) if self.git_info else {}
         if git_info:
             logger.info("Git provenance")
@@ -655,6 +657,8 @@ The 'jobs' API is no longer supported.
 
         self.project_uuid = runtimeContext.project_uuid
 
+        self.runtime_status_update("activity", "data transfer")
+
         # Upload local file references in the job order.
         with Perf(metrics, "upload_job_order"):
             job_order, jobmapper = upload_job_order(self, "%s input" % runtimeContext.name,
@@ -823,6 +827,8 @@ The 'jobs' API is no longer supported.
         # We either running the workflow directly, or submitting it
         # and will wait for a final result.
 
+        self.runtime_status_update("activity", "workflow execution")
+
         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
         if current_container:
             logger.info("Running inside container %s", current_container.get("uuid"))
diff --git a/sdk/cwl/arvados_cwl/http.py b/sdk/cwl/arvados_cwl/http.py
deleted file mode 100644 (file)
index f2415bc..0000000
+++ /dev/null
@@ -1,224 +0,0 @@
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-from __future__ import division
-from future import standard_library
-standard_library.install_aliases()
-
-import requests
-import email.utils
-import time
-import datetime
-import re
-import arvados
-import arvados.collection
-import urllib.parse
-import logging
-import calendar
-import urllib.parse
-
-logger = logging.getLogger('arvados.cwl-runner')
-
-def my_formatdate(dt):
-    return email.utils.formatdate(timeval=calendar.timegm(dt.timetuple()),
-                                  localtime=False, usegmt=True)
-
-def my_parsedate(text):
-    parsed = email.utils.parsedate_tz(text)
-    if parsed:
-        if parsed[9]:
-            # Adjust to UTC
-            return datetime.datetime(*parsed[:6]) + datetime.timedelta(seconds=parsed[9])
-        else:
-            # TZ is zero or missing, assume UTC.
-            return datetime.datetime(*parsed[:6])
-    else:
-        return datetime.datetime(1970, 1, 1)
-
-def fresh_cache(url, properties, now):
-    pr = properties[url]
-    expires = None
-
-    logger.debug("Checking cache freshness for %s using %s", url, pr)
-
-    if "Cache-Control" in pr:
-        if re.match(r"immutable", pr["Cache-Control"]):
-            return True
-
-        g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"])
-        if g:
-            expires = my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2)))
-
-    if expires is None and "Expires" in pr:
-        expires = my_parsedate(pr["Expires"])
-
-    if expires is None:
-        # Use a default cache time of 24 hours if upstream didn't set
-        # any cache headers, to reduce redundant downloads.
-        expires = my_parsedate(pr["Date"]) + datetime.timedelta(hours=24)
-
-    if not expires:
-        return False
-
-    return (now < expires)
-
-def remember_headers(url, properties, headers, now):
-    properties.setdefault(url, {})
-    for h in ("Cache-Control", "ETag", "Expires", "Date", "Content-Length"):
-        if h in headers:
-            properties[url][h] = headers[h]
-    if "Date" not in headers:
-        properties[url]["Date"] = my_formatdate(now)
-
-
-def changed(url, clean_url, properties, now):
-    req = requests.head(url, allow_redirects=True)
-
-    if req.status_code != 200:
-        # Sometimes endpoints are misconfigured and will deny HEAD but
-        # allow GET so instead of failing here, we'll try GET If-None-Match
-        return True
-
-    etag = properties[url].get("ETag")
-
-    if url in properties:
-        del properties[url]
-    remember_headers(clean_url, properties, req.headers, now)
-
-    if "ETag" in req.headers and etag == req.headers["ETag"]:
-        # Didn't change
-        return False
-
-    return True
-
-def etag_quote(etag):
-    # if it already has leading and trailing quotes, do nothing
-    if etag[0] == '"' and etag[-1] == '"':
-        return etag
-    else:
-        # Add quotes.
-        return '"' + etag + '"'
-
-
-def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow, varying_url_params="", prefer_cached_downloads=False):
-    varying_params = [s.strip() for s in varying_url_params.split(",")]
-
-    parsed = urllib.parse.urlparse(url)
-    query = [q for q in urllib.parse.parse_qsl(parsed.query)
-             if q[0] not in varying_params]
-
-    clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params,
-                                         urllib.parse.urlencode(query, safe="/"),  parsed.fragment))
-
-    r1 = api.collections().list(filters=[["properties", "exists", url]]).execute()
-
-    if clean_url == url:
-        items = r1["items"]
-    else:
-        r2 = api.collections().list(filters=[["properties", "exists", clean_url]]).execute()
-        items = r1["items"] + r2["items"]
-
-    now = utcnow()
-
-    etags = {}
-
-    for item in items:
-        properties = item["properties"]
-
-        if clean_url in properties:
-            cache_url = clean_url
-        elif url in properties:
-            cache_url = url
-        else:
-            return False
-
-        if prefer_cached_downloads or fresh_cache(cache_url, properties, now):
-            # HTTP caching rules say we should use the cache
-            cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
-            return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
-
-        if not changed(cache_url, clean_url, properties, now):
-            # ETag didn't change, same content, just update headers
-            api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
-            cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
-            return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
-
-        if "ETag" in properties[cache_url] and len(properties[cache_url]["ETag"]) > 2:
-            etags[properties[cache_url]["ETag"]] = item
-
-    logger.debug("Found ETags %s", etags)
-
-    properties = {}
-    headers = {}
-    if etags:
-        headers['If-None-Match'] = ', '.join([etag_quote(k) for k,v in etags.items()])
-    logger.debug("Sending GET request with headers %s", headers)
-    req = requests.get(url, stream=True, allow_redirects=True, headers=headers)
-
-    if req.status_code not in (200, 304):
-        raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
-
-    remember_headers(clean_url, properties, req.headers, now)
-
-    if req.status_code == 304 and "ETag" in req.headers and req.headers["ETag"] in etags:
-        item = etags[req.headers["ETag"]]
-        item["properties"].update(properties)
-        api.collections().update(uuid=item["uuid"], body={"collection":{"properties": item["properties"]}}).execute()
-        cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
-        return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
-
-    if "Content-Length" in properties[clean_url]:
-        cl = int(properties[clean_url]["Content-Length"])
-        logger.info("Downloading %s (%s bytes)", url, cl)
-    else:
-        cl = None
-        logger.info("Downloading %s (unknown size)", url)
-
-    c = arvados.collection.Collection()
-
-    if req.headers.get("Content-Disposition"):
-        grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))', req.headers["Content-Disposition"])
-        if grp.group(2):
-            name = grp.group(2)
-        else:
-            name = grp.group(4)
-    else:
-        name = parsed.path.split("/")[-1]
-
-    count = 0
-    start = time.time()
-    checkpoint = start
-    with c.open(name, "wb") as f:
-        for chunk in req.iter_content(chunk_size=1024):
-            count += len(chunk)
-            f.write(chunk)
-            loopnow = time.time()
-            if (loopnow - checkpoint) > 20:
-                bps = count / (loopnow - start)
-                if cl is not None:
-                    logger.info("%2.1f%% complete, %3.2f MiB/s, %1.0f seconds left",
-                                ((count * 100) / cl),
-                                (bps // (1024*1024)),
-                                ((cl-count) // bps))
-                else:
-                    logger.info("%d downloaded, %3.2f MiB/s", count, (bps / (1024*1024)))
-                checkpoint = loopnow
-
-    logger.info("Download complete")
-
-    collectionname = "Downloaded from %s" % urllib.parse.quote(clean_url, safe='')
-
-    # max length - space to add a timestamp used by ensure_unique_name
-    max_name_len = 254 - 28
-
-    if len(collectionname) > max_name_len:
-        over = len(collectionname) - max_name_len
-        split = int(max_name_len/2)
-        collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
-
-    c.save_new(name=collectionname, owner_uuid=project_uuid, ensure_unique_name=True)
-
-    api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
-
-    return "keep:%s/%s" % (c.portable_data_hash(), name)
index e2e287bf1dbd9cbcfbe63275ae40087393bb1d1f..3f54a396bbd535827b8f546dc153d94de2ce9f70 100644 (file)
@@ -26,7 +26,7 @@ from cwltool.utils import adjustFileObjs, adjustDirObjs
 from cwltool.stdfsaccess import abspath
 from cwltool.workflow import WorkflowException
 
-from .http import http_to_keep
+from arvados.http_to_keep import http_to_keep
 
 logger = logging.getLogger('arvados.cwl-runner')
 
@@ -109,9 +109,9 @@ class ArvPathMapper(PathMapper):
                         # passthrough, we'll download it later.
                         self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
                     else:
-                        keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src,
-                                               varying_url_params=self.arvrunner.toplevel_runtimeContext.varying_url_params,
-                                               prefer_cached_downloads=self.arvrunner.toplevel_runtimeContext.prefer_cached_downloads)
+                        keepref = "keep:%s/%s" % http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src,
+                                                              varying_url_params=self.arvrunner.toplevel_runtimeContext.varying_url_params,
+                                                              prefer_cached_downloads=self.arvrunner.toplevel_runtimeContext.prefer_cached_downloads)
                         logger.info("%s is %s", src, keepref)
                         self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
                 except Exception as e:
diff --git a/sdk/python/arvados/_pycurlhelper.py b/sdk/python/arvados/_pycurlhelper.py
new file mode 100644 (file)
index 0000000..e1153ad
--- /dev/null
@@ -0,0 +1,85 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import socket
+import pycurl
+import math
+
+class PyCurlHelper:
+    # Default Keep server connection timeout:  2 seconds
+    # Default Keep server read timeout:       256 seconds
+    # Default Keep server bandwidth minimum:  32768 bytes per second
+    # Default Keep proxy connection timeout:  20 seconds
+    # Default Keep proxy read timeout:        256 seconds
+    # Default Keep proxy bandwidth minimum:   32768 bytes per second
+    DEFAULT_TIMEOUT = (2, 256, 32768)
+    DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
+
+    def __init__(self, title_case_headers=False):
+        self._socket = None
+        self.title_case_headers = title_case_headers
+
+    def _socket_open(self, *args, **kwargs):
+        if len(args) + len(kwargs) == 2:
+            return self._socket_open_pycurl_7_21_5(*args, **kwargs)
+        else:
+            return self._socket_open_pycurl_7_19_3(*args, **kwargs)
+
+    def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
+        return self._socket_open_pycurl_7_21_5(
+            purpose=None,
+            address=collections.namedtuple(
+                'Address', ['family', 'socktype', 'protocol', 'addr'],
+            )(family, socktype, protocol, address))
+
+    def _socket_open_pycurl_7_21_5(self, purpose, address):
+        """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
+        s = socket.socket(address.family, address.socktype, address.protocol)
+        s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+        # Will throw invalid protocol error on mac. This test prevents that.
+        if hasattr(socket, 'TCP_KEEPIDLE'):
+            s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
+        s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
+        self._socket = s
+        return s
+
+    def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
+        if not timeouts:
+            return
+        elif isinstance(timeouts, tuple):
+            if len(timeouts) == 2:
+                conn_t, xfer_t = timeouts
+                bandwidth_bps = self.DEFAULT_TIMEOUT[2]
+            else:
+                conn_t, xfer_t, bandwidth_bps = timeouts
+        else:
+            conn_t, xfer_t = (timeouts, timeouts)
+            bandwidth_bps = self.DEFAULT_TIMEOUT[2]
+        curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
+        if not ignore_bandwidth:
+            curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
+            curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
+
+    def _headerfunction(self, header_line):
+        if isinstance(header_line, bytes):
+            header_line = header_line.decode('iso-8859-1')
+        if ':' in header_line:
+            name, value = header_line.split(':', 1)
+            if self.title_case_headers:
+                name = name.strip().title()
+            else:
+                name = name.strip().lower()
+            value = value.strip()
+        elif self._headers:
+            name = self._lastheadername
+            value = self._headers[name] + ' ' + header_line.strip()
+        elif header_line.startswith('HTTP/'):
+            name = 'x-status-line'
+            value = header_line
+        else:
+            _logger.error("Unexpected header line: %s", header_line)
+            return
+        self._lastheadername = name
+        self._headers[name] = value
+        # Returning None implies all bytes were written
diff --git a/sdk/python/arvados/http_to_keep.py b/sdk/python/arvados/http_to_keep.py
new file mode 100644 (file)
index 0000000..16c3dc4
--- /dev/null
@@ -0,0 +1,347 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import division
+from future import standard_library
+standard_library.install_aliases()
+
+import email.utils
+import time
+import datetime
+import re
+import arvados
+import arvados.collection
+import urllib.parse
+import logging
+import calendar
+import urllib.parse
+import pycurl
+import dataclasses
+import typing
+from arvados._pycurlhelper import PyCurlHelper
+
+logger = logging.getLogger('arvados.http_import')
+
+def _my_formatdate(dt):
+    return email.utils.formatdate(timeval=calendar.timegm(dt.timetuple()),
+                                  localtime=False, usegmt=True)
+
+def _my_parsedate(text):
+    parsed = email.utils.parsedate_tz(text)
+    if parsed:
+        if parsed[9]:
+            # Adjust to UTC
+            return datetime.datetime(*parsed[:6]) + datetime.timedelta(seconds=parsed[9])
+        else:
+            # TZ is zero or missing, assume UTC.
+            return datetime.datetime(*parsed[:6])
+    else:
+        return datetime.datetime(1970, 1, 1)
+
+def _fresh_cache(url, properties, now):
+    pr = properties[url]
+    expires = None
+
+    logger.debug("Checking cache freshness for %s using %s", url, pr)
+
+    if "Cache-Control" in pr:
+        if re.match(r"immutable", pr["Cache-Control"]):
+            return True
+
+        g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"])
+        if g:
+            expires = _my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2)))
+
+    if expires is None and "Expires" in pr:
+        expires = _my_parsedate(pr["Expires"])
+
+    if expires is None:
+        # Use a default cache time of 24 hours if upstream didn't set
+        # any cache headers, to reduce redundant downloads.
+        expires = _my_parsedate(pr["Date"]) + datetime.timedelta(hours=24)
+
+    if not expires:
+        return False
+
+    return (now < expires)
+
+def _remember_headers(url, properties, headers, now):
+    properties.setdefault(url, {})
+    for h in ("Cache-Control", "Etag", "Expires", "Date", "Content-Length"):
+        if h in headers:
+            properties[url][h] = headers[h]
+    if "Date" not in headers:
+        properties[url]["Date"] = _my_formatdate(now)
+
+@dataclasses.dataclass
+class _Response:
+    status_code: int
+    headers: typing.Mapping[str, str]
+
+
+class _Downloader(PyCurlHelper):
+    # Wait up to 60 seconds for connection
+    # How long it can be in "low bandwidth" state before it gives up
+    # Low bandwidth threshold is 32 KiB/s
+    DOWNLOADER_TIMEOUT = (60, 300, 32768)
+
+    def __init__(self, apiclient):
+        super(_Downloader, self).__init__(title_case_headers=True)
+        self.curl = pycurl.Curl()
+        self.curl.setopt(pycurl.NOSIGNAL, 1)
+        self.curl.setopt(pycurl.OPENSOCKETFUNCTION,
+                    lambda *args, **kwargs: self._socket_open(*args, **kwargs))
+        self.target = None
+        self.apiclient = apiclient
+
+    def head(self, url):
+        get_headers = {'Accept': 'application/octet-stream'}
+        self._headers = {}
+
+        self.curl.setopt(pycurl.URL, url.encode('utf-8'))
+        self.curl.setopt(pycurl.HTTPHEADER, [
+            '{}: {}'.format(k,v) for k,v in get_headers.items()])
+
+        self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+        self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
+        self.curl.setopt(pycurl.NOBODY, True)
+        self.curl.setopt(pycurl.FOLLOWLOCATION, True)
+
+        self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, True)
+
+        try:
+            self.curl.perform()
+        except Exception as e:
+            raise arvados.errors.HttpError(0, str(e))
+        finally:
+            if self._socket:
+                self._socket.close()
+                self._socket = None
+
+        return _Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers)
+
+    def download(self, url, headers):
+        self.count = 0
+        self.start = time.time()
+        self.checkpoint = self.start
+        self._headers = {}
+        self._first_chunk = True
+        self.collection = None
+        self.parsedurl = urllib.parse.urlparse(url)
+
+        get_headers = {'Accept': 'application/octet-stream'}
+        get_headers.update(headers)
+
+        self.curl.setopt(pycurl.URL, url.encode('utf-8'))
+        self.curl.setopt(pycurl.HTTPHEADER, [
+            '{}: {}'.format(k,v) for k,v in get_headers.items()])
+
+        self.curl.setopt(pycurl.WRITEFUNCTION, self.body_write)
+        self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+
+        self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
+        self.curl.setopt(pycurl.HTTPGET, True)
+        self.curl.setopt(pycurl.FOLLOWLOCATION, True)
+
+        self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, False)
+
+        try:
+            self.curl.perform()
+        except Exception as e:
+            raise arvados.errors.HttpError(0, str(e))
+        finally:
+            if self._socket:
+                self._socket.close()
+                self._socket = None
+
+        return _Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers)
+
+    def headers_received(self):
+        self.collection = arvados.collection.Collection(api_client=self.apiclient)
+
+        if "Content-Length" in self._headers:
+            self.contentlength = int(self._headers["Content-Length"])
+            logger.info("File size is %s bytes", self.contentlength)
+        else:
+            self.contentlength = None
+
+        if self._headers.get("Content-Disposition"):
+            grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))',
+                            self._headers["Content-Disposition"])
+            if grp.group(2):
+                self.name = grp.group(2)
+            else:
+                self.name = grp.group(4)
+        else:
+            self.name = self.parsedurl.path.split("/")[-1]
+
+        # Can't call curl.getinfo(pycurl.RESPONSE_CODE) until
+        # perform() is done but we need to know the status before that
+        # so we have to parse the status line ourselves.
+        mt = re.match(r'^HTTP\/(\d(\.\d)?) ([1-5]\d\d) ([^\r\n\x00-\x08\x0b\x0c\x0e-\x1f\x7f]*)\r\n$', self._headers["x-status-line"])
+        code = int(mt.group(3))
+
+        if code == 200:
+            self.target = self.collection.open(self.name, "wb")
+
+    def body_write(self, chunk):
+        if self._first_chunk:
+            self.headers_received()
+            self._first_chunk = False
+
+        self.count += len(chunk)
+        self.target.write(chunk)
+        loopnow = time.time()
+        if (loopnow - self.checkpoint) < 20:
+            return
+
+        bps = self.count / (loopnow - self.start)
+        if self.contentlength is not None:
+            logger.info("%2.1f%% complete, %6.2f MiB/s, %1.0f seconds left",
+                        ((self.count * 100) / self.contentlength),
+                        (bps / (1024.0*1024.0)),
+                        ((self.contentlength-self.count) // bps))
+        else:
+            logger.info("%d downloaded, %6.2f MiB/s", count, (bps / (1024.0*1024.0)))
+        self.checkpoint = loopnow
+
+
+def _changed(url, clean_url, properties, now, curldownloader):
+    req = curldownloader.head(url)
+
+    if req.status_code != 200:
+        # Sometimes endpoints are misconfigured and will deny HEAD but
+        # allow GET so instead of failing here, we'll try GET If-None-Match
+        return True
+
+    # previous version of this code used "ETag", now we are
+    # normalizing to "Etag", check for both.
+    etag = properties[url].get("Etag") or properties[url].get("ETag")
+
+    if url in properties:
+        del properties[url]
+    _remember_headers(clean_url, properties, req.headers, now)
+
+    if "Etag" in req.headers and etag == req.headers["Etag"]:
+        # Didn't change
+        return False
+
+    return True
+
+def _etag_quote(etag):
+    # if it already has leading and trailing quotes, do nothing
+    if etag[0] == '"' and etag[-1] == '"':
+        return etag
+    else:
+        # Add quotes.
+        return '"' + etag + '"'
+
+
+def http_to_keep(api, project_uuid, url,
+                 utcnow=datetime.datetime.utcnow, varying_url_params="",
+                 prefer_cached_downloads=False):
+    """Download a file over HTTP and upload it to keep, with HTTP headers as metadata.
+
+    Before downloading the URL, checks to see if the URL already
+    exists in Keep and applies HTTP caching policy, the
+    varying_url_params and prefer_cached_downloads flags in order to
+    decide whether to use the version in Keep or re-download it.
+    """
+
+    logger.info("Checking Keep for %s", url)
+
+    varying_params = [s.strip() for s in varying_url_params.split(",")]
+
+    parsed = urllib.parse.urlparse(url)
+    query = [q for q in urllib.parse.parse_qsl(parsed.query)
+             if q[0] not in varying_params]
+
+    clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params,
+                                         urllib.parse.urlencode(query, safe="/"),  parsed.fragment))
+
+    r1 = api.collections().list(filters=[["properties", "exists", url]]).execute()
+
+    if clean_url == url:
+        items = r1["items"]
+    else:
+        r2 = api.collections().list(filters=[["properties", "exists", clean_url]]).execute()
+        items = r1["items"] + r2["items"]
+
+    now = utcnow()
+
+    etags = {}
+
+    curldownloader = _Downloader(api)
+
+    for item in items:
+        properties = item["properties"]
+
+        if clean_url in properties:
+            cache_url = clean_url
+        elif url in properties:
+            cache_url = url
+        else:
+            raise Exception("Shouldn't happen, got an API result for %s that doesn't have the URL in properties" % item["uuid"])
+
+        if prefer_cached_downloads or _fresh_cache(cache_url, properties, now):
+            # HTTP caching rules say we should use the cache
+            cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
+            return (item["portable_data_hash"], next(iter(cr.keys())) )
+
+        if not _changed(cache_url, clean_url, properties, now, curldownloader):
+            # Etag didn't change, same content, just update headers
+            api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
+            cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
+            return (item["portable_data_hash"], next(iter(cr.keys())))
+
+        for etagstr in ("Etag", "ETag"):
+            if etagstr in properties[cache_url] and len(properties[cache_url][etagstr]) > 2:
+                etags[properties[cache_url][etagstr]] = item
+
+    logger.debug("Found ETag values %s", etags)
+
+    properties = {}
+    headers = {}
+    if etags:
+        headers['If-None-Match'] = ', '.join([_etag_quote(k) for k,v in etags.items()])
+    logger.debug("Sending GET request with headers %s", headers)
+
+    logger.info("Beginning download of %s", url)
+
+    req = curldownloader.download(url, headers)
+
+    c = curldownloader.collection
+
+    if req.status_code not in (200, 304):
+        raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
+
+    if curldownloader.target is not None:
+        curldownloader.target.close()
+
+    _remember_headers(clean_url, properties, req.headers, now)
+
+    if req.status_code == 304 and "Etag" in req.headers and req.headers["Etag"] in etags:
+        item = etags[req.headers["Etag"]]
+        item["properties"].update(properties)
+        api.collections().update(uuid=item["uuid"], body={"collection":{"properties": item["properties"]}}).execute()
+        cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
+        return (item["portable_data_hash"], list(cr.keys())[0])
+
+    logger.info("Download complete")
+
+    collectionname = "Downloaded from %s" % urllib.parse.quote(clean_url, safe='')
+
+    # max length - space to add a timestamp used by ensure_unique_name
+    max_name_len = 254 - 28
+
+    if len(collectionname) > max_name_len:
+        over = len(collectionname) - max_name_len
+        split = int(max_name_len/2)
+        collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
+
+    c.save_new(name=collectionname, owner_uuid=project_uuid, ensure_unique_name=True)
+
+    api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
+
+    return (c.portable_data_hash(), curldownloader.name)
index cbe96ffa2f1c8d038e95f6174b3dc44ec739770a..6804f355a8cdc12217e59f7d259fe9852d91aac4 100644 (file)
@@ -44,6 +44,7 @@ import arvados.errors
 import arvados.retry as retry
 import arvados.util
 import arvados.diskcache
+from arvados._pycurlhelper import PyCurlHelper
 
 _logger = logging.getLogger('arvados.keep')
 global_client_object = None
@@ -405,18 +406,10 @@ class Counter(object):
 
 
 class KeepClient(object):
+    DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
+    DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
 
-    # Default Keep server connection timeout:  2 seconds
-    # Default Keep server read timeout:       256 seconds
-    # Default Keep server bandwidth minimum:  32768 bytes per second
-    # Default Keep proxy connection timeout:  20 seconds
-    # Default Keep proxy read timeout:        256 seconds
-    # Default Keep proxy bandwidth minimum:   32768 bytes per second
-    DEFAULT_TIMEOUT = (2, 256, 32768)
-    DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
-
-
-    class KeepService(object):
+    class KeepService(PyCurlHelper):
         """Make requests to a single Keep service, and track results.
 
         A KeepService is intended to last long enough to perform one
@@ -439,6 +432,7 @@ class KeepClient(object):
                      download_counter=None,
                      headers={},
                      insecure=False):
+            super(KeepClient.KeepService, self).__init__()
             self.root = root
             self._user_agent_pool = user_agent_pool
             self._result = {'error': None}
@@ -476,30 +470,6 @@ class KeepClient(object):
             except:
                 ua.close()
 
-        def _socket_open(self, *args, **kwargs):
-            if len(args) + len(kwargs) == 2:
-                return self._socket_open_pycurl_7_21_5(*args, **kwargs)
-            else:
-                return self._socket_open_pycurl_7_19_3(*args, **kwargs)
-
-        def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
-            return self._socket_open_pycurl_7_21_5(
-                purpose=None,
-                address=collections.namedtuple(
-                    'Address', ['family', 'socktype', 'protocol', 'addr'],
-                )(family, socktype, protocol, address))
-
-        def _socket_open_pycurl_7_21_5(self, purpose, address):
-            """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
-            s = socket.socket(address.family, address.socktype, address.protocol)
-            s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
-            # Will throw invalid protocol error on mac. This test prevents that.
-            if hasattr(socket, 'TCP_KEEPIDLE'):
-                s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
-            s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
-            self._socket = s
-            return s
-
         def get(self, locator, method="GET", timeout=None):
             # locator is a KeepLocator object.
             url = self.root + str(locator)
@@ -525,6 +495,8 @@ class KeepClient(object):
                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
                     if method == "HEAD":
                         curl.setopt(pycurl.NOBODY, True)
+                    else:
+                        curl.setopt(pycurl.HTTPGET, True)
                     self._setcurltimeouts(curl, timeout, method=="HEAD")
 
                     try:
@@ -669,43 +641,6 @@ class KeepClient(object):
                 self.upload_counter.add(len(body))
             return True
 
-        def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
-            if not timeouts:
-                return
-            elif isinstance(timeouts, tuple):
-                if len(timeouts) == 2:
-                    conn_t, xfer_t = timeouts
-                    bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
-                else:
-                    conn_t, xfer_t, bandwidth_bps = timeouts
-            else:
-                conn_t, xfer_t = (timeouts, timeouts)
-                bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
-            curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
-            if not ignore_bandwidth:
-                curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
-                curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
-
-        def _headerfunction(self, header_line):
-            if isinstance(header_line, bytes):
-                header_line = header_line.decode('iso-8859-1')
-            if ':' in header_line:
-                name, value = header_line.split(':', 1)
-                name = name.strip().lower()
-                value = value.strip()
-            elif self._headers:
-                name = self._lastheadername
-                value = self._headers[name] + ' ' + header_line.strip()
-            elif header_line.startswith('HTTP/'):
-                name = 'x-status-line'
-                value = header_line
-            else:
-                _logger.error("Unexpected header line: %s", header_line)
-                return
-            self._lastheadername = name
-            self._headers[name] = value
-            # Returning None implies all bytes were written
-
 
     class KeepWriterQueue(queue.Queue):
         def __init__(self, copies, classes=[]):
similarity index 63%
rename from sdk/cwl/tests/test_http.py
rename to sdk/python/tests/test_http.py
index 5598b1f1387a33a4c53d45eac5fe7dbc042dbeef..381a61e2aa044103d53470d4bfe84fa99569344a 100644 (file)
@@ -18,23 +18,64 @@ import datetime
 
 import arvados
 import arvados.collection
-import arvados_cwl
-import arvados_cwl.runner
 import arvados.keep
+import pycurl
 
-from .matcher import JsonDiffMatcher, StripYAMLComments
-from .mock_discovery import get_rootDesc
-
-import arvados_cwl.http
+from arvados.http_to_keep import http_to_keep
 
 import ruamel.yaml as yaml
 
+# Turns out there was already "FakeCurl" that serves the same purpose, but
+# I wrote this before I knew that.  Whoops.
+class CurlMock:
+    def __init__(self, headers = {}):
+        self.perform_was_called = False
+        self.headers = headers
+        self.get_response = 200
+        self.head_response = 200
+        self.req_headers = []
+
+    def setopt(self, op, *args):
+        if op == pycurl.URL:
+            self.url = args[0]
+        if op == pycurl.WRITEFUNCTION:
+            self.writefn = args[0]
+        if op == pycurl.HEADERFUNCTION:
+            self.headerfn = args[0]
+        if op == pycurl.NOBODY:
+            self.head = True
+        if op == pycurl.HTTPGET:
+            self.head = False
+        if op == pycurl.HTTPHEADER:
+            self.req_headers = args[0]
+
+    def getinfo(self, op):
+        if op == pycurl.RESPONSE_CODE:
+            if self.head:
+                return self.head_response
+            else:
+                return self.get_response
+
+    def perform(self):
+        self.perform_was_called = True
+
+        if self.head:
+            self.headerfn("HTTP/1.1 {} Status\r\n".format(self.head_response))
+        else:
+            self.headerfn("HTTP/1.1 {} Status\r\n".format(self.get_response))
+
+        for k,v in self.headers.items():
+            self.headerfn("%s: %s" % (k,v))
+
+        if not self.head and self.get_response == 200:
+            self.writefn(self.chunk)
+
 
 class TestHttpToKeep(unittest.TestCase):
 
-    @mock.patch("requests.get")
+    @mock.patch("pycurl.Curl")
     @mock.patch("arvados.collection.Collection")
-    def test_http_get(self, collectionmock, getmock):
+    def test_http_get(self, collectionmock, curlmock):
         api = mock.MagicMock()
 
         api.collections().list().execute.return_value = {
@@ -46,19 +87,20 @@ class TestHttpToKeep(unittest.TestCase):
         cm.portable_data_hash.return_value = "99999999999999999999999999999998+99"
         collectionmock.return_value = cm
 
-        req = mock.MagicMock()
-        req.status_code = 200
-        req.headers = {}
-        req.iter_content.return_value = ["abc"]
-        getmock.return_value = req
+        mockobj = CurlMock()
+        mockobj.chunk = b'abc'
+        def init():
+            return mockobj
+        curlmock.side_effect = init
 
         utcnow = mock.MagicMock()
         utcnow.return_value = datetime.datetime(2018, 5, 15)
 
-        r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
-        self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+        r = http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+        self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt"))
 
-        getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True, headers={})
+        assert mockobj.url == b"http://example.com/file1.txt"
+        assert mockobj.perform_was_called is True
 
         cm.open.assert_called_with("file1.txt", "wb")
         cm.save_new.assert_called_with(name="Downloaded from http%3A%2F%2Fexample.com%2Ffile1.txt",
@@ -70,9 +112,9 @@ class TestHttpToKeep(unittest.TestCase):
         ])
 
 
-    @mock.patch("requests.get")
+    @mock.patch("pycurl.Curl")
     @mock.patch("arvados.collection.CollectionReader")
-    def test_http_expires(self, collectionmock, getmock):
+    def test_http_expires(self, collectionmock, curlmock):
         api = mock.MagicMock()
 
         api.collections().list().execute.return_value = {
@@ -94,24 +136,24 @@ class TestHttpToKeep(unittest.TestCase):
         cm.keys.return_value = ["file1.txt"]
         collectionmock.return_value = cm
 
-        req = mock.MagicMock()
-        req.status_code = 200
-        req.headers = {}
-        req.iter_content.return_value = ["abc"]
-        getmock.return_value = req
+        mockobj = CurlMock()
+        mockobj.chunk = b'abc'
+        def init():
+            return mockobj
+        curlmock.side_effect = init
 
         utcnow = mock.MagicMock()
         utcnow.return_value = datetime.datetime(2018, 5, 16)
 
-        r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
-        self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+        r = http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+        self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt"))
 
-        getmock.assert_not_called()
+        assert mockobj.perform_was_called is False
 
 
-    @mock.patch("requests.get")
+    @mock.patch("pycurl.Curl")
     @mock.patch("arvados.collection.CollectionReader")
-    def test_http_cache_control(self, collectionmock, getmock):
+    def test_http_cache_control(self, collectionmock, curlmock):
         api = mock.MagicMock()
 
         api.collections().list().execute.return_value = {
@@ -133,25 +175,24 @@ class TestHttpToKeep(unittest.TestCase):
         cm.keys.return_value = ["file1.txt"]
         collectionmock.return_value = cm
 
-        req = mock.MagicMock()
-        req.status_code = 200
-        req.headers = {}
-        req.iter_content.return_value = ["abc"]
-        getmock.return_value = req
+        mockobj = CurlMock()
+        mockobj.chunk = b'abc'
+        def init():
+            return mockobj
+        curlmock.side_effect = init
 
         utcnow = mock.MagicMock()
         utcnow.return_value = datetime.datetime(2018, 5, 16)
 
-        r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
-        self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+        r = http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+        self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt"))
 
-        getmock.assert_not_called()
+        assert mockobj.perform_was_called is False
 
 
-    @mock.patch("requests.get")
-    @mock.patch("requests.head")
+    @mock.patch("pycurl.Curl")
     @mock.patch("arvados.collection.Collection")
-    def test_http_expired(self, collectionmock, headmock, getmock):
+    def test_http_expired(self, collectionmock, curlmock):
         api = mock.MagicMock()
 
         api.collections().list().execute.return_value = {
@@ -161,7 +202,7 @@ class TestHttpToKeep(unittest.TestCase):
                 "properties": {
                     'http://example.com/file1.txt': {
                         'Date': 'Tue, 15 May 2018 00:00:00 GMT',
-                        'Expires': 'Tue, 16 May 2018 00:00:00 GMT'
+                        'Expires': 'Wed, 16 May 2018 00:00:00 GMT'
                     }
                 }
             }]
@@ -173,20 +214,20 @@ class TestHttpToKeep(unittest.TestCase):
         cm.keys.return_value = ["file1.txt"]
         collectionmock.return_value = cm
 
-        req = mock.MagicMock()
-        req.status_code = 200
-        req.headers = {'Date': 'Tue, 17 May 2018 00:00:00 GMT'}
-        req.iter_content.return_value = ["def"]
-        getmock.return_value = req
-        headmock.return_value = req
+        mockobj = CurlMock({'Date': 'Thu, 17 May 2018 00:00:00 GMT'})
+        mockobj.chunk = b'def'
+        def init():
+            return mockobj
+        curlmock.side_effect = init
 
         utcnow = mock.MagicMock()
         utcnow.return_value = datetime.datetime(2018, 5, 17)
 
-        r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
-        self.assertEqual(r, "keep:99999999999999999999999999999997+99/file1.txt")
+        r = http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+        self.assertEqual(r, ("99999999999999999999999999999997+99", "file1.txt"))
 
-        getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True, headers={})
+        assert mockobj.url == b"http://example.com/file1.txt"
+        assert mockobj.perform_was_called is True
 
         cm.open.assert_called_with("file1.txt", "wb")
         cm.save_new.assert_called_with(name="Downloaded from http%3A%2F%2Fexample.com%2Ffile1.txt",
@@ -194,14 +235,13 @@ class TestHttpToKeep(unittest.TestCase):
 
         api.collections().update.assert_has_calls([
             mock.call(uuid=cm.manifest_locator(),
-                      body={"collection":{"properties": {'http://example.com/file1.txt': {'Date': 'Tue, 17 May 2018 00:00:00 GMT'}}}})
+                      body={"collection":{"properties": {'http://example.com/file1.txt': {'Date': 'Thu, 17 May 2018 00:00:00 GMT'}}}})
         ])
 
 
-    @mock.patch("requests.get")
-    @mock.patch("requests.head")
+    @mock.patch("pycurl.Curl")
     @mock.patch("arvados.collection.CollectionReader")
-    def test_http_etag(self, collectionmock, headmock, getmock):
+    def test_http_etag(self, collectionmock, curlmock):
         api = mock.MagicMock()
 
         api.collections().list().execute.return_value = {
@@ -211,8 +251,8 @@ class TestHttpToKeep(unittest.TestCase):
                 "properties": {
                     'http://example.com/file1.txt': {
                         'Date': 'Tue, 15 May 2018 00:00:00 GMT',
-                        'Expires': 'Tue, 16 May 2018 00:00:00 GMT',
-                        'ETag': '"123456"'
+                        'Expires': 'Wed, 16 May 2018 00:00:00 GMT',
+                        'Etag': '"123456"'
                     }
                 }
             }]
@@ -224,36 +264,36 @@ class TestHttpToKeep(unittest.TestCase):
         cm.keys.return_value = ["file1.txt"]
         collectionmock.return_value = cm
 
-        req = mock.MagicMock()
-        req.status_code = 200
-        req.headers = {
-            'Date': 'Tue, 17 May 2018 00:00:00 GMT',
-            'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
-            'ETag': '"123456"'
-        }
-        headmock.return_value = req
+        mockobj = CurlMock({
+            'Date': 'Thu, 17 May 2018 00:00:00 GMT',
+            'Expires': 'Sat, 19 May 2018 00:00:00 GMT',
+            'Etag': '"123456"'
+        })
+        mockobj.chunk = None
+        def init():
+            return mockobj
+        curlmock.side_effect = init
 
         utcnow = mock.MagicMock()
         utcnow.return_value = datetime.datetime(2018, 5, 17)
 
-        r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
-        self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+        r = http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+        self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt"))
 
-        getmock.assert_not_called()
         cm.open.assert_not_called()
 
         api.collections().update.assert_has_calls([
             mock.call(uuid=cm.manifest_locator(),
                       body={"collection":{"properties": {'http://example.com/file1.txt': {
-                          'Date': 'Tue, 17 May 2018 00:00:00 GMT',
-                          'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
-                          'ETag': '"123456"'
+                          'Date': 'Thu, 17 May 2018 00:00:00 GMT',
+                          'Expires': 'Sat, 19 May 2018 00:00:00 GMT',
+                          'Etag': '"123456"'
                       }}}})
                       ])
 
-    @mock.patch("requests.get")
+    @mock.patch("pycurl.Curl")
     @mock.patch("arvados.collection.Collection")
-    def test_http_content_disp(self, collectionmock, getmock):
+    def test_http_content_disp(self, collectionmock, curlmock):
         api = mock.MagicMock()
 
         api.collections().list().execute.return_value = {
@@ -265,19 +305,19 @@ class TestHttpToKeep(unittest.TestCase):
         cm.portable_data_hash.return_value = "99999999999999999999999999999998+99"
         collectionmock.return_value = cm
 
-        req = mock.MagicMock()
-        req.status_code = 200
-        req.headers = {"Content-Disposition": "attachment; filename=file1.txt"}
-        req.iter_content.return_value = ["abc"]
-        getmock.return_value = req
+        mockobj = CurlMock({"Content-Disposition": "attachment; filename=file1.txt"})
+        mockobj.chunk = "abc"
+        def init():
+            return mockobj
+        curlmock.side_effect = init
 
         utcnow = mock.MagicMock()
         utcnow.return_value = datetime.datetime(2018, 5, 15)
 
-        r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/download?fn=/file1.txt", utcnow=utcnow)
-        self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+        r = http_to_keep(api, None, "http://example.com/download?fn=/file1.txt", utcnow=utcnow)
+        self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt"))
 
-        getmock.assert_called_with("http://example.com/download?fn=/file1.txt", stream=True, allow_redirects=True, headers={})
+        assert mockobj.url == b"http://example.com/download?fn=/file1.txt"
 
         cm.open.assert_called_with("file1.txt", "wb")
         cm.save_new.assert_called_with(name="Downloaded from http%3A%2F%2Fexample.com%2Fdownload%3Ffn%3D%2Ffile1.txt",
@@ -288,10 +328,9 @@ class TestHttpToKeep(unittest.TestCase):
                       body={"collection":{"properties": {"http://example.com/download?fn=/file1.txt": {'Date': 'Tue, 15 May 2018 00:00:00 GMT'}}}})
         ])
 
-    @mock.patch("requests.get")
-    @mock.patch("requests.head")
+    @mock.patch("pycurl.Curl")
     @mock.patch("arvados.collection.CollectionReader")
-    def test_http_etag_if_none_match(self, collectionmock, headmock, getmock):
+    def test_http_etag_if_none_match(self, collectionmock, curlmock):
         api = mock.MagicMock()
 
         api.collections().list().execute.return_value = {
@@ -302,7 +341,7 @@ class TestHttpToKeep(unittest.TestCase):
                     'http://example.com/file1.txt': {
                         'Date': 'Tue, 15 May 2018 00:00:00 GMT',
                         'Expires': 'Tue, 16 May 2018 00:00:00 GMT',
-                        'ETag': '"123456"'
+                        'Etag': '"123456"'
                     }
                 }
             }]
@@ -314,29 +353,26 @@ class TestHttpToKeep(unittest.TestCase):
         cm.keys.return_value = ["file1.txt"]
         collectionmock.return_value = cm
 
-        # Head request fails, will try a conditional GET instead
-        req = mock.MagicMock()
-        req.status_code = 403
-        req.headers = {
-        }
-        headmock.return_value = req
+        mockobj = CurlMock({
+            'Date': 'Tue, 17 May 2018 00:00:00 GMT',
+            'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
+            'Etag': '"123456"'
+        })
+        mockobj.chunk = None
+        mockobj.head_response = 403
+        mockobj.get_response = 304
+        def init():
+            return mockobj
+        curlmock.side_effect = init
 
         utcnow = mock.MagicMock()
         utcnow.return_value = datetime.datetime(2018, 5, 17)
 
-        req = mock.MagicMock()
-        req.status_code = 304
-        req.headers = {
-            'Date': 'Tue, 17 May 2018 00:00:00 GMT',
-            'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
-            'ETag': '"123456"'
-        }
-        getmock.return_value = req
+        r = http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+        self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt"))
 
-        r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
-        self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
-
-        getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True, headers={"If-None-Match": '"123456"'})
+        print(mockobj.req_headers)
+        assert mockobj.req_headers == ["Accept: application/octet-stream", "If-None-Match: \"123456\""]
         cm.open.assert_not_called()
 
         api.collections().update.assert_has_calls([
@@ -344,15 +380,13 @@ class TestHttpToKeep(unittest.TestCase):
                       body={"collection":{"properties": {'http://example.com/file1.txt': {
                           'Date': 'Tue, 17 May 2018 00:00:00 GMT',
                           'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
-                          'ETag': '"123456"'
+                          'Etag': '"123456"'
                       }}}})
                       ])
 
-
-    @mock.patch("requests.get")
-    @mock.patch("requests.head")
+    @mock.patch("pycurl.Curl")
     @mock.patch("arvados.collection.CollectionReader")
-    def test_http_prefer_cached_downloads(self, collectionmock, headmock, getmock):
+    def test_http_prefer_cached_downloads(self, collectionmock, curlmock):
         api = mock.MagicMock()
 
         api.collections().list().execute.return_value = {
@@ -363,7 +397,7 @@ class TestHttpToKeep(unittest.TestCase):
                     'http://example.com/file1.txt': {
                         'Date': 'Tue, 15 May 2018 00:00:00 GMT',
                         'Expires': 'Tue, 16 May 2018 00:00:00 GMT',
-                        'ETag': '"123456"'
+                        'Etag': '"123456"'
                     }
                 }
             }]
@@ -375,21 +409,24 @@ class TestHttpToKeep(unittest.TestCase):
         cm.keys.return_value = ["file1.txt"]
         collectionmock.return_value = cm
 
+        mockobj = CurlMock()
+        def init():
+            return mockobj
+        curlmock.side_effect = init
+
         utcnow = mock.MagicMock()
         utcnow.return_value = datetime.datetime(2018, 5, 17)
 
-        r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow, prefer_cached_downloads=True)
-        self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+        r = http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow, prefer_cached_downloads=True)
+        self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt"))
 
-        headmock.assert_not_called()
-        getmock.assert_not_called()
+        assert mockobj.perform_was_called is False
         cm.open.assert_not_called()
         api.collections().update.assert_not_called()
 
-    @mock.patch("requests.get")
-    @mock.patch("requests.head")
+    @mock.patch("pycurl.Curl")
     @mock.patch("arvados.collection.CollectionReader")
-    def test_http_varying_url_params(self, collectionmock, headmock, getmock):
+    def test_http_varying_url_params(self, collectionmock, curlmock):
         for prurl in ("http://example.com/file1.txt", "http://example.com/file1.txt?KeyId=123&Signature=456&Expires=789"):
             api = mock.MagicMock()
 
@@ -401,7 +438,7 @@ class TestHttpToKeep(unittest.TestCase):
                         prurl: {
                             'Date': 'Tue, 15 May 2018 00:00:00 GMT',
                             'Expires': 'Tue, 16 May 2018 00:00:00 GMT',
-                            'ETag': '"123456"'
+                            'Etag': '"123456"'
                         }
                     }
                 }]
@@ -413,23 +450,24 @@ class TestHttpToKeep(unittest.TestCase):
             cm.keys.return_value = ["file1.txt"]
             collectionmock.return_value = cm
 
-            req = mock.MagicMock()
-            req.status_code = 200
-            req.headers = {
+            mockobj = CurlMock({
                 'Date': 'Tue, 17 May 2018 00:00:00 GMT',
                 'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
-                'ETag': '"123456"'
-            }
-            headmock.return_value = req
+                'Etag': '"123456"'
+            })
+            mockobj.chunk = None
+            def init():
+                return mockobj
+            curlmock.side_effect = init
 
             utcnow = mock.MagicMock()
             utcnow.return_value = datetime.datetime(2018, 5, 17)
 
-            r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt?KeyId=123&Signature=456&Expires=789",
+            r = http_to_keep(api, None, "http://example.com/file1.txt?KeyId=123&Signature=456&Expires=789",
                                               utcnow=utcnow, varying_url_params="KeyId,Signature,Expires")
-            self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+            self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt"))
 
-            getmock.assert_not_called()
+            assert mockobj.perform_was_called is True
             cm.open.assert_not_called()
 
             api.collections().update.assert_has_calls([
@@ -437,6 +475,6 @@ class TestHttpToKeep(unittest.TestCase):
                           body={"collection":{"properties": {'http://example.com/file1.txt': {
                               'Date': 'Tue, 17 May 2018 00:00:00 GMT',
                               'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
-                              'ETag': '"123456"'
+                              'Etag': '"123456"'
                           }}}})
                           ])
index 9b401cc6ac86a1e9dd6e1af147c1b97565e484aa..3834690078a13ca5821e6d141a8c05b6c9ec10d3 100644 (file)
@@ -63,6 +63,8 @@ gem 'rails-observers'
 gem 'rails-perftest'
 gem 'rails-controller-testing'
 
+gem 'mini_portile2', '~> 2.8', '>= 2.8.1'
+
 # arvados-google-api-client and googleauth depend on signet, but
 # signet 0.12 is incompatible with ruby 2.3.
 gem 'signet', '< 0.12'
index bdfaaf4efeedb65e753b87ad4c7dc8d8a3659e0c..21d0ad471208b32987144156d42d16a5ddc5683f 100644 (file)
@@ -242,6 +242,7 @@ DEPENDENCIES
   listen
   lograge
   logstash-event
+  mini_portile2 (~> 2.8, >= 2.8.1)
   minitest (= 5.10.3)
   mocha
   multi_json
@@ -265,4 +266,4 @@ DEPENDENCIES
   themes_for_rails!
 
 BUNDLED WITH
-   2.2.19
+   2.3.26