from __future__ import absolute_import
from __future__ import division
from future import standard_library
+from future.utils import native_str
standard_library.install_aliases()
from builtins import next
from builtins import str
from builtins import range
from builtins import object
-import io
+import collections
import datetime
import hashlib
+import io
import logging
import math
import os
from . import timer
import urllib.parse
+if sys.version_info >= (3, 0):
+ from io import BytesIO
+else:
+ from cStringIO import StringIO as BytesIO
+
import arvados
import arvados.config as config
import arvados.errors
def __str__(self):
return '+'.join(
- str(s) for s in [self.md5sum, self.size,
- self.permission_hint()] + self.hints
+ native_str(s)
+ for s in [self.md5sum, self.size,
+ self.permission_hint()] + self.hints
if s is not None)
def stripped(self):
return getattr(self, data_name)
def setter(self, hex_str):
if not arvados.util.is_hex(hex_str, length):
- raise ValueError("{} is not a {}-digit hex string: {}".
+ raise ValueError("{} is not a {}-digit hex string: {!r}".
format(name, length, hex_str))
setattr(self, data_name, hex_str)
return property(getter, setter)
self._result = {'error': None}
self._usable = True
self._session = None
+ self._socket = None
self.get_headers = {'Accept': 'application/octet-stream'}
self.get_headers.update(headers)
self.put_headers = headers
except:
ua.close()
- @staticmethod
- def _socket_open(family, socktype, protocol, address=None):
+ def _socket_open(self, *args, **kwargs):
+ if len(args) + len(kwargs) == 2:
+ return self._socket_open_pycurl_7_21_5(*args, **kwargs)
+ else:
+ return self._socket_open_pycurl_7_19_3(*args, **kwargs)
+
+ def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
+ return self._socket_open_pycurl_7_21_5(
+ purpose=None,
+ address=collections.namedtuple(
+ 'Address', ['family', 'socktype', 'protocol', 'addr'],
+ )(family, socktype, protocol, address))
+
+ def _socket_open_pycurl_7_21_5(self, purpose, address):
"""Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
- s = socket.socket(family, socktype, protocol)
+ s = socket.socket(address.family, address.socktype, address.protocol)
s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# Will throw invalid protocol error on mac. This test prevents that.
if hasattr(socket, 'TCP_KEEPIDLE'):
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
+ self._socket = s
return s
def get(self, locator, method="GET", timeout=None):
try:
with timer.Timer() as t:
self._headers = {}
- response_body = io.StringIO()
+ response_body = BytesIO()
curl.setopt(pycurl.NOSIGNAL, 1)
- curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+ curl.setopt(pycurl.OPENSOCKETFUNCTION,
+ lambda *args, **kwargs: self._socket_open(*args, **kwargs))
curl.setopt(pycurl.URL, url.encode('utf-8'))
curl.setopt(pycurl.HTTPHEADER, [
'{}: {}'.format(k,v) for k,v in self.get_headers.items()])
curl.perform()
except Exception as e:
raise arvados.errors.HttpError(0, str(e))
+ finally:
+ if self._socket:
+ self._socket.close()
+ self._socket = None
self._result = {
'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
'body': response_body.getvalue(),
try:
with timer.Timer() as t:
self._headers = {}
- body_reader = io.StringIO(body)
- response_body = io.StringIO()
+ body_reader = BytesIO(body)
+ response_body = BytesIO()
curl.setopt(pycurl.NOSIGNAL, 1)
- curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+ curl.setopt(pycurl.OPENSOCKETFUNCTION,
+ lambda *args, **kwargs: self._socket_open(*args, **kwargs))
curl.setopt(pycurl.URL, url.encode('utf-8'))
# Using UPLOAD tells cURL to wait for a "go ahead" from the
# Keep server (in the form of a HTTP/1.1 "100 Continue"
curl.perform()
except Exception as e:
raise arvados.errors.HttpError(0, str(e))
+ finally:
+ if self._socket:
+ self._socket.close()
+ self._socket = None
self._result = {
'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
- 'body': response_body.getvalue(),
+ 'body': response_body.getvalue().decode('utf-8'),
'headers': self._headers,
'error': False,
}
curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
def _headerfunction(self, header_line):
- header_line = header_line.decode('iso-8859-1')
+ if isinstance(header_line, bytes):
+ header_line = header_line.decode('iso-8859-1')
if ':' in header_line:
name, value = header_line.split(':', 1)
name = name.strip().lower()
The weight is md5(h + u) where u is the last 15 characters of
the service endpoint's UUID.
"""
- return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
+ return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
"""Return an array of Keep service endpoints, in the order in
KeepClient is initialized.
"""
- if isinstance(data, str):
- data = data.encode("ascii")
- elif not isinstance(data, str):
- raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
+ if not isinstance(data, bytes):
+ data = data.encode()
self.put_counter.add(1)
"""
md5 = hashlib.md5(data).hexdigest()
locator = '%s+%d' % (md5, len(data))
- with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
+ with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
f.write(data)
os.rename(os.path.join(self.local_store, md5 + '.tmp'),
os.path.join(self.local_store, md5))
raise arvados.errors.NotFoundError(
"Invalid data locator: '%s'" % loc_s)
if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
- return ''
- with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
+ return b''
+ with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
return f.read()
def is_cached(self, locator):