+import bz2
+import datetime
+import fcntl
+import functools
import gflags
+import hashlib
+import json
import logging
import os
import pprint
-import sys
-import types
-import subprocess
-import json
-import UserDict
+import pycurl
+import Queue
import re
-import hashlib
+import socket
+import ssl
import string
-import bz2
-import zlib
-import fcntl
-import time
+import cStringIO
+import subprocess
+import sys
import threading
+import time
import timer
-import datetime
-import ssl
-import socket
-import requests
+import types
+import UserDict
+import zlib
import arvados
import arvados.config as config
_logger = logging.getLogger('arvados.keep')
global_client_object = None
+
class KeepLocator(object):
EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
self.size = None
for hint in pieces:
if self.HINT_RE.match(hint) is None:
- raise ValueError("unrecognized hint data {}".format(hint))
+ raise ValueError("invalid hint format: {}".format(hint))
elif hint.startswith('A'):
self.parse_permission_hint(hint)
else:
if s is not None)
def stripped(self):
- return "%s+%i" % (self.md5sum, self.size)
+ if self.size is not None:
+ return "%s+%i" % (self.md5sum, self.size)
+ else:
+ return self.md5sum
def _make_hex_prop(name, length):
# Build and return a new property with the given name that
return getattr(self, data_name)
def setter(self, hex_str):
if not arvados.util.is_hex(hex_str, length):
- raise ValueError("{} must be a {}-digit hex string: {}".
+ raise ValueError("{} is not a {}-digit hex string: {}".
format(name, length, hex_str))
setattr(self, data_name, hex_str)
return property(getter, setter)
class KeepService(object):
- # Make requests to a single Keep service, and track results.
- HTTP_ERRORS = (requests.exceptions.RequestException,
- socket.error, ssl.SSLError)
+ """Make requests to a single Keep service, and track results.
+
+ A KeepService is intended to last long enough to perform one
+ transaction (GET or PUT) against one Keep service. This can
+ involve calling either get() or put() multiple times in order
+ to retry after transient failures. However, calling both get()
+ and put() on a single instance -- or using the same instance
+ to access two different Keep services -- will not produce
+ sensible behavior.
+ """
+
+ HTTP_ERRORS = (
+ socket.error,
+ ssl.SSLError,
+ arvados.errors.HttpError,
+ )
- def __init__(self, root, session, **headers):
+ def __init__(self, root, user_agent_pool=Queue.LifoQueue(), **headers):
self.root = root
- self.last_result = None
- self.success_flag = None
- self.session = session
+ self._user_agent_pool = user_agent_pool
+ self._result = {'error': None}
+ self._usable = True
+ self._session = None
self.get_headers = {'Accept': 'application/octet-stream'}
self.get_headers.update(headers)
self.put_headers = headers
def usable(self):
- return self.success_flag is not False
+ """Is it worth attempting a request?"""
+ return self._usable
def finished(self):
- return self.success_flag is not None
+ """Did the request succeed or encounter permanent failure?"""
+ return self._result['error'] == False or not self._usable
- def last_status(self):
+ def last_result(self):
+ return self._result
+
+ def _get_user_agent(self):
try:
- return self.last_result.status_code
- except AttributeError:
- return None
+ return self._user_agent_pool.get(False)
+ except Queue.Empty:
+ return pycurl.Curl()
+
+ def _put_user_agent(self, ua):
+ try:
+ ua.reset()
+ self._user_agent_pool.put(ua, False)
+ except:
+ ua.close()
+
+ @staticmethod
+ def _socket_open(family, socktype, protocol, address=None):
+ """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
+ s = socket.socket(family, socktype, protocol)
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+ s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
+ s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
+ return s
def get(self, locator, timeout=None):
# locator is a KeepLocator object.
url = self.root + str(locator)
_logger.debug("Request: GET %s", url)
+ curl = self._get_user_agent()
try:
with timer.Timer() as t:
- result = self.session.get(url.encode('utf-8'),
- headers=self.get_headers,
- timeout=timeout)
+ self._headers = {}
+ response_body = cStringIO.StringIO()
+ curl.setopt(pycurl.NOSIGNAL, 1)
+ curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+ curl.setopt(pycurl.URL, url.encode('utf-8'))
+ curl.setopt(pycurl.HTTPHEADER, [
+ '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
+ curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
+ curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+ self._setcurltimeouts(curl, timeout)
+ try:
+ curl.perform()
+ except Exception as e:
+ raise arvados.errors.HttpError(0, str(e))
+ self._result = {
+ 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
+ 'body': response_body.getvalue(),
+ 'headers': self._headers,
+ 'error': False,
+ }
+ ok = retry.check_http_response_success(self._result['status_code'])
+ if not ok:
+ self._result['error'] = arvados.errors.HttpError(
+ self._result['status_code'],
+ self._headers.get('x-status-line', 'Error'))
except self.HTTP_ERRORS as e:
- _logger.debug("Request fail: GET %s => %s: %s",
- url, type(e), str(e))
- self.last_result = e
+ self._result = {
+ 'error': e,
+ }
+ ok = False
+ self._usable = ok != False
+ if self._result.get('status_code', None):
+ # The client worked well enough to get an HTTP status
+ # code, so presumably any problems are just on the
+ # server side and it's OK to reuse the client.
+ self._put_user_agent(curl)
else:
- self.last_result = result
- self.success_flag = retry.check_http_response_success(result)
- content = result.content
- _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
- self.last_status(), len(content), t.msecs,
- (len(content)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
- if self.success_flag:
- resp_md5 = hashlib.md5(content).hexdigest()
- if resp_md5 == locator.md5sum:
- return content
- _logger.warning("Checksum fail: md5(%s) = %s",
- url, resp_md5)
- return None
+ # Don't return this client to the pool, in case it's
+ # broken.
+ curl.close()
+ if not ok:
+ _logger.debug("Request fail: GET %s => %s: %s",
+ url, type(self._result['error']), str(self._result['error']))
+ return None
+ _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
+ self._result['status_code'],
+ len(self._result['body']),
+ t.msecs,
+ (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+ resp_md5 = hashlib.md5(self._result['body']).hexdigest()
+ if resp_md5 != locator.md5sum:
+ _logger.warning("Checksum fail: md5(%s) = %s",
+ url, resp_md5)
+ self._result['error'] = arvados.errors.HttpError(
+ 0, 'Checksum fail')
+ return None
+ return self._result['body']
def put(self, hash_s, body, timeout=None):
url = self.root + hash_s
_logger.debug("Request: PUT %s", url)
+ curl = self._get_user_agent()
try:
- result = self.session.put(url.encode('utf-8'),
- data=body,
- headers=self.put_headers,
- timeout=timeout)
+ self._headers = {}
+ body_reader = cStringIO.StringIO(body)
+ response_body = cStringIO.StringIO()
+ curl.setopt(pycurl.NOSIGNAL, 1)
+ curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+ curl.setopt(pycurl.URL, url.encode('utf-8'))
+ # Using UPLOAD tells cURL to wait for a "go ahead" from the
+ # Keep server (in the form of a HTTP/1.1 "100 Continue"
+ # response) instead of sending the request body immediately.
+ # This allows the server to reject the request if the request
+ # is invalid or the server is read-only, without waiting for
+ # the client to send the entire block.
+ curl.setopt(pycurl.UPLOAD, True)
+ curl.setopt(pycurl.INFILESIZE, len(body))
+ curl.setopt(pycurl.READFUNCTION, body_reader.read)
+ curl.setopt(pycurl.HTTPHEADER, [
+ '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
+ curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
+ curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+ self._setcurltimeouts(curl, timeout)
+ try:
+ curl.perform()
+ except Exception as e:
+ raise arvados.errors.HttpError(0, str(e))
+ self._result = {
+ 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
+ 'body': response_body.getvalue(),
+ 'headers': self._headers,
+ 'error': False,
+ }
+ ok = retry.check_http_response_success(self._result['status_code'])
+ if not ok:
+ self._result['error'] = arvados.errors.HttpError(
+ self._result['status_code'],
+ self._headers.get('x-status-line', 'Error'))
except self.HTTP_ERRORS as e:
+ self._result = {
+ 'error': e,
+ }
+ ok = False
+ self._usable = ok != False # still usable if ok is True or None
+ if self._result.get('status_code', None):
+ # Client is functional. See comment in get().
+ self._put_user_agent(curl)
+ else:
+ curl.close()
+ if not ok:
_logger.debug("Request fail: PUT %s => %s: %s",
- url, type(e), str(e))
- self.last_result = e
+ url, type(self._result['error']), str(self._result['error']))
+ return False
+ return True
+
+ def _setcurltimeouts(self, curl, timeouts):
+ if not timeouts:
+ return
+ elif isinstance(timeouts, tuple):
+ conn_t, xfer_t = timeouts
else:
- self.last_result = result
- self.success_flag = retry.check_http_response_success(result)
- return self.success_flag
+ conn_t, xfer_t = (timeouts, timeouts)
+ curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
+ curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
+
+ def _headerfunction(self, header_line):
+ header_line = header_line.decode('iso-8859-1')
+ if ':' in header_line:
+ name, value = header_line.split(':', 1)
+ name = name.strip().lower()
+ value = value.strip()
+ elif self._headers:
+ name = self._lastheadername
+ value = self._headers[name] + ' ' + header_line.strip()
+ elif header_line.startswith('HTTP/'):
+ name = 'x-status-line'
+ value = header_line
+ else:
+ _logger.error("Unexpected header line: %s", header_line)
+ return
+ self._lastheadername = name
+ self._headers[name] = value
+ # Returning None implies all bytes were written
class KeepWriterThread(threading.Thread):
self.args['data_hash'],
self.args['data'],
timeout=self.args.get('timeout', None)))
- status = self.service.last_status()
+ result = self.service.last_result()
if self._success:
- result = self.service.last_result
_logger.debug("KeepWriterThread %s succeeded %s+%i %s",
str(threading.current_thread()),
self.args['data_hash'],
# we're talking to a proxy or other backend that
# stores to multiple copies for us.
try:
- replicas_stored = int(result.headers['x-keep-replicas-stored'])
+ replicas_stored = int(result['headers']['x-keep-replicas-stored'])
except (KeyError, ValueError):
replicas_stored = 1
- limiter.save_response(result.text.strip(), replicas_stored)
- elif status is not None:
+ limiter.save_response(result['body'].strip(), replicas_stored)
+ elif result.get('status_code', None):
_logger.debug("Request fail: PUT %s => %s %s",
- self.args['data_hash'], status,
- self.service.last_result.text)
+ self.args['data_hash'],
+ result['status_code'],
+ result['body'])
def __init__(self, api_client=None, proxy=None,
"""Initialize a new KeepClient.
Arguments:
- * api_client: The API client to use to find Keep services. If not
+ :api_client:
+ The API client to use to find Keep services. If not
provided, KeepClient will build one from available Arvados
configuration.
- * proxy: If specified, this KeepClient will send requests to this
- Keep proxy. Otherwise, KeepClient will fall back to the setting
- of the ARVADOS_KEEP_PROXY configuration setting. If you want to
- ensure KeepClient does not use a proxy, pass in an empty string.
- * timeout: The timeout (in seconds) for HTTP requests to Keep
+
+ :proxy:
+ If specified, this KeepClient will send requests to this Keep
+ proxy. Otherwise, KeepClient will fall back to the setting of the
+ ARVADOS_KEEP_PROXY configuration setting. If you want to ensure
+ KeepClient does not use a proxy, pass in an empty string.
+
+ :timeout:
+ The initial timeout (in seconds) for HTTP requests to Keep
non-proxy servers. A tuple of two floats is interpreted as
(connection_timeout, read_timeout): see
http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
+ Because timeouts are often a result of transient server load, the
+ actual connection timeout will be increased by a factor of two on
+ each retry.
Default: (2, 300).
- * proxy_timeout: The timeout (in seconds) for HTTP requests to
+
+ :proxy_timeout:
+ The initial timeout (in seconds) for HTTP requests to
Keep proxies. A tuple of two floats is interpreted as
- (connection_timeout, read_timeout). Default: (20, 300).
- * api_token: If you're not using an API client, but only talking
+ (connection_timeout, read_timeout). The behavior described
+ above for adjusting connection timeouts on retry also applies.
+ Default: (20, 300).
+
+ :api_token:
+ If you're not using an API client, but only talking
directly to a Keep proxy, this parameter specifies an API token
to authenticate Keep requests. It is an error to specify both
api_client and api_token. If you specify neither, KeepClient
will use one available from the Arvados configuration.
- * local_store: If specified, this KeepClient will bypass Keep
+
+ :local_store:
+ If specified, this KeepClient will bypass Keep
services, and save data to the named directory. If unspecified,
KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
environment variable. If you want to ensure KeepClient does not
use local storage, pass in an empty string. This is primarily
intended to mock a server for testing.
- * num_retries: The default number of times to retry failed requests.
+
+ :num_retries:
+ The default number of times to retry failed requests.
This will be used as the default num_retries value when get() and
put() are called. Default 0.
"""
self.block_cache = block_cache if block_cache else KeepBlockCache()
self.timeout = timeout
self.proxy_timeout = proxy_timeout
+ self._user_agent_pool = Queue.LifoQueue()
if local_store:
self.local_store = local_store
self.put = self.local_store_put
else:
self.num_retries = num_retries
- self.session = session if session is not None else requests.Session()
if proxy:
if not proxy.endswith('/'):
proxy += '/'
self.api_token = api_token
+ self._gateway_services = {}
self._keep_services = [{
'uuid': 'proxy',
'_service_root': proxy,
}]
+ self._writable_services = self._keep_services
self.using_proxy = True
self._static_services_list = True
else:
api_client = arvados.api('v1')
self.api_client = api_client
self.api_token = api_client.api_token
+ self._gateway_services = {}
self._keep_services = None
+ self._writable_services = None
self.using_proxy = None
self._static_services_list = False
- def current_timeout(self):
- """Return the appropriate timeout to use for this client: the proxy
- timeout setting if the backend service is currently a proxy,
- the regular timeout setting otherwise.
+ def current_timeout(self, attempt_number):
+ """Return the appropriate timeout to use for this client.
+
+ The proxy timeout setting if the backend service is currently a proxy,
+ the regular timeout setting otherwise. The `attempt_number` indicates
+ how many times the operation has been tried already (starting from 0
+ for the first try), and scales the connection timeout portion of the
+ return value accordingly.
+
"""
# TODO(twp): the timeout should be a property of a
# KeepService, not a KeepClient. See #4488.
- return self.proxy_timeout if self.using_proxy else self.timeout
+ t = self.proxy_timeout if self.using_proxy else self.timeout
+ return (t[0] * (1 << attempt_number), t[1])
def build_services_list(self, force_rebuild=False):
if (self._static_services_list or
except Exception: # API server predates Keep services.
keep_services = self.api_client.keep_disks().list()
- self._keep_services = keep_services.execute().get('items')
- if not self._keep_services:
+ accessible = keep_services.execute().get('items')
+ if not accessible:
raise arvados.errors.NoKeepServersError()
- self.using_proxy = any(ks.get('service_type') == 'proxy'
- for ks in self._keep_services)
-
# Precompute the base URI for each service.
- for r in self._keep_services:
- r['_service_root'] = "{}://[{}]:{:d}/".format(
+ for r in accessible:
+ host = r['service_host']
+ if not host.startswith('[') and host.find(':') >= 0:
+ # IPv6 URIs must be formatted like http://[::1]:80/...
+ host = '[' + host + ']'
+ r['_service_root'] = "{}://{}:{:d}/".format(
'https' if r['service_ssl_flag'] else 'http',
- r['service_host'],
+ host,
r['service_port'])
+
+ # Gateway services are only used when specified by UUID,
+ # so there's nothing to gain by filtering them by
+ # service_type.
+ self._gateway_services = {ks.get('uuid'): ks for ks in accessible}
+ _logger.debug(str(self._gateway_services))
+
+ self._keep_services = [
+ ks for ks in accessible
+ if ks.get('service_type') in ['disk', 'proxy']]
+ self._writable_services = [
+ ks for ks in accessible
+ if (ks.get('service_type') in ['disk', 'proxy']) and (True != ks.get('read_only'))]
_logger.debug(str(self._keep_services))
+ self.using_proxy = any(ks.get('service_type') == 'proxy'
+ for ks in self._keep_services)
+
def _service_weight(self, data_hash, service_uuid):
"""Compute the weight of a Keep service endpoint for a data
block with a known hash.
"""
return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
- def weighted_service_roots(self, data_hash, force_rebuild=False):
+ def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
"""Return an array of Keep service endpoints, in the order in
which they should be probed when reading or writing data with
- the given hash.
+ the given hash+hints.
"""
self.build_services_list(force_rebuild)
- # Sort the available services by weight (heaviest first) for
- # this data_hash, and return their service_roots (base URIs)
+ sorted_roots = []
+
+ # Use the services indicated by the given +K@... remote
+ # service hints, if any are present and can be resolved to a
+ # URI.
+ for hint in locator.hints:
+ if hint.startswith('K@'):
+ if len(hint) == 7:
+ sorted_roots.append(
+ "https://keep.{}.arvadosapi.com/".format(hint[2:]))
+ elif len(hint) == 29:
+ svc = self._gateway_services.get(hint[2:])
+ if svc:
+ sorted_roots.append(svc['_service_root'])
+
+ # Sort the available local services by weight (heaviest first)
+ # for this locator, and return their service_roots (base URIs)
# in that order.
- sorted_roots = [
+ use_services = self._keep_services
+ if need_writable:
+ use_services = self._writable_services
+ sorted_roots.extend([
svc['_service_root'] for svc in sorted(
- self._keep_services,
+ use_services,
reverse=True,
- key=lambda svc: self._service_weight(data_hash, svc['uuid']))]
- _logger.debug(data_hash + ': ' + str(sorted_roots))
+ key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
+ _logger.debug("{}: {}".format(locator, sorted_roots))
return sorted_roots
- def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
+ def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
# roots_map is a dictionary, mapping Keep service root strings
# to KeepService objects. Poll for Keep services, and add any
# new ones to roots_map. Return the current list of local
# root strings.
headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
- local_roots = self.weighted_service_roots(md5_s, force_rebuild)
+ local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
for root in local_roots:
if root not in roots_map:
- roots_map[root] = self.KeepService(root, self.session, **headers)
+ roots_map[root] = self.KeepService(
+ root, self._user_agent_pool, **headers)
return local_roots
@staticmethod
else:
return None
+ def get_from_cache(self, loc):
+ """Fetch a block only if is in the cache, otherwise return None."""
+ slot = self.block_cache.get(loc)
+ if slot is not None and slot.ready.is_set():
+ return slot.get()
+ else:
+ return None
+
@retry.retry_method
- def get(self, loc_s, num_retries=None, cache_only=False):
+ def get(self, loc_s, num_retries=None):
"""Get data from Keep.
This method fetches one or more blocks of data from Keep. It
to fetch data from every available Keep service, along with any
that are named in location hints in the locator. The default value
is set when the KeepClient is initialized.
- * cache_only: If true, return the block data only if already present in
- cache, otherwise return None.
"""
if ',' in loc_s:
return ''.join(self.get(x) for x in loc_s.split(','))
locator = KeepLocator(loc_s)
- expect_hash = locator.md5sum
-
- if cache_only:
- slot = self.block_cache.get(expect_hash)
- if slot.ready.is_set():
- return slot.get()
- else:
- return None
-
- slot, first = self.block_cache.reserve_cache(expect_hash)
+ slot, first = self.block_cache.reserve_cache(locator.md5sum)
if not first:
v = slot.get()
return v
+ # If the locator has hints specifying a prefix (indicating a
+ # remote keepproxy) or the UUID of a local gateway service,
+ # read data from the indicated service(s) instead of the usual
+ # list of local disk services.
+ hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
+ for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
+ hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
+ for hint in locator.hints if (
+ hint.startswith('K@') and
+ len(hint) == 29 and
+ self._gateway_services.get(hint[2:])
+ )])
+ # Map root URLs to their KeepService objects.
+ roots_map = {
+ root: self.KeepService(root, self._user_agent_pool)
+ for root in hint_roots
+ }
+
# See #3147 for a discussion of the loop implementation. Highlights:
# * Refresh the list of Keep services after each failure, in case
# it's being updated.
# * Retry until we succeed, we're out of retries, or every available
# service has returned permanent failure.
- hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
- for hint in locator.hints if hint.startswith('K@')]
- # Map root URLs their KeepService objects.
- roots_map = {root: self.KeepService(root, self.session) for root in hint_roots}
+ sorted_roots = []
+ roots_map = {}
blob = None
loop = retry.RetryLoop(num_retries, self._check_loop_result,
backoff_start=2)
for tries_left in loop:
try:
- local_roots = self.map_new_services(
- roots_map, expect_hash,
- force_rebuild=(tries_left < num_retries))
+ sorted_roots = self.map_new_services(
+ roots_map, locator,
+ force_rebuild=(tries_left < num_retries),
+ need_writable=False)
except Exception as error:
loop.save_result(error)
continue
# Query KeepService objects that haven't returned
# permanent failure, in our specified shuffle order.
services_to_try = [roots_map[root]
- for root in (local_roots + hint_roots)
+ for root in sorted_roots
if roots_map[root].usable()]
for keep_service in services_to_try:
- blob = keep_service.get(locator, timeout=self.current_timeout())
+ blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
if blob is not None:
break
loop.save_result((blob, len(services_to_try)))
if loop.success():
return blob
- try:
- all_roots = local_roots + hint_roots
- except NameError:
- # We never successfully fetched local_roots.
- all_roots = hint_roots
# Q: Including 403 is necessary for the Keep tests to continue
# passing, but maybe they should expect KeepReadError instead?
- not_founds = sum(1 for key in all_roots
- if roots_map[key].last_status() in {403, 404, 410})
- service_errors = ((key, roots_map[key].last_result)
- for key in all_roots)
+ not_founds = sum(1 for key in sorted_roots
+ if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
+ service_errors = ((key, roots_map[key].last_result()['error'])
+ for key in sorted_roots)
if not roots_map:
raise arvados.errors.KeepReadError(
"failed to read {}: no Keep services available ({})".format(
loc_s, loop.last_result()))
- elif not_founds == len(all_roots):
+ elif not_founds == len(sorted_roots):
raise arvados.errors.NotFoundError(
"{} not found".format(loc_s), service_errors)
else:
raise arvados.errors.KeepReadError(
- "failed to read {}".format(loc_s), service_errors)
+ "failed to read {}".format(loc_s), service_errors, label="service")
@retry.retry_method
def put(self, data, copies=2, num_retries=None):
exponential backoff. The default value is set when the
KeepClient is initialized.
"""
+
+ if isinstance(data, unicode):
+ data = data.encode("ascii")
+ elif not isinstance(data, str):
+ raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
+
data_hash = hashlib.md5(data).hexdigest()
if copies < 1:
return data_hash
+ locator = KeepLocator(data_hash + '+' + str(len(data)))
headers = {}
if self.using_proxy:
for tries_left in loop:
try:
local_roots = self.map_new_services(
- roots_map, data_hash,
- force_rebuild=(tries_left < num_retries), **headers)
+ roots_map, locator,
+ force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
except Exception as error:
loop.save_result(error)
continue
data_hash=data_hash,
service_root=service_root,
thread_limiter=thread_limiter,
- timeout=self.current_timeout())
+ timeout=self.current_timeout(num_retries-tries_left))
t.start()
threads.append(t)
for t in threads:
"failed to write {}: no Keep services available ({})".format(
data_hash, loop.last_result()))
else:
- service_errors = ((key, roots_map[key].last_result)
+ service_errors = ((key, roots_map[key].last_result()['error'])
for key in local_roots
- if not roots_map[key].success_flag)
+ if roots_map[key].last_result()['error'])
raise arvados.errors.KeepWriteError(
"failed to write {} (wanted {} copies but wrote {})".format(
- data_hash, copies, thread_limiter.done()), service_errors)
+ data_hash, copies, thread_limiter.done()), service_errors, label="service")
def local_store_put(self, data, copies=1, num_retries=None):
"""A stub for put().