# Note that unless in debug mode, some stack traces related to user
# workflow errors may be suppressed.
+
+ # Set the logging on most modules INFO (instead of default which is WARNING)
+ logger.setLevel(logging.INFO)
+ logging.getLogger('arvados').setLevel(logging.INFO)
+ logging.getLogger('arvados.keep').setLevel(logging.WARNING)
+
if arvargs.debug:
logger.setLevel(logging.DEBUG)
logging.getLogger('arvados').setLevel(logging.DEBUG)
activity statuses, for example in the RuntimeStatusLoggingHandler.
"""
- if kind not in ('error', 'warning'):
+ if kind not in ('error', 'warning', 'activity'):
# Ignore any other status kind
return
runtime_status = current.get('runtime_status', {})
original_updatemessage = updatemessage = runtime_status.get(kind, "")
- if not updatemessage:
+ if kind == "activity" or not updatemessage:
updatemessage = message
# Subsequent messages tacked on in detail
def arv_executor(self, updated_tool, job_order, runtimeContext, logger=None):
self.debug = runtimeContext.debug
+ self.runtime_status_update("activity", "initialization")
+
git_info = self.get_git_info(updated_tool) if self.git_info else {}
if git_info:
logger.info("Git provenance")
self.project_uuid = runtimeContext.project_uuid
+ self.runtime_status_update("activity", "data transfer")
+
# Upload local file references in the job order.
with Perf(metrics, "upload_job_order"):
job_order, jobmapper = upload_job_order(self, "%s input" % runtimeContext.name,
# We either running the workflow directly, or submitting it
# and will wait for a final result.
+ self.runtime_status_update("activity", "workflow execution")
+
current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
if current_container:
logger.info("Running inside container %s", current_container.get("uuid"))
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-from __future__ import division
-from future import standard_library
-standard_library.install_aliases()
-
-import requests
-import email.utils
-import time
-import datetime
-import re
-import arvados
-import arvados.collection
-import urllib.parse
-import logging
-import calendar
-import urllib.parse
-
-logger = logging.getLogger('arvados.cwl-runner')
-
-def my_formatdate(dt):
- return email.utils.formatdate(timeval=calendar.timegm(dt.timetuple()),
- localtime=False, usegmt=True)
-
-def my_parsedate(text):
- parsed = email.utils.parsedate_tz(text)
- if parsed:
- if parsed[9]:
- # Adjust to UTC
- return datetime.datetime(*parsed[:6]) + datetime.timedelta(seconds=parsed[9])
- else:
- # TZ is zero or missing, assume UTC.
- return datetime.datetime(*parsed[:6])
- else:
- return datetime.datetime(1970, 1, 1)
-
-def fresh_cache(url, properties, now):
- pr = properties[url]
- expires = None
-
- logger.debug("Checking cache freshness for %s using %s", url, pr)
-
- if "Cache-Control" in pr:
- if re.match(r"immutable", pr["Cache-Control"]):
- return True
-
- g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"])
- if g:
- expires = my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2)))
-
- if expires is None and "Expires" in pr:
- expires = my_parsedate(pr["Expires"])
-
- if expires is None:
- # Use a default cache time of 24 hours if upstream didn't set
- # any cache headers, to reduce redundant downloads.
- expires = my_parsedate(pr["Date"]) + datetime.timedelta(hours=24)
-
- if not expires:
- return False
-
- return (now < expires)
-
-def remember_headers(url, properties, headers, now):
- properties.setdefault(url, {})
- for h in ("Cache-Control", "ETag", "Expires", "Date", "Content-Length"):
- if h in headers:
- properties[url][h] = headers[h]
- if "Date" not in headers:
- properties[url]["Date"] = my_formatdate(now)
-
-
-def changed(url, clean_url, properties, now):
- req = requests.head(url, allow_redirects=True)
-
- if req.status_code != 200:
- # Sometimes endpoints are misconfigured and will deny HEAD but
- # allow GET so instead of failing here, we'll try GET If-None-Match
- return True
-
- etag = properties[url].get("ETag")
-
- if url in properties:
- del properties[url]
- remember_headers(clean_url, properties, req.headers, now)
-
- if "ETag" in req.headers and etag == req.headers["ETag"]:
- # Didn't change
- return False
-
- return True
-
-def etag_quote(etag):
- # if it already has leading and trailing quotes, do nothing
- if etag[0] == '"' and etag[-1] == '"':
- return etag
- else:
- # Add quotes.
- return '"' + etag + '"'
-
-
-def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow, varying_url_params="", prefer_cached_downloads=False):
- varying_params = [s.strip() for s in varying_url_params.split(",")]
-
- parsed = urllib.parse.urlparse(url)
- query = [q for q in urllib.parse.parse_qsl(parsed.query)
- if q[0] not in varying_params]
-
- clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params,
- urllib.parse.urlencode(query, safe="/"), parsed.fragment))
-
- r1 = api.collections().list(filters=[["properties", "exists", url]]).execute()
-
- if clean_url == url:
- items = r1["items"]
- else:
- r2 = api.collections().list(filters=[["properties", "exists", clean_url]]).execute()
- items = r1["items"] + r2["items"]
-
- now = utcnow()
-
- etags = {}
-
- for item in items:
- properties = item["properties"]
-
- if clean_url in properties:
- cache_url = clean_url
- elif url in properties:
- cache_url = url
- else:
- return False
-
- if prefer_cached_downloads or fresh_cache(cache_url, properties, now):
- # HTTP caching rules say we should use the cache
- cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
- return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
-
- if not changed(cache_url, clean_url, properties, now):
- # ETag didn't change, same content, just update headers
- api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
- cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
- return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
-
- if "ETag" in properties[cache_url] and len(properties[cache_url]["ETag"]) > 2:
- etags[properties[cache_url]["ETag"]] = item
-
- logger.debug("Found ETags %s", etags)
-
- properties = {}
- headers = {}
- if etags:
- headers['If-None-Match'] = ', '.join([etag_quote(k) for k,v in etags.items()])
- logger.debug("Sending GET request with headers %s", headers)
- req = requests.get(url, stream=True, allow_redirects=True, headers=headers)
-
- if req.status_code not in (200, 304):
- raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
-
- remember_headers(clean_url, properties, req.headers, now)
-
- if req.status_code == 304 and "ETag" in req.headers and req.headers["ETag"] in etags:
- item = etags[req.headers["ETag"]]
- item["properties"].update(properties)
- api.collections().update(uuid=item["uuid"], body={"collection":{"properties": item["properties"]}}).execute()
- cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
- return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
-
- if "Content-Length" in properties[clean_url]:
- cl = int(properties[clean_url]["Content-Length"])
- logger.info("Downloading %s (%s bytes)", url, cl)
- else:
- cl = None
- logger.info("Downloading %s (unknown size)", url)
-
- c = arvados.collection.Collection()
-
- if req.headers.get("Content-Disposition"):
- grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))', req.headers["Content-Disposition"])
- if grp.group(2):
- name = grp.group(2)
- else:
- name = grp.group(4)
- else:
- name = parsed.path.split("/")[-1]
-
- count = 0
- start = time.time()
- checkpoint = start
- with c.open(name, "wb") as f:
- for chunk in req.iter_content(chunk_size=1024):
- count += len(chunk)
- f.write(chunk)
- loopnow = time.time()
- if (loopnow - checkpoint) > 20:
- bps = count / (loopnow - start)
- if cl is not None:
- logger.info("%2.1f%% complete, %3.2f MiB/s, %1.0f seconds left",
- ((count * 100) / cl),
- (bps // (1024*1024)),
- ((cl-count) // bps))
- else:
- logger.info("%d downloaded, %3.2f MiB/s", count, (bps / (1024*1024)))
- checkpoint = loopnow
-
- logger.info("Download complete")
-
- collectionname = "Downloaded from %s" % urllib.parse.quote(clean_url, safe='')
-
- # max length - space to add a timestamp used by ensure_unique_name
- max_name_len = 254 - 28
-
- if len(collectionname) > max_name_len:
- over = len(collectionname) - max_name_len
- split = int(max_name_len/2)
- collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
-
- c.save_new(name=collectionname, owner_uuid=project_uuid, ensure_unique_name=True)
-
- api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
-
- return "keep:%s/%s" % (c.portable_data_hash(), name)
from cwltool.stdfsaccess import abspath
from cwltool.workflow import WorkflowException
-from .http import http_to_keep
+from arvados.http_to_keep import http_to_keep
logger = logging.getLogger('arvados.cwl-runner')
# passthrough, we'll download it later.
self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
else:
- keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src,
- varying_url_params=self.arvrunner.toplevel_runtimeContext.varying_url_params,
- prefer_cached_downloads=self.arvrunner.toplevel_runtimeContext.prefer_cached_downloads)
+ keepref = "keep:%s/%s" % http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src,
+ varying_url_params=self.arvrunner.toplevel_runtimeContext.varying_url_params,
+ prefer_cached_downloads=self.arvrunner.toplevel_runtimeContext.prefer_cached_downloads)
logger.info("%s is %s", src, keepref)
self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
except Exception as e:
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import socket
+import pycurl
+import math
+
+class PyCurlHelper:
+ # Default Keep server connection timeout: 2 seconds
+ # Default Keep server read timeout: 256 seconds
+ # Default Keep server bandwidth minimum: 32768 bytes per second
+ # Default Keep proxy connection timeout: 20 seconds
+ # Default Keep proxy read timeout: 256 seconds
+ # Default Keep proxy bandwidth minimum: 32768 bytes per second
+ DEFAULT_TIMEOUT = (2, 256, 32768)
+ DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
+
+ def __init__(self, title_case_headers=False):
+ self._socket = None
+ self.title_case_headers = title_case_headers
+
+ def _socket_open(self, *args, **kwargs):
+ if len(args) + len(kwargs) == 2:
+ return self._socket_open_pycurl_7_21_5(*args, **kwargs)
+ else:
+ return self._socket_open_pycurl_7_19_3(*args, **kwargs)
+
+ def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
+ return self._socket_open_pycurl_7_21_5(
+ purpose=None,
+ address=collections.namedtuple(
+ 'Address', ['family', 'socktype', 'protocol', 'addr'],
+ )(family, socktype, protocol, address))
+
+ def _socket_open_pycurl_7_21_5(self, purpose, address):
+ """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
+ s = socket.socket(address.family, address.socktype, address.protocol)
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+ # Will throw invalid protocol error on mac. This test prevents that.
+ if hasattr(socket, 'TCP_KEEPIDLE'):
+ s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
+ s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
+ self._socket = s
+ return s
+
+ def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
+ if not timeouts:
+ return
+ elif isinstance(timeouts, tuple):
+ if len(timeouts) == 2:
+ conn_t, xfer_t = timeouts
+ bandwidth_bps = self.DEFAULT_TIMEOUT[2]
+ else:
+ conn_t, xfer_t, bandwidth_bps = timeouts
+ else:
+ conn_t, xfer_t = (timeouts, timeouts)
+ bandwidth_bps = self.DEFAULT_TIMEOUT[2]
+ curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
+ if not ignore_bandwidth:
+ curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
+ curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
+
+ def _headerfunction(self, header_line):
+ if isinstance(header_line, bytes):
+ header_line = header_line.decode('iso-8859-1')
+ if ':' in header_line:
+ name, value = header_line.split(':', 1)
+ if self.title_case_headers:
+ name = name.strip().title()
+ else:
+ name = name.strip().lower()
+ value = value.strip()
+ elif self._headers:
+ name = self._lastheadername
+ value = self._headers[name] + ' ' + header_line.strip()
+ elif header_line.startswith('HTTP/'):
+ name = 'x-status-line'
+ value = header_line
+ else:
+ _logger.error("Unexpected header line: %s", header_line)
+ return
+ self._lastheadername = name
+ self._headers[name] = value
+ # Returning None implies all bytes were written
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import division
+from future import standard_library
+standard_library.install_aliases()
+
+import email.utils
+import time
+import datetime
+import re
+import arvados
+import arvados.collection
+import urllib.parse
+import logging
+import calendar
+import urllib.parse
+import pycurl
+import dataclasses
+import typing
+from arvados._pycurlhelper import PyCurlHelper
+
+logger = logging.getLogger('arvados.http_import')
+
+def _my_formatdate(dt):
+ return email.utils.formatdate(timeval=calendar.timegm(dt.timetuple()),
+ localtime=False, usegmt=True)
+
+def _my_parsedate(text):
+ parsed = email.utils.parsedate_tz(text)
+ if parsed:
+ if parsed[9]:
+ # Adjust to UTC
+ return datetime.datetime(*parsed[:6]) + datetime.timedelta(seconds=parsed[9])
+ else:
+ # TZ is zero or missing, assume UTC.
+ return datetime.datetime(*parsed[:6])
+ else:
+ return datetime.datetime(1970, 1, 1)
+
+def _fresh_cache(url, properties, now):
+ pr = properties[url]
+ expires = None
+
+ logger.debug("Checking cache freshness for %s using %s", url, pr)
+
+ if "Cache-Control" in pr:
+ if re.match(r"immutable", pr["Cache-Control"]):
+ return True
+
+ g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"])
+ if g:
+ expires = _my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2)))
+
+ if expires is None and "Expires" in pr:
+ expires = _my_parsedate(pr["Expires"])
+
+ if expires is None:
+ # Use a default cache time of 24 hours if upstream didn't set
+ # any cache headers, to reduce redundant downloads.
+ expires = _my_parsedate(pr["Date"]) + datetime.timedelta(hours=24)
+
+ if not expires:
+ return False
+
+ return (now < expires)
+
+def _remember_headers(url, properties, headers, now):
+ properties.setdefault(url, {})
+ for h in ("Cache-Control", "Etag", "Expires", "Date", "Content-Length"):
+ if h in headers:
+ properties[url][h] = headers[h]
+ if "Date" not in headers:
+ properties[url]["Date"] = _my_formatdate(now)
+
+@dataclasses.dataclass
+class _Response:
+ status_code: int
+ headers: typing.Mapping[str, str]
+
+
+class _Downloader(PyCurlHelper):
+ # Wait up to 60 seconds for connection
+ # How long it can be in "low bandwidth" state before it gives up
+ # Low bandwidth threshold is 32 KiB/s
+ DOWNLOADER_TIMEOUT = (60, 300, 32768)
+
+ def __init__(self, apiclient):
+ super(_Downloader, self).__init__(title_case_headers=True)
+ self.curl = pycurl.Curl()
+ self.curl.setopt(pycurl.NOSIGNAL, 1)
+ self.curl.setopt(pycurl.OPENSOCKETFUNCTION,
+ lambda *args, **kwargs: self._socket_open(*args, **kwargs))
+ self.target = None
+ self.apiclient = apiclient
+
+ def head(self, url):
+ get_headers = {'Accept': 'application/octet-stream'}
+ self._headers = {}
+
+ self.curl.setopt(pycurl.URL, url.encode('utf-8'))
+ self.curl.setopt(pycurl.HTTPHEADER, [
+ '{}: {}'.format(k,v) for k,v in get_headers.items()])
+
+ self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+ self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
+ self.curl.setopt(pycurl.NOBODY, True)
+ self.curl.setopt(pycurl.FOLLOWLOCATION, True)
+
+ self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, True)
+
+ try:
+ self.curl.perform()
+ except Exception as e:
+ raise arvados.errors.HttpError(0, str(e))
+ finally:
+ if self._socket:
+ self._socket.close()
+ self._socket = None
+
+ return _Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers)
+
+ def download(self, url, headers):
+ self.count = 0
+ self.start = time.time()
+ self.checkpoint = self.start
+ self._headers = {}
+ self._first_chunk = True
+ self.collection = None
+ self.parsedurl = urllib.parse.urlparse(url)
+
+ get_headers = {'Accept': 'application/octet-stream'}
+ get_headers.update(headers)
+
+ self.curl.setopt(pycurl.URL, url.encode('utf-8'))
+ self.curl.setopt(pycurl.HTTPHEADER, [
+ '{}: {}'.format(k,v) for k,v in get_headers.items()])
+
+ self.curl.setopt(pycurl.WRITEFUNCTION, self.body_write)
+ self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+
+ self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
+ self.curl.setopt(pycurl.HTTPGET, True)
+ self.curl.setopt(pycurl.FOLLOWLOCATION, True)
+
+ self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, False)
+
+ try:
+ self.curl.perform()
+ except Exception as e:
+ raise arvados.errors.HttpError(0, str(e))
+ finally:
+ if self._socket:
+ self._socket.close()
+ self._socket = None
+
+ return _Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers)
+
+ def headers_received(self):
+ self.collection = arvados.collection.Collection(api_client=self.apiclient)
+
+ if "Content-Length" in self._headers:
+ self.contentlength = int(self._headers["Content-Length"])
+ logger.info("File size is %s bytes", self.contentlength)
+ else:
+ self.contentlength = None
+
+ if self._headers.get("Content-Disposition"):
+ grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))',
+ self._headers["Content-Disposition"])
+ if grp.group(2):
+ self.name = grp.group(2)
+ else:
+ self.name = grp.group(4)
+ else:
+ self.name = self.parsedurl.path.split("/")[-1]
+
+ # Can't call curl.getinfo(pycurl.RESPONSE_CODE) until
+ # perform() is done but we need to know the status before that
+ # so we have to parse the status line ourselves.
+ mt = re.match(r'^HTTP\/(\d(\.\d)?) ([1-5]\d\d) ([^\r\n\x00-\x08\x0b\x0c\x0e-\x1f\x7f]*)\r\n$', self._headers["x-status-line"])
+ code = int(mt.group(3))
+
+ if code == 200:
+ self.target = self.collection.open(self.name, "wb")
+
+ def body_write(self, chunk):
+ if self._first_chunk:
+ self.headers_received()
+ self._first_chunk = False
+
+ self.count += len(chunk)
+ self.target.write(chunk)
+ loopnow = time.time()
+ if (loopnow - self.checkpoint) < 20:
+ return
+
+ bps = self.count / (loopnow - self.start)
+ if self.contentlength is not None:
+ logger.info("%2.1f%% complete, %6.2f MiB/s, %1.0f seconds left",
+ ((self.count * 100) / self.contentlength),
+ (bps / (1024.0*1024.0)),
+ ((self.contentlength-self.count) // bps))
+ else:
+ logger.info("%d downloaded, %6.2f MiB/s", count, (bps / (1024.0*1024.0)))
+ self.checkpoint = loopnow
+
+
+def _changed(url, clean_url, properties, now, curldownloader):
+ req = curldownloader.head(url)
+
+ if req.status_code != 200:
+ # Sometimes endpoints are misconfigured and will deny HEAD but
+ # allow GET so instead of failing here, we'll try GET If-None-Match
+ return True
+
+ # previous version of this code used "ETag", now we are
+ # normalizing to "Etag", check for both.
+ etag = properties[url].get("Etag") or properties[url].get("ETag")
+
+ if url in properties:
+ del properties[url]
+ _remember_headers(clean_url, properties, req.headers, now)
+
+ if "Etag" in req.headers and etag == req.headers["Etag"]:
+ # Didn't change
+ return False
+
+ return True
+
+def _etag_quote(etag):
+ # if it already has leading and trailing quotes, do nothing
+ if etag[0] == '"' and etag[-1] == '"':
+ return etag
+ else:
+ # Add quotes.
+ return '"' + etag + '"'
+
+
+def http_to_keep(api, project_uuid, url,
+ utcnow=datetime.datetime.utcnow, varying_url_params="",
+ prefer_cached_downloads=False):
+ """Download a file over HTTP and upload it to keep, with HTTP headers as metadata.
+
+ Before downloading the URL, checks to see if the URL already
+ exists in Keep and applies HTTP caching policy, the
+ varying_url_params and prefer_cached_downloads flags in order to
+ decide whether to use the version in Keep or re-download it.
+ """
+
+ logger.info("Checking Keep for %s", url)
+
+ varying_params = [s.strip() for s in varying_url_params.split(",")]
+
+ parsed = urllib.parse.urlparse(url)
+ query = [q for q in urllib.parse.parse_qsl(parsed.query)
+ if q[0] not in varying_params]
+
+ clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params,
+ urllib.parse.urlencode(query, safe="/"), parsed.fragment))
+
+ r1 = api.collections().list(filters=[["properties", "exists", url]]).execute()
+
+ if clean_url == url:
+ items = r1["items"]
+ else:
+ r2 = api.collections().list(filters=[["properties", "exists", clean_url]]).execute()
+ items = r1["items"] + r2["items"]
+
+ now = utcnow()
+
+ etags = {}
+
+ curldownloader = _Downloader(api)
+
+ for item in items:
+ properties = item["properties"]
+
+ if clean_url in properties:
+ cache_url = clean_url
+ elif url in properties:
+ cache_url = url
+ else:
+ raise Exception("Shouldn't happen, got an API result for %s that doesn't have the URL in properties" % item["uuid"])
+
+ if prefer_cached_downloads or _fresh_cache(cache_url, properties, now):
+ # HTTP caching rules say we should use the cache
+ cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
+ return (item["portable_data_hash"], next(iter(cr.keys())) )
+
+ if not _changed(cache_url, clean_url, properties, now, curldownloader):
+ # Etag didn't change, same content, just update headers
+ api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
+ cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
+ return (item["portable_data_hash"], next(iter(cr.keys())))
+
+ for etagstr in ("Etag", "ETag"):
+ if etagstr in properties[cache_url] and len(properties[cache_url][etagstr]) > 2:
+ etags[properties[cache_url][etagstr]] = item
+
+ logger.debug("Found ETag values %s", etags)
+
+ properties = {}
+ headers = {}
+ if etags:
+ headers['If-None-Match'] = ', '.join([_etag_quote(k) for k,v in etags.items()])
+ logger.debug("Sending GET request with headers %s", headers)
+
+ logger.info("Beginning download of %s", url)
+
+ req = curldownloader.download(url, headers)
+
+ c = curldownloader.collection
+
+ if req.status_code not in (200, 304):
+ raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
+
+ if curldownloader.target is not None:
+ curldownloader.target.close()
+
+ _remember_headers(clean_url, properties, req.headers, now)
+
+ if req.status_code == 304 and "Etag" in req.headers and req.headers["Etag"] in etags:
+ item = etags[req.headers["Etag"]]
+ item["properties"].update(properties)
+ api.collections().update(uuid=item["uuid"], body={"collection":{"properties": item["properties"]}}).execute()
+ cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
+ return (item["portable_data_hash"], list(cr.keys())[0])
+
+ logger.info("Download complete")
+
+ collectionname = "Downloaded from %s" % urllib.parse.quote(clean_url, safe='')
+
+ # max length - space to add a timestamp used by ensure_unique_name
+ max_name_len = 254 - 28
+
+ if len(collectionname) > max_name_len:
+ over = len(collectionname) - max_name_len
+ split = int(max_name_len/2)
+ collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
+
+ c.save_new(name=collectionname, owner_uuid=project_uuid, ensure_unique_name=True)
+
+ api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
+
+ return (c.portable_data_hash(), curldownloader.name)
import arvados.retry as retry
import arvados.util
import arvados.diskcache
+from arvados._pycurlhelper import PyCurlHelper
_logger = logging.getLogger('arvados.keep')
global_client_object = None
class KeepClient(object):
+ DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
+ DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
- # Default Keep server connection timeout: 2 seconds
- # Default Keep server read timeout: 256 seconds
- # Default Keep server bandwidth minimum: 32768 bytes per second
- # Default Keep proxy connection timeout: 20 seconds
- # Default Keep proxy read timeout: 256 seconds
- # Default Keep proxy bandwidth minimum: 32768 bytes per second
- DEFAULT_TIMEOUT = (2, 256, 32768)
- DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
-
-
- class KeepService(object):
+ class KeepService(PyCurlHelper):
"""Make requests to a single Keep service, and track results.
A KeepService is intended to last long enough to perform one
download_counter=None,
headers={},
insecure=False):
+ super(KeepClient.KeepService, self).__init__()
self.root = root
self._user_agent_pool = user_agent_pool
self._result = {'error': None}
except:
ua.close()
- def _socket_open(self, *args, **kwargs):
- if len(args) + len(kwargs) == 2:
- return self._socket_open_pycurl_7_21_5(*args, **kwargs)
- else:
- return self._socket_open_pycurl_7_19_3(*args, **kwargs)
-
- def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
- return self._socket_open_pycurl_7_21_5(
- purpose=None,
- address=collections.namedtuple(
- 'Address', ['family', 'socktype', 'protocol', 'addr'],
- )(family, socktype, protocol, address))
-
- def _socket_open_pycurl_7_21_5(self, purpose, address):
- """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
- s = socket.socket(address.family, address.socktype, address.protocol)
- s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
- # Will throw invalid protocol error on mac. This test prevents that.
- if hasattr(socket, 'TCP_KEEPIDLE'):
- s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
- s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
- self._socket = s
- return s
-
def get(self, locator, method="GET", timeout=None):
# locator is a KeepLocator object.
url = self.root + str(locator)
curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
if method == "HEAD":
curl.setopt(pycurl.NOBODY, True)
+ else:
+ curl.setopt(pycurl.HTTPGET, True)
self._setcurltimeouts(curl, timeout, method=="HEAD")
try:
self.upload_counter.add(len(body))
return True
- def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
- if not timeouts:
- return
- elif isinstance(timeouts, tuple):
- if len(timeouts) == 2:
- conn_t, xfer_t = timeouts
- bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
- else:
- conn_t, xfer_t, bandwidth_bps = timeouts
- else:
- conn_t, xfer_t = (timeouts, timeouts)
- bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
- curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
- if not ignore_bandwidth:
- curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
- curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
-
- def _headerfunction(self, header_line):
- if isinstance(header_line, bytes):
- header_line = header_line.decode('iso-8859-1')
- if ':' in header_line:
- name, value = header_line.split(':', 1)
- name = name.strip().lower()
- value = value.strip()
- elif self._headers:
- name = self._lastheadername
- value = self._headers[name] + ' ' + header_line.strip()
- elif header_line.startswith('HTTP/'):
- name = 'x-status-line'
- value = header_line
- else:
- _logger.error("Unexpected header line: %s", header_line)
- return
- self._lastheadername = name
- self._headers[name] = value
- # Returning None implies all bytes were written
-
class KeepWriterQueue(queue.Queue):
def __init__(self, copies, classes=[]):
import arvados
import arvados.collection
-import arvados_cwl
-import arvados_cwl.runner
import arvados.keep
+import pycurl
-from .matcher import JsonDiffMatcher, StripYAMLComments
-from .mock_discovery import get_rootDesc
-
-import arvados_cwl.http
+from arvados.http_to_keep import http_to_keep
import ruamel.yaml as yaml
+# Turns out there was already "FakeCurl" that serves the same purpose, but
+# I wrote this before I knew that. Whoops.
+class CurlMock:
+ def __init__(self, headers = {}):
+ self.perform_was_called = False
+ self.headers = headers
+ self.get_response = 200
+ self.head_response = 200
+ self.req_headers = []
+
+ def setopt(self, op, *args):
+ if op == pycurl.URL:
+ self.url = args[0]
+ if op == pycurl.WRITEFUNCTION:
+ self.writefn = args[0]
+ if op == pycurl.HEADERFUNCTION:
+ self.headerfn = args[0]
+ if op == pycurl.NOBODY:
+ self.head = True
+ if op == pycurl.HTTPGET:
+ self.head = False
+ if op == pycurl.HTTPHEADER:
+ self.req_headers = args[0]
+
+ def getinfo(self, op):
+ if op == pycurl.RESPONSE_CODE:
+ if self.head:
+ return self.head_response
+ else:
+ return self.get_response
+
+ def perform(self):
+ self.perform_was_called = True
+
+ if self.head:
+ self.headerfn("HTTP/1.1 {} Status\r\n".format(self.head_response))
+ else:
+ self.headerfn("HTTP/1.1 {} Status\r\n".format(self.get_response))
+
+ for k,v in self.headers.items():
+ self.headerfn("%s: %s" % (k,v))
+
+ if not self.head and self.get_response == 200:
+ self.writefn(self.chunk)
+
class TestHttpToKeep(unittest.TestCase):
- @mock.patch("requests.get")
+ @mock.patch("pycurl.Curl")
@mock.patch("arvados.collection.Collection")
- def test_http_get(self, collectionmock, getmock):
+ def test_http_get(self, collectionmock, curlmock):
api = mock.MagicMock()
api.collections().list().execute.return_value = {
cm.portable_data_hash.return_value = "99999999999999999999999999999998+99"
collectionmock.return_value = cm
- req = mock.MagicMock()
- req.status_code = 200
- req.headers = {}
- req.iter_content.return_value = ["abc"]
- getmock.return_value = req
+ mockobj = CurlMock()
+ mockobj.chunk = b'abc'
+ def init():
+ return mockobj
+ curlmock.side_effect = init
utcnow = mock.MagicMock()
utcnow.return_value = datetime.datetime(2018, 5, 15)
- r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
- self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+ r = http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+ self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt"))
- getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True, headers={})
+ assert mockobj.url == b"http://example.com/file1.txt"
+ assert mockobj.perform_was_called is True
cm.open.assert_called_with("file1.txt", "wb")
cm.save_new.assert_called_with(name="Downloaded from http%3A%2F%2Fexample.com%2Ffile1.txt",
])
- @mock.patch("requests.get")
+ @mock.patch("pycurl.Curl")
@mock.patch("arvados.collection.CollectionReader")
- def test_http_expires(self, collectionmock, getmock):
+ def test_http_expires(self, collectionmock, curlmock):
api = mock.MagicMock()
api.collections().list().execute.return_value = {
cm.keys.return_value = ["file1.txt"]
collectionmock.return_value = cm
- req = mock.MagicMock()
- req.status_code = 200
- req.headers = {}
- req.iter_content.return_value = ["abc"]
- getmock.return_value = req
+ mockobj = CurlMock()
+ mockobj.chunk = b'abc'
+ def init():
+ return mockobj
+ curlmock.side_effect = init
utcnow = mock.MagicMock()
utcnow.return_value = datetime.datetime(2018, 5, 16)
- r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
- self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+ r = http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+ self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt"))
- getmock.assert_not_called()
+ assert mockobj.perform_was_called is False
- @mock.patch("requests.get")
+ @mock.patch("pycurl.Curl")
@mock.patch("arvados.collection.CollectionReader")
- def test_http_cache_control(self, collectionmock, getmock):
+ def test_http_cache_control(self, collectionmock, curlmock):
api = mock.MagicMock()
api.collections().list().execute.return_value = {
cm.keys.return_value = ["file1.txt"]
collectionmock.return_value = cm
- req = mock.MagicMock()
- req.status_code = 200
- req.headers = {}
- req.iter_content.return_value = ["abc"]
- getmock.return_value = req
+ mockobj = CurlMock()
+ mockobj.chunk = b'abc'
+ def init():
+ return mockobj
+ curlmock.side_effect = init
utcnow = mock.MagicMock()
utcnow.return_value = datetime.datetime(2018, 5, 16)
- r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
- self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+ r = http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+ self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt"))
- getmock.assert_not_called()
+ assert mockobj.perform_was_called is False
- @mock.patch("requests.get")
- @mock.patch("requests.head")
+ @mock.patch("pycurl.Curl")
@mock.patch("arvados.collection.Collection")
- def test_http_expired(self, collectionmock, headmock, getmock):
+ def test_http_expired(self, collectionmock, curlmock):
api = mock.MagicMock()
api.collections().list().execute.return_value = {
"properties": {
'http://example.com/file1.txt': {
'Date': 'Tue, 15 May 2018 00:00:00 GMT',
- 'Expires': 'Tue, 16 May 2018 00:00:00 GMT'
+ 'Expires': 'Wed, 16 May 2018 00:00:00 GMT'
}
}
}]
cm.keys.return_value = ["file1.txt"]
collectionmock.return_value = cm
- req = mock.MagicMock()
- req.status_code = 200
- req.headers = {'Date': 'Tue, 17 May 2018 00:00:00 GMT'}
- req.iter_content.return_value = ["def"]
- getmock.return_value = req
- headmock.return_value = req
+ mockobj = CurlMock({'Date': 'Thu, 17 May 2018 00:00:00 GMT'})
+ mockobj.chunk = b'def'
+ def init():
+ return mockobj
+ curlmock.side_effect = init
utcnow = mock.MagicMock()
utcnow.return_value = datetime.datetime(2018, 5, 17)
- r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
- self.assertEqual(r, "keep:99999999999999999999999999999997+99/file1.txt")
+ r = http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+ self.assertEqual(r, ("99999999999999999999999999999997+99", "file1.txt"))
- getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True, headers={})
+ assert mockobj.url == b"http://example.com/file1.txt"
+ assert mockobj.perform_was_called is True
cm.open.assert_called_with("file1.txt", "wb")
cm.save_new.assert_called_with(name="Downloaded from http%3A%2F%2Fexample.com%2Ffile1.txt",
api.collections().update.assert_has_calls([
mock.call(uuid=cm.manifest_locator(),
- body={"collection":{"properties": {'http://example.com/file1.txt': {'Date': 'Tue, 17 May 2018 00:00:00 GMT'}}}})
+ body={"collection":{"properties": {'http://example.com/file1.txt': {'Date': 'Thu, 17 May 2018 00:00:00 GMT'}}}})
])
- @mock.patch("requests.get")
- @mock.patch("requests.head")
+ @mock.patch("pycurl.Curl")
@mock.patch("arvados.collection.CollectionReader")
- def test_http_etag(self, collectionmock, headmock, getmock):
+ def test_http_etag(self, collectionmock, curlmock):
api = mock.MagicMock()
api.collections().list().execute.return_value = {
"properties": {
'http://example.com/file1.txt': {
'Date': 'Tue, 15 May 2018 00:00:00 GMT',
- 'Expires': 'Tue, 16 May 2018 00:00:00 GMT',
- 'ETag': '"123456"'
+ 'Expires': 'Wed, 16 May 2018 00:00:00 GMT',
+ 'Etag': '"123456"'
}
}
}]
cm.keys.return_value = ["file1.txt"]
collectionmock.return_value = cm
- req = mock.MagicMock()
- req.status_code = 200
- req.headers = {
- 'Date': 'Tue, 17 May 2018 00:00:00 GMT',
- 'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
- 'ETag': '"123456"'
- }
- headmock.return_value = req
+ mockobj = CurlMock({
+ 'Date': 'Thu, 17 May 2018 00:00:00 GMT',
+ 'Expires': 'Sat, 19 May 2018 00:00:00 GMT',
+ 'Etag': '"123456"'
+ })
+ mockobj.chunk = None
+ def init():
+ return mockobj
+ curlmock.side_effect = init
utcnow = mock.MagicMock()
utcnow.return_value = datetime.datetime(2018, 5, 17)
- r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
- self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+ r = http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+ self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt"))
- getmock.assert_not_called()
cm.open.assert_not_called()
api.collections().update.assert_has_calls([
mock.call(uuid=cm.manifest_locator(),
body={"collection":{"properties": {'http://example.com/file1.txt': {
- 'Date': 'Tue, 17 May 2018 00:00:00 GMT',
- 'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
- 'ETag': '"123456"'
+ 'Date': 'Thu, 17 May 2018 00:00:00 GMT',
+ 'Expires': 'Sat, 19 May 2018 00:00:00 GMT',
+ 'Etag': '"123456"'
}}}})
])
- @mock.patch("requests.get")
+ @mock.patch("pycurl.Curl")
@mock.patch("arvados.collection.Collection")
- def test_http_content_disp(self, collectionmock, getmock):
+ def test_http_content_disp(self, collectionmock, curlmock):
api = mock.MagicMock()
api.collections().list().execute.return_value = {
cm.portable_data_hash.return_value = "99999999999999999999999999999998+99"
collectionmock.return_value = cm
- req = mock.MagicMock()
- req.status_code = 200
- req.headers = {"Content-Disposition": "attachment; filename=file1.txt"}
- req.iter_content.return_value = ["abc"]
- getmock.return_value = req
+ mockobj = CurlMock({"Content-Disposition": "attachment; filename=file1.txt"})
+ mockobj.chunk = "abc"
+ def init():
+ return mockobj
+ curlmock.side_effect = init
utcnow = mock.MagicMock()
utcnow.return_value = datetime.datetime(2018, 5, 15)
- r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/download?fn=/file1.txt", utcnow=utcnow)
- self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+ r = http_to_keep(api, None, "http://example.com/download?fn=/file1.txt", utcnow=utcnow)
+ self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt"))
- getmock.assert_called_with("http://example.com/download?fn=/file1.txt", stream=True, allow_redirects=True, headers={})
+ assert mockobj.url == b"http://example.com/download?fn=/file1.txt"
cm.open.assert_called_with("file1.txt", "wb")
cm.save_new.assert_called_with(name="Downloaded from http%3A%2F%2Fexample.com%2Fdownload%3Ffn%3D%2Ffile1.txt",
body={"collection":{"properties": {"http://example.com/download?fn=/file1.txt": {'Date': 'Tue, 15 May 2018 00:00:00 GMT'}}}})
])
- @mock.patch("requests.get")
- @mock.patch("requests.head")
+ @mock.patch("pycurl.Curl")
@mock.patch("arvados.collection.CollectionReader")
- def test_http_etag_if_none_match(self, collectionmock, headmock, getmock):
+ def test_http_etag_if_none_match(self, collectionmock, curlmock):
api = mock.MagicMock()
api.collections().list().execute.return_value = {
'http://example.com/file1.txt': {
'Date': 'Tue, 15 May 2018 00:00:00 GMT',
'Expires': 'Tue, 16 May 2018 00:00:00 GMT',
- 'ETag': '"123456"'
+ 'Etag': '"123456"'
}
}
}]
cm.keys.return_value = ["file1.txt"]
collectionmock.return_value = cm
- # Head request fails, will try a conditional GET instead
- req = mock.MagicMock()
- req.status_code = 403
- req.headers = {
- }
- headmock.return_value = req
+ mockobj = CurlMock({
+ 'Date': 'Tue, 17 May 2018 00:00:00 GMT',
+ 'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
+ 'Etag': '"123456"'
+ })
+ mockobj.chunk = None
+ mockobj.head_response = 403
+ mockobj.get_response = 304
+ def init():
+ return mockobj
+ curlmock.side_effect = init
utcnow = mock.MagicMock()
utcnow.return_value = datetime.datetime(2018, 5, 17)
- req = mock.MagicMock()
- req.status_code = 304
- req.headers = {
- 'Date': 'Tue, 17 May 2018 00:00:00 GMT',
- 'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
- 'ETag': '"123456"'
- }
- getmock.return_value = req
+ r = http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+ self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt"))
- r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
- self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
-
- getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True, headers={"If-None-Match": '"123456"'})
+ print(mockobj.req_headers)
+ assert mockobj.req_headers == ["Accept: application/octet-stream", "If-None-Match: \"123456\""]
cm.open.assert_not_called()
api.collections().update.assert_has_calls([
body={"collection":{"properties": {'http://example.com/file1.txt': {
'Date': 'Tue, 17 May 2018 00:00:00 GMT',
'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
- 'ETag': '"123456"'
+ 'Etag': '"123456"'
}}}})
])
-
- @mock.patch("requests.get")
- @mock.patch("requests.head")
+ @mock.patch("pycurl.Curl")
@mock.patch("arvados.collection.CollectionReader")
- def test_http_prefer_cached_downloads(self, collectionmock, headmock, getmock):
+ def test_http_prefer_cached_downloads(self, collectionmock, curlmock):
api = mock.MagicMock()
api.collections().list().execute.return_value = {
'http://example.com/file1.txt': {
'Date': 'Tue, 15 May 2018 00:00:00 GMT',
'Expires': 'Tue, 16 May 2018 00:00:00 GMT',
- 'ETag': '"123456"'
+ 'Etag': '"123456"'
}
}
}]
cm.keys.return_value = ["file1.txt"]
collectionmock.return_value = cm
+ mockobj = CurlMock()
+ def init():
+ return mockobj
+ curlmock.side_effect = init
+
utcnow = mock.MagicMock()
utcnow.return_value = datetime.datetime(2018, 5, 17)
- r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow, prefer_cached_downloads=True)
- self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+ r = http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow, prefer_cached_downloads=True)
+ self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt"))
- headmock.assert_not_called()
- getmock.assert_not_called()
+ assert mockobj.perform_was_called is False
cm.open.assert_not_called()
api.collections().update.assert_not_called()
- @mock.patch("requests.get")
- @mock.patch("requests.head")
+ @mock.patch("pycurl.Curl")
@mock.patch("arvados.collection.CollectionReader")
- def test_http_varying_url_params(self, collectionmock, headmock, getmock):
+ def test_http_varying_url_params(self, collectionmock, curlmock):
for prurl in ("http://example.com/file1.txt", "http://example.com/file1.txt?KeyId=123&Signature=456&Expires=789"):
api = mock.MagicMock()
prurl: {
'Date': 'Tue, 15 May 2018 00:00:00 GMT',
'Expires': 'Tue, 16 May 2018 00:00:00 GMT',
- 'ETag': '"123456"'
+ 'Etag': '"123456"'
}
}
}]
cm.keys.return_value = ["file1.txt"]
collectionmock.return_value = cm
- req = mock.MagicMock()
- req.status_code = 200
- req.headers = {
+ mockobj = CurlMock({
'Date': 'Tue, 17 May 2018 00:00:00 GMT',
'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
- 'ETag': '"123456"'
- }
- headmock.return_value = req
+ 'Etag': '"123456"'
+ })
+ mockobj.chunk = None
+ def init():
+ return mockobj
+ curlmock.side_effect = init
utcnow = mock.MagicMock()
utcnow.return_value = datetime.datetime(2018, 5, 17)
- r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt?KeyId=123&Signature=456&Expires=789",
+ r = http_to_keep(api, None, "http://example.com/file1.txt?KeyId=123&Signature=456&Expires=789",
utcnow=utcnow, varying_url_params="KeyId,Signature,Expires")
- self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+ self.assertEqual(r, ("99999999999999999999999999999998+99", "file1.txt"))
- getmock.assert_not_called()
+ assert mockobj.perform_was_called is True
cm.open.assert_not_called()
api.collections().update.assert_has_calls([
body={"collection":{"properties": {'http://example.com/file1.txt': {
'Date': 'Tue, 17 May 2018 00:00:00 GMT',
'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
- 'ETag': '"123456"'
+ 'Etag': '"123456"'
}}}})
])
gem 'rails-perftest'
gem 'rails-controller-testing'
+gem 'mini_portile2', '~> 2.8', '>= 2.8.1'
+
# arvados-google-api-client and googleauth depend on signet, but
# signet 0.12 is incompatible with ruby 2.3.
gem 'signet', '< 0.12'
listen
lograge
logstash-event
+ mini_portile2 (~> 2.8, >= 2.8.1)
minitest (= 5.10.3)
mocha
multi_json
themes_for_rails!
BUNDLED WITH
- 2.2.19
+ 2.3.26