1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 from future.utils import native_str
9 standard_library.install_aliases()
10 from builtins import next
11 from builtins import str
12 from builtins import range
13 from builtins import object
31 if sys.version_info >= (3, 0):
32 from io import BytesIO
34 from cStringIO import StringIO as BytesIO
37 import arvados.config as config
39 import arvados.retry as retry
42 _logger = logging.getLogger('arvados.keep')
43 global_client_object = None
46 # Monkey patch TCP constants when not available (apple). Values sourced from:
47 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
48 if sys.platform == 'darwin':
49 if not hasattr(socket, 'TCP_KEEPALIVE'):
50 socket.TCP_KEEPALIVE = 0x010
51 if not hasattr(socket, 'TCP_KEEPINTVL'):
52 socket.TCP_KEEPINTVL = 0x101
53 if not hasattr(socket, 'TCP_KEEPCNT'):
54 socket.TCP_KEEPCNT = 0x102
57 class KeepLocator(object):
58 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
59 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
61 def __init__(self, locator_str):
64 self._perm_expiry = None
65 pieces = iter(locator_str.split('+'))
66 self.md5sum = next(pieces)
68 self.size = int(next(pieces))
72 if self.HINT_RE.match(hint) is None:
73 raise ValueError("invalid hint format: {}".format(hint))
74 elif hint.startswith('A'):
75 self.parse_permission_hint(hint)
77 self.hints.append(hint)
82 for s in [self.md5sum, self.size,
83 self.permission_hint()] + self.hints
87 if self.size is not None:
88 return "%s+%i" % (self.md5sum, self.size)
92 def _make_hex_prop(name, length):
93 # Build and return a new property with the given name that
94 # must be a hex string of the given length.
95 data_name = '_{}'.format(name)
97 return getattr(self, data_name)
98 def setter(self, hex_str):
99 if not arvados.util.is_hex(hex_str, length):
100 raise ValueError("{} is not a {}-digit hex string: {!r}".
101 format(name, length, hex_str))
102 setattr(self, data_name, hex_str)
103 return property(getter, setter)
105 md5sum = _make_hex_prop('md5sum', 32)
106 perm_sig = _make_hex_prop('perm_sig', 40)
109 def perm_expiry(self):
110 return self._perm_expiry
113 def perm_expiry(self, value):
114 if not arvados.util.is_hex(value, 1, 8):
116 "permission timestamp must be a hex Unix timestamp: {}".
118 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
120 def permission_hint(self):
121 data = [self.perm_sig, self.perm_expiry]
124 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
125 return "A{}@{:08x}".format(*data)
127 def parse_permission_hint(self, s):
129 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
131 raise ValueError("bad permission hint {}".format(s))
133 def permission_expired(self, as_of_dt=None):
134 if self.perm_expiry is None:
136 elif as_of_dt is None:
137 as_of_dt = datetime.datetime.now()
138 return self.perm_expiry <= as_of_dt
142 """Simple interface to a global KeepClient object.
144 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
145 own API client. The global KeepClient will build an API client from the
146 current Arvados configuration, which may not match the one you built.
151 def global_client_object(cls):
152 global global_client_object
153 # Previously, KeepClient would change its behavior at runtime based
154 # on these configuration settings. We simulate that behavior here
155 # by checking the values and returning a new KeepClient if any of
157 key = (config.get('ARVADOS_API_HOST'),
158 config.get('ARVADOS_API_TOKEN'),
159 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
160 config.get('ARVADOS_KEEP_PROXY'),
161 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
162 os.environ.get('KEEP_LOCAL_STORE'))
163 if (global_client_object is None) or (cls._last_key != key):
164 global_client_object = KeepClient()
166 return global_client_object
169 def get(locator, **kwargs):
170 return Keep.global_client_object().get(locator, **kwargs)
173 def put(data, **kwargs):
174 return Keep.global_client_object().put(data, **kwargs)
176 class KeepBlockCache(object):
177 # Default RAM cache is 256MiB
178 def __init__(self, cache_max=(256 * 1024 * 1024)):
179 self.cache_max = cache_max
181 self._cache_lock = threading.Lock()
183 class CacheSlot(object):
184 __slots__ = ("locator", "ready", "content")
186 def __init__(self, locator):
187 self.locator = locator
188 self.ready = threading.Event()
195 def set(self, value):
200 if self.content is None:
203 return len(self.content)
206 '''Cap the cache size to self.cache_max'''
207 with self._cache_lock:
208 # Select all slots except those where ready.is_set() and content is
209 # None (that means there was an error reading the block).
210 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
211 sm = sum([slot.size() for slot in self._cache])
212 while len(self._cache) > 0 and sm > self.cache_max:
213 for i in range(len(self._cache)-1, -1, -1):
214 if self._cache[i].ready.is_set():
217 sm = sum([slot.size() for slot in self._cache])
219 def _get(self, locator):
220 # Test if the locator is already in the cache
221 for i in range(0, len(self._cache)):
222 if self._cache[i].locator == locator:
225 # move it to the front
227 self._cache.insert(0, n)
231 def get(self, locator):
232 with self._cache_lock:
233 return self._get(locator)
235 def reserve_cache(self, locator):
236 '''Reserve a cache slot for the specified locator,
237 or return the existing slot.'''
238 with self._cache_lock:
239 n = self._get(locator)
243 # Add a new cache slot for the locator
244 n = KeepBlockCache.CacheSlot(locator)
245 self._cache.insert(0, n)
248 class Counter(object):
249 def __init__(self, v=0):
250 self._lk = threading.Lock()
262 class KeepClient(object):
264 # Default Keep server connection timeout: 2 seconds
265 # Default Keep server read timeout: 256 seconds
266 # Default Keep server bandwidth minimum: 32768 bytes per second
267 # Default Keep proxy connection timeout: 20 seconds
268 # Default Keep proxy read timeout: 256 seconds
269 # Default Keep proxy bandwidth minimum: 32768 bytes per second
270 DEFAULT_TIMEOUT = (2, 256, 32768)
271 DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
274 class KeepService(object):
275 """Make requests to a single Keep service, and track results.
277 A KeepService is intended to last long enough to perform one
278 transaction (GET or PUT) against one Keep service. This can
279 involve calling either get() or put() multiple times in order
280 to retry after transient failures. However, calling both get()
281 and put() on a single instance -- or using the same instance
282 to access two different Keep services -- will not produce
289 arvados.errors.HttpError,
292 def __init__(self, root, user_agent_pool=queue.LifoQueue(),
294 download_counter=None,
298 self._user_agent_pool = user_agent_pool
299 self._result = {'error': None}
303 self.get_headers = {'Accept': 'application/octet-stream'}
304 self.get_headers.update(headers)
305 self.put_headers = headers
306 self.upload_counter = upload_counter
307 self.download_counter = download_counter
308 self.insecure = insecure
311 """Is it worth attempting a request?"""
315 """Did the request succeed or encounter permanent failure?"""
316 return self._result['error'] == False or not self._usable
318 def last_result(self):
321 def _get_user_agent(self):
323 return self._user_agent_pool.get(block=False)
327 def _put_user_agent(self, ua):
330 self._user_agent_pool.put(ua, block=False)
334 def _socket_open(self, *args, **kwargs):
335 if len(args) + len(kwargs) == 2:
336 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
338 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
340 def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
341 return self._socket_open_pycurl_7_21_5(
343 address=collections.namedtuple(
344 'Address', ['family', 'socktype', 'protocol', 'addr'],
345 )(family, socktype, protocol, address))
347 def _socket_open_pycurl_7_21_5(self, purpose, address):
348 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
349 s = socket.socket(address.family, address.socktype, address.protocol)
350 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
351 # Will throw invalid protocol error on mac. This test prevents that.
352 if hasattr(socket, 'TCP_KEEPIDLE'):
353 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
354 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
358 def get(self, locator, method="GET", timeout=None):
359 # locator is a KeepLocator object.
360 url = self.root + str(locator)
361 _logger.debug("Request: %s %s", method, url)
362 curl = self._get_user_agent()
365 with timer.Timer() as t:
367 response_body = BytesIO()
368 curl.setopt(pycurl.NOSIGNAL, 1)
369 curl.setopt(pycurl.OPENSOCKETFUNCTION,
370 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
371 curl.setopt(pycurl.URL, url.encode('utf-8'))
372 curl.setopt(pycurl.HTTPHEADER, [
373 '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
374 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
375 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
377 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
379 curl.setopt(pycurl.NOBODY, True)
380 self._setcurltimeouts(curl, timeout)
384 except Exception as e:
385 raise arvados.errors.HttpError(0, str(e))
391 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
392 'body': response_body.getvalue(),
393 'headers': self._headers,
397 ok = retry.check_http_response_success(self._result['status_code'])
399 self._result['error'] = arvados.errors.HttpError(
400 self._result['status_code'],
401 self._headers.get('x-status-line', 'Error'))
402 except self.HTTP_ERRORS as e:
406 self._usable = ok != False
407 if self._result.get('status_code', None):
408 # The client worked well enough to get an HTTP status
409 # code, so presumably any problems are just on the
410 # server side and it's OK to reuse the client.
411 self._put_user_agent(curl)
413 # Don't return this client to the pool, in case it's
417 _logger.debug("Request fail: GET %s => %s: %s",
418 url, type(self._result['error']), str(self._result['error']))
421 _logger.info("HEAD %s: %s bytes",
422 self._result['status_code'],
423 self._result.get('content-length'))
426 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
427 self._result['status_code'],
428 len(self._result['body']),
430 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
432 if self.download_counter:
433 self.download_counter.add(len(self._result['body']))
434 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
435 if resp_md5 != locator.md5sum:
436 _logger.warning("Checksum fail: md5(%s) = %s",
438 self._result['error'] = arvados.errors.HttpError(
441 return self._result['body']
443 def put(self, hash_s, body, timeout=None):
444 url = self.root + hash_s
445 _logger.debug("Request: PUT %s", url)
446 curl = self._get_user_agent()
449 with timer.Timer() as t:
451 body_reader = BytesIO(body)
452 response_body = BytesIO()
453 curl.setopt(pycurl.NOSIGNAL, 1)
454 curl.setopt(pycurl.OPENSOCKETFUNCTION,
455 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
456 curl.setopt(pycurl.URL, url.encode('utf-8'))
457 # Using UPLOAD tells cURL to wait for a "go ahead" from the
458 # Keep server (in the form of a HTTP/1.1 "100 Continue"
459 # response) instead of sending the request body immediately.
460 # This allows the server to reject the request if the request
461 # is invalid or the server is read-only, without waiting for
462 # the client to send the entire block.
463 curl.setopt(pycurl.UPLOAD, True)
464 curl.setopt(pycurl.INFILESIZE, len(body))
465 curl.setopt(pycurl.READFUNCTION, body_reader.read)
466 curl.setopt(pycurl.HTTPHEADER, [
467 '{}: {}'.format(k,v) for k,v in self.put_headers.items()])
468 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
469 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
471 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
472 self._setcurltimeouts(curl, timeout)
475 except Exception as e:
476 raise arvados.errors.HttpError(0, str(e))
482 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
483 'body': response_body.getvalue().decode('utf-8'),
484 'headers': self._headers,
487 ok = retry.check_http_response_success(self._result['status_code'])
489 self._result['error'] = arvados.errors.HttpError(
490 self._result['status_code'],
491 self._headers.get('x-status-line', 'Error'))
492 except self.HTTP_ERRORS as e:
496 self._usable = ok != False # still usable if ok is True or None
497 if self._result.get('status_code', None):
498 # Client is functional. See comment in get().
499 self._put_user_agent(curl)
503 _logger.debug("Request fail: PUT %s => %s: %s",
504 url, type(self._result['error']), str(self._result['error']))
506 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
507 self._result['status_code'],
510 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
511 if self.upload_counter:
512 self.upload_counter.add(len(body))
515 def _setcurltimeouts(self, curl, timeouts):
518 elif isinstance(timeouts, tuple):
519 if len(timeouts) == 2:
520 conn_t, xfer_t = timeouts
521 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
523 conn_t, xfer_t, bandwidth_bps = timeouts
525 conn_t, xfer_t = (timeouts, timeouts)
526 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
527 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
528 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
529 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
531 def _headerfunction(self, header_line):
532 if isinstance(header_line, bytes):
533 header_line = header_line.decode('iso-8859-1')
534 if ':' in header_line:
535 name, value = header_line.split(':', 1)
536 name = name.strip().lower()
537 value = value.strip()
539 name = self._lastheadername
540 value = self._headers[name] + ' ' + header_line.strip()
541 elif header_line.startswith('HTTP/'):
542 name = 'x-status-line'
545 _logger.error("Unexpected header line: %s", header_line)
547 self._lastheadername = name
548 self._headers[name] = value
549 # Returning None implies all bytes were written
552 class KeepWriterQueue(queue.Queue):
553 def __init__(self, copies):
554 queue.Queue.__init__(self) # Old-style superclass
555 self.wanted_copies = copies
556 self.successful_copies = 0
558 self.successful_copies_lock = threading.Lock()
559 self.pending_tries = copies
560 self.pending_tries_notification = threading.Condition()
562 def write_success(self, response, replicas_nr):
563 with self.successful_copies_lock:
564 self.successful_copies += replicas_nr
565 self.response = response
566 with self.pending_tries_notification:
567 self.pending_tries_notification.notify_all()
569 def write_fail(self, ks):
570 with self.pending_tries_notification:
571 self.pending_tries += 1
572 self.pending_tries_notification.notify()
574 def pending_copies(self):
575 with self.successful_copies_lock:
576 return self.wanted_copies - self.successful_copies
578 def get_next_task(self):
579 with self.pending_tries_notification:
581 if self.pending_copies() < 1:
582 # This notify_all() is unnecessary --
583 # write_success() already called notify_all()
584 # when pending<1 became true, so it's not
585 # possible for any other thread to be in
586 # wait() now -- but it's cheap insurance
587 # against deadlock so we do it anyway:
588 self.pending_tries_notification.notify_all()
589 # Drain the queue and then raise Queue.Empty
593 elif self.pending_tries > 0:
594 service, service_root = self.get_nowait()
595 if service.finished():
598 self.pending_tries -= 1
599 return service, service_root
601 self.pending_tries_notification.notify_all()
604 self.pending_tries_notification.wait()
607 class KeepWriterThreadPool(object):
608 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
609 self.total_task_nr = 0
610 self.wanted_copies = copies
611 if (not max_service_replicas) or (max_service_replicas >= copies):
614 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
615 _logger.debug("Pool max threads is %d", num_threads)
617 self.queue = KeepClient.KeepWriterQueue(copies)
619 for _ in range(num_threads):
620 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
621 self.workers.append(w)
623 def add_task(self, ks, service_root):
624 self.queue.put((ks, service_root))
625 self.total_task_nr += 1
628 return self.queue.successful_copies
632 for worker in self.workers:
634 # Wait for finished work
638 return self.queue.response
641 class KeepWriterThread(threading.Thread):
642 TaskFailed = RuntimeError()
644 def __init__(self, queue, data, data_hash, timeout=None):
645 super(KeepClient.KeepWriterThread, self).__init__()
646 self.timeout = timeout
649 self.data_hash = data_hash
655 service, service_root = self.queue.get_next_task()
659 locator, copies = self.do_task(service, service_root)
660 except Exception as e:
661 if e is not self.TaskFailed:
662 _logger.exception("Exception in KeepWriterThread")
663 self.queue.write_fail(service)
665 self.queue.write_success(locator, copies)
667 self.queue.task_done()
669 def do_task(self, service, service_root):
670 success = bool(service.put(self.data_hash,
672 timeout=self.timeout))
673 result = service.last_result()
676 if result.get('status_code', None):
677 _logger.debug("Request fail: PUT %s => %s %s",
679 result['status_code'],
681 raise self.TaskFailed
683 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
684 str(threading.current_thread()),
689 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
690 except (KeyError, ValueError):
693 return result['body'].strip(), replicas_stored
696 def __init__(self, api_client=None, proxy=None,
697 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
698 api_token=None, local_store=None, block_cache=None,
699 num_retries=0, session=None):
700 """Initialize a new KeepClient.
704 The API client to use to find Keep services. If not
705 provided, KeepClient will build one from available Arvados
709 If specified, this KeepClient will send requests to this Keep
710 proxy. Otherwise, KeepClient will fall back to the setting of the
711 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
712 If you want to KeepClient does not use a proxy, pass in an empty
716 The initial timeout (in seconds) for HTTP requests to Keep
717 non-proxy servers. A tuple of three floats is interpreted as
718 (connection_timeout, read_timeout, minimum_bandwidth). A connection
719 will be aborted if the average traffic rate falls below
720 minimum_bandwidth bytes per second over an interval of read_timeout
721 seconds. Because timeouts are often a result of transient server
722 load, the actual connection timeout will be increased by a factor
723 of two on each retry.
724 Default: (2, 256, 32768).
727 The initial timeout (in seconds) for HTTP requests to
728 Keep proxies. A tuple of three floats is interpreted as
729 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
730 described above for adjusting connection timeouts on retry also
732 Default: (20, 256, 32768).
735 If you're not using an API client, but only talking
736 directly to a Keep proxy, this parameter specifies an API token
737 to authenticate Keep requests. It is an error to specify both
738 api_client and api_token. If you specify neither, KeepClient
739 will use one available from the Arvados configuration.
742 If specified, this KeepClient will bypass Keep
743 services, and save data to the named directory. If unspecified,
744 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
745 environment variable. If you want to ensure KeepClient does not
746 use local storage, pass in an empty string. This is primarily
747 intended to mock a server for testing.
750 The default number of times to retry failed requests.
751 This will be used as the default num_retries value when get() and
752 put() are called. Default 0.
754 self.lock = threading.Lock()
756 if config.get('ARVADOS_KEEP_SERVICES'):
757 proxy = config.get('ARVADOS_KEEP_SERVICES')
759 proxy = config.get('ARVADOS_KEEP_PROXY')
760 if api_token is None:
761 if api_client is None:
762 api_token = config.get('ARVADOS_API_TOKEN')
764 api_token = api_client.api_token
765 elif api_client is not None:
767 "can't build KeepClient with both API client and token")
768 if local_store is None:
769 local_store = os.environ.get('KEEP_LOCAL_STORE')
771 if api_client is None:
772 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
774 self.insecure = api_client.insecure
776 self.block_cache = block_cache if block_cache else KeepBlockCache()
777 self.timeout = timeout
778 self.proxy_timeout = proxy_timeout
779 self._user_agent_pool = queue.LifoQueue()
780 self.upload_counter = Counter()
781 self.download_counter = Counter()
782 self.put_counter = Counter()
783 self.get_counter = Counter()
784 self.hits_counter = Counter()
785 self.misses_counter = Counter()
788 self.local_store = local_store
789 self.get = self.local_store_get
790 self.put = self.local_store_put
792 self.num_retries = num_retries
793 self.max_replicas_per_service = None
795 proxy_uris = proxy.split()
796 for i in range(len(proxy_uris)):
797 if not proxy_uris[i].endswith('/'):
800 url = urllib.parse.urlparse(proxy_uris[i])
801 if not (url.scheme and url.netloc):
802 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
803 self.api_token = api_token
804 self._gateway_services = {}
805 self._keep_services = [{
806 'uuid': "00000-bi6l4-%015d" % idx,
807 'service_type': 'proxy',
808 '_service_root': uri,
809 } for idx, uri in enumerate(proxy_uris)]
810 self._writable_services = self._keep_services
811 self.using_proxy = True
812 self._static_services_list = True
814 # It's important to avoid instantiating an API client
815 # unless we actually need one, for testing's sake.
816 if api_client is None:
817 api_client = arvados.api('v1')
818 self.api_client = api_client
819 self.api_token = api_client.api_token
820 self._gateway_services = {}
821 self._keep_services = None
822 self._writable_services = None
823 self.using_proxy = None
824 self._static_services_list = False
826 def current_timeout(self, attempt_number):
827 """Return the appropriate timeout to use for this client.
829 The proxy timeout setting if the backend service is currently a proxy,
830 the regular timeout setting otherwise. The `attempt_number` indicates
831 how many times the operation has been tried already (starting from 0
832 for the first try), and scales the connection timeout portion of the
833 return value accordingly.
836 # TODO(twp): the timeout should be a property of a
837 # KeepService, not a KeepClient. See #4488.
838 t = self.proxy_timeout if self.using_proxy else self.timeout
840 return (t[0] * (1 << attempt_number), t[1])
842 return (t[0] * (1 << attempt_number), t[1], t[2])
843 def _any_nondisk_services(self, service_list):
844 return any(ks.get('service_type', 'disk') != 'disk'
845 for ks in service_list)
847 def build_services_list(self, force_rebuild=False):
848 if (self._static_services_list or
849 (self._keep_services and not force_rebuild)):
853 keep_services = self.api_client.keep_services().accessible()
854 except Exception: # API server predates Keep services.
855 keep_services = self.api_client.keep_disks().list()
857 # Gateway services are only used when specified by UUID,
858 # so there's nothing to gain by filtering them by
860 self._gateway_services = {ks['uuid']: ks for ks in
861 keep_services.execute()['items']}
862 if not self._gateway_services:
863 raise arvados.errors.NoKeepServersError()
865 # Precompute the base URI for each service.
866 for r in self._gateway_services.values():
867 host = r['service_host']
868 if not host.startswith('[') and host.find(':') >= 0:
869 # IPv6 URIs must be formatted like http://[::1]:80/...
870 host = '[' + host + ']'
871 r['_service_root'] = "{}://{}:{:d}/".format(
872 'https' if r['service_ssl_flag'] else 'http',
876 _logger.debug(str(self._gateway_services))
877 self._keep_services = [
878 ks for ks in self._gateway_services.values()
879 if not ks.get('service_type', '').startswith('gateway:')]
880 self._writable_services = [ks for ks in self._keep_services
881 if not ks.get('read_only')]
883 # For disk type services, max_replicas_per_service is 1
884 # It is unknown (unlimited) for other service types.
885 if self._any_nondisk_services(self._writable_services):
886 self.max_replicas_per_service = None
888 self.max_replicas_per_service = 1
890 def _service_weight(self, data_hash, service_uuid):
891 """Compute the weight of a Keep service endpoint for a data
892 block with a known hash.
894 The weight is md5(h + u) where u is the last 15 characters of
895 the service endpoint's UUID.
897 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
899 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
900 """Return an array of Keep service endpoints, in the order in
901 which they should be probed when reading or writing data with
902 the given hash+hints.
904 self.build_services_list(force_rebuild)
907 # Use the services indicated by the given +K@... remote
908 # service hints, if any are present and can be resolved to a
910 for hint in locator.hints:
911 if hint.startswith('K@'):
914 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
915 elif len(hint) == 29:
916 svc = self._gateway_services.get(hint[2:])
918 sorted_roots.append(svc['_service_root'])
920 # Sort the available local services by weight (heaviest first)
921 # for this locator, and return their service_roots (base URIs)
923 use_services = self._keep_services
925 use_services = self._writable_services
926 self.using_proxy = self._any_nondisk_services(use_services)
927 sorted_roots.extend([
928 svc['_service_root'] for svc in sorted(
931 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
932 _logger.debug("{}: {}".format(locator, sorted_roots))
935 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
936 # roots_map is a dictionary, mapping Keep service root strings
937 # to KeepService objects. Poll for Keep services, and add any
938 # new ones to roots_map. Return the current list of local
940 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
941 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
942 for root in local_roots:
943 if root not in roots_map:
944 roots_map[root] = self.KeepService(
945 root, self._user_agent_pool,
946 upload_counter=self.upload_counter,
947 download_counter=self.download_counter,
949 insecure=self.insecure)
953 def _check_loop_result(result):
954 # KeepClient RetryLoops should save results as a 2-tuple: the
955 # actual result of the request, and the number of servers available
956 # to receive the request this round.
957 # This method returns True if there's a real result, False if
958 # there are no more servers available, otherwise None.
959 if isinstance(result, Exception):
961 result, tried_server_count = result
962 if (result is not None) and (result is not False):
964 elif tried_server_count < 1:
965 _logger.info("No more Keep services to try; giving up")
970 def get_from_cache(self, loc):
971 """Fetch a block only if is in the cache, otherwise return None."""
972 slot = self.block_cache.get(loc)
973 if slot is not None and slot.ready.is_set():
979 def head(self, loc_s, **kwargs):
980 return self._get_or_head(loc_s, method="HEAD", **kwargs)
983 def get(self, loc_s, **kwargs):
984 return self._get_or_head(loc_s, method="GET", **kwargs)
986 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None):
987 """Get data from Keep.
989 This method fetches one or more blocks of data from Keep. It
990 sends a request each Keep service registered with the API
991 server (or the proxy provided when this client was
992 instantiated), then each service named in location hints, in
993 sequence. As soon as one service provides the data, it's
997 * loc_s: A string of one or more comma-separated locators to fetch.
998 This method returns the concatenation of these blocks.
999 * num_retries: The number of times to retry GET requests to
1000 *each* Keep server if it returns temporary failures, with
1001 exponential backoff. Note that, in each loop, the method may try
1002 to fetch data from every available Keep service, along with any
1003 that are named in location hints in the locator. The default value
1004 is set when the KeepClient is initialized.
1007 return ''.join(self.get(x) for x in loc_s.split(','))
1009 self.get_counter.add(1)
1014 locator = KeepLocator(loc_s)
1016 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1018 self.hits_counter.add(1)
1021 raise arvados.errors.KeepReadError(
1022 "failed to read {}".format(loc_s))
1025 self.misses_counter.add(1)
1028 'X-Request-Id': (request_id or
1029 (hasattr(self, 'api_client') and self.api_client.request_id) or
1030 arvados.util.new_request_id()),
1033 # If the locator has hints specifying a prefix (indicating a
1034 # remote keepproxy) or the UUID of a local gateway service,
1035 # read data from the indicated service(s) instead of the usual
1036 # list of local disk services.
1037 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1038 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1039 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1040 for hint in locator.hints if (
1041 hint.startswith('K@') and
1043 self._gateway_services.get(hint[2:])
1045 # Map root URLs to their KeepService objects.
1047 root: self.KeepService(root, self._user_agent_pool,
1048 upload_counter=self.upload_counter,
1049 download_counter=self.download_counter,
1051 insecure=self.insecure)
1052 for root in hint_roots
1055 # See #3147 for a discussion of the loop implementation. Highlights:
1056 # * Refresh the list of Keep services after each failure, in case
1057 # it's being updated.
1058 # * Retry until we succeed, we're out of retries, or every available
1059 # service has returned permanent failure.
1062 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1064 for tries_left in loop:
1066 sorted_roots = self.map_new_services(
1068 force_rebuild=(tries_left < num_retries),
1069 need_writable=False,
1071 except Exception as error:
1072 loop.save_result(error)
1075 # Query KeepService objects that haven't returned
1076 # permanent failure, in our specified shuffle order.
1077 services_to_try = [roots_map[root]
1078 for root in sorted_roots
1079 if roots_map[root].usable()]
1080 for keep_service in services_to_try:
1081 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1082 if blob is not None:
1084 loop.save_result((blob, len(services_to_try)))
1086 # Always cache the result, then return it if we succeeded.
1088 if method == "HEAD":
1093 if slot is not None:
1095 self.block_cache.cap_cache()
1097 # Q: Including 403 is necessary for the Keep tests to continue
1098 # passing, but maybe they should expect KeepReadError instead?
1099 not_founds = sum(1 for key in sorted_roots
1100 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1101 service_errors = ((key, roots_map[key].last_result()['error'])
1102 for key in sorted_roots)
1104 raise arvados.errors.KeepReadError(
1105 "failed to read {}: no Keep services available ({})".format(
1106 loc_s, loop.last_result()))
1107 elif not_founds == len(sorted_roots):
1108 raise arvados.errors.NotFoundError(
1109 "{} not found".format(loc_s), service_errors)
1111 raise arvados.errors.KeepReadError(
1112 "failed to read {}".format(loc_s), service_errors, label="service")
1115 def put(self, data, copies=2, num_retries=None, request_id=None):
1116 """Save data in Keep.
1118 This method will get a list of Keep services from the API server, and
1119 send the data to each one simultaneously in a new thread. Once the
1120 uploads are finished, if enough copies are saved, this method returns
1121 the most recent HTTP response body. If requests fail to upload
1122 enough copies, this method raises KeepWriteError.
1125 * data: The string of data to upload.
1126 * copies: The number of copies that the user requires be saved.
1128 * num_retries: The number of times to retry PUT requests to
1129 *each* Keep server if it returns temporary failures, with
1130 exponential backoff. The default value is set when the
1131 KeepClient is initialized.
1134 if not isinstance(data, bytes):
1135 data = data.encode()
1137 self.put_counter.add(1)
1139 data_hash = hashlib.md5(data).hexdigest()
1140 loc_s = data_hash + '+' + str(len(data))
1143 locator = KeepLocator(loc_s)
1146 'X-Request-Id': (request_id or
1147 (hasattr(self, 'api_client') and self.api_client.request_id) or
1148 arvados.util.new_request_id()),
1149 'X-Keep-Desired-Replicas': str(copies),
1152 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1155 for tries_left in loop:
1157 sorted_roots = self.map_new_services(
1159 force_rebuild=(tries_left < num_retries),
1162 except Exception as error:
1163 loop.save_result(error)
1166 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1167 data_hash=data_hash,
1168 copies=copies - done,
1169 max_service_replicas=self.max_replicas_per_service,
1170 timeout=self.current_timeout(num_retries - tries_left))
1171 for service_root, ks in [(root, roots_map[root])
1172 for root in sorted_roots]:
1175 writer_pool.add_task(ks, service_root)
1177 done += writer_pool.done()
1178 loop.save_result((done >= copies, writer_pool.total_task_nr))
1181 return writer_pool.response()
1183 raise arvados.errors.KeepWriteError(
1184 "failed to write {}: no Keep services available ({})".format(
1185 data_hash, loop.last_result()))
1187 service_errors = ((key, roots_map[key].last_result()['error'])
1188 for key in sorted_roots
1189 if roots_map[key].last_result()['error'])
1190 raise arvados.errors.KeepWriteError(
1191 "failed to write {} (wanted {} copies but wrote {})".format(
1192 data_hash, copies, writer_pool.done()), service_errors, label="service")
1194 def local_store_put(self, data, copies=1, num_retries=None):
1195 """A stub for put().
1197 This method is used in place of the real put() method when
1198 using local storage (see constructor's local_store argument).
1200 copies and num_retries arguments are ignored: they are here
1201 only for the sake of offering the same call signature as
1204 Data stored this way can be retrieved via local_store_get().
1206 md5 = hashlib.md5(data).hexdigest()
1207 locator = '%s+%d' % (md5, len(data))
1208 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1210 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1211 os.path.join(self.local_store, md5))
1214 def local_store_get(self, loc_s, num_retries=None):
1215 """Companion to local_store_put()."""
1217 locator = KeepLocator(loc_s)
1219 raise arvados.errors.NotFoundError(
1220 "Invalid data locator: '%s'" % loc_s)
1221 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1223 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1226 def is_cached(self, locator):
1227 return self.block_cache.reserve_cache(expect_hash)