-import cStringIO
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import absolute_import
+from __future__ import division
+from future import standard_library
+from future.utils import native_str
+standard_library.install_aliases()
+from builtins import next
+from builtins import str
+from builtins import range
+from builtins import object
+import collections
import datetime
import hashlib
+import io
import logging
import math
import os
import pycurl
-import Queue
+import queue
import re
import socket
import ssl
+import sys
import threading
-import timer
+from . import timer
+import urllib.parse
+
+if sys.version_info >= (3, 0):
+ from io import BytesIO
+else:
+ from cStringIO import StringIO as BytesIO
import arvados
import arvados.config as config
global_client_object = None
+# Monkey patch TCP constants when not available (apple). Values sourced from:
+# http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
+if sys.platform == 'darwin':
+ if not hasattr(socket, 'TCP_KEEPALIVE'):
+ socket.TCP_KEEPALIVE = 0x010
+ if not hasattr(socket, 'TCP_KEEPINTVL'):
+ socket.TCP_KEEPINTVL = 0x101
+ if not hasattr(socket, 'TCP_KEEPCNT'):
+ socket.TCP_KEEPCNT = 0x102
+
+
class KeepLocator(object):
EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
def __str__(self):
return '+'.join(
- str(s) for s in [self.md5sum, self.size,
- self.permission_hint()] + self.hints
+ native_str(s)
+ for s in [self.md5sum, self.size,
+ self.permission_hint()] + self.hints
if s is not None)
def stripped(self):
return getattr(self, data_name)
def setter(self, hex_str):
if not arvados.util.is_hex(hex_str, length):
- raise ValueError("{} is not a {}-digit hex string: {}".
+ raise ValueError("{} is not a {}-digit hex string: {!r}".
format(name, length, hex_str))
setattr(self, data_name, hex_str)
return property(getter, setter)
self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
sm = sum([slot.size() for slot in self._cache])
while len(self._cache) > 0 and sm > self.cache_max:
- for i in xrange(len(self._cache)-1, -1, -1):
+ for i in range(len(self._cache)-1, -1, -1):
if self._cache[i].ready.is_set():
del self._cache[i]
break
def _get(self, locator):
# Test if the locator is already in the cache
- for i in xrange(0, len(self._cache)):
+ for i in range(0, len(self._cache)):
if self._cache[i].locator == locator:
n = self._cache[i]
if i != 0:
arvados.errors.HttpError,
)
- def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
+ def __init__(self, root, user_agent_pool=queue.LifoQueue(),
upload_counter=None,
download_counter=None, **headers):
self.root = root
self._result = {'error': None}
self._usable = True
self._session = None
+ self._socket = None
self.get_headers = {'Accept': 'application/octet-stream'}
self.get_headers.update(headers)
self.put_headers = headers
def _get_user_agent(self):
try:
- return self._user_agent_pool.get(False)
- except Queue.Empty:
+ return self._user_agent_pool.get(block=False)
+ except queue.Empty:
return pycurl.Curl()
def _put_user_agent(self, ua):
try:
ua.reset()
- self._user_agent_pool.put(ua, False)
+ self._user_agent_pool.put(ua, block=False)
except:
ua.close()
- @staticmethod
- def _socket_open(family, socktype, protocol, address=None):
+ def _socket_open(self, *args, **kwargs):
+ if len(args) + len(kwargs) == 2:
+ return self._socket_open_pycurl_7_21_5(*args, **kwargs)
+ else:
+ return self._socket_open_pycurl_7_19_3(*args, **kwargs)
+
+ def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
+ return self._socket_open_pycurl_7_21_5(
+ purpose=None,
+ address=collections.namedtuple(
+ 'Address', ['family', 'socktype', 'protocol', 'addr'],
+ )(family, socktype, protocol, address))
+
+ def _socket_open_pycurl_7_21_5(self, purpose, address):
"""Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
- s = socket.socket(family, socktype, protocol)
+ s = socket.socket(address.family, address.socktype, address.protocol)
s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
- s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
+ # Will throw invalid protocol error on mac. This test prevents that.
+ if hasattr(socket, 'TCP_KEEPIDLE'):
+ s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
+ self._socket = s
return s
def get(self, locator, method="GET", timeout=None):
try:
with timer.Timer() as t:
self._headers = {}
- response_body = cStringIO.StringIO()
+ response_body = BytesIO()
curl.setopt(pycurl.NOSIGNAL, 1)
- curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+ curl.setopt(pycurl.OPENSOCKETFUNCTION,
+ lambda *args, **kwargs: self._socket_open(*args, **kwargs))
curl.setopt(pycurl.URL, url.encode('utf-8'))
curl.setopt(pycurl.HTTPHEADER, [
- '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
+ '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
if method == "HEAD":
curl.perform()
except Exception as e:
raise arvados.errors.HttpError(0, str(e))
+ finally:
+ if self._socket:
+ self._socket.close()
+ self._socket = None
self._result = {
'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
'body': response_body.getvalue(),
self._result['status_code'],
len(self._result['body']),
t.msecs,
- (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+ 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
if self.download_counter:
self.download_counter.add(len(self._result['body']))
try:
with timer.Timer() as t:
self._headers = {}
- body_reader = cStringIO.StringIO(body)
- response_body = cStringIO.StringIO()
+ body_reader = BytesIO(body)
+ response_body = BytesIO()
curl.setopt(pycurl.NOSIGNAL, 1)
- curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+ curl.setopt(pycurl.OPENSOCKETFUNCTION,
+ lambda *args, **kwargs: self._socket_open(*args, **kwargs))
curl.setopt(pycurl.URL, url.encode('utf-8'))
# Using UPLOAD tells cURL to wait for a "go ahead" from the
# Keep server (in the form of a HTTP/1.1 "100 Continue"
curl.setopt(pycurl.INFILESIZE, len(body))
curl.setopt(pycurl.READFUNCTION, body_reader.read)
curl.setopt(pycurl.HTTPHEADER, [
- '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
+ '{}: {}'.format(k,v) for k,v in self.put_headers.items()])
curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
self._setcurltimeouts(curl, timeout)
curl.perform()
except Exception as e:
raise arvados.errors.HttpError(0, str(e))
+ finally:
+ if self._socket:
+ self._socket.close()
+ self._socket = None
self._result = {
'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
- 'body': response_body.getvalue(),
+ 'body': response_body.getvalue().decode('utf-8'),
'headers': self._headers,
'error': False,
}
self._result['status_code'],
len(body),
t.msecs,
- (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+ 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
if self.upload_counter:
self.upload_counter.add(len(body))
return True
curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
def _headerfunction(self, header_line):
- header_line = header_line.decode('iso-8859-1')
+ if isinstance(header_line, bytes):
+ header_line = header_line.decode('iso-8859-1')
if ':' in header_line:
name, value = header_line.split(':', 1)
name = name.strip().lower()
# Returning None implies all bytes were written
- class KeepWriterQueue(Queue.Queue):
+ class KeepWriterQueue(queue.Queue):
def __init__(self, copies):
- Queue.Queue.__init__(self) # Old-style superclass
+ queue.Queue.__init__(self) # Old-style superclass
self.wanted_copies = copies
self.successful_copies = 0
self.response = None
self.successful_copies_lock = threading.Lock()
- self.retries = copies
- self.retries_notification = threading.Condition()
+ self.pending_tries = copies
+ self.pending_tries_notification = threading.Condition()
def write_success(self, response, replicas_nr):
with self.successful_copies_lock:
self.successful_copies += replicas_nr
self.response = response
+ with self.pending_tries_notification:
+ self.pending_tries_notification.notify_all()
- def write_fail(self, ks, status_code):
- with self.retries_notification:
- self.retries += 1
- self.retries_notification.notify()
+ def write_fail(self, ks):
+ with self.pending_tries_notification:
+ self.pending_tries += 1
+ self.pending_tries_notification.notify()
def pending_copies(self):
- return self.wanted_copies - self.successful_copies
-
-
+ with self.successful_copies_lock:
+ return self.wanted_copies - self.successful_copies
+
+ def get_next_task(self):
+ with self.pending_tries_notification:
+ while True:
+ if self.pending_copies() < 1:
+ # This notify_all() is unnecessary --
+ # write_success() already called notify_all()
+ # when pending<1 became true, so it's not
+ # possible for any other thread to be in
+ # wait() now -- but it's cheap insurance
+ # against deadlock so we do it anyway:
+ self.pending_tries_notification.notify_all()
+ # Drain the queue and then raise Queue.Empty
+ while True:
+ self.get_nowait()
+ self.task_done()
+ elif self.pending_tries > 0:
+ service, service_root = self.get_nowait()
+ if service.finished():
+ self.task_done()
+ continue
+ self.pending_tries -= 1
+ return service, service_root
+ elif self.empty():
+ self.pending_tries_notification.notify_all()
+ raise queue.Empty
+ else:
+ self.pending_tries_notification.wait()
+
+
class KeepWriterThreadPool(object):
def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
self.total_task_nr = 0
if (not max_service_replicas) or (max_service_replicas >= copies):
num_threads = 1
else:
- num_threads = int(math.ceil(float(copies) / max_service_replicas))
+ num_threads = int(math.ceil(1.0*copies/max_service_replicas))
_logger.debug("Pool max threads is %d", num_threads)
self.workers = []
self.queue = KeepClient.KeepWriterQueue(copies)
worker.start()
# Wait for finished work
self.queue.join()
- with self.queue.retries_notification:
- self.queue.retries_notification.notify_all()
- for worker in self.workers:
- worker.join()
def response(self):
return self.queue.response
class KeepWriterThread(threading.Thread):
+ TaskFailed = RuntimeError()
+
def __init__(self, queue, data, data_hash, timeout=None):
super(KeepClient.KeepWriterThread, self).__init__()
self.timeout = timeout
self.queue = queue
self.data = data
self.data_hash = data_hash
-
+ self.daemon = True
+
def run(self):
- while not self.queue.empty():
- if self.queue.pending_copies() > 0:
- # Avoid overreplication, wait for some needed retry
- with self.queue.retries_notification:
- if not self.queue.retries > 0:
- self.queue.retries_notification.wait()
- continue # try again when awake
- self.queue.retries -= 1
-
- # Get to work
- try:
- service, service_root = self.queue.get_nowait()
- except Queue.Empty:
- continue
- if service.finished():
- self.queue.task_done()
- continue
- success = bool(service.put(self.data_hash,
- self.data,
- timeout=self.timeout))
- result = service.last_result()
- if success:
- _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
- str(threading.current_thread()),
- self.data_hash,
- len(self.data),
- service_root)
- try:
- replicas_stored = int(result['headers']['x-keep-replicas-stored'])
- except (KeyError, ValueError):
- replicas_stored = 1
-
- self.queue.write_success(result['body'].strip(), replicas_stored)
- else:
- if result.get('status_code', None):
- _logger.debug("Request fail: PUT %s => %s %s",
- self.data_hash,
- result['status_code'],
- result['body'])
- self.queue.write_fail(service, result.get('status_code', None)) # Schedule a retry
+ while True:
+ try:
+ service, service_root = self.queue.get_next_task()
+ except queue.Empty:
+ return
+ try:
+ locator, copies = self.do_task(service, service_root)
+ except Exception as e:
+ if e is not self.TaskFailed:
+ _logger.exception("Exception in KeepWriterThread")
+ self.queue.write_fail(service)
else:
- # Remove the task from the queue anyways
- try:
- self.queue.get_nowait()
- except Queue.Empty:
- continue
- # Mark as done so the queue can be join()ed
- self.queue.task_done()
+ self.queue.write_success(locator, copies)
+ finally:
+ self.queue.task_done()
+
+ def do_task(self, service, service_root):
+ success = bool(service.put(self.data_hash,
+ self.data,
+ timeout=self.timeout))
+ result = service.last_result()
+
+ if not success:
+ if result.get('status_code', None):
+ _logger.debug("Request fail: PUT %s => %s %s",
+ self.data_hash,
+ result['status_code'],
+ result['body'])
+ raise self.TaskFailed
+
+ _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
+ str(threading.current_thread()),
+ self.data_hash,
+ len(self.data),
+ service_root)
+ try:
+ replicas_stored = int(result['headers']['x-keep-replicas-stored'])
+ except (KeyError, ValueError):
+ replicas_stored = 1
+
+ return result['body'].strip(), replicas_stored
def __init__(self, api_client=None, proxy=None,
:proxy:
If specified, this KeepClient will send requests to this Keep
proxy. Otherwise, KeepClient will fall back to the setting of the
- ARVADOS_KEEP_PROXY configuration setting. If you want to ensure
- KeepClient does not use a proxy, pass in an empty string.
+ ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
+ If you want to KeepClient does not use a proxy, pass in an empty
+ string.
:timeout:
The initial timeout (in seconds) for HTTP requests to Keep
"""
self.lock = threading.Lock()
if proxy is None:
- proxy = config.get('ARVADOS_KEEP_PROXY')
+ if config.get('ARVADOS_KEEP_SERVICES'):
+ proxy = config.get('ARVADOS_KEEP_SERVICES')
+ else:
+ proxy = config.get('ARVADOS_KEEP_PROXY')
if api_token is None:
if api_client is None:
api_token = config.get('ARVADOS_API_TOKEN')
self.block_cache = block_cache if block_cache else KeepBlockCache()
self.timeout = timeout
self.proxy_timeout = proxy_timeout
- self._user_agent_pool = Queue.LifoQueue()
+ self._user_agent_pool = queue.LifoQueue()
self.upload_counter = Counter()
self.download_counter = Counter()
self.put_counter = Counter()
self.num_retries = num_retries
self.max_replicas_per_service = None
if proxy:
- if not proxy.endswith('/'):
- proxy += '/'
+ proxy_uris = proxy.split()
+ for i in range(len(proxy_uris)):
+ if not proxy_uris[i].endswith('/'):
+ proxy_uris[i] += '/'
+ # URL validation
+ url = urllib.parse.urlparse(proxy_uris[i])
+ if not (url.scheme and url.netloc):
+ raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
self.api_token = api_token
self._gateway_services = {}
self._keep_services = [{
- 'uuid': 'proxy',
+ 'uuid': "00000-bi6l4-%015d" % idx,
'service_type': 'proxy',
- '_service_root': proxy,
- }]
+ '_service_root': uri,
+ } for idx, uri in enumerate(proxy_uris)]
self._writable_services = self._keep_services
self.using_proxy = True
self._static_services_list = True
raise arvados.errors.NoKeepServersError()
# Precompute the base URI for each service.
- for r in self._gateway_services.itervalues():
+ for r in self._gateway_services.values():
host = r['service_host']
if not host.startswith('[') and host.find(':') >= 0:
# IPv6 URIs must be formatted like http://[::1]:80/...
_logger.debug(str(self._gateway_services))
self._keep_services = [
- ks for ks in self._gateway_services.itervalues()
+ ks for ks in self._gateway_services.values()
if not ks.get('service_type', '').startswith('gateway:')]
self._writable_services = [ks for ks in self._keep_services
if not ks.get('read_only')]
The weight is md5(h + u) where u is the last 15 characters of
the service endpoint's UUID.
"""
- return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
+ return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
"""Return an array of Keep service endpoints, in the order in
KeepClient is initialized.
"""
- if isinstance(data, unicode):
- data = data.encode("ascii")
- elif not isinstance(data, str):
- raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
+ if not isinstance(data, bytes):
+ data = data.encode()
self.put_counter.add(1)
"""
md5 = hashlib.md5(data).hexdigest()
locator = '%s+%d' % (md5, len(data))
- with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
+ with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
f.write(data)
os.rename(os.path.join(self.local_store, md5 + '.tmp'),
os.path.join(self.local_store, md5))
raise arvados.errors.NotFoundError(
"Invalid data locator: '%s'" % loc_s)
if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
- return ''
- with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
+ return b''
+ with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
return f.read()
def is_cached(self, locator):