1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 from future.utils import native_str
9 standard_library.install_aliases()
10 from builtins import next
11 from builtins import str
12 from builtins import range
13 from builtins import object
31 if sys.version_info >= (3, 0):
32 from io import BytesIO
34 from cStringIO import StringIO as BytesIO
37 import arvados.config as config
39 import arvados.retry as retry
42 _logger = logging.getLogger('arvados.keep')
43 global_client_object = None
46 # Monkey patch TCP constants when not available (apple). Values sourced from:
47 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
48 if sys.platform == 'darwin':
49 if not hasattr(socket, 'TCP_KEEPALIVE'):
50 socket.TCP_KEEPALIVE = 0x010
51 if not hasattr(socket, 'TCP_KEEPINTVL'):
52 socket.TCP_KEEPINTVL = 0x101
53 if not hasattr(socket, 'TCP_KEEPCNT'):
54 socket.TCP_KEEPCNT = 0x102
57 class KeepLocator(object):
58 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
59 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
61 def __init__(self, locator_str):
64 self._perm_expiry = None
65 pieces = iter(locator_str.split('+'))
66 self.md5sum = next(pieces)
68 self.size = int(next(pieces))
72 if self.HINT_RE.match(hint) is None:
73 raise ValueError("invalid hint format: {}".format(hint))
74 elif hint.startswith('A'):
75 self.parse_permission_hint(hint)
77 self.hints.append(hint)
82 for s in [self.md5sum, self.size,
83 self.permission_hint()] + self.hints
87 if self.size is not None:
88 return "%s+%i" % (self.md5sum, self.size)
92 def _make_hex_prop(name, length):
93 # Build and return a new property with the given name that
94 # must be a hex string of the given length.
95 data_name = '_{}'.format(name)
97 return getattr(self, data_name)
98 def setter(self, hex_str):
99 if not arvados.util.is_hex(hex_str, length):
100 raise ValueError("{} is not a {}-digit hex string: {!r}".
101 format(name, length, hex_str))
102 setattr(self, data_name, hex_str)
103 return property(getter, setter)
105 md5sum = _make_hex_prop('md5sum', 32)
106 perm_sig = _make_hex_prop('perm_sig', 40)
109 def perm_expiry(self):
110 return self._perm_expiry
113 def perm_expiry(self, value):
114 if not arvados.util.is_hex(value, 1, 8):
116 "permission timestamp must be a hex Unix timestamp: {}".
118 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
120 def permission_hint(self):
121 data = [self.perm_sig, self.perm_expiry]
124 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
125 return "A{}@{:08x}".format(*data)
127 def parse_permission_hint(self, s):
129 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
131 raise ValueError("bad permission hint {}".format(s))
133 def permission_expired(self, as_of_dt=None):
134 if self.perm_expiry is None:
136 elif as_of_dt is None:
137 as_of_dt = datetime.datetime.now()
138 return self.perm_expiry <= as_of_dt
142 """Simple interface to a global KeepClient object.
144 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
145 own API client. The global KeepClient will build an API client from the
146 current Arvados configuration, which may not match the one you built.
151 def global_client_object(cls):
152 global global_client_object
153 # Previously, KeepClient would change its behavior at runtime based
154 # on these configuration settings. We simulate that behavior here
155 # by checking the values and returning a new KeepClient if any of
157 key = (config.get('ARVADOS_API_HOST'),
158 config.get('ARVADOS_API_TOKEN'),
159 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
160 config.get('ARVADOS_KEEP_PROXY'),
161 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
162 os.environ.get('KEEP_LOCAL_STORE'))
163 if (global_client_object is None) or (cls._last_key != key):
164 global_client_object = KeepClient()
166 return global_client_object
169 def get(locator, **kwargs):
170 return Keep.global_client_object().get(locator, **kwargs)
173 def put(data, **kwargs):
174 return Keep.global_client_object().put(data, **kwargs)
176 class KeepBlockCache(object):
177 # Default RAM cache is 256MiB
178 def __init__(self, cache_max=(256 * 1024 * 1024)):
179 self.cache_max = cache_max
181 self._cache_lock = threading.Lock()
183 class CacheSlot(object):
184 __slots__ = ("locator", "ready", "content")
186 def __init__(self, locator):
187 self.locator = locator
188 self.ready = threading.Event()
195 def set(self, value):
200 if self.content is None:
203 return len(self.content)
206 '''Cap the cache size to self.cache_max'''
207 with self._cache_lock:
208 # Select all slots except those where ready.is_set() and content is
209 # None (that means there was an error reading the block).
210 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
211 sm = sum([slot.size() for slot in self._cache])
212 while len(self._cache) > 0 and sm > self.cache_max:
213 for i in range(len(self._cache)-1, -1, -1):
214 if self._cache[i].ready.is_set():
217 sm = sum([slot.size() for slot in self._cache])
219 def _get(self, locator):
220 # Test if the locator is already in the cache
221 for i in range(0, len(self._cache)):
222 if self._cache[i].locator == locator:
225 # move it to the front
227 self._cache.insert(0, n)
231 def get(self, locator):
232 with self._cache_lock:
233 return self._get(locator)
235 def reserve_cache(self, locator):
236 '''Reserve a cache slot for the specified locator,
237 or return the existing slot.'''
238 with self._cache_lock:
239 n = self._get(locator)
243 # Add a new cache slot for the locator
244 n = KeepBlockCache.CacheSlot(locator)
245 self._cache.insert(0, n)
248 class Counter(object):
249 def __init__(self, v=0):
250 self._lk = threading.Lock()
262 class KeepClient(object):
264 # Default Keep server connection timeout: 2 seconds
265 # Default Keep server read timeout: 256 seconds
266 # Default Keep server bandwidth minimum: 32768 bytes per second
267 # Default Keep proxy connection timeout: 20 seconds
268 # Default Keep proxy read timeout: 256 seconds
269 # Default Keep proxy bandwidth minimum: 32768 bytes per second
270 DEFAULT_TIMEOUT = (2, 256, 32768)
271 DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
274 class KeepService(object):
275 """Make requests to a single Keep service, and track results.
277 A KeepService is intended to last long enough to perform one
278 transaction (GET or PUT) against one Keep service. This can
279 involve calling either get() or put() multiple times in order
280 to retry after transient failures. However, calling both get()
281 and put() on a single instance -- or using the same instance
282 to access two different Keep services -- will not produce
289 arvados.errors.HttpError,
292 def __init__(self, root, user_agent_pool=queue.LifoQueue(),
294 download_counter=None,
297 self._user_agent_pool = user_agent_pool
298 self._result = {'error': None}
302 self.get_headers = {'Accept': 'application/octet-stream'}
303 self.get_headers.update(headers)
304 self.put_headers = headers
305 self.upload_counter = upload_counter
306 self.download_counter = download_counter
309 """Is it worth attempting a request?"""
313 """Did the request succeed or encounter permanent failure?"""
314 return self._result['error'] == False or not self._usable
316 def last_result(self):
319 def _get_user_agent(self):
321 return self._user_agent_pool.get(block=False)
325 def _put_user_agent(self, ua):
328 self._user_agent_pool.put(ua, block=False)
332 def _socket_open(self, *args, **kwargs):
333 if len(args) + len(kwargs) == 2:
334 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
336 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
338 def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
339 return self._socket_open_pycurl_7_21_5(
341 address=collections.namedtuple(
342 'Address', ['family', 'socktype', 'protocol', 'addr'],
343 )(family, socktype, protocol, address))
345 def _socket_open_pycurl_7_21_5(self, purpose, address):
346 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
347 s = socket.socket(address.family, address.socktype, address.protocol)
348 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
349 # Will throw invalid protocol error on mac. This test prevents that.
350 if hasattr(socket, 'TCP_KEEPIDLE'):
351 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
352 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
356 def get(self, locator, method="GET", timeout=None):
357 # locator is a KeepLocator object.
358 url = self.root + str(locator)
359 _logger.debug("Request: %s %s", method, url)
360 curl = self._get_user_agent()
363 with timer.Timer() as t:
365 response_body = BytesIO()
366 curl.setopt(pycurl.NOSIGNAL, 1)
367 curl.setopt(pycurl.OPENSOCKETFUNCTION,
368 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
369 curl.setopt(pycurl.URL, url.encode('utf-8'))
370 curl.setopt(pycurl.HTTPHEADER, [
371 '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
372 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
373 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
375 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
377 curl.setopt(pycurl.NOBODY, True)
378 self._setcurltimeouts(curl, timeout)
382 except Exception as e:
383 raise arvados.errors.HttpError(0, str(e))
389 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
390 'body': response_body.getvalue(),
391 'headers': self._headers,
395 ok = retry.check_http_response_success(self._result['status_code'])
397 self._result['error'] = arvados.errors.HttpError(
398 self._result['status_code'],
399 self._headers.get('x-status-line', 'Error'))
400 except self.HTTP_ERRORS as e:
404 self._usable = ok != False
405 if self._result.get('status_code', None):
406 # The client worked well enough to get an HTTP status
407 # code, so presumably any problems are just on the
408 # server side and it's OK to reuse the client.
409 self._put_user_agent(curl)
411 # Don't return this client to the pool, in case it's
415 _logger.debug("Request fail: GET %s => %s: %s",
416 url, type(self._result['error']), str(self._result['error']))
419 _logger.info("HEAD %s: %s bytes",
420 self._result['status_code'],
421 self._result.get('content-length'))
424 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
425 self._result['status_code'],
426 len(self._result['body']),
428 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
430 if self.download_counter:
431 self.download_counter.add(len(self._result['body']))
432 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
433 if resp_md5 != locator.md5sum:
434 _logger.warning("Checksum fail: md5(%s) = %s",
436 self._result['error'] = arvados.errors.HttpError(
439 return self._result['body']
441 def put(self, hash_s, body, timeout=None):
442 url = self.root + hash_s
443 _logger.debug("Request: PUT %s", url)
444 curl = self._get_user_agent()
447 with timer.Timer() as t:
449 body_reader = BytesIO(body)
450 response_body = BytesIO()
451 curl.setopt(pycurl.NOSIGNAL, 1)
452 curl.setopt(pycurl.OPENSOCKETFUNCTION,
453 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
454 curl.setopt(pycurl.URL, url.encode('utf-8'))
455 # Using UPLOAD tells cURL to wait for a "go ahead" from the
456 # Keep server (in the form of a HTTP/1.1 "100 Continue"
457 # response) instead of sending the request body immediately.
458 # This allows the server to reject the request if the request
459 # is invalid or the server is read-only, without waiting for
460 # the client to send the entire block.
461 curl.setopt(pycurl.UPLOAD, True)
462 curl.setopt(pycurl.INFILESIZE, len(body))
463 curl.setopt(pycurl.READFUNCTION, body_reader.read)
464 curl.setopt(pycurl.HTTPHEADER, [
465 '{}: {}'.format(k,v) for k,v in self.put_headers.items()])
466 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
467 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
469 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
470 self._setcurltimeouts(curl, timeout)
473 except Exception as e:
474 raise arvados.errors.HttpError(0, str(e))
480 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
481 'body': response_body.getvalue().decode('utf-8'),
482 'headers': self._headers,
485 ok = retry.check_http_response_success(self._result['status_code'])
487 self._result['error'] = arvados.errors.HttpError(
488 self._result['status_code'],
489 self._headers.get('x-status-line', 'Error'))
490 except self.HTTP_ERRORS as e:
494 self._usable = ok != False # still usable if ok is True or None
495 if self._result.get('status_code', None):
496 # Client is functional. See comment in get().
497 self._put_user_agent(curl)
501 _logger.debug("Request fail: PUT %s => %s: %s",
502 url, type(self._result['error']), str(self._result['error']))
504 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
505 self._result['status_code'],
508 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
509 if self.upload_counter:
510 self.upload_counter.add(len(body))
513 def _setcurltimeouts(self, curl, timeouts):
516 elif isinstance(timeouts, tuple):
517 if len(timeouts) == 2:
518 conn_t, xfer_t = timeouts
519 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
521 conn_t, xfer_t, bandwidth_bps = timeouts
523 conn_t, xfer_t = (timeouts, timeouts)
524 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
525 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
526 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
527 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
529 def _headerfunction(self, header_line):
530 if isinstance(header_line, bytes):
531 header_line = header_line.decode('iso-8859-1')
532 if ':' in header_line:
533 name, value = header_line.split(':', 1)
534 name = name.strip().lower()
535 value = value.strip()
537 name = self._lastheadername
538 value = self._headers[name] + ' ' + header_line.strip()
539 elif header_line.startswith('HTTP/'):
540 name = 'x-status-line'
543 _logger.error("Unexpected header line: %s", header_line)
545 self._lastheadername = name
546 self._headers[name] = value
547 # Returning None implies all bytes were written
550 class KeepWriterQueue(queue.Queue):
551 def __init__(self, copies):
552 queue.Queue.__init__(self) # Old-style superclass
553 self.wanted_copies = copies
554 self.successful_copies = 0
556 self.successful_copies_lock = threading.Lock()
557 self.pending_tries = copies
558 self.pending_tries_notification = threading.Condition()
560 def write_success(self, response, replicas_nr):
561 with self.successful_copies_lock:
562 self.successful_copies += replicas_nr
563 self.response = response
564 with self.pending_tries_notification:
565 self.pending_tries_notification.notify_all()
567 def write_fail(self, ks):
568 with self.pending_tries_notification:
569 self.pending_tries += 1
570 self.pending_tries_notification.notify()
572 def pending_copies(self):
573 with self.successful_copies_lock:
574 return self.wanted_copies - self.successful_copies
576 def get_next_task(self):
577 with self.pending_tries_notification:
579 if self.pending_copies() < 1:
580 # This notify_all() is unnecessary --
581 # write_success() already called notify_all()
582 # when pending<1 became true, so it's not
583 # possible for any other thread to be in
584 # wait() now -- but it's cheap insurance
585 # against deadlock so we do it anyway:
586 self.pending_tries_notification.notify_all()
587 # Drain the queue and then raise Queue.Empty
591 elif self.pending_tries > 0:
592 service, service_root = self.get_nowait()
593 if service.finished():
596 self.pending_tries -= 1
597 return service, service_root
599 self.pending_tries_notification.notify_all()
602 self.pending_tries_notification.wait()
605 class KeepWriterThreadPool(object):
606 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
607 self.total_task_nr = 0
608 self.wanted_copies = copies
609 if (not max_service_replicas) or (max_service_replicas >= copies):
612 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
613 _logger.debug("Pool max threads is %d", num_threads)
615 self.queue = KeepClient.KeepWriterQueue(copies)
617 for _ in range(num_threads):
618 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
619 self.workers.append(w)
621 def add_task(self, ks, service_root):
622 self.queue.put((ks, service_root))
623 self.total_task_nr += 1
626 return self.queue.successful_copies
630 for worker in self.workers:
632 # Wait for finished work
636 return self.queue.response
639 class KeepWriterThread(threading.Thread):
640 TaskFailed = RuntimeError()
642 def __init__(self, queue, data, data_hash, timeout=None):
643 super(KeepClient.KeepWriterThread, self).__init__()
644 self.timeout = timeout
647 self.data_hash = data_hash
653 service, service_root = self.queue.get_next_task()
657 locator, copies = self.do_task(service, service_root)
658 except Exception as e:
659 if e is not self.TaskFailed:
660 _logger.exception("Exception in KeepWriterThread")
661 self.queue.write_fail(service)
663 self.queue.write_success(locator, copies)
665 self.queue.task_done()
667 def do_task(self, service, service_root):
668 success = bool(service.put(self.data_hash,
670 timeout=self.timeout))
671 result = service.last_result()
674 if result.get('status_code', None):
675 _logger.debug("Request fail: PUT %s => %s %s",
677 result['status_code'],
679 raise self.TaskFailed
681 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
682 str(threading.current_thread()),
687 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
688 except (KeyError, ValueError):
691 return result['body'].strip(), replicas_stored
694 def __init__(self, api_client=None, proxy=None,
695 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
696 api_token=None, local_store=None, block_cache=None,
697 num_retries=0, session=None):
698 """Initialize a new KeepClient.
702 The API client to use to find Keep services. If not
703 provided, KeepClient will build one from available Arvados
707 If specified, this KeepClient will send requests to this Keep
708 proxy. Otherwise, KeepClient will fall back to the setting of the
709 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
710 If you want to KeepClient does not use a proxy, pass in an empty
714 The initial timeout (in seconds) for HTTP requests to Keep
715 non-proxy servers. A tuple of three floats is interpreted as
716 (connection_timeout, read_timeout, minimum_bandwidth). A connection
717 will be aborted if the average traffic rate falls below
718 minimum_bandwidth bytes per second over an interval of read_timeout
719 seconds. Because timeouts are often a result of transient server
720 load, the actual connection timeout will be increased by a factor
721 of two on each retry.
722 Default: (2, 256, 32768).
725 The initial timeout (in seconds) for HTTP requests to
726 Keep proxies. A tuple of three floats is interpreted as
727 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
728 described above for adjusting connection timeouts on retry also
730 Default: (20, 256, 32768).
733 If you're not using an API client, but only talking
734 directly to a Keep proxy, this parameter specifies an API token
735 to authenticate Keep requests. It is an error to specify both
736 api_client and api_token. If you specify neither, KeepClient
737 will use one available from the Arvados configuration.
740 If specified, this KeepClient will bypass Keep
741 services, and save data to the named directory. If unspecified,
742 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
743 environment variable. If you want to ensure KeepClient does not
744 use local storage, pass in an empty string. This is primarily
745 intended to mock a server for testing.
748 The default number of times to retry failed requests.
749 This will be used as the default num_retries value when get() and
750 put() are called. Default 0.
752 self.lock = threading.Lock()
754 if config.get('ARVADOS_KEEP_SERVICES'):
755 proxy = config.get('ARVADOS_KEEP_SERVICES')
757 proxy = config.get('ARVADOS_KEEP_PROXY')
758 if api_token is None:
759 if api_client is None:
760 api_token = config.get('ARVADOS_API_TOKEN')
762 api_token = api_client.api_token
763 elif api_client is not None:
765 "can't build KeepClient with both API client and token")
766 if local_store is None:
767 local_store = os.environ.get('KEEP_LOCAL_STORE')
769 if config.flag_is_true('ARVADOS_API_HOST_INSECURE'):
772 self.insecure = False
774 self.block_cache = block_cache if block_cache else KeepBlockCache()
775 self.timeout = timeout
776 self.proxy_timeout = proxy_timeout
777 self._user_agent_pool = queue.LifoQueue()
778 self.upload_counter = Counter()
779 self.download_counter = Counter()
780 self.put_counter = Counter()
781 self.get_counter = Counter()
782 self.hits_counter = Counter()
783 self.misses_counter = Counter()
786 self.local_store = local_store
787 self.get = self.local_store_get
788 self.put = self.local_store_put
790 self.num_retries = num_retries
791 self.max_replicas_per_service = None
793 proxy_uris = proxy.split()
794 for i in range(len(proxy_uris)):
795 if not proxy_uris[i].endswith('/'):
798 url = urllib.parse.urlparse(proxy_uris[i])
799 if not (url.scheme and url.netloc):
800 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
801 self.api_token = api_token
802 self._gateway_services = {}
803 self._keep_services = [{
804 'uuid': "00000-bi6l4-%015d" % idx,
805 'service_type': 'proxy',
806 '_service_root': uri,
807 } for idx, uri in enumerate(proxy_uris)]
808 self._writable_services = self._keep_services
809 self.using_proxy = True
810 self._static_services_list = True
812 # It's important to avoid instantiating an API client
813 # unless we actually need one, for testing's sake.
814 if api_client is None:
815 api_client = arvados.api('v1')
816 self.api_client = api_client
817 self.api_token = api_client.api_token
818 self._gateway_services = {}
819 self._keep_services = None
820 self._writable_services = None
821 self.using_proxy = None
822 self._static_services_list = False
824 def current_timeout(self, attempt_number):
825 """Return the appropriate timeout to use for this client.
827 The proxy timeout setting if the backend service is currently a proxy,
828 the regular timeout setting otherwise. The `attempt_number` indicates
829 how many times the operation has been tried already (starting from 0
830 for the first try), and scales the connection timeout portion of the
831 return value accordingly.
834 # TODO(twp): the timeout should be a property of a
835 # KeepService, not a KeepClient. See #4488.
836 t = self.proxy_timeout if self.using_proxy else self.timeout
838 return (t[0] * (1 << attempt_number), t[1])
840 return (t[0] * (1 << attempt_number), t[1], t[2])
841 def _any_nondisk_services(self, service_list):
842 return any(ks.get('service_type', 'disk') != 'disk'
843 for ks in service_list)
845 def build_services_list(self, force_rebuild=False):
846 if (self._static_services_list or
847 (self._keep_services and not force_rebuild)):
851 keep_services = self.api_client.keep_services().accessible()
852 except Exception: # API server predates Keep services.
853 keep_services = self.api_client.keep_disks().list()
855 # Gateway services are only used when specified by UUID,
856 # so there's nothing to gain by filtering them by
858 self._gateway_services = {ks['uuid']: ks for ks in
859 keep_services.execute()['items']}
860 if not self._gateway_services:
861 raise arvados.errors.NoKeepServersError()
863 # Precompute the base URI for each service.
864 for r in self._gateway_services.values():
865 host = r['service_host']
866 if not host.startswith('[') and host.find(':') >= 0:
867 # IPv6 URIs must be formatted like http://[::1]:80/...
868 host = '[' + host + ']'
869 r['_service_root'] = "{}://{}:{:d}/".format(
870 'https' if r['service_ssl_flag'] else 'http',
874 _logger.debug(str(self._gateway_services))
875 self._keep_services = [
876 ks for ks in self._gateway_services.values()
877 if not ks.get('service_type', '').startswith('gateway:')]
878 self._writable_services = [ks for ks in self._keep_services
879 if not ks.get('read_only')]
881 # For disk type services, max_replicas_per_service is 1
882 # It is unknown (unlimited) for other service types.
883 if self._any_nondisk_services(self._writable_services):
884 self.max_replicas_per_service = None
886 self.max_replicas_per_service = 1
888 def _service_weight(self, data_hash, service_uuid):
889 """Compute the weight of a Keep service endpoint for a data
890 block with a known hash.
892 The weight is md5(h + u) where u is the last 15 characters of
893 the service endpoint's UUID.
895 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
897 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
898 """Return an array of Keep service endpoints, in the order in
899 which they should be probed when reading or writing data with
900 the given hash+hints.
902 self.build_services_list(force_rebuild)
905 # Use the services indicated by the given +K@... remote
906 # service hints, if any are present and can be resolved to a
908 for hint in locator.hints:
909 if hint.startswith('K@'):
912 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
913 elif len(hint) == 29:
914 svc = self._gateway_services.get(hint[2:])
916 sorted_roots.append(svc['_service_root'])
918 # Sort the available local services by weight (heaviest first)
919 # for this locator, and return their service_roots (base URIs)
921 use_services = self._keep_services
923 use_services = self._writable_services
924 self.using_proxy = self._any_nondisk_services(use_services)
925 sorted_roots.extend([
926 svc['_service_root'] for svc in sorted(
929 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
930 _logger.debug("{}: {}".format(locator, sorted_roots))
933 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
934 # roots_map is a dictionary, mapping Keep service root strings
935 # to KeepService objects. Poll for Keep services, and add any
936 # new ones to roots_map. Return the current list of local
938 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
939 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
940 for root in local_roots:
941 if root not in roots_map:
942 roots_map[root] = self.KeepService(
943 root, self._user_agent_pool,
944 upload_counter=self.upload_counter,
945 download_counter=self.download_counter,
950 def _check_loop_result(result):
951 # KeepClient RetryLoops should save results as a 2-tuple: the
952 # actual result of the request, and the number of servers available
953 # to receive the request this round.
954 # This method returns True if there's a real result, False if
955 # there are no more servers available, otherwise None.
956 if isinstance(result, Exception):
958 result, tried_server_count = result
959 if (result is not None) and (result is not False):
961 elif tried_server_count < 1:
962 _logger.info("No more Keep services to try; giving up")
967 def get_from_cache(self, loc):
968 """Fetch a block only if is in the cache, otherwise return None."""
969 slot = self.block_cache.get(loc)
970 if slot is not None and slot.ready.is_set():
976 def head(self, loc_s, **kwargs):
977 return self._get_or_head(loc_s, method="HEAD", **kwargs)
980 def get(self, loc_s, **kwargs):
981 return self._get_or_head(loc_s, method="GET", **kwargs)
983 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None):
984 """Get data from Keep.
986 This method fetches one or more blocks of data from Keep. It
987 sends a request each Keep service registered with the API
988 server (or the proxy provided when this client was
989 instantiated), then each service named in location hints, in
990 sequence. As soon as one service provides the data, it's
994 * loc_s: A string of one or more comma-separated locators to fetch.
995 This method returns the concatenation of these blocks.
996 * num_retries: The number of times to retry GET requests to
997 *each* Keep server if it returns temporary failures, with
998 exponential backoff. Note that, in each loop, the method may try
999 to fetch data from every available Keep service, along with any
1000 that are named in location hints in the locator. The default value
1001 is set when the KeepClient is initialized.
1004 return ''.join(self.get(x) for x in loc_s.split(','))
1006 self.get_counter.add(1)
1011 locator = KeepLocator(loc_s)
1013 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1015 self.hits_counter.add(1)
1018 raise arvados.errors.KeepReadError(
1019 "failed to read {}".format(loc_s))
1022 self.misses_counter.add(1)
1025 'X-Request-Id': (request_id or
1026 (hasattr(self, 'api_client') and self.api_client.request_id) or
1027 arvados.util.new_request_id()),
1030 # If the locator has hints specifying a prefix (indicating a
1031 # remote keepproxy) or the UUID of a local gateway service,
1032 # read data from the indicated service(s) instead of the usual
1033 # list of local disk services.
1034 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1035 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1036 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1037 for hint in locator.hints if (
1038 hint.startswith('K@') and
1040 self._gateway_services.get(hint[2:])
1042 # Map root URLs to their KeepService objects.
1044 root: self.KeepService(root, self._user_agent_pool,
1045 upload_counter=self.upload_counter,
1046 download_counter=self.download_counter,
1048 for root in hint_roots
1051 # See #3147 for a discussion of the loop implementation. Highlights:
1052 # * Refresh the list of Keep services after each failure, in case
1053 # it's being updated.
1054 # * Retry until we succeed, we're out of retries, or every available
1055 # service has returned permanent failure.
1058 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1060 for tries_left in loop:
1062 sorted_roots = self.map_new_services(
1064 force_rebuild=(tries_left < num_retries),
1065 need_writable=False,
1067 except Exception as error:
1068 loop.save_result(error)
1071 # Query KeepService objects that haven't returned
1072 # permanent failure, in our specified shuffle order.
1073 services_to_try = [roots_map[root]
1074 for root in sorted_roots
1075 if roots_map[root].usable()]
1076 for keep_service in services_to_try:
1077 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1078 if blob is not None:
1080 loop.save_result((blob, len(services_to_try)))
1082 # Always cache the result, then return it if we succeeded.
1084 if method == "HEAD":
1089 if slot is not None:
1091 self.block_cache.cap_cache()
1093 # Q: Including 403 is necessary for the Keep tests to continue
1094 # passing, but maybe they should expect KeepReadError instead?
1095 not_founds = sum(1 for key in sorted_roots
1096 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1097 service_errors = ((key, roots_map[key].last_result()['error'])
1098 for key in sorted_roots)
1100 raise arvados.errors.KeepReadError(
1101 "failed to read {}: no Keep services available ({})".format(
1102 loc_s, loop.last_result()))
1103 elif not_founds == len(sorted_roots):
1104 raise arvados.errors.NotFoundError(
1105 "{} not found".format(loc_s), service_errors)
1107 raise arvados.errors.KeepReadError(
1108 "failed to read {}".format(loc_s), service_errors, label="service")
1111 def put(self, data, copies=2, num_retries=None, request_id=None):
1112 """Save data in Keep.
1114 This method will get a list of Keep services from the API server, and
1115 send the data to each one simultaneously in a new thread. Once the
1116 uploads are finished, if enough copies are saved, this method returns
1117 the most recent HTTP response body. If requests fail to upload
1118 enough copies, this method raises KeepWriteError.
1121 * data: The string of data to upload.
1122 * copies: The number of copies that the user requires be saved.
1124 * num_retries: The number of times to retry PUT requests to
1125 *each* Keep server if it returns temporary failures, with
1126 exponential backoff. The default value is set when the
1127 KeepClient is initialized.
1130 if not isinstance(data, bytes):
1131 data = data.encode()
1133 self.put_counter.add(1)
1135 data_hash = hashlib.md5(data).hexdigest()
1136 loc_s = data_hash + '+' + str(len(data))
1139 locator = KeepLocator(loc_s)
1142 'X-Request-Id': (request_id or
1143 (hasattr(self, 'api_client') and self.api_client.request_id) or
1144 arvados.util.new_request_id()),
1145 'X-Keep-Desired-Replicas': str(copies),
1148 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1151 for tries_left in loop:
1153 sorted_roots = self.map_new_services(
1155 force_rebuild=(tries_left < num_retries),
1158 except Exception as error:
1159 loop.save_result(error)
1162 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1163 data_hash=data_hash,
1164 copies=copies - done,
1165 max_service_replicas=self.max_replicas_per_service,
1166 timeout=self.current_timeout(num_retries - tries_left))
1167 for service_root, ks in [(root, roots_map[root])
1168 for root in sorted_roots]:
1171 writer_pool.add_task(ks, service_root)
1173 done += writer_pool.done()
1174 loop.save_result((done >= copies, writer_pool.total_task_nr))
1177 return writer_pool.response()
1179 raise arvados.errors.KeepWriteError(
1180 "failed to write {}: no Keep services available ({})".format(
1181 data_hash, loop.last_result()))
1183 service_errors = ((key, roots_map[key].last_result()['error'])
1184 for key in sorted_roots
1185 if roots_map[key].last_result()['error'])
1186 raise arvados.errors.KeepWriteError(
1187 "failed to write {} (wanted {} copies but wrote {})".format(
1188 data_hash, copies, writer_pool.done()), service_errors, label="service")
1190 def local_store_put(self, data, copies=1, num_retries=None):
1191 """A stub for put().
1193 This method is used in place of the real put() method when
1194 using local storage (see constructor's local_store argument).
1196 copies and num_retries arguments are ignored: they are here
1197 only for the sake of offering the same call signature as
1200 Data stored this way can be retrieved via local_store_get().
1202 md5 = hashlib.md5(data).hexdigest()
1203 locator = '%s+%d' % (md5, len(data))
1204 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1206 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1207 os.path.join(self.local_store, md5))
1210 def local_store_get(self, loc_s, num_retries=None):
1211 """Companion to local_store_put()."""
1213 locator = KeepLocator(loc_s)
1215 raise arvados.errors.NotFoundError(
1216 "Invalid data locator: '%s'" % loc_s)
1217 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1219 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1222 def is_cached(self, locator):
1223 return self.block_cache.reserve_cache(expect_hash)