1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 from future.utils import native_str
9 standard_library.install_aliases()
10 from builtins import next
11 from builtins import str
12 from builtins import range
13 from builtins import object
31 if sys.version_info >= (3, 0):
32 from io import BytesIO
34 from cStringIO import StringIO as BytesIO
37 import arvados.config as config
39 import arvados.retry as retry
42 _logger = logging.getLogger('arvados.keep')
43 global_client_object = None
46 # Monkey patch TCP constants when not available (apple). Values sourced from:
47 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
48 if sys.platform == 'darwin':
49 if not hasattr(socket, 'TCP_KEEPALIVE'):
50 socket.TCP_KEEPALIVE = 0x010
51 if not hasattr(socket, 'TCP_KEEPINTVL'):
52 socket.TCP_KEEPINTVL = 0x101
53 if not hasattr(socket, 'TCP_KEEPCNT'):
54 socket.TCP_KEEPCNT = 0x102
57 class KeepLocator(object):
58 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
59 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
61 def __init__(self, locator_str):
64 self._perm_expiry = None
65 pieces = iter(locator_str.split('+'))
66 self.md5sum = next(pieces)
68 self.size = int(next(pieces))
72 if self.HINT_RE.match(hint) is None:
73 raise ValueError("invalid hint format: {}".format(hint))
74 elif hint.startswith('A'):
75 self.parse_permission_hint(hint)
77 self.hints.append(hint)
82 for s in [self.md5sum, self.size,
83 self.permission_hint()] + self.hints
87 if self.size is not None:
88 return "%s+%i" % (self.md5sum, self.size)
92 def _make_hex_prop(name, length):
93 # Build and return a new property with the given name that
94 # must be a hex string of the given length.
95 data_name = '_{}'.format(name)
97 return getattr(self, data_name)
98 def setter(self, hex_str):
99 if not arvados.util.is_hex(hex_str, length):
100 raise ValueError("{} is not a {}-digit hex string: {!r}".
101 format(name, length, hex_str))
102 setattr(self, data_name, hex_str)
103 return property(getter, setter)
105 md5sum = _make_hex_prop('md5sum', 32)
106 perm_sig = _make_hex_prop('perm_sig', 40)
109 def perm_expiry(self):
110 return self._perm_expiry
113 def perm_expiry(self, value):
114 if not arvados.util.is_hex(value, 1, 8):
116 "permission timestamp must be a hex Unix timestamp: {}".
118 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
120 def permission_hint(self):
121 data = [self.perm_sig, self.perm_expiry]
124 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
125 return "A{}@{:08x}".format(*data)
127 def parse_permission_hint(self, s):
129 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
131 raise ValueError("bad permission hint {}".format(s))
133 def permission_expired(self, as_of_dt=None):
134 if self.perm_expiry is None:
136 elif as_of_dt is None:
137 as_of_dt = datetime.datetime.now()
138 return self.perm_expiry <= as_of_dt
142 """Simple interface to a global KeepClient object.
144 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
145 own API client. The global KeepClient will build an API client from the
146 current Arvados configuration, which may not match the one you built.
151 def global_client_object(cls):
152 global global_client_object
153 # Previously, KeepClient would change its behavior at runtime based
154 # on these configuration settings. We simulate that behavior here
155 # by checking the values and returning a new KeepClient if any of
157 key = (config.get('ARVADOS_API_HOST'),
158 config.get('ARVADOS_API_TOKEN'),
159 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
160 config.get('ARVADOS_KEEP_PROXY'),
161 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
162 os.environ.get('KEEP_LOCAL_STORE'))
163 if (global_client_object is None) or (cls._last_key != key):
164 global_client_object = KeepClient()
166 return global_client_object
169 def get(locator, **kwargs):
170 return Keep.global_client_object().get(locator, **kwargs)
173 def put(data, **kwargs):
174 return Keep.global_client_object().put(data, **kwargs)
176 class KeepBlockCache(object):
177 # Default RAM cache is 256MiB
178 def __init__(self, cache_max=(256 * 1024 * 1024)):
179 self.cache_max = cache_max
181 self._cache_lock = threading.Lock()
183 class CacheSlot(object):
184 __slots__ = ("locator", "ready", "content")
186 def __init__(self, locator):
187 self.locator = locator
188 self.ready = threading.Event()
195 def set(self, value):
200 if self.content is None:
203 return len(self.content)
206 '''Cap the cache size to self.cache_max'''
207 with self._cache_lock:
208 # Select all slots except those where ready.is_set() and content is
209 # None (that means there was an error reading the block).
210 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
211 sm = sum([slot.size() for slot in self._cache])
212 while len(self._cache) > 0 and sm > self.cache_max:
213 for i in range(len(self._cache)-1, -1, -1):
214 if self._cache[i].ready.is_set():
217 sm = sum([slot.size() for slot in self._cache])
219 def _get(self, locator):
220 # Test if the locator is already in the cache
221 for i in range(0, len(self._cache)):
222 if self._cache[i].locator == locator:
225 # move it to the front
227 self._cache.insert(0, n)
231 def get(self, locator):
232 with self._cache_lock:
233 return self._get(locator)
235 def reserve_cache(self, locator):
236 '''Reserve a cache slot for the specified locator,
237 or return the existing slot.'''
238 with self._cache_lock:
239 n = self._get(locator)
243 # Add a new cache slot for the locator
244 n = KeepBlockCache.CacheSlot(locator)
245 self._cache.insert(0, n)
248 class Counter(object):
249 def __init__(self, v=0):
250 self._lk = threading.Lock()
262 class KeepClient(object):
264 # Default Keep server connection timeout: 2 seconds
265 # Default Keep server read timeout: 256 seconds
266 # Default Keep server bandwidth minimum: 32768 bytes per second
267 # Default Keep proxy connection timeout: 20 seconds
268 # Default Keep proxy read timeout: 256 seconds
269 # Default Keep proxy bandwidth minimum: 32768 bytes per second
270 DEFAULT_TIMEOUT = (2, 256, 32768)
271 DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
274 class KeepService(object):
275 """Make requests to a single Keep service, and track results.
277 A KeepService is intended to last long enough to perform one
278 transaction (GET or PUT) against one Keep service. This can
279 involve calling either get() or put() multiple times in order
280 to retry after transient failures. However, calling both get()
281 and put() on a single instance -- or using the same instance
282 to access two different Keep services -- will not produce
289 arvados.errors.HttpError,
292 def __init__(self, root, user_agent_pool=queue.LifoQueue(),
294 download_counter=None,
298 self._user_agent_pool = user_agent_pool
299 self._result = {'error': None}
303 self.get_headers = {'Accept': 'application/octet-stream'}
304 self.get_headers.update(headers)
305 self.put_headers = headers
306 self.upload_counter = upload_counter
307 self.download_counter = download_counter
308 self.insecure = insecure
311 """Is it worth attempting a request?"""
315 """Did the request succeed or encounter permanent failure?"""
316 return self._result['error'] == False or not self._usable
318 def last_result(self):
321 def _get_user_agent(self):
323 return self._user_agent_pool.get(block=False)
327 def _put_user_agent(self, ua):
330 self._user_agent_pool.put(ua, block=False)
334 def _socket_open(self, *args, **kwargs):
335 if len(args) + len(kwargs) == 2:
336 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
338 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
340 def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
341 return self._socket_open_pycurl_7_21_5(
343 address=collections.namedtuple(
344 'Address', ['family', 'socktype', 'protocol', 'addr'],
345 )(family, socktype, protocol, address))
347 def _socket_open_pycurl_7_21_5(self, purpose, address):
348 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
349 s = socket.socket(address.family, address.socktype, address.protocol)
350 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
351 # Will throw invalid protocol error on mac. This test prevents that.
352 if hasattr(socket, 'TCP_KEEPIDLE'):
353 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
354 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
358 def get(self, locator, method="GET", timeout=None):
359 # locator is a KeepLocator object.
360 url = self.root + str(locator)
361 _logger.debug("Request: %s %s", method, url)
362 curl = self._get_user_agent()
365 with timer.Timer() as t:
367 response_body = BytesIO()
368 curl.setopt(pycurl.NOSIGNAL, 1)
369 curl.setopt(pycurl.OPENSOCKETFUNCTION,
370 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
371 curl.setopt(pycurl.URL, url.encode('utf-8'))
372 curl.setopt(pycurl.HTTPHEADER, [
373 '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
374 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
375 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
377 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
379 curl.setopt(pycurl.NOBODY, True)
380 self._setcurltimeouts(curl, timeout, method=="HEAD")
384 except Exception as e:
385 raise arvados.errors.HttpError(0, str(e))
391 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
392 'body': response_body.getvalue(),
393 'headers': self._headers,
397 ok = retry.check_http_response_success(self._result['status_code'])
399 self._result['error'] = arvados.errors.HttpError(
400 self._result['status_code'],
401 self._headers.get('x-status-line', 'Error'))
402 except self.HTTP_ERRORS as e:
406 self._usable = ok != False
407 if self._result.get('status_code', None):
408 # The client worked well enough to get an HTTP status
409 # code, so presumably any problems are just on the
410 # server side and it's OK to reuse the client.
411 self._put_user_agent(curl)
413 # Don't return this client to the pool, in case it's
417 _logger.debug("Request fail: GET %s => %s: %s",
418 url, type(self._result['error']), str(self._result['error']))
421 _logger.info("HEAD %s: %s bytes",
422 self._result['status_code'],
423 self._result.get('content-length'))
424 if self._result['headers'].get('x-keep-locator'):
425 # This is a response to a remote block copy request, return
426 # the local copy block locator.
427 return self._result['headers'].get('x-keep-locator')
430 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
431 self._result['status_code'],
432 len(self._result['body']),
434 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
436 if self.download_counter:
437 self.download_counter.add(len(self._result['body']))
438 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
439 if resp_md5 != locator.md5sum:
440 _logger.warning("Checksum fail: md5(%s) = %s",
442 self._result['error'] = arvados.errors.HttpError(
445 return self._result['body']
447 def put(self, hash_s, body, timeout=None):
448 url = self.root + hash_s
449 _logger.debug("Request: PUT %s", url)
450 curl = self._get_user_agent()
453 with timer.Timer() as t:
455 body_reader = BytesIO(body)
456 response_body = BytesIO()
457 curl.setopt(pycurl.NOSIGNAL, 1)
458 curl.setopt(pycurl.OPENSOCKETFUNCTION,
459 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
460 curl.setopt(pycurl.URL, url.encode('utf-8'))
461 # Using UPLOAD tells cURL to wait for a "go ahead" from the
462 # Keep server (in the form of a HTTP/1.1 "100 Continue"
463 # response) instead of sending the request body immediately.
464 # This allows the server to reject the request if the request
465 # is invalid or the server is read-only, without waiting for
466 # the client to send the entire block.
467 curl.setopt(pycurl.UPLOAD, True)
468 curl.setopt(pycurl.INFILESIZE, len(body))
469 curl.setopt(pycurl.READFUNCTION, body_reader.read)
470 curl.setopt(pycurl.HTTPHEADER, [
471 '{}: {}'.format(k,v) for k,v in self.put_headers.items()])
472 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
473 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
475 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
476 self._setcurltimeouts(curl, timeout)
479 except Exception as e:
480 raise arvados.errors.HttpError(0, str(e))
486 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
487 'body': response_body.getvalue().decode('utf-8'),
488 'headers': self._headers,
491 ok = retry.check_http_response_success(self._result['status_code'])
493 self._result['error'] = arvados.errors.HttpError(
494 self._result['status_code'],
495 self._headers.get('x-status-line', 'Error'))
496 except self.HTTP_ERRORS as e:
500 self._usable = ok != False # still usable if ok is True or None
501 if self._result.get('status_code', None):
502 # Client is functional. See comment in get().
503 self._put_user_agent(curl)
507 _logger.debug("Request fail: PUT %s => %s: %s",
508 url, type(self._result['error']), str(self._result['error']))
510 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
511 self._result['status_code'],
514 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
515 if self.upload_counter:
516 self.upload_counter.add(len(body))
519 def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
522 elif isinstance(timeouts, tuple):
523 if len(timeouts) == 2:
524 conn_t, xfer_t = timeouts
525 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
527 conn_t, xfer_t, bandwidth_bps = timeouts
529 conn_t, xfer_t = (timeouts, timeouts)
530 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
531 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
532 if not ignore_bandwidth:
533 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
534 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
536 def _headerfunction(self, header_line):
537 if isinstance(header_line, bytes):
538 header_line = header_line.decode('iso-8859-1')
539 if ':' in header_line:
540 name, value = header_line.split(':', 1)
541 name = name.strip().lower()
542 value = value.strip()
544 name = self._lastheadername
545 value = self._headers[name] + ' ' + header_line.strip()
546 elif header_line.startswith('HTTP/'):
547 name = 'x-status-line'
550 _logger.error("Unexpected header line: %s", header_line)
552 self._lastheadername = name
553 self._headers[name] = value
554 # Returning None implies all bytes were written
557 class KeepWriterQueue(queue.Queue):
558 def __init__(self, copies):
559 queue.Queue.__init__(self) # Old-style superclass
560 self.wanted_copies = copies
561 self.successful_copies = 0
563 self.successful_copies_lock = threading.Lock()
564 self.pending_tries = copies
565 self.pending_tries_notification = threading.Condition()
567 def write_success(self, response, replicas_nr):
568 with self.successful_copies_lock:
569 self.successful_copies += replicas_nr
570 self.response = response
571 with self.pending_tries_notification:
572 self.pending_tries_notification.notify_all()
574 def write_fail(self, ks):
575 with self.pending_tries_notification:
576 self.pending_tries += 1
577 self.pending_tries_notification.notify()
579 def pending_copies(self):
580 with self.successful_copies_lock:
581 return self.wanted_copies - self.successful_copies
583 def get_next_task(self):
584 with self.pending_tries_notification:
586 if self.pending_copies() < 1:
587 # This notify_all() is unnecessary --
588 # write_success() already called notify_all()
589 # when pending<1 became true, so it's not
590 # possible for any other thread to be in
591 # wait() now -- but it's cheap insurance
592 # against deadlock so we do it anyway:
593 self.pending_tries_notification.notify_all()
594 # Drain the queue and then raise Queue.Empty
598 elif self.pending_tries > 0:
599 service, service_root = self.get_nowait()
600 if service.finished():
603 self.pending_tries -= 1
604 return service, service_root
606 self.pending_tries_notification.notify_all()
609 self.pending_tries_notification.wait()
612 class KeepWriterThreadPool(object):
613 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
614 self.total_task_nr = 0
615 self.wanted_copies = copies
616 if (not max_service_replicas) or (max_service_replicas >= copies):
619 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
620 _logger.debug("Pool max threads is %d", num_threads)
622 self.queue = KeepClient.KeepWriterQueue(copies)
624 for _ in range(num_threads):
625 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
626 self.workers.append(w)
628 def add_task(self, ks, service_root):
629 self.queue.put((ks, service_root))
630 self.total_task_nr += 1
633 return self.queue.successful_copies
637 for worker in self.workers:
639 # Wait for finished work
643 return self.queue.response
646 class KeepWriterThread(threading.Thread):
647 TaskFailed = RuntimeError()
649 def __init__(self, queue, data, data_hash, timeout=None):
650 super(KeepClient.KeepWriterThread, self).__init__()
651 self.timeout = timeout
654 self.data_hash = data_hash
660 service, service_root = self.queue.get_next_task()
664 locator, copies = self.do_task(service, service_root)
665 except Exception as e:
666 if e is not self.TaskFailed:
667 _logger.exception("Exception in KeepWriterThread")
668 self.queue.write_fail(service)
670 self.queue.write_success(locator, copies)
672 self.queue.task_done()
674 def do_task(self, service, service_root):
675 success = bool(service.put(self.data_hash,
677 timeout=self.timeout))
678 result = service.last_result()
681 if result.get('status_code', None):
682 _logger.debug("Request fail: PUT %s => %s %s",
684 result['status_code'],
686 raise self.TaskFailed
688 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
689 str(threading.current_thread()),
694 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
695 except (KeyError, ValueError):
698 return result['body'].strip(), replicas_stored
701 def __init__(self, api_client=None, proxy=None,
702 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
703 api_token=None, local_store=None, block_cache=None,
704 num_retries=0, session=None):
705 """Initialize a new KeepClient.
709 The API client to use to find Keep services. If not
710 provided, KeepClient will build one from available Arvados
714 If specified, this KeepClient will send requests to this Keep
715 proxy. Otherwise, KeepClient will fall back to the setting of the
716 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
717 If you want to KeepClient does not use a proxy, pass in an empty
721 The initial timeout (in seconds) for HTTP requests to Keep
722 non-proxy servers. A tuple of three floats is interpreted as
723 (connection_timeout, read_timeout, minimum_bandwidth). A connection
724 will be aborted if the average traffic rate falls below
725 minimum_bandwidth bytes per second over an interval of read_timeout
726 seconds. Because timeouts are often a result of transient server
727 load, the actual connection timeout will be increased by a factor
728 of two on each retry.
729 Default: (2, 256, 32768).
732 The initial timeout (in seconds) for HTTP requests to
733 Keep proxies. A tuple of three floats is interpreted as
734 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
735 described above for adjusting connection timeouts on retry also
737 Default: (20, 256, 32768).
740 If you're not using an API client, but only talking
741 directly to a Keep proxy, this parameter specifies an API token
742 to authenticate Keep requests. It is an error to specify both
743 api_client and api_token. If you specify neither, KeepClient
744 will use one available from the Arvados configuration.
747 If specified, this KeepClient will bypass Keep
748 services, and save data to the named directory. If unspecified,
749 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
750 environment variable. If you want to ensure KeepClient does not
751 use local storage, pass in an empty string. This is primarily
752 intended to mock a server for testing.
755 The default number of times to retry failed requests.
756 This will be used as the default num_retries value when get() and
757 put() are called. Default 0.
759 self.lock = threading.Lock()
761 if config.get('ARVADOS_KEEP_SERVICES'):
762 proxy = config.get('ARVADOS_KEEP_SERVICES')
764 proxy = config.get('ARVADOS_KEEP_PROXY')
765 if api_token is None:
766 if api_client is None:
767 api_token = config.get('ARVADOS_API_TOKEN')
769 api_token = api_client.api_token
770 elif api_client is not None:
772 "can't build KeepClient with both API client and token")
773 if local_store is None:
774 local_store = os.environ.get('KEEP_LOCAL_STORE')
776 if api_client is None:
777 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
779 self.insecure = api_client.insecure
781 self.block_cache = block_cache if block_cache else KeepBlockCache()
782 self.timeout = timeout
783 self.proxy_timeout = proxy_timeout
784 self._user_agent_pool = queue.LifoQueue()
785 self.upload_counter = Counter()
786 self.download_counter = Counter()
787 self.put_counter = Counter()
788 self.get_counter = Counter()
789 self.hits_counter = Counter()
790 self.misses_counter = Counter()
793 self.local_store = local_store
794 self.head = self.local_store_head
795 self.get = self.local_store_get
796 self.put = self.local_store_put
798 self.num_retries = num_retries
799 self.max_replicas_per_service = None
801 proxy_uris = proxy.split()
802 for i in range(len(proxy_uris)):
803 if not proxy_uris[i].endswith('/'):
806 url = urllib.parse.urlparse(proxy_uris[i])
807 if not (url.scheme and url.netloc):
808 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
809 self.api_token = api_token
810 self._gateway_services = {}
811 self._keep_services = [{
812 'uuid': "00000-bi6l4-%015d" % idx,
813 'service_type': 'proxy',
814 '_service_root': uri,
815 } for idx, uri in enumerate(proxy_uris)]
816 self._writable_services = self._keep_services
817 self.using_proxy = True
818 self._static_services_list = True
820 # It's important to avoid instantiating an API client
821 # unless we actually need one, for testing's sake.
822 if api_client is None:
823 api_client = arvados.api('v1')
824 self.api_client = api_client
825 self.api_token = api_client.api_token
826 self._gateway_services = {}
827 self._keep_services = None
828 self._writable_services = None
829 self.using_proxy = None
830 self._static_services_list = False
832 def current_timeout(self, attempt_number):
833 """Return the appropriate timeout to use for this client.
835 The proxy timeout setting if the backend service is currently a proxy,
836 the regular timeout setting otherwise. The `attempt_number` indicates
837 how many times the operation has been tried already (starting from 0
838 for the first try), and scales the connection timeout portion of the
839 return value accordingly.
842 # TODO(twp): the timeout should be a property of a
843 # KeepService, not a KeepClient. See #4488.
844 t = self.proxy_timeout if self.using_proxy else self.timeout
846 return (t[0] * (1 << attempt_number), t[1])
848 return (t[0] * (1 << attempt_number), t[1], t[2])
849 def _any_nondisk_services(self, service_list):
850 return any(ks.get('service_type', 'disk') != 'disk'
851 for ks in service_list)
853 def build_services_list(self, force_rebuild=False):
854 if (self._static_services_list or
855 (self._keep_services and not force_rebuild)):
859 keep_services = self.api_client.keep_services().accessible()
860 except Exception: # API server predates Keep services.
861 keep_services = self.api_client.keep_disks().list()
863 # Gateway services are only used when specified by UUID,
864 # so there's nothing to gain by filtering them by
866 self._gateway_services = {ks['uuid']: ks for ks in
867 keep_services.execute()['items']}
868 if not self._gateway_services:
869 raise arvados.errors.NoKeepServersError()
871 # Precompute the base URI for each service.
872 for r in self._gateway_services.values():
873 host = r['service_host']
874 if not host.startswith('[') and host.find(':') >= 0:
875 # IPv6 URIs must be formatted like http://[::1]:80/...
876 host = '[' + host + ']'
877 r['_service_root'] = "{}://{}:{:d}/".format(
878 'https' if r['service_ssl_flag'] else 'http',
882 _logger.debug(str(self._gateway_services))
883 self._keep_services = [
884 ks for ks in self._gateway_services.values()
885 if not ks.get('service_type', '').startswith('gateway:')]
886 self._writable_services = [ks for ks in self._keep_services
887 if not ks.get('read_only')]
889 # For disk type services, max_replicas_per_service is 1
890 # It is unknown (unlimited) for other service types.
891 if self._any_nondisk_services(self._writable_services):
892 self.max_replicas_per_service = None
894 self.max_replicas_per_service = 1
896 def _service_weight(self, data_hash, service_uuid):
897 """Compute the weight of a Keep service endpoint for a data
898 block with a known hash.
900 The weight is md5(h + u) where u is the last 15 characters of
901 the service endpoint's UUID.
903 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
905 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
906 """Return an array of Keep service endpoints, in the order in
907 which they should be probed when reading or writing data with
908 the given hash+hints.
910 self.build_services_list(force_rebuild)
913 # Use the services indicated by the given +K@... remote
914 # service hints, if any are present and can be resolved to a
916 for hint in locator.hints:
917 if hint.startswith('K@'):
920 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
921 elif len(hint) == 29:
922 svc = self._gateway_services.get(hint[2:])
924 sorted_roots.append(svc['_service_root'])
926 # Sort the available local services by weight (heaviest first)
927 # for this locator, and return their service_roots (base URIs)
929 use_services = self._keep_services
931 use_services = self._writable_services
932 self.using_proxy = self._any_nondisk_services(use_services)
933 sorted_roots.extend([
934 svc['_service_root'] for svc in sorted(
937 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
938 _logger.debug("{}: {}".format(locator, sorted_roots))
941 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
942 # roots_map is a dictionary, mapping Keep service root strings
943 # to KeepService objects. Poll for Keep services, and add any
944 # new ones to roots_map. Return the current list of local
946 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
947 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
948 for root in local_roots:
949 if root not in roots_map:
950 roots_map[root] = self.KeepService(
951 root, self._user_agent_pool,
952 upload_counter=self.upload_counter,
953 download_counter=self.download_counter,
955 insecure=self.insecure)
959 def _check_loop_result(result):
960 # KeepClient RetryLoops should save results as a 2-tuple: the
961 # actual result of the request, and the number of servers available
962 # to receive the request this round.
963 # This method returns True if there's a real result, False if
964 # there are no more servers available, otherwise None.
965 if isinstance(result, Exception):
967 result, tried_server_count = result
968 if (result is not None) and (result is not False):
970 elif tried_server_count < 1:
971 _logger.info("No more Keep services to try; giving up")
976 def get_from_cache(self, loc):
977 """Fetch a block only if is in the cache, otherwise return None."""
978 slot = self.block_cache.get(loc)
979 if slot is not None and slot.ready.is_set():
984 def refresh_signature(self, loc):
985 """Ask Keep to get the remote block and return its local signature"""
986 now = datetime.datetime.utcnow().isoformat("T") + 'Z'
987 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
990 def head(self, loc_s, **kwargs):
991 return self._get_or_head(loc_s, method="HEAD", **kwargs)
994 def get(self, loc_s, **kwargs):
995 return self._get_or_head(loc_s, method="GET", **kwargs)
997 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None):
998 """Get data from Keep.
1000 This method fetches one or more blocks of data from Keep. It
1001 sends a request each Keep service registered with the API
1002 server (or the proxy provided when this client was
1003 instantiated), then each service named in location hints, in
1004 sequence. As soon as one service provides the data, it's
1008 * loc_s: A string of one or more comma-separated locators to fetch.
1009 This method returns the concatenation of these blocks.
1010 * num_retries: The number of times to retry GET requests to
1011 *each* Keep server if it returns temporary failures, with
1012 exponential backoff. Note that, in each loop, the method may try
1013 to fetch data from every available Keep service, along with any
1014 that are named in location hints in the locator. The default value
1015 is set when the KeepClient is initialized.
1018 return ''.join(self.get(x) for x in loc_s.split(','))
1020 self.get_counter.add(1)
1025 locator = KeepLocator(loc_s)
1027 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1029 self.hits_counter.add(1)
1032 raise arvados.errors.KeepReadError(
1033 "failed to read {}".format(loc_s))
1036 self.misses_counter.add(1)
1040 headers['X-Request-Id'] = (request_id or
1041 (hasattr(self, 'api_client') and self.api_client.request_id) or
1042 arvados.util.new_request_id())
1044 # If the locator has hints specifying a prefix (indicating a
1045 # remote keepproxy) or the UUID of a local gateway service,
1046 # read data from the indicated service(s) instead of the usual
1047 # list of local disk services.
1048 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1049 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1050 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1051 for hint in locator.hints if (
1052 hint.startswith('K@') and
1054 self._gateway_services.get(hint[2:])
1056 # Map root URLs to their KeepService objects.
1058 root: self.KeepService(root, self._user_agent_pool,
1059 upload_counter=self.upload_counter,
1060 download_counter=self.download_counter,
1062 insecure=self.insecure)
1063 for root in hint_roots
1066 # See #3147 for a discussion of the loop implementation. Highlights:
1067 # * Refresh the list of Keep services after each failure, in case
1068 # it's being updated.
1069 # * Retry until we succeed, we're out of retries, or every available
1070 # service has returned permanent failure.
1073 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1075 for tries_left in loop:
1077 sorted_roots = self.map_new_services(
1079 force_rebuild=(tries_left < num_retries),
1080 need_writable=False,
1082 except Exception as error:
1083 loop.save_result(error)
1086 # Query KeepService objects that haven't returned
1087 # permanent failure, in our specified shuffle order.
1088 services_to_try = [roots_map[root]
1089 for root in sorted_roots
1090 if roots_map[root].usable()]
1091 for keep_service in services_to_try:
1092 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1093 if blob is not None:
1095 loop.save_result((blob, len(services_to_try)))
1097 # Always cache the result, then return it if we succeeded.
1101 if slot is not None:
1103 self.block_cache.cap_cache()
1105 # Q: Including 403 is necessary for the Keep tests to continue
1106 # passing, but maybe they should expect KeepReadError instead?
1107 not_founds = sum(1 for key in sorted_roots
1108 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1109 service_errors = ((key, roots_map[key].last_result()['error'])
1110 for key in sorted_roots)
1112 raise arvados.errors.KeepReadError(
1113 "failed to read {}: no Keep services available ({})".format(
1114 loc_s, loop.last_result()))
1115 elif not_founds == len(sorted_roots):
1116 raise arvados.errors.NotFoundError(
1117 "{} not found".format(loc_s), service_errors)
1119 raise arvados.errors.KeepReadError(
1120 "failed to read {}".format(loc_s), service_errors, label="service")
1123 def put(self, data, copies=2, num_retries=None, request_id=None):
1124 """Save data in Keep.
1126 This method will get a list of Keep services from the API server, and
1127 send the data to each one simultaneously in a new thread. Once the
1128 uploads are finished, if enough copies are saved, this method returns
1129 the most recent HTTP response body. If requests fail to upload
1130 enough copies, this method raises KeepWriteError.
1133 * data: The string of data to upload.
1134 * copies: The number of copies that the user requires be saved.
1136 * num_retries: The number of times to retry PUT requests to
1137 *each* Keep server if it returns temporary failures, with
1138 exponential backoff. The default value is set when the
1139 KeepClient is initialized.
1142 if not isinstance(data, bytes):
1143 data = data.encode()
1145 self.put_counter.add(1)
1147 data_hash = hashlib.md5(data).hexdigest()
1148 loc_s = data_hash + '+' + str(len(data))
1151 locator = KeepLocator(loc_s)
1154 'X-Request-Id': (request_id or
1155 (hasattr(self, 'api_client') and self.api_client.request_id) or
1156 arvados.util.new_request_id()),
1157 'X-Keep-Desired-Replicas': str(copies),
1160 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1163 for tries_left in loop:
1165 sorted_roots = self.map_new_services(
1167 force_rebuild=(tries_left < num_retries),
1170 except Exception as error:
1171 loop.save_result(error)
1174 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1175 data_hash=data_hash,
1176 copies=copies - done,
1177 max_service_replicas=self.max_replicas_per_service,
1178 timeout=self.current_timeout(num_retries - tries_left))
1179 for service_root, ks in [(root, roots_map[root])
1180 for root in sorted_roots]:
1183 writer_pool.add_task(ks, service_root)
1185 done += writer_pool.done()
1186 loop.save_result((done >= copies, writer_pool.total_task_nr))
1189 return writer_pool.response()
1191 raise arvados.errors.KeepWriteError(
1192 "failed to write {}: no Keep services available ({})".format(
1193 data_hash, loop.last_result()))
1195 service_errors = ((key, roots_map[key].last_result()['error'])
1196 for key in sorted_roots
1197 if roots_map[key].last_result()['error'])
1198 raise arvados.errors.KeepWriteError(
1199 "failed to write {} (wanted {} copies but wrote {})".format(
1200 data_hash, copies, writer_pool.done()), service_errors, label="service")
1202 def local_store_put(self, data, copies=1, num_retries=None):
1203 """A stub for put().
1205 This method is used in place of the real put() method when
1206 using local storage (see constructor's local_store argument).
1208 copies and num_retries arguments are ignored: they are here
1209 only for the sake of offering the same call signature as
1212 Data stored this way can be retrieved via local_store_get().
1214 md5 = hashlib.md5(data).hexdigest()
1215 locator = '%s+%d' % (md5, len(data))
1216 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1218 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1219 os.path.join(self.local_store, md5))
1222 def local_store_get(self, loc_s, num_retries=None):
1223 """Companion to local_store_put()."""
1225 locator = KeepLocator(loc_s)
1227 raise arvados.errors.NotFoundError(
1228 "Invalid data locator: '%s'" % loc_s)
1229 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1231 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1234 def local_store_head(self, loc_s, num_retries=None):
1235 """Companion to local_store_put()."""
1237 locator = KeepLocator(loc_s)
1239 raise arvados.errors.NotFoundError(
1240 "Invalid data locator: '%s'" % loc_s)
1241 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1243 if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1246 def is_cached(self, locator):
1247 return self.block_cache.reserve_cache(expect_hash)