-import cStringIO
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import absolute_import
+from __future__ import division
+from future import standard_library
+from future.utils import native_str
+standard_library.install_aliases()
+from builtins import next
+from builtins import str
+from builtins import range
+from builtins import object
+import collections
import datetime
import hashlib
+import io
import logging
import math
import os
import pycurl
-import Queue
+import queue
import re
import socket
import ssl
import sys
import threading
-import timer
-import urlparse
+from . import timer
+import urllib.parse
+
+if sys.version_info >= (3, 0):
+ from io import BytesIO
+else:
+ from cStringIO import StringIO as BytesIO
import arvados
import arvados.config as config
def __str__(self):
return '+'.join(
- str(s) for s in [self.md5sum, self.size,
- self.permission_hint()] + self.hints
+ native_str(s)
+ for s in [self.md5sum, self.size,
+ self.permission_hint()] + self.hints
if s is not None)
def stripped(self):
return getattr(self, data_name)
def setter(self, hex_str):
if not arvados.util.is_hex(hex_str, length):
- raise ValueError("{} is not a {}-digit hex string: {}".
+ raise ValueError("{} is not a {}-digit hex string: {!r}".
format(name, length, hex_str))
setattr(self, data_name, hex_str)
return property(getter, setter)
self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
sm = sum([slot.size() for slot in self._cache])
while len(self._cache) > 0 and sm > self.cache_max:
- for i in xrange(len(self._cache)-1, -1, -1):
+ for i in range(len(self._cache)-1, -1, -1):
if self._cache[i].ready.is_set():
del self._cache[i]
break
def _get(self, locator):
# Test if the locator is already in the cache
- for i in xrange(0, len(self._cache)):
+ for i in range(0, len(self._cache)):
if self._cache[i].locator == locator:
n = self._cache[i]
if i != 0:
arvados.errors.HttpError,
)
- def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
+ def __init__(self, root, user_agent_pool=queue.LifoQueue(),
upload_counter=None,
- download_counter=None, **headers):
+ download_counter=None,
+ headers={},
+ insecure=False):
self.root = root
self._user_agent_pool = user_agent_pool
self._result = {'error': None}
self._usable = True
self._session = None
+ self._socket = None
self.get_headers = {'Accept': 'application/octet-stream'}
self.get_headers.update(headers)
self.put_headers = headers
self.upload_counter = upload_counter
self.download_counter = download_counter
+ self.insecure = insecure
def usable(self):
"""Is it worth attempting a request?"""
def _get_user_agent(self):
try:
return self._user_agent_pool.get(block=False)
- except Queue.Empty:
+ except queue.Empty:
return pycurl.Curl()
def _put_user_agent(self, ua):
except:
ua.close()
- @staticmethod
- def _socket_open(family, socktype, protocol, address=None):
+ def _socket_open(self, *args, **kwargs):
+ if len(args) + len(kwargs) == 2:
+ return self._socket_open_pycurl_7_21_5(*args, **kwargs)
+ else:
+ return self._socket_open_pycurl_7_19_3(*args, **kwargs)
+
+ def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
+ return self._socket_open_pycurl_7_21_5(
+ purpose=None,
+ address=collections.namedtuple(
+ 'Address', ['family', 'socktype', 'protocol', 'addr'],
+ )(family, socktype, protocol, address))
+
+ def _socket_open_pycurl_7_21_5(self, purpose, address):
"""Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
- s = socket.socket(family, socktype, protocol)
+ s = socket.socket(address.family, address.socktype, address.protocol)
s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# Will throw invalid protocol error on mac. This test prevents that.
if hasattr(socket, 'TCP_KEEPIDLE'):
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
+ self._socket = s
return s
def get(self, locator, method="GET", timeout=None):
try:
with timer.Timer() as t:
self._headers = {}
- response_body = cStringIO.StringIO()
+ response_body = BytesIO()
curl.setopt(pycurl.NOSIGNAL, 1)
- curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+ curl.setopt(pycurl.OPENSOCKETFUNCTION,
+ lambda *args, **kwargs: self._socket_open(*args, **kwargs))
curl.setopt(pycurl.URL, url.encode('utf-8'))
curl.setopt(pycurl.HTTPHEADER, [
- '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
+ '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+ if self.insecure:
+ curl.setopt(pycurl.SSL_VERIFYPEER, 0)
if method == "HEAD":
curl.setopt(pycurl.NOBODY, True)
- self._setcurltimeouts(curl, timeout)
+ self._setcurltimeouts(curl, timeout, method=="HEAD")
try:
curl.perform()
except Exception as e:
raise arvados.errors.HttpError(0, str(e))
+ finally:
+ if self._socket:
+ self._socket.close()
+ self._socket = None
self._result = {
'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
'body': response_body.getvalue(),
_logger.info("HEAD %s: %s bytes",
self._result['status_code'],
self._result.get('content-length'))
+ if self._result['headers'].get('x-keep-locator'):
+ # This is a response to a remote block copy request, return
+ # the local copy block locator.
+ return self._result['headers'].get('x-keep-locator')
return True
_logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
self._result['status_code'],
len(self._result['body']),
t.msecs,
- (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+ 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
if self.download_counter:
self.download_counter.add(len(self._result['body']))
try:
with timer.Timer() as t:
self._headers = {}
- body_reader = cStringIO.StringIO(body)
- response_body = cStringIO.StringIO()
+ body_reader = BytesIO(body)
+ response_body = BytesIO()
curl.setopt(pycurl.NOSIGNAL, 1)
- curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+ curl.setopt(pycurl.OPENSOCKETFUNCTION,
+ lambda *args, **kwargs: self._socket_open(*args, **kwargs))
curl.setopt(pycurl.URL, url.encode('utf-8'))
# Using UPLOAD tells cURL to wait for a "go ahead" from the
# Keep server (in the form of a HTTP/1.1 "100 Continue"
curl.setopt(pycurl.INFILESIZE, len(body))
curl.setopt(pycurl.READFUNCTION, body_reader.read)
curl.setopt(pycurl.HTTPHEADER, [
- '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
+ '{}: {}'.format(k,v) for k,v in self.put_headers.items()])
curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+ if self.insecure:
+ curl.setopt(pycurl.SSL_VERIFYPEER, 0)
self._setcurltimeouts(curl, timeout)
try:
curl.perform()
except Exception as e:
raise arvados.errors.HttpError(0, str(e))
+ finally:
+ if self._socket:
+ self._socket.close()
+ self._socket = None
self._result = {
'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
- 'body': response_body.getvalue(),
+ 'body': response_body.getvalue().decode('utf-8'),
'headers': self._headers,
'error': False,
}
self._result['status_code'],
len(body),
t.msecs,
- (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+ 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
if self.upload_counter:
self.upload_counter.add(len(body))
return True
- def _setcurltimeouts(self, curl, timeouts):
+ def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
if not timeouts:
return
elif isinstance(timeouts, tuple):
conn_t, xfer_t = (timeouts, timeouts)
bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
- curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
- curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
+ if not ignore_bandwidth:
+ curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
+ curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
def _headerfunction(self, header_line):
- header_line = header_line.decode('iso-8859-1')
+ if isinstance(header_line, bytes):
+ header_line = header_line.decode('iso-8859-1')
if ':' in header_line:
name, value = header_line.split(':', 1)
name = name.strip().lower()
self._lastheadername = name
self._headers[name] = value
# Returning None implies all bytes were written
-
- class KeepWriterQueue(Queue.Queue):
+
+ class KeepWriterQueue(queue.Queue):
def __init__(self, copies):
- Queue.Queue.__init__(self) # Old-style superclass
+ queue.Queue.__init__(self) # Old-style superclass
self.wanted_copies = copies
self.successful_copies = 0
self.response = None
self.successful_copies_lock = threading.Lock()
self.pending_tries = copies
self.pending_tries_notification = threading.Condition()
-
+
def write_success(self, response, replicas_nr):
with self.successful_copies_lock:
self.successful_copies += replicas_nr
self.response = response
with self.pending_tries_notification:
self.pending_tries_notification.notify_all()
-
+
def write_fail(self, ks):
with self.pending_tries_notification:
self.pending_tries += 1
self.pending_tries_notification.notify()
-
+
def pending_copies(self):
with self.successful_copies_lock:
return self.wanted_copies - self.successful_copies
return service, service_root
elif self.empty():
self.pending_tries_notification.notify_all()
- raise Queue.Empty
+ raise queue.Empty
else:
self.pending_tries_notification.wait()
if (not max_service_replicas) or (max_service_replicas >= copies):
num_threads = 1
else:
- num_threads = int(math.ceil(float(copies) / max_service_replicas))
+ num_threads = int(math.ceil(1.0*copies/max_service_replicas))
_logger.debug("Pool max threads is %d", num_threads)
self.workers = []
self.queue = KeepClient.KeepWriterQueue(copies)
for _ in range(num_threads):
w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
self.workers.append(w)
-
+
def add_task(self, ks, service_root):
self.queue.put((ks, service_root))
self.total_task_nr += 1
-
+
def done(self):
return self.queue.successful_copies
-
+
def join(self):
# Start workers
for worker in self.workers:
worker.start()
# Wait for finished work
self.queue.join()
-
+
def response(self):
return self.queue.response
-
-
+
+
class KeepWriterThread(threading.Thread):
TaskFailed = RuntimeError()
while True:
try:
service, service_root = self.queue.get_next_task()
- except Queue.Empty:
+ except queue.Empty:
return
try:
locator, copies = self.do_task(service, service_root)
if local_store is None:
local_store = os.environ.get('KEEP_LOCAL_STORE')
+ if api_client is None:
+ self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
+ else:
+ self.insecure = api_client.insecure
+
self.block_cache = block_cache if block_cache else KeepBlockCache()
self.timeout = timeout
self.proxy_timeout = proxy_timeout
- self._user_agent_pool = Queue.LifoQueue()
+ self._user_agent_pool = queue.LifoQueue()
self.upload_counter = Counter()
self.download_counter = Counter()
self.put_counter = Counter()
if local_store:
self.local_store = local_store
+ self.head = self.local_store_head
self.get = self.local_store_get
self.put = self.local_store_put
else:
if not proxy_uris[i].endswith('/'):
proxy_uris[i] += '/'
# URL validation
- url = urlparse.urlparse(proxy_uris[i])
+ url = urllib.parse.urlparse(proxy_uris[i])
if not (url.scheme and url.netloc):
raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
self.api_token = api_token
raise arvados.errors.NoKeepServersError()
# Precompute the base URI for each service.
- for r in self._gateway_services.itervalues():
+ for r in self._gateway_services.values():
host = r['service_host']
if not host.startswith('[') and host.find(':') >= 0:
# IPv6 URIs must be formatted like http://[::1]:80/...
_logger.debug(str(self._gateway_services))
self._keep_services = [
- ks for ks in self._gateway_services.itervalues()
+ ks for ks in self._gateway_services.values()
if not ks.get('service_type', '').startswith('gateway:')]
self._writable_services = [ks for ks in self._keep_services
if not ks.get('read_only')]
The weight is md5(h + u) where u is the last 15 characters of
the service endpoint's UUID.
"""
- return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
+ return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
"""Return an array of Keep service endpoints, in the order in
_logger.debug("{}: {}".format(locator, sorted_roots))
return sorted_roots
- def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
+ def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
# roots_map is a dictionary, mapping Keep service root strings
# to KeepService objects. Poll for Keep services, and add any
# new ones to roots_map. Return the current list of local
root, self._user_agent_pool,
upload_counter=self.upload_counter,
download_counter=self.download_counter,
- **headers)
+ headers=headers,
+ insecure=self.insecure)
return local_roots
@staticmethod
else:
return None
+ def refresh_signature(self, loc):
+ """Ask Keep to get the remote block and return its local signature"""
+ now = datetime.datetime.utcnow().isoformat("T") + 'Z'
+ return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
+
@retry.retry_method
- def head(self, loc_s, num_retries=None):
- return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
+ def head(self, loc_s, **kwargs):
+ return self._get_or_head(loc_s, method="HEAD", **kwargs)
@retry.retry_method
- def get(self, loc_s, num_retries=None):
- return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
+ def get(self, loc_s, **kwargs):
+ return self._get_or_head(loc_s, method="GET", **kwargs)
- def _get_or_head(self, loc_s, method="GET", num_retries=None):
+ def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None):
"""Get data from Keep.
This method fetches one or more blocks of data from Keep. It
self.get_counter.add(1)
- locator = KeepLocator(loc_s)
- if method == "GET":
- slot, first = self.block_cache.reserve_cache(locator.md5sum)
- if not first:
- self.hits_counter.add(1)
- v = slot.get()
- return v
-
- self.misses_counter.add(1)
-
- # If the locator has hints specifying a prefix (indicating a
- # remote keepproxy) or the UUID of a local gateway service,
- # read data from the indicated service(s) instead of the usual
- # list of local disk services.
- hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
- for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
- hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
- for hint in locator.hints if (
- hint.startswith('K@') and
- len(hint) == 29 and
- self._gateway_services.get(hint[2:])
- )])
- # Map root URLs to their KeepService objects.
- roots_map = {
- root: self.KeepService(root, self._user_agent_pool,
- upload_counter=self.upload_counter,
- download_counter=self.download_counter)
- for root in hint_roots
- }
-
- # See #3147 for a discussion of the loop implementation. Highlights:
- # * Refresh the list of Keep services after each failure, in case
- # it's being updated.
- # * Retry until we succeed, we're out of retries, or every available
- # service has returned permanent failure.
- sorted_roots = []
- roots_map = {}
+ slot = None
blob = None
- loop = retry.RetryLoop(num_retries, self._check_loop_result,
- backoff_start=2)
- for tries_left in loop:
- try:
- sorted_roots = self.map_new_services(
- roots_map, locator,
- force_rebuild=(tries_left < num_retries),
- need_writable=False)
- except Exception as error:
- loop.save_result(error)
- continue
+ try:
+ locator = KeepLocator(loc_s)
+ if method == "GET":
+ slot, first = self.block_cache.reserve_cache(locator.md5sum)
+ if not first:
+ self.hits_counter.add(1)
+ blob = slot.get()
+ if blob is None:
+ raise arvados.errors.KeepReadError(
+ "failed to read {}".format(loc_s))
+ return blob
+
+ self.misses_counter.add(1)
+
+ if headers is None:
+ headers = {}
+ headers['X-Request-Id'] = (request_id or
+ (hasattr(self, 'api_client') and self.api_client.request_id) or
+ arvados.util.new_request_id())
+
+ # If the locator has hints specifying a prefix (indicating a
+ # remote keepproxy) or the UUID of a local gateway service,
+ # read data from the indicated service(s) instead of the usual
+ # list of local disk services.
+ hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
+ for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
+ hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
+ for hint in locator.hints if (
+ hint.startswith('K@') and
+ len(hint) == 29 and
+ self._gateway_services.get(hint[2:])
+ )])
+ # Map root URLs to their KeepService objects.
+ roots_map = {
+ root: self.KeepService(root, self._user_agent_pool,
+ upload_counter=self.upload_counter,
+ download_counter=self.download_counter,
+ headers=headers,
+ insecure=self.insecure)
+ for root in hint_roots
+ }
+
+ # See #3147 for a discussion of the loop implementation. Highlights:
+ # * Refresh the list of Keep services after each failure, in case
+ # it's being updated.
+ # * Retry until we succeed, we're out of retries, or every available
+ # service has returned permanent failure.
+ sorted_roots = []
+ roots_map = {}
+ loop = retry.RetryLoop(num_retries, self._check_loop_result,
+ backoff_start=2)
+ for tries_left in loop:
+ try:
+ sorted_roots = self.map_new_services(
+ roots_map, locator,
+ force_rebuild=(tries_left < num_retries),
+ need_writable=False,
+ headers=headers)
+ except Exception as error:
+ loop.save_result(error)
+ continue
- # Query KeepService objects that haven't returned
- # permanent failure, in our specified shuffle order.
- services_to_try = [roots_map[root]
- for root in sorted_roots
- if roots_map[root].usable()]
- for keep_service in services_to_try:
- blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
- if blob is not None:
- break
- loop.save_result((blob, len(services_to_try)))
-
- # Always cache the result, then return it if we succeeded.
- if method == "GET":
- slot.set(blob)
- self.block_cache.cap_cache()
- if loop.success():
- if method == "HEAD":
- return True
- else:
+ # Query KeepService objects that haven't returned
+ # permanent failure, in our specified shuffle order.
+ services_to_try = [roots_map[root]
+ for root in sorted_roots
+ if roots_map[root].usable()]
+ for keep_service in services_to_try:
+ blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
+ if blob is not None:
+ break
+ loop.save_result((blob, len(services_to_try)))
+
+ # Always cache the result, then return it if we succeeded.
+ if loop.success():
return blob
+ finally:
+ if slot is not None:
+ slot.set(blob)
+ self.block_cache.cap_cache()
# Q: Including 403 is necessary for the Keep tests to continue
# passing, but maybe they should expect KeepReadError instead?
"failed to read {}".format(loc_s), service_errors, label="service")
@retry.retry_method
- def put(self, data, copies=2, num_retries=None):
+ def put(self, data, copies=2, num_retries=None, request_id=None):
"""Save data in Keep.
This method will get a list of Keep services from the API server, and
KeepClient is initialized.
"""
- if isinstance(data, unicode):
- data = data.encode("ascii")
- elif not isinstance(data, str):
- raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
+ if not isinstance(data, bytes):
+ data = data.encode()
self.put_counter.add(1)
return loc_s
locator = KeepLocator(loc_s)
- headers = {}
- # Tell the proxy how many copies we want it to store
- headers['X-Keep-Desired-Replicas'] = str(copies)
+ headers = {
+ 'X-Request-Id': (request_id or
+ (hasattr(self, 'api_client') and self.api_client.request_id) or
+ arvados.util.new_request_id()),
+ 'X-Keep-Desired-Replicas': str(copies),
+ }
roots_map = {}
loop = retry.RetryLoop(num_retries, self._check_loop_result,
backoff_start=2)
try:
sorted_roots = self.map_new_services(
roots_map, locator,
- force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
+ force_rebuild=(tries_left < num_retries),
+ need_writable=True,
+ headers=headers)
except Exception as error:
loop.save_result(error)
continue
- writer_pool = KeepClient.KeepWriterThreadPool(data=data,
+ writer_pool = KeepClient.KeepWriterThreadPool(data=data,
data_hash=data_hash,
copies=copies - done,
max_service_replicas=self.max_replicas_per_service,
"""
md5 = hashlib.md5(data).hexdigest()
locator = '%s+%d' % (md5, len(data))
- with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
+ with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
f.write(data)
os.rename(os.path.join(self.local_store, md5 + '.tmp'),
os.path.join(self.local_store, md5))
raise arvados.errors.NotFoundError(
"Invalid data locator: '%s'" % loc_s)
if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
- return ''
- with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
+ return b''
+ with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
return f.read()
+ def local_store_head(self, loc_s, num_retries=None):
+ """Companion to local_store_put()."""
+ try:
+ locator = KeepLocator(loc_s)
+ except ValueError:
+ raise arvados.errors.NotFoundError(
+ "Invalid data locator: '%s'" % loc_s)
+ if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
+ return True
+ if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
+ return True
+
def is_cached(self, locator):
return self.block_cache.reserve_cache(expect_hash)