1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 from future.utils import native_str
9 standard_library.install_aliases()
10 from builtins import next
11 from builtins import str
12 from builtins import range
13 from builtins import object
31 if sys.version_info >= (3, 0):
32 from io import BytesIO
34 from cStringIO import StringIO as BytesIO
37 import arvados.config as config
39 import arvados.retry as retry
42 _logger = logging.getLogger('arvados.keep')
43 global_client_object = None
46 # Monkey patch TCP constants when not available (apple). Values sourced from:
47 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
48 if sys.platform == 'darwin':
49 if not hasattr(socket, 'TCP_KEEPALIVE'):
50 socket.TCP_KEEPALIVE = 0x010
51 if not hasattr(socket, 'TCP_KEEPINTVL'):
52 socket.TCP_KEEPINTVL = 0x101
53 if not hasattr(socket, 'TCP_KEEPCNT'):
54 socket.TCP_KEEPCNT = 0x102
57 class KeepLocator(object):
58 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
59 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
61 def __init__(self, locator_str):
64 self._perm_expiry = None
65 pieces = iter(locator_str.split('+'))
66 self.md5sum = next(pieces)
68 self.size = int(next(pieces))
72 if self.HINT_RE.match(hint) is None:
73 raise ValueError("invalid hint format: {}".format(hint))
74 elif hint.startswith('A'):
75 self.parse_permission_hint(hint)
77 self.hints.append(hint)
82 for s in [self.md5sum, self.size,
83 self.permission_hint()] + self.hints
87 if self.size is not None:
88 return "%s+%i" % (self.md5sum, self.size)
92 def _make_hex_prop(name, length):
93 # Build and return a new property with the given name that
94 # must be a hex string of the given length.
95 data_name = '_{}'.format(name)
97 return getattr(self, data_name)
98 def setter(self, hex_str):
99 if not arvados.util.is_hex(hex_str, length):
100 raise ValueError("{} is not a {}-digit hex string: {!r}".
101 format(name, length, hex_str))
102 setattr(self, data_name, hex_str)
103 return property(getter, setter)
105 md5sum = _make_hex_prop('md5sum', 32)
106 perm_sig = _make_hex_prop('perm_sig', 40)
109 def perm_expiry(self):
110 return self._perm_expiry
113 def perm_expiry(self, value):
114 if not arvados.util.is_hex(value, 1, 8):
116 "permission timestamp must be a hex Unix timestamp: {}".
118 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
120 def permission_hint(self):
121 data = [self.perm_sig, self.perm_expiry]
124 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
125 return "A{}@{:08x}".format(*data)
127 def parse_permission_hint(self, s):
129 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
131 raise ValueError("bad permission hint {}".format(s))
133 def permission_expired(self, as_of_dt=None):
134 if self.perm_expiry is None:
136 elif as_of_dt is None:
137 as_of_dt = datetime.datetime.now()
138 return self.perm_expiry <= as_of_dt
142 """Simple interface to a global KeepClient object.
144 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
145 own API client. The global KeepClient will build an API client from the
146 current Arvados configuration, which may not match the one you built.
151 def global_client_object(cls):
152 global global_client_object
153 # Previously, KeepClient would change its behavior at runtime based
154 # on these configuration settings. We simulate that behavior here
155 # by checking the values and returning a new KeepClient if any of
157 key = (config.get('ARVADOS_API_HOST'),
158 config.get('ARVADOS_API_TOKEN'),
159 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
160 config.get('ARVADOS_KEEP_PROXY'),
161 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
162 os.environ.get('KEEP_LOCAL_STORE'))
163 if (global_client_object is None) or (cls._last_key != key):
164 global_client_object = KeepClient()
166 return global_client_object
169 def get(locator, **kwargs):
170 return Keep.global_client_object().get(locator, **kwargs)
173 def put(data, **kwargs):
174 return Keep.global_client_object().put(data, **kwargs)
176 class KeepBlockCache(object):
177 # Default RAM cache is 256MiB
178 def __init__(self, cache_max=(256 * 1024 * 1024)):
179 self.cache_max = cache_max
181 self._cache_lock = threading.Lock()
183 class CacheSlot(object):
184 __slots__ = ("locator", "ready", "content")
186 def __init__(self, locator):
187 self.locator = locator
188 self.ready = threading.Event()
195 def set(self, value):
200 if self.content is None:
203 return len(self.content)
206 '''Cap the cache size to self.cache_max'''
207 with self._cache_lock:
208 # Select all slots except those where ready.is_set() and content is
209 # None (that means there was an error reading the block).
210 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
211 sm = sum([slot.size() for slot in self._cache])
212 while len(self._cache) > 0 and sm > self.cache_max:
213 for i in range(len(self._cache)-1, -1, -1):
214 if self._cache[i].ready.is_set():
217 sm = sum([slot.size() for slot in self._cache])
219 def _get(self, locator):
220 # Test if the locator is already in the cache
221 for i in range(0, len(self._cache)):
222 if self._cache[i].locator == locator:
225 # move it to the front
227 self._cache.insert(0, n)
231 def get(self, locator):
232 with self._cache_lock:
233 return self._get(locator)
235 def reserve_cache(self, locator):
236 '''Reserve a cache slot for the specified locator,
237 or return the existing slot.'''
238 with self._cache_lock:
239 n = self._get(locator)
243 # Add a new cache slot for the locator
244 n = KeepBlockCache.CacheSlot(locator)
245 self._cache.insert(0, n)
248 class Counter(object):
249 def __init__(self, v=0):
250 self._lk = threading.Lock()
262 class KeepClient(object):
264 # Default Keep server connection timeout: 2 seconds
265 # Default Keep server read timeout: 256 seconds
266 # Default Keep server bandwidth minimum: 32768 bytes per second
267 # Default Keep proxy connection timeout: 20 seconds
268 # Default Keep proxy read timeout: 256 seconds
269 # Default Keep proxy bandwidth minimum: 32768 bytes per second
270 DEFAULT_TIMEOUT = (2, 256, 32768)
271 DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
274 class KeepService(object):
275 """Make requests to a single Keep service, and track results.
277 A KeepService is intended to last long enough to perform one
278 transaction (GET or PUT) against one Keep service. This can
279 involve calling either get() or put() multiple times in order
280 to retry after transient failures. However, calling both get()
281 and put() on a single instance -- or using the same instance
282 to access two different Keep services -- will not produce
289 arvados.errors.HttpError,
292 def __init__(self, root, user_agent_pool=queue.LifoQueue(),
294 download_counter=None, **headers):
296 self._user_agent_pool = user_agent_pool
297 self._result = {'error': None}
301 self.get_headers = {'Accept': 'application/octet-stream'}
302 self.get_headers.update(headers)
303 self.put_headers = headers
304 self.upload_counter = upload_counter
305 self.download_counter = download_counter
308 """Is it worth attempting a request?"""
312 """Did the request succeed or encounter permanent failure?"""
313 return self._result['error'] == False or not self._usable
315 def last_result(self):
318 def _get_user_agent(self):
320 return self._user_agent_pool.get(block=False)
324 def _put_user_agent(self, ua):
327 self._user_agent_pool.put(ua, block=False)
331 def _socket_open(self, *args, **kwargs):
332 if len(args) + len(kwargs) == 2:
333 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
335 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
337 def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
338 return self._socket_open_pycurl_7_21_5(
340 address=collections.namedtuple(
341 'Address', ['family', 'socktype', 'protocol', 'addr'],
342 )(family, socktype, protocol, address))
344 def _socket_open_pycurl_7_21_5(self, purpose, address):
345 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
346 s = socket.socket(address.family, address.socktype, address.protocol)
347 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
348 # Will throw invalid protocol error on mac. This test prevents that.
349 if hasattr(socket, 'TCP_KEEPIDLE'):
350 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
351 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
355 def get(self, locator, method="GET", timeout=None):
356 # locator is a KeepLocator object.
357 url = self.root + str(locator)
358 _logger.debug("Request: %s %s", method, url)
359 curl = self._get_user_agent()
362 with timer.Timer() as t:
364 response_body = BytesIO()
365 curl.setopt(pycurl.NOSIGNAL, 1)
366 curl.setopt(pycurl.OPENSOCKETFUNCTION,
367 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
368 curl.setopt(pycurl.URL, url.encode('utf-8'))
369 curl.setopt(pycurl.HTTPHEADER, [
370 '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
371 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
372 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
374 curl.setopt(pycurl.NOBODY, True)
375 self._setcurltimeouts(curl, timeout)
379 except Exception as e:
380 raise arvados.errors.HttpError(0, str(e))
386 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
387 'body': response_body.getvalue(),
388 'headers': self._headers,
392 ok = retry.check_http_response_success(self._result['status_code'])
394 self._result['error'] = arvados.errors.HttpError(
395 self._result['status_code'],
396 self._headers.get('x-status-line', 'Error'))
397 except self.HTTP_ERRORS as e:
401 self._usable = ok != False
402 if self._result.get('status_code', None):
403 # The client worked well enough to get an HTTP status
404 # code, so presumably any problems are just on the
405 # server side and it's OK to reuse the client.
406 self._put_user_agent(curl)
408 # Don't return this client to the pool, in case it's
412 _logger.debug("Request fail: GET %s => %s: %s",
413 url, type(self._result['error']), str(self._result['error']))
416 _logger.info("HEAD %s: %s bytes",
417 self._result['status_code'],
418 self._result.get('content-length'))
421 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
422 self._result['status_code'],
423 len(self._result['body']),
425 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
427 if self.download_counter:
428 self.download_counter.add(len(self._result['body']))
429 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
430 if resp_md5 != locator.md5sum:
431 _logger.warning("Checksum fail: md5(%s) = %s",
433 self._result['error'] = arvados.errors.HttpError(
436 return self._result['body']
438 def put(self, hash_s, body, timeout=None):
439 url = self.root + hash_s
440 _logger.debug("Request: PUT %s", url)
441 curl = self._get_user_agent()
444 with timer.Timer() as t:
446 body_reader = BytesIO(body)
447 response_body = BytesIO()
448 curl.setopt(pycurl.NOSIGNAL, 1)
449 curl.setopt(pycurl.OPENSOCKETFUNCTION,
450 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
451 curl.setopt(pycurl.URL, url.encode('utf-8'))
452 # Using UPLOAD tells cURL to wait for a "go ahead" from the
453 # Keep server (in the form of a HTTP/1.1 "100 Continue"
454 # response) instead of sending the request body immediately.
455 # This allows the server to reject the request if the request
456 # is invalid or the server is read-only, without waiting for
457 # the client to send the entire block.
458 curl.setopt(pycurl.UPLOAD, True)
459 curl.setopt(pycurl.INFILESIZE, len(body))
460 curl.setopt(pycurl.READFUNCTION, body_reader.read)
461 curl.setopt(pycurl.HTTPHEADER, [
462 '{}: {}'.format(k,v) for k,v in self.put_headers.items()])
463 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
464 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
465 self._setcurltimeouts(curl, timeout)
468 except Exception as e:
469 raise arvados.errors.HttpError(0, str(e))
475 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
476 'body': response_body.getvalue().decode('utf-8'),
477 'headers': self._headers,
480 ok = retry.check_http_response_success(self._result['status_code'])
482 self._result['error'] = arvados.errors.HttpError(
483 self._result['status_code'],
484 self._headers.get('x-status-line', 'Error'))
485 except self.HTTP_ERRORS as e:
489 self._usable = ok != False # still usable if ok is True or None
490 if self._result.get('status_code', None):
491 # Client is functional. See comment in get().
492 self._put_user_agent(curl)
496 _logger.debug("Request fail: PUT %s => %s: %s",
497 url, type(self._result['error']), str(self._result['error']))
499 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
500 self._result['status_code'],
503 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
504 if self.upload_counter:
505 self.upload_counter.add(len(body))
508 def _setcurltimeouts(self, curl, timeouts):
511 elif isinstance(timeouts, tuple):
512 if len(timeouts) == 2:
513 conn_t, xfer_t = timeouts
514 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
516 conn_t, xfer_t, bandwidth_bps = timeouts
518 conn_t, xfer_t = (timeouts, timeouts)
519 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
520 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
521 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
522 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
524 def _headerfunction(self, header_line):
525 if isinstance(header_line, bytes):
526 header_line = header_line.decode('iso-8859-1')
527 if ':' in header_line:
528 name, value = header_line.split(':', 1)
529 name = name.strip().lower()
530 value = value.strip()
532 name = self._lastheadername
533 value = self._headers[name] + ' ' + header_line.strip()
534 elif header_line.startswith('HTTP/'):
535 name = 'x-status-line'
538 _logger.error("Unexpected header line: %s", header_line)
540 self._lastheadername = name
541 self._headers[name] = value
542 # Returning None implies all bytes were written
545 class KeepWriterQueue(queue.Queue):
546 def __init__(self, copies):
547 queue.Queue.__init__(self) # Old-style superclass
548 self.wanted_copies = copies
549 self.successful_copies = 0
551 self.successful_copies_lock = threading.Lock()
552 self.pending_tries = copies
553 self.pending_tries_notification = threading.Condition()
555 def write_success(self, response, replicas_nr):
556 with self.successful_copies_lock:
557 self.successful_copies += replicas_nr
558 self.response = response
559 with self.pending_tries_notification:
560 self.pending_tries_notification.notify_all()
562 def write_fail(self, ks):
563 with self.pending_tries_notification:
564 self.pending_tries += 1
565 self.pending_tries_notification.notify()
567 def pending_copies(self):
568 with self.successful_copies_lock:
569 return self.wanted_copies - self.successful_copies
571 def get_next_task(self):
572 with self.pending_tries_notification:
574 if self.pending_copies() < 1:
575 # This notify_all() is unnecessary --
576 # write_success() already called notify_all()
577 # when pending<1 became true, so it's not
578 # possible for any other thread to be in
579 # wait() now -- but it's cheap insurance
580 # against deadlock so we do it anyway:
581 self.pending_tries_notification.notify_all()
582 # Drain the queue and then raise Queue.Empty
586 elif self.pending_tries > 0:
587 service, service_root = self.get_nowait()
588 if service.finished():
591 self.pending_tries -= 1
592 return service, service_root
594 self.pending_tries_notification.notify_all()
597 self.pending_tries_notification.wait()
600 class KeepWriterThreadPool(object):
601 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
602 self.total_task_nr = 0
603 self.wanted_copies = copies
604 if (not max_service_replicas) or (max_service_replicas >= copies):
607 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
608 _logger.debug("Pool max threads is %d", num_threads)
610 self.queue = KeepClient.KeepWriterQueue(copies)
612 for _ in range(num_threads):
613 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
614 self.workers.append(w)
616 def add_task(self, ks, service_root):
617 self.queue.put((ks, service_root))
618 self.total_task_nr += 1
621 return self.queue.successful_copies
625 for worker in self.workers:
627 # Wait for finished work
631 return self.queue.response
634 class KeepWriterThread(threading.Thread):
635 TaskFailed = RuntimeError()
637 def __init__(self, queue, data, data_hash, timeout=None):
638 super(KeepClient.KeepWriterThread, self).__init__()
639 self.timeout = timeout
642 self.data_hash = data_hash
648 service, service_root = self.queue.get_next_task()
652 locator, copies = self.do_task(service, service_root)
653 except Exception as e:
654 if e is not self.TaskFailed:
655 _logger.exception("Exception in KeepWriterThread")
656 self.queue.write_fail(service)
658 self.queue.write_success(locator, copies)
660 self.queue.task_done()
662 def do_task(self, service, service_root):
663 success = bool(service.put(self.data_hash,
665 timeout=self.timeout))
666 result = service.last_result()
669 if result.get('status_code', None):
670 _logger.debug("Request fail: PUT %s => %s %s",
672 result['status_code'],
674 raise self.TaskFailed
676 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
677 str(threading.current_thread()),
682 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
683 except (KeyError, ValueError):
686 return result['body'].strip(), replicas_stored
689 def __init__(self, api_client=None, proxy=None,
690 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
691 api_token=None, local_store=None, block_cache=None,
692 num_retries=0, session=None):
693 """Initialize a new KeepClient.
697 The API client to use to find Keep services. If not
698 provided, KeepClient will build one from available Arvados
702 If specified, this KeepClient will send requests to this Keep
703 proxy. Otherwise, KeepClient will fall back to the setting of the
704 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
705 If you want to KeepClient does not use a proxy, pass in an empty
709 The initial timeout (in seconds) for HTTP requests to Keep
710 non-proxy servers. A tuple of three floats is interpreted as
711 (connection_timeout, read_timeout, minimum_bandwidth). A connection
712 will be aborted if the average traffic rate falls below
713 minimum_bandwidth bytes per second over an interval of read_timeout
714 seconds. Because timeouts are often a result of transient server
715 load, the actual connection timeout will be increased by a factor
716 of two on each retry.
717 Default: (2, 256, 32768).
720 The initial timeout (in seconds) for HTTP requests to
721 Keep proxies. A tuple of three floats is interpreted as
722 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
723 described above for adjusting connection timeouts on retry also
725 Default: (20, 256, 32768).
728 If you're not using an API client, but only talking
729 directly to a Keep proxy, this parameter specifies an API token
730 to authenticate Keep requests. It is an error to specify both
731 api_client and api_token. If you specify neither, KeepClient
732 will use one available from the Arvados configuration.
735 If specified, this KeepClient will bypass Keep
736 services, and save data to the named directory. If unspecified,
737 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
738 environment variable. If you want to ensure KeepClient does not
739 use local storage, pass in an empty string. This is primarily
740 intended to mock a server for testing.
743 The default number of times to retry failed requests.
744 This will be used as the default num_retries value when get() and
745 put() are called. Default 0.
747 self.lock = threading.Lock()
749 if config.get('ARVADOS_KEEP_SERVICES'):
750 proxy = config.get('ARVADOS_KEEP_SERVICES')
752 proxy = config.get('ARVADOS_KEEP_PROXY')
753 if api_token is None:
754 if api_client is None:
755 api_token = config.get('ARVADOS_API_TOKEN')
757 api_token = api_client.api_token
758 elif api_client is not None:
760 "can't build KeepClient with both API client and token")
761 if local_store is None:
762 local_store = os.environ.get('KEEP_LOCAL_STORE')
764 self.block_cache = block_cache if block_cache else KeepBlockCache()
765 self.timeout = timeout
766 self.proxy_timeout = proxy_timeout
767 self._user_agent_pool = queue.LifoQueue()
768 self.upload_counter = Counter()
769 self.download_counter = Counter()
770 self.put_counter = Counter()
771 self.get_counter = Counter()
772 self.hits_counter = Counter()
773 self.misses_counter = Counter()
776 self.local_store = local_store
777 self.get = self.local_store_get
778 self.put = self.local_store_put
780 self.num_retries = num_retries
781 self.max_replicas_per_service = None
783 proxy_uris = proxy.split()
784 for i in range(len(proxy_uris)):
785 if not proxy_uris[i].endswith('/'):
788 url = urllib.parse.urlparse(proxy_uris[i])
789 if not (url.scheme and url.netloc):
790 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
791 self.api_token = api_token
792 self._gateway_services = {}
793 self._keep_services = [{
794 'uuid': "00000-bi6l4-%015d" % idx,
795 'service_type': 'proxy',
796 '_service_root': uri,
797 } for idx, uri in enumerate(proxy_uris)]
798 self._writable_services = self._keep_services
799 self.using_proxy = True
800 self._static_services_list = True
802 # It's important to avoid instantiating an API client
803 # unless we actually need one, for testing's sake.
804 if api_client is None:
805 api_client = arvados.api('v1')
806 self.api_client = api_client
807 self.api_token = api_client.api_token
808 self._gateway_services = {}
809 self._keep_services = None
810 self._writable_services = None
811 self.using_proxy = None
812 self._static_services_list = False
814 def current_timeout(self, attempt_number):
815 """Return the appropriate timeout to use for this client.
817 The proxy timeout setting if the backend service is currently a proxy,
818 the regular timeout setting otherwise. The `attempt_number` indicates
819 how many times the operation has been tried already (starting from 0
820 for the first try), and scales the connection timeout portion of the
821 return value accordingly.
824 # TODO(twp): the timeout should be a property of a
825 # KeepService, not a KeepClient. See #4488.
826 t = self.proxy_timeout if self.using_proxy else self.timeout
828 return (t[0] * (1 << attempt_number), t[1])
830 return (t[0] * (1 << attempt_number), t[1], t[2])
831 def _any_nondisk_services(self, service_list):
832 return any(ks.get('service_type', 'disk') != 'disk'
833 for ks in service_list)
835 def build_services_list(self, force_rebuild=False):
836 if (self._static_services_list or
837 (self._keep_services and not force_rebuild)):
841 keep_services = self.api_client.keep_services().accessible()
842 except Exception: # API server predates Keep services.
843 keep_services = self.api_client.keep_disks().list()
845 # Gateway services are only used when specified by UUID,
846 # so there's nothing to gain by filtering them by
848 self._gateway_services = {ks['uuid']: ks for ks in
849 keep_services.execute()['items']}
850 if not self._gateway_services:
851 raise arvados.errors.NoKeepServersError()
853 # Precompute the base URI for each service.
854 for r in self._gateway_services.values():
855 host = r['service_host']
856 if not host.startswith('[') and host.find(':') >= 0:
857 # IPv6 URIs must be formatted like http://[::1]:80/...
858 host = '[' + host + ']'
859 r['_service_root'] = "{}://{}:{:d}/".format(
860 'https' if r['service_ssl_flag'] else 'http',
864 _logger.debug(str(self._gateway_services))
865 self._keep_services = [
866 ks for ks in self._gateway_services.values()
867 if not ks.get('service_type', '').startswith('gateway:')]
868 self._writable_services = [ks for ks in self._keep_services
869 if not ks.get('read_only')]
871 # For disk type services, max_replicas_per_service is 1
872 # It is unknown (unlimited) for other service types.
873 if self._any_nondisk_services(self._writable_services):
874 self.max_replicas_per_service = None
876 self.max_replicas_per_service = 1
878 def _service_weight(self, data_hash, service_uuid):
879 """Compute the weight of a Keep service endpoint for a data
880 block with a known hash.
882 The weight is md5(h + u) where u is the last 15 characters of
883 the service endpoint's UUID.
885 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
887 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
888 """Return an array of Keep service endpoints, in the order in
889 which they should be probed when reading or writing data with
890 the given hash+hints.
892 self.build_services_list(force_rebuild)
895 # Use the services indicated by the given +K@... remote
896 # service hints, if any are present and can be resolved to a
898 for hint in locator.hints:
899 if hint.startswith('K@'):
902 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
903 elif len(hint) == 29:
904 svc = self._gateway_services.get(hint[2:])
906 sorted_roots.append(svc['_service_root'])
908 # Sort the available local services by weight (heaviest first)
909 # for this locator, and return their service_roots (base URIs)
911 use_services = self._keep_services
913 use_services = self._writable_services
914 self.using_proxy = self._any_nondisk_services(use_services)
915 sorted_roots.extend([
916 svc['_service_root'] for svc in sorted(
919 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
920 _logger.debug("{}: {}".format(locator, sorted_roots))
923 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
924 # roots_map is a dictionary, mapping Keep service root strings
925 # to KeepService objects. Poll for Keep services, and add any
926 # new ones to roots_map. Return the current list of local
928 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
929 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
930 for root in local_roots:
931 if root not in roots_map:
932 roots_map[root] = self.KeepService(
933 root, self._user_agent_pool,
934 upload_counter=self.upload_counter,
935 download_counter=self.download_counter,
940 def _check_loop_result(result):
941 # KeepClient RetryLoops should save results as a 2-tuple: the
942 # actual result of the request, and the number of servers available
943 # to receive the request this round.
944 # This method returns True if there's a real result, False if
945 # there are no more servers available, otherwise None.
946 if isinstance(result, Exception):
948 result, tried_server_count = result
949 if (result is not None) and (result is not False):
951 elif tried_server_count < 1:
952 _logger.info("No more Keep services to try; giving up")
957 def get_from_cache(self, loc):
958 """Fetch a block only if is in the cache, otherwise return None."""
959 slot = self.block_cache.get(loc)
960 if slot is not None and slot.ready.is_set():
966 def head(self, loc_s, num_retries=None):
967 return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
970 def get(self, loc_s, num_retries=None):
971 return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
973 def _get_or_head(self, loc_s, method="GET", num_retries=None):
974 """Get data from Keep.
976 This method fetches one or more blocks of data from Keep. It
977 sends a request each Keep service registered with the API
978 server (or the proxy provided when this client was
979 instantiated), then each service named in location hints, in
980 sequence. As soon as one service provides the data, it's
984 * loc_s: A string of one or more comma-separated locators to fetch.
985 This method returns the concatenation of these blocks.
986 * num_retries: The number of times to retry GET requests to
987 *each* Keep server if it returns temporary failures, with
988 exponential backoff. Note that, in each loop, the method may try
989 to fetch data from every available Keep service, along with any
990 that are named in location hints in the locator. The default value
991 is set when the KeepClient is initialized.
994 return ''.join(self.get(x) for x in loc_s.split(','))
996 self.get_counter.add(1)
998 locator = KeepLocator(loc_s)
1000 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1002 self.hits_counter.add(1)
1006 self.misses_counter.add(1)
1008 # If the locator has hints specifying a prefix (indicating a
1009 # remote keepproxy) or the UUID of a local gateway service,
1010 # read data from the indicated service(s) instead of the usual
1011 # list of local disk services.
1012 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1013 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1014 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1015 for hint in locator.hints if (
1016 hint.startswith('K@') and
1018 self._gateway_services.get(hint[2:])
1020 # Map root URLs to their KeepService objects.
1022 root: self.KeepService(root, self._user_agent_pool,
1023 upload_counter=self.upload_counter,
1024 download_counter=self.download_counter)
1025 for root in hint_roots
1028 # See #3147 for a discussion of the loop implementation. Highlights:
1029 # * Refresh the list of Keep services after each failure, in case
1030 # it's being updated.
1031 # * Retry until we succeed, we're out of retries, or every available
1032 # service has returned permanent failure.
1036 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1038 for tries_left in loop:
1040 sorted_roots = self.map_new_services(
1042 force_rebuild=(tries_left < num_retries),
1043 need_writable=False)
1044 except Exception as error:
1045 loop.save_result(error)
1048 # Query KeepService objects that haven't returned
1049 # permanent failure, in our specified shuffle order.
1050 services_to_try = [roots_map[root]
1051 for root in sorted_roots
1052 if roots_map[root].usable()]
1053 for keep_service in services_to_try:
1054 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1055 if blob is not None:
1057 loop.save_result((blob, len(services_to_try)))
1059 # Always cache the result, then return it if we succeeded.
1062 self.block_cache.cap_cache()
1064 if method == "HEAD":
1069 # Q: Including 403 is necessary for the Keep tests to continue
1070 # passing, but maybe they should expect KeepReadError instead?
1071 not_founds = sum(1 for key in sorted_roots
1072 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1073 service_errors = ((key, roots_map[key].last_result()['error'])
1074 for key in sorted_roots)
1076 raise arvados.errors.KeepReadError(
1077 "failed to read {}: no Keep services available ({})".format(
1078 loc_s, loop.last_result()))
1079 elif not_founds == len(sorted_roots):
1080 raise arvados.errors.NotFoundError(
1081 "{} not found".format(loc_s), service_errors)
1083 raise arvados.errors.KeepReadError(
1084 "failed to read {}".format(loc_s), service_errors, label="service")
1087 def put(self, data, copies=2, num_retries=None):
1088 """Save data in Keep.
1090 This method will get a list of Keep services from the API server, and
1091 send the data to each one simultaneously in a new thread. Once the
1092 uploads are finished, if enough copies are saved, this method returns
1093 the most recent HTTP response body. If requests fail to upload
1094 enough copies, this method raises KeepWriteError.
1097 * data: The string of data to upload.
1098 * copies: The number of copies that the user requires be saved.
1100 * num_retries: The number of times to retry PUT requests to
1101 *each* Keep server if it returns temporary failures, with
1102 exponential backoff. The default value is set when the
1103 KeepClient is initialized.
1106 if not isinstance(data, bytes):
1107 data = data.encode()
1109 self.put_counter.add(1)
1111 data_hash = hashlib.md5(data).hexdigest()
1112 loc_s = data_hash + '+' + str(len(data))
1115 locator = KeepLocator(loc_s)
1118 # Tell the proxy how many copies we want it to store
1119 headers['X-Keep-Desired-Replicas'] = str(copies)
1121 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1124 for tries_left in loop:
1126 sorted_roots = self.map_new_services(
1128 force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
1129 except Exception as error:
1130 loop.save_result(error)
1133 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1134 data_hash=data_hash,
1135 copies=copies - done,
1136 max_service_replicas=self.max_replicas_per_service,
1137 timeout=self.current_timeout(num_retries - tries_left))
1138 for service_root, ks in [(root, roots_map[root])
1139 for root in sorted_roots]:
1142 writer_pool.add_task(ks, service_root)
1144 done += writer_pool.done()
1145 loop.save_result((done >= copies, writer_pool.total_task_nr))
1148 return writer_pool.response()
1150 raise arvados.errors.KeepWriteError(
1151 "failed to write {}: no Keep services available ({})".format(
1152 data_hash, loop.last_result()))
1154 service_errors = ((key, roots_map[key].last_result()['error'])
1155 for key in sorted_roots
1156 if roots_map[key].last_result()['error'])
1157 raise arvados.errors.KeepWriteError(
1158 "failed to write {} (wanted {} copies but wrote {})".format(
1159 data_hash, copies, writer_pool.done()), service_errors, label="service")
1161 def local_store_put(self, data, copies=1, num_retries=None):
1162 """A stub for put().
1164 This method is used in place of the real put() method when
1165 using local storage (see constructor's local_store argument).
1167 copies and num_retries arguments are ignored: they are here
1168 only for the sake of offering the same call signature as
1171 Data stored this way can be retrieved via local_store_get().
1173 md5 = hashlib.md5(data).hexdigest()
1174 locator = '%s+%d' % (md5, len(data))
1175 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1177 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1178 os.path.join(self.local_store, md5))
1181 def local_store_get(self, loc_s, num_retries=None):
1182 """Companion to local_store_put()."""
1184 locator = KeepLocator(loc_s)
1186 raise arvados.errors.NotFoundError(
1187 "Invalid data locator: '%s'" % loc_s)
1188 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1190 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1193 def is_cached(self, locator):
1194 return self.block_cache.reserve_cache(expect_hash)