1 from __future__ import absolute_import
2 from __future__ import division
3 from future import standard_library
4 from future.utils import native_str
5 standard_library.install_aliases()
6 from builtins import next
7 from builtins import str
8 from builtins import range
9 from builtins import object
27 if sys.version_info >= (3, 0):
28 from io import BytesIO
30 from cStringIO import StringIO as BytesIO
33 import arvados.config as config
35 import arvados.retry as retry
38 _logger = logging.getLogger('arvados.keep')
39 global_client_object = None
42 # Monkey patch TCP constants when not available (apple). Values sourced from:
43 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
44 if sys.platform == 'darwin':
45 if not hasattr(socket, 'TCP_KEEPALIVE'):
46 socket.TCP_KEEPALIVE = 0x010
47 if not hasattr(socket, 'TCP_KEEPINTVL'):
48 socket.TCP_KEEPINTVL = 0x101
49 if not hasattr(socket, 'TCP_KEEPCNT'):
50 socket.TCP_KEEPCNT = 0x102
53 class KeepLocator(object):
54 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
55 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
57 def __init__(self, locator_str):
60 self._perm_expiry = None
61 pieces = iter(locator_str.split('+'))
62 self.md5sum = next(pieces)
64 self.size = int(next(pieces))
68 if self.HINT_RE.match(hint) is None:
69 raise ValueError("invalid hint format: {}".format(hint))
70 elif hint.startswith('A'):
71 self.parse_permission_hint(hint)
73 self.hints.append(hint)
78 for s in [self.md5sum, self.size,
79 self.permission_hint()] + self.hints
83 if self.size is not None:
84 return "%s+%i" % (self.md5sum, self.size)
88 def _make_hex_prop(name, length):
89 # Build and return a new property with the given name that
90 # must be a hex string of the given length.
91 data_name = '_{}'.format(name)
93 return getattr(self, data_name)
94 def setter(self, hex_str):
95 if not arvados.util.is_hex(hex_str, length):
96 raise ValueError("{} is not a {}-digit hex string: {!r}".
97 format(name, length, hex_str))
98 setattr(self, data_name, hex_str)
99 return property(getter, setter)
101 md5sum = _make_hex_prop('md5sum', 32)
102 perm_sig = _make_hex_prop('perm_sig', 40)
105 def perm_expiry(self):
106 return self._perm_expiry
109 def perm_expiry(self, value):
110 if not arvados.util.is_hex(value, 1, 8):
112 "permission timestamp must be a hex Unix timestamp: {}".
114 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
116 def permission_hint(self):
117 data = [self.perm_sig, self.perm_expiry]
120 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
121 return "A{}@{:08x}".format(*data)
123 def parse_permission_hint(self, s):
125 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
127 raise ValueError("bad permission hint {}".format(s))
129 def permission_expired(self, as_of_dt=None):
130 if self.perm_expiry is None:
132 elif as_of_dt is None:
133 as_of_dt = datetime.datetime.now()
134 return self.perm_expiry <= as_of_dt
138 """Simple interface to a global KeepClient object.
140 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
141 own API client. The global KeepClient will build an API client from the
142 current Arvados configuration, which may not match the one you built.
147 def global_client_object(cls):
148 global global_client_object
149 # Previously, KeepClient would change its behavior at runtime based
150 # on these configuration settings. We simulate that behavior here
151 # by checking the values and returning a new KeepClient if any of
153 key = (config.get('ARVADOS_API_HOST'),
154 config.get('ARVADOS_API_TOKEN'),
155 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
156 config.get('ARVADOS_KEEP_PROXY'),
157 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
158 os.environ.get('KEEP_LOCAL_STORE'))
159 if (global_client_object is None) or (cls._last_key != key):
160 global_client_object = KeepClient()
162 return global_client_object
165 def get(locator, **kwargs):
166 return Keep.global_client_object().get(locator, **kwargs)
169 def put(data, **kwargs):
170 return Keep.global_client_object().put(data, **kwargs)
172 class KeepBlockCache(object):
173 # Default RAM cache is 256MiB
174 def __init__(self, cache_max=(256 * 1024 * 1024)):
175 self.cache_max = cache_max
177 self._cache_lock = threading.Lock()
179 class CacheSlot(object):
180 __slots__ = ("locator", "ready", "content")
182 def __init__(self, locator):
183 self.locator = locator
184 self.ready = threading.Event()
191 def set(self, value):
196 if self.content is None:
199 return len(self.content)
202 '''Cap the cache size to self.cache_max'''
203 with self._cache_lock:
204 # Select all slots except those where ready.is_set() and content is
205 # None (that means there was an error reading the block).
206 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
207 sm = sum([slot.size() for slot in self._cache])
208 while len(self._cache) > 0 and sm > self.cache_max:
209 for i in range(len(self._cache)-1, -1, -1):
210 if self._cache[i].ready.is_set():
213 sm = sum([slot.size() for slot in self._cache])
215 def _get(self, locator):
216 # Test if the locator is already in the cache
217 for i in range(0, len(self._cache)):
218 if self._cache[i].locator == locator:
221 # move it to the front
223 self._cache.insert(0, n)
227 def get(self, locator):
228 with self._cache_lock:
229 return self._get(locator)
231 def reserve_cache(self, locator):
232 '''Reserve a cache slot for the specified locator,
233 or return the existing slot.'''
234 with self._cache_lock:
235 n = self._get(locator)
239 # Add a new cache slot for the locator
240 n = KeepBlockCache.CacheSlot(locator)
241 self._cache.insert(0, n)
244 class Counter(object):
245 def __init__(self, v=0):
246 self._lk = threading.Lock()
258 class KeepClient(object):
260 # Default Keep server connection timeout: 2 seconds
261 # Default Keep server read timeout: 256 seconds
262 # Default Keep server bandwidth minimum: 32768 bytes per second
263 # Default Keep proxy connection timeout: 20 seconds
264 # Default Keep proxy read timeout: 256 seconds
265 # Default Keep proxy bandwidth minimum: 32768 bytes per second
266 DEFAULT_TIMEOUT = (2, 256, 32768)
267 DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
270 class KeepService(object):
271 """Make requests to a single Keep service, and track results.
273 A KeepService is intended to last long enough to perform one
274 transaction (GET or PUT) against one Keep service. This can
275 involve calling either get() or put() multiple times in order
276 to retry after transient failures. However, calling both get()
277 and put() on a single instance -- or using the same instance
278 to access two different Keep services -- will not produce
285 arvados.errors.HttpError,
288 def __init__(self, root, user_agent_pool=queue.LifoQueue(),
290 download_counter=None, **headers):
292 self._user_agent_pool = user_agent_pool
293 self._result = {'error': None}
297 self.get_headers = {'Accept': 'application/octet-stream'}
298 self.get_headers.update(headers)
299 self.put_headers = headers
300 self.upload_counter = upload_counter
301 self.download_counter = download_counter
304 """Is it worth attempting a request?"""
308 """Did the request succeed or encounter permanent failure?"""
309 return self._result['error'] == False or not self._usable
311 def last_result(self):
314 def _get_user_agent(self):
316 return self._user_agent_pool.get(block=False)
320 def _put_user_agent(self, ua):
323 self._user_agent_pool.put(ua, block=False)
327 def _socket_open(self, *args, **kwargs):
328 if len(args) + len(kwargs) == 2:
329 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
331 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
333 def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
334 return self._socket_open_pycurl_7_21_5(
336 address=collections.namedtuple(
337 'Address', ['family', 'socktype', 'protocol', 'addr'],
338 )(family, socktype, protocol, address))
340 def _socket_open_pycurl_7_21_5(self, purpose, address):
341 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
342 s = socket.socket(address.family, address.socktype, address.protocol)
343 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
344 # Will throw invalid protocol error on mac. This test prevents that.
345 if hasattr(socket, 'TCP_KEEPIDLE'):
346 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
347 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
351 def get(self, locator, method="GET", timeout=None):
352 # locator is a KeepLocator object.
353 url = self.root + str(locator)
354 _logger.debug("Request: %s %s", method, url)
355 curl = self._get_user_agent()
358 with timer.Timer() as t:
360 response_body = BytesIO()
361 curl.setopt(pycurl.NOSIGNAL, 1)
362 curl.setopt(pycurl.OPENSOCKETFUNCTION,
363 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
364 curl.setopt(pycurl.URL, url.encode('utf-8'))
365 curl.setopt(pycurl.HTTPHEADER, [
366 '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
367 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
368 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
370 curl.setopt(pycurl.NOBODY, True)
371 self._setcurltimeouts(curl, timeout)
375 except Exception as e:
376 raise arvados.errors.HttpError(0, str(e))
382 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
383 'body': response_body.getvalue(),
384 'headers': self._headers,
388 ok = retry.check_http_response_success(self._result['status_code'])
390 self._result['error'] = arvados.errors.HttpError(
391 self._result['status_code'],
392 self._headers.get('x-status-line', 'Error'))
393 except self.HTTP_ERRORS as e:
397 self._usable = ok != False
398 if self._result.get('status_code', None):
399 # The client worked well enough to get an HTTP status
400 # code, so presumably any problems are just on the
401 # server side and it's OK to reuse the client.
402 self._put_user_agent(curl)
404 # Don't return this client to the pool, in case it's
408 _logger.debug("Request fail: GET %s => %s: %s",
409 url, type(self._result['error']), str(self._result['error']))
412 _logger.info("HEAD %s: %s bytes",
413 self._result['status_code'],
414 self._result.get('content-length'))
417 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
418 self._result['status_code'],
419 len(self._result['body']),
421 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
423 if self.download_counter:
424 self.download_counter.add(len(self._result['body']))
425 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
426 if resp_md5 != locator.md5sum:
427 _logger.warning("Checksum fail: md5(%s) = %s",
429 self._result['error'] = arvados.errors.HttpError(
432 return self._result['body']
434 def put(self, hash_s, body, timeout=None):
435 url = self.root + hash_s
436 _logger.debug("Request: PUT %s", url)
437 curl = self._get_user_agent()
440 with timer.Timer() as t:
442 body_reader = BytesIO(body)
443 response_body = BytesIO()
444 curl.setopt(pycurl.NOSIGNAL, 1)
445 curl.setopt(pycurl.OPENSOCKETFUNCTION,
446 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
447 curl.setopt(pycurl.URL, url.encode('utf-8'))
448 # Using UPLOAD tells cURL to wait for a "go ahead" from the
449 # Keep server (in the form of a HTTP/1.1 "100 Continue"
450 # response) instead of sending the request body immediately.
451 # This allows the server to reject the request if the request
452 # is invalid or the server is read-only, without waiting for
453 # the client to send the entire block.
454 curl.setopt(pycurl.UPLOAD, True)
455 curl.setopt(pycurl.INFILESIZE, len(body))
456 curl.setopt(pycurl.READFUNCTION, body_reader.read)
457 curl.setopt(pycurl.HTTPHEADER, [
458 '{}: {}'.format(k,v) for k,v in self.put_headers.items()])
459 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
460 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
461 self._setcurltimeouts(curl, timeout)
464 except Exception as e:
465 raise arvados.errors.HttpError(0, str(e))
471 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
472 'body': response_body.getvalue().decode('utf-8'),
473 'headers': self._headers,
476 ok = retry.check_http_response_success(self._result['status_code'])
478 self._result['error'] = arvados.errors.HttpError(
479 self._result['status_code'],
480 self._headers.get('x-status-line', 'Error'))
481 except self.HTTP_ERRORS as e:
485 self._usable = ok != False # still usable if ok is True or None
486 if self._result.get('status_code', None):
487 # Client is functional. See comment in get().
488 self._put_user_agent(curl)
492 _logger.debug("Request fail: PUT %s => %s: %s",
493 url, type(self._result['error']), str(self._result['error']))
495 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
496 self._result['status_code'],
499 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
500 if self.upload_counter:
501 self.upload_counter.add(len(body))
504 def _setcurltimeouts(self, curl, timeouts):
507 elif isinstance(timeouts, tuple):
508 if len(timeouts) == 2:
509 conn_t, xfer_t = timeouts
510 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
512 conn_t, xfer_t, bandwidth_bps = timeouts
514 conn_t, xfer_t = (timeouts, timeouts)
515 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
516 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
517 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
518 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
520 def _headerfunction(self, header_line):
521 if isinstance(header_line, bytes):
522 header_line = header_line.decode('iso-8859-1')
523 if ':' in header_line:
524 name, value = header_line.split(':', 1)
525 name = name.strip().lower()
526 value = value.strip()
528 name = self._lastheadername
529 value = self._headers[name] + ' ' + header_line.strip()
530 elif header_line.startswith('HTTP/'):
531 name = 'x-status-line'
534 _logger.error("Unexpected header line: %s", header_line)
536 self._lastheadername = name
537 self._headers[name] = value
538 # Returning None implies all bytes were written
541 class KeepWriterQueue(queue.Queue):
542 def __init__(self, copies):
543 queue.Queue.__init__(self) # Old-style superclass
544 self.wanted_copies = copies
545 self.successful_copies = 0
547 self.successful_copies_lock = threading.Lock()
548 self.pending_tries = copies
549 self.pending_tries_notification = threading.Condition()
551 def write_success(self, response, replicas_nr):
552 with self.successful_copies_lock:
553 self.successful_copies += replicas_nr
554 self.response = response
555 with self.pending_tries_notification:
556 self.pending_tries_notification.notify_all()
558 def write_fail(self, ks):
559 with self.pending_tries_notification:
560 self.pending_tries += 1
561 self.pending_tries_notification.notify()
563 def pending_copies(self):
564 with self.successful_copies_lock:
565 return self.wanted_copies - self.successful_copies
567 def get_next_task(self):
568 with self.pending_tries_notification:
570 if self.pending_copies() < 1:
571 # This notify_all() is unnecessary --
572 # write_success() already called notify_all()
573 # when pending<1 became true, so it's not
574 # possible for any other thread to be in
575 # wait() now -- but it's cheap insurance
576 # against deadlock so we do it anyway:
577 self.pending_tries_notification.notify_all()
578 # Drain the queue and then raise Queue.Empty
582 elif self.pending_tries > 0:
583 service, service_root = self.get_nowait()
584 if service.finished():
587 self.pending_tries -= 1
588 return service, service_root
590 self.pending_tries_notification.notify_all()
593 self.pending_tries_notification.wait()
596 class KeepWriterThreadPool(object):
597 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
598 self.total_task_nr = 0
599 self.wanted_copies = copies
600 if (not max_service_replicas) or (max_service_replicas >= copies):
603 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
604 _logger.debug("Pool max threads is %d", num_threads)
606 self.queue = KeepClient.KeepWriterQueue(copies)
608 for _ in range(num_threads):
609 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
610 self.workers.append(w)
612 def add_task(self, ks, service_root):
613 self.queue.put((ks, service_root))
614 self.total_task_nr += 1
617 return self.queue.successful_copies
621 for worker in self.workers:
623 # Wait for finished work
627 return self.queue.response
630 class KeepWriterThread(threading.Thread):
631 TaskFailed = RuntimeError()
633 def __init__(self, queue, data, data_hash, timeout=None):
634 super(KeepClient.KeepWriterThread, self).__init__()
635 self.timeout = timeout
638 self.data_hash = data_hash
644 service, service_root = self.queue.get_next_task()
648 locator, copies = self.do_task(service, service_root)
649 except Exception as e:
650 if e is not self.TaskFailed:
651 _logger.exception("Exception in KeepWriterThread")
652 self.queue.write_fail(service)
654 self.queue.write_success(locator, copies)
656 self.queue.task_done()
658 def do_task(self, service, service_root):
659 success = bool(service.put(self.data_hash,
661 timeout=self.timeout))
662 result = service.last_result()
665 if result.get('status_code', None):
666 _logger.debug("Request fail: PUT %s => %s %s",
668 result['status_code'],
670 raise self.TaskFailed
672 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
673 str(threading.current_thread()),
678 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
679 except (KeyError, ValueError):
682 return result['body'].strip(), replicas_stored
685 def __init__(self, api_client=None, proxy=None,
686 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
687 api_token=None, local_store=None, block_cache=None,
688 num_retries=0, session=None):
689 """Initialize a new KeepClient.
693 The API client to use to find Keep services. If not
694 provided, KeepClient will build one from available Arvados
698 If specified, this KeepClient will send requests to this Keep
699 proxy. Otherwise, KeepClient will fall back to the setting of the
700 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
701 If you want to KeepClient does not use a proxy, pass in an empty
705 The initial timeout (in seconds) for HTTP requests to Keep
706 non-proxy servers. A tuple of three floats is interpreted as
707 (connection_timeout, read_timeout, minimum_bandwidth). A connection
708 will be aborted if the average traffic rate falls below
709 minimum_bandwidth bytes per second over an interval of read_timeout
710 seconds. Because timeouts are often a result of transient server
711 load, the actual connection timeout will be increased by a factor
712 of two on each retry.
713 Default: (2, 256, 32768).
716 The initial timeout (in seconds) for HTTP requests to
717 Keep proxies. A tuple of three floats is interpreted as
718 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
719 described above for adjusting connection timeouts on retry also
721 Default: (20, 256, 32768).
724 If you're not using an API client, but only talking
725 directly to a Keep proxy, this parameter specifies an API token
726 to authenticate Keep requests. It is an error to specify both
727 api_client and api_token. If you specify neither, KeepClient
728 will use one available from the Arvados configuration.
731 If specified, this KeepClient will bypass Keep
732 services, and save data to the named directory. If unspecified,
733 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
734 environment variable. If you want to ensure KeepClient does not
735 use local storage, pass in an empty string. This is primarily
736 intended to mock a server for testing.
739 The default number of times to retry failed requests.
740 This will be used as the default num_retries value when get() and
741 put() are called. Default 0.
743 self.lock = threading.Lock()
745 if config.get('ARVADOS_KEEP_SERVICES'):
746 proxy = config.get('ARVADOS_KEEP_SERVICES')
748 proxy = config.get('ARVADOS_KEEP_PROXY')
749 if api_token is None:
750 if api_client is None:
751 api_token = config.get('ARVADOS_API_TOKEN')
753 api_token = api_client.api_token
754 elif api_client is not None:
756 "can't build KeepClient with both API client and token")
757 if local_store is None:
758 local_store = os.environ.get('KEEP_LOCAL_STORE')
760 self.block_cache = block_cache if block_cache else KeepBlockCache()
761 self.timeout = timeout
762 self.proxy_timeout = proxy_timeout
763 self._user_agent_pool = queue.LifoQueue()
764 self.upload_counter = Counter()
765 self.download_counter = Counter()
766 self.put_counter = Counter()
767 self.get_counter = Counter()
768 self.hits_counter = Counter()
769 self.misses_counter = Counter()
772 self.local_store = local_store
773 self.get = self.local_store_get
774 self.put = self.local_store_put
776 self.num_retries = num_retries
777 self.max_replicas_per_service = None
779 proxy_uris = proxy.split()
780 for i in range(len(proxy_uris)):
781 if not proxy_uris[i].endswith('/'):
784 url = urllib.parse.urlparse(proxy_uris[i])
785 if not (url.scheme and url.netloc):
786 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
787 self.api_token = api_token
788 self._gateway_services = {}
789 self._keep_services = [{
790 'uuid': "00000-bi6l4-%015d" % idx,
791 'service_type': 'proxy',
792 '_service_root': uri,
793 } for idx, uri in enumerate(proxy_uris)]
794 self._writable_services = self._keep_services
795 self.using_proxy = True
796 self._static_services_list = True
798 # It's important to avoid instantiating an API client
799 # unless we actually need one, for testing's sake.
800 if api_client is None:
801 api_client = arvados.api('v1')
802 self.api_client = api_client
803 self.api_token = api_client.api_token
804 self._gateway_services = {}
805 self._keep_services = None
806 self._writable_services = None
807 self.using_proxy = None
808 self._static_services_list = False
810 def current_timeout(self, attempt_number):
811 """Return the appropriate timeout to use for this client.
813 The proxy timeout setting if the backend service is currently a proxy,
814 the regular timeout setting otherwise. The `attempt_number` indicates
815 how many times the operation has been tried already (starting from 0
816 for the first try), and scales the connection timeout portion of the
817 return value accordingly.
820 # TODO(twp): the timeout should be a property of a
821 # KeepService, not a KeepClient. See #4488.
822 t = self.proxy_timeout if self.using_proxy else self.timeout
824 return (t[0] * (1 << attempt_number), t[1])
826 return (t[0] * (1 << attempt_number), t[1], t[2])
827 def _any_nondisk_services(self, service_list):
828 return any(ks.get('service_type', 'disk') != 'disk'
829 for ks in service_list)
831 def build_services_list(self, force_rebuild=False):
832 if (self._static_services_list or
833 (self._keep_services and not force_rebuild)):
837 keep_services = self.api_client.keep_services().accessible()
838 except Exception: # API server predates Keep services.
839 keep_services = self.api_client.keep_disks().list()
841 # Gateway services are only used when specified by UUID,
842 # so there's nothing to gain by filtering them by
844 self._gateway_services = {ks['uuid']: ks for ks in
845 keep_services.execute()['items']}
846 if not self._gateway_services:
847 raise arvados.errors.NoKeepServersError()
849 # Precompute the base URI for each service.
850 for r in self._gateway_services.values():
851 host = r['service_host']
852 if not host.startswith('[') and host.find(':') >= 0:
853 # IPv6 URIs must be formatted like http://[::1]:80/...
854 host = '[' + host + ']'
855 r['_service_root'] = "{}://{}:{:d}/".format(
856 'https' if r['service_ssl_flag'] else 'http',
860 _logger.debug(str(self._gateway_services))
861 self._keep_services = [
862 ks for ks in self._gateway_services.values()
863 if not ks.get('service_type', '').startswith('gateway:')]
864 self._writable_services = [ks for ks in self._keep_services
865 if not ks.get('read_only')]
867 # For disk type services, max_replicas_per_service is 1
868 # It is unknown (unlimited) for other service types.
869 if self._any_nondisk_services(self._writable_services):
870 self.max_replicas_per_service = None
872 self.max_replicas_per_service = 1
874 def _service_weight(self, data_hash, service_uuid):
875 """Compute the weight of a Keep service endpoint for a data
876 block with a known hash.
878 The weight is md5(h + u) where u is the last 15 characters of
879 the service endpoint's UUID.
881 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
883 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
884 """Return an array of Keep service endpoints, in the order in
885 which they should be probed when reading or writing data with
886 the given hash+hints.
888 self.build_services_list(force_rebuild)
891 # Use the services indicated by the given +K@... remote
892 # service hints, if any are present and can be resolved to a
894 for hint in locator.hints:
895 if hint.startswith('K@'):
898 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
899 elif len(hint) == 29:
900 svc = self._gateway_services.get(hint[2:])
902 sorted_roots.append(svc['_service_root'])
904 # Sort the available local services by weight (heaviest first)
905 # for this locator, and return their service_roots (base URIs)
907 use_services = self._keep_services
909 use_services = self._writable_services
910 self.using_proxy = self._any_nondisk_services(use_services)
911 sorted_roots.extend([
912 svc['_service_root'] for svc in sorted(
915 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
916 _logger.debug("{}: {}".format(locator, sorted_roots))
919 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
920 # roots_map is a dictionary, mapping Keep service root strings
921 # to KeepService objects. Poll for Keep services, and add any
922 # new ones to roots_map. Return the current list of local
924 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
925 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
926 for root in local_roots:
927 if root not in roots_map:
928 roots_map[root] = self.KeepService(
929 root, self._user_agent_pool,
930 upload_counter=self.upload_counter,
931 download_counter=self.download_counter,
936 def _check_loop_result(result):
937 # KeepClient RetryLoops should save results as a 2-tuple: the
938 # actual result of the request, and the number of servers available
939 # to receive the request this round.
940 # This method returns True if there's a real result, False if
941 # there are no more servers available, otherwise None.
942 if isinstance(result, Exception):
944 result, tried_server_count = result
945 if (result is not None) and (result is not False):
947 elif tried_server_count < 1:
948 _logger.info("No more Keep services to try; giving up")
953 def get_from_cache(self, loc):
954 """Fetch a block only if is in the cache, otherwise return None."""
955 slot = self.block_cache.get(loc)
956 if slot is not None and slot.ready.is_set():
962 def head(self, loc_s, num_retries=None):
963 return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
966 def get(self, loc_s, num_retries=None):
967 return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
969 def _get_or_head(self, loc_s, method="GET", num_retries=None):
970 """Get data from Keep.
972 This method fetches one or more blocks of data from Keep. It
973 sends a request each Keep service registered with the API
974 server (or the proxy provided when this client was
975 instantiated), then each service named in location hints, in
976 sequence. As soon as one service provides the data, it's
980 * loc_s: A string of one or more comma-separated locators to fetch.
981 This method returns the concatenation of these blocks.
982 * num_retries: The number of times to retry GET requests to
983 *each* Keep server if it returns temporary failures, with
984 exponential backoff. Note that, in each loop, the method may try
985 to fetch data from every available Keep service, along with any
986 that are named in location hints in the locator. The default value
987 is set when the KeepClient is initialized.
990 return ''.join(self.get(x) for x in loc_s.split(','))
992 self.get_counter.add(1)
994 locator = KeepLocator(loc_s)
996 slot, first = self.block_cache.reserve_cache(locator.md5sum)
998 self.hits_counter.add(1)
1002 self.misses_counter.add(1)
1004 # If the locator has hints specifying a prefix (indicating a
1005 # remote keepproxy) or the UUID of a local gateway service,
1006 # read data from the indicated service(s) instead of the usual
1007 # list of local disk services.
1008 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1009 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1010 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1011 for hint in locator.hints if (
1012 hint.startswith('K@') and
1014 self._gateway_services.get(hint[2:])
1016 # Map root URLs to their KeepService objects.
1018 root: self.KeepService(root, self._user_agent_pool,
1019 upload_counter=self.upload_counter,
1020 download_counter=self.download_counter)
1021 for root in hint_roots
1024 # See #3147 for a discussion of the loop implementation. Highlights:
1025 # * Refresh the list of Keep services after each failure, in case
1026 # it's being updated.
1027 # * Retry until we succeed, we're out of retries, or every available
1028 # service has returned permanent failure.
1032 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1034 for tries_left in loop:
1036 sorted_roots = self.map_new_services(
1038 force_rebuild=(tries_left < num_retries),
1039 need_writable=False)
1040 except Exception as error:
1041 loop.save_result(error)
1044 # Query KeepService objects that haven't returned
1045 # permanent failure, in our specified shuffle order.
1046 services_to_try = [roots_map[root]
1047 for root in sorted_roots
1048 if roots_map[root].usable()]
1049 for keep_service in services_to_try:
1050 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1051 if blob is not None:
1053 loop.save_result((blob, len(services_to_try)))
1055 # Always cache the result, then return it if we succeeded.
1058 self.block_cache.cap_cache()
1060 if method == "HEAD":
1065 # Q: Including 403 is necessary for the Keep tests to continue
1066 # passing, but maybe they should expect KeepReadError instead?
1067 not_founds = sum(1 for key in sorted_roots
1068 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1069 service_errors = ((key, roots_map[key].last_result()['error'])
1070 for key in sorted_roots)
1072 raise arvados.errors.KeepReadError(
1073 "failed to read {}: no Keep services available ({})".format(
1074 loc_s, loop.last_result()))
1075 elif not_founds == len(sorted_roots):
1076 raise arvados.errors.NotFoundError(
1077 "{} not found".format(loc_s), service_errors)
1079 raise arvados.errors.KeepReadError(
1080 "failed to read {}".format(loc_s), service_errors, label="service")
1083 def put(self, data, copies=2, num_retries=None):
1084 """Save data in Keep.
1086 This method will get a list of Keep services from the API server, and
1087 send the data to each one simultaneously in a new thread. Once the
1088 uploads are finished, if enough copies are saved, this method returns
1089 the most recent HTTP response body. If requests fail to upload
1090 enough copies, this method raises KeepWriteError.
1093 * data: The string of data to upload.
1094 * copies: The number of copies that the user requires be saved.
1096 * num_retries: The number of times to retry PUT requests to
1097 *each* Keep server if it returns temporary failures, with
1098 exponential backoff. The default value is set when the
1099 KeepClient is initialized.
1102 if not isinstance(data, bytes):
1103 data = data.encode()
1105 self.put_counter.add(1)
1107 data_hash = hashlib.md5(data).hexdigest()
1108 loc_s = data_hash + '+' + str(len(data))
1111 locator = KeepLocator(loc_s)
1114 # Tell the proxy how many copies we want it to store
1115 headers['X-Keep-Desired-Replicas'] = str(copies)
1117 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1120 for tries_left in loop:
1122 sorted_roots = self.map_new_services(
1124 force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
1125 except Exception as error:
1126 loop.save_result(error)
1129 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1130 data_hash=data_hash,
1131 copies=copies - done,
1132 max_service_replicas=self.max_replicas_per_service,
1133 timeout=self.current_timeout(num_retries - tries_left))
1134 for service_root, ks in [(root, roots_map[root])
1135 for root in sorted_roots]:
1138 writer_pool.add_task(ks, service_root)
1140 done += writer_pool.done()
1141 loop.save_result((done >= copies, writer_pool.total_task_nr))
1144 return writer_pool.response()
1146 raise arvados.errors.KeepWriteError(
1147 "failed to write {}: no Keep services available ({})".format(
1148 data_hash, loop.last_result()))
1150 service_errors = ((key, roots_map[key].last_result()['error'])
1151 for key in sorted_roots
1152 if roots_map[key].last_result()['error'])
1153 raise arvados.errors.KeepWriteError(
1154 "failed to write {} (wanted {} copies but wrote {})".format(
1155 data_hash, copies, writer_pool.done()), service_errors, label="service")
1157 def local_store_put(self, data, copies=1, num_retries=None):
1158 """A stub for put().
1160 This method is used in place of the real put() method when
1161 using local storage (see constructor's local_store argument).
1163 copies and num_retries arguments are ignored: they are here
1164 only for the sake of offering the same call signature as
1167 Data stored this way can be retrieved via local_store_get().
1169 md5 = hashlib.md5(data).hexdigest()
1170 locator = '%s+%d' % (md5, len(data))
1171 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1173 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1174 os.path.join(self.local_store, md5))
1177 def local_store_get(self, loc_s, num_retries=None):
1178 """Companion to local_store_put()."""
1180 locator = KeepLocator(loc_s)
1182 raise arvados.errors.NotFoundError(
1183 "Invalid data locator: '%s'" % loc_s)
1184 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1186 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1189 def is_cached(self, locator):
1190 return self.block_cache.reserve_cache(expect_hash)