1 from __future__ import absolute_import
2 from __future__ import division
3 from future import standard_library
4 standard_library.install_aliases()
5 from builtins import next
6 from builtins import str
7 from builtins import range
8 from builtins import object
25 if sys.version_info >= (3, 0):
26 from io import BytesIO
28 from cStringIO import StringIO as BytesIO
31 import arvados.config as config
33 import arvados.retry as retry
36 _logger = logging.getLogger('arvados.keep')
37 global_client_object = None
40 # Monkey patch TCP constants when not available (apple). Values sourced from:
41 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
42 if sys.platform == 'darwin':
43 if not hasattr(socket, 'TCP_KEEPALIVE'):
44 socket.TCP_KEEPALIVE = 0x010
45 if not hasattr(socket, 'TCP_KEEPINTVL'):
46 socket.TCP_KEEPINTVL = 0x101
47 if not hasattr(socket, 'TCP_KEEPCNT'):
48 socket.TCP_KEEPCNT = 0x102
51 class KeepLocator(object):
52 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
53 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
55 def __init__(self, locator_str):
58 self._perm_expiry = None
59 pieces = iter(locator_str.split('+'))
60 self.md5sum = next(pieces)
62 self.size = int(next(pieces))
66 if self.HINT_RE.match(hint) is None:
67 raise ValueError("invalid hint format: {}".format(hint))
68 elif hint.startswith('A'):
69 self.parse_permission_hint(hint)
71 self.hints.append(hint)
75 str(s) for s in [self.md5sum, self.size,
76 self.permission_hint()] + self.hints
80 if self.size is not None:
81 return "%s+%i" % (self.md5sum, self.size)
85 def _make_hex_prop(name, length):
86 # Build and return a new property with the given name that
87 # must be a hex string of the given length.
88 data_name = '_{}'.format(name)
90 return getattr(self, data_name)
91 def setter(self, hex_str):
92 if not arvados.util.is_hex(hex_str, length):
93 raise ValueError("{} is not a {}-digit hex string: {!r}".
94 format(name, length, hex_str))
95 setattr(self, data_name, hex_str)
96 return property(getter, setter)
98 md5sum = _make_hex_prop('md5sum', 32)
99 perm_sig = _make_hex_prop('perm_sig', 40)
102 def perm_expiry(self):
103 return self._perm_expiry
106 def perm_expiry(self, value):
107 if not arvados.util.is_hex(value, 1, 8):
109 "permission timestamp must be a hex Unix timestamp: {}".
111 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
113 def permission_hint(self):
114 data = [self.perm_sig, self.perm_expiry]
117 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
118 return "A{}@{:08x}".format(*data)
120 def parse_permission_hint(self, s):
122 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
124 raise ValueError("bad permission hint {}".format(s))
126 def permission_expired(self, as_of_dt=None):
127 if self.perm_expiry is None:
129 elif as_of_dt is None:
130 as_of_dt = datetime.datetime.now()
131 return self.perm_expiry <= as_of_dt
135 """Simple interface to a global KeepClient object.
137 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
138 own API client. The global KeepClient will build an API client from the
139 current Arvados configuration, which may not match the one you built.
144 def global_client_object(cls):
145 global global_client_object
146 # Previously, KeepClient would change its behavior at runtime based
147 # on these configuration settings. We simulate that behavior here
148 # by checking the values and returning a new KeepClient if any of
150 key = (config.get('ARVADOS_API_HOST'),
151 config.get('ARVADOS_API_TOKEN'),
152 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
153 config.get('ARVADOS_KEEP_PROXY'),
154 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
155 os.environ.get('KEEP_LOCAL_STORE'))
156 if (global_client_object is None) or (cls._last_key != key):
157 global_client_object = KeepClient()
159 return global_client_object
162 def get(locator, **kwargs):
163 return Keep.global_client_object().get(locator, **kwargs)
166 def put(data, **kwargs):
167 return Keep.global_client_object().put(data, **kwargs)
169 class KeepBlockCache(object):
170 # Default RAM cache is 256MiB
171 def __init__(self, cache_max=(256 * 1024 * 1024)):
172 self.cache_max = cache_max
174 self._cache_lock = threading.Lock()
176 class CacheSlot(object):
177 __slots__ = ("locator", "ready", "content")
179 def __init__(self, locator):
180 self.locator = locator
181 self.ready = threading.Event()
188 def set(self, value):
193 if self.content is None:
196 return len(self.content)
199 '''Cap the cache size to self.cache_max'''
200 with self._cache_lock:
201 # Select all slots except those where ready.is_set() and content is
202 # None (that means there was an error reading the block).
203 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
204 sm = sum([slot.size() for slot in self._cache])
205 while len(self._cache) > 0 and sm > self.cache_max:
206 for i in range(len(self._cache)-1, -1, -1):
207 if self._cache[i].ready.is_set():
210 sm = sum([slot.size() for slot in self._cache])
212 def _get(self, locator):
213 # Test if the locator is already in the cache
214 for i in range(0, len(self._cache)):
215 if self._cache[i].locator == locator:
218 # move it to the front
220 self._cache.insert(0, n)
224 def get(self, locator):
225 with self._cache_lock:
226 return self._get(locator)
228 def reserve_cache(self, locator):
229 '''Reserve a cache slot for the specified locator,
230 or return the existing slot.'''
231 with self._cache_lock:
232 n = self._get(locator)
236 # Add a new cache slot for the locator
237 n = KeepBlockCache.CacheSlot(locator)
238 self._cache.insert(0, n)
241 class Counter(object):
242 def __init__(self, v=0):
243 self._lk = threading.Lock()
255 class KeepClient(object):
257 # Default Keep server connection timeout: 2 seconds
258 # Default Keep server read timeout: 256 seconds
259 # Default Keep server bandwidth minimum: 32768 bytes per second
260 # Default Keep proxy connection timeout: 20 seconds
261 # Default Keep proxy read timeout: 256 seconds
262 # Default Keep proxy bandwidth minimum: 32768 bytes per second
263 DEFAULT_TIMEOUT = (2, 256, 32768)
264 DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
267 class KeepService(object):
268 """Make requests to a single Keep service, and track results.
270 A KeepService is intended to last long enough to perform one
271 transaction (GET or PUT) against one Keep service. This can
272 involve calling either get() or put() multiple times in order
273 to retry after transient failures. However, calling both get()
274 and put() on a single instance -- or using the same instance
275 to access two different Keep services -- will not produce
282 arvados.errors.HttpError,
285 def __init__(self, root, user_agent_pool=queue.LifoQueue(),
287 download_counter=None, **headers):
289 self._user_agent_pool = user_agent_pool
290 self._result = {'error': None}
293 self.get_headers = {'Accept': 'application/octet-stream'}
294 self.get_headers.update(headers)
295 self.put_headers = headers
296 self.upload_counter = upload_counter
297 self.download_counter = download_counter
300 """Is it worth attempting a request?"""
304 """Did the request succeed or encounter permanent failure?"""
305 return self._result['error'] == False or not self._usable
307 def last_result(self):
310 def _get_user_agent(self):
312 return self._user_agent_pool.get(block=False)
316 def _put_user_agent(self, ua):
319 self._user_agent_pool.put(ua, block=False)
324 def _socket_open(family, socktype, protocol, address=None):
325 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
326 s = socket.socket(family, socktype, protocol)
327 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
328 # Will throw invalid protocol error on mac. This test prevents that.
329 if hasattr(socket, 'TCP_KEEPIDLE'):
330 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
331 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
334 def get(self, locator, method="GET", timeout=None):
335 # locator is a KeepLocator object.
336 url = self.root + str(locator)
337 _logger.debug("Request: %s %s", method, url)
338 curl = self._get_user_agent()
341 with timer.Timer() as t:
343 response_body = BytesIO()
344 curl.setopt(pycurl.NOSIGNAL, 1)
345 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
346 curl.setopt(pycurl.URL, url.encode('utf-8'))
347 curl.setopt(pycurl.HTTPHEADER, [
348 '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
349 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
350 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
352 curl.setopt(pycurl.NOBODY, True)
353 self._setcurltimeouts(curl, timeout)
357 except Exception as e:
358 raise arvados.errors.HttpError(0, str(e))
360 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
361 'body': response_body.getvalue(),
362 'headers': self._headers,
366 ok = retry.check_http_response_success(self._result['status_code'])
368 self._result['error'] = arvados.errors.HttpError(
369 self._result['status_code'],
370 self._headers.get('x-status-line', 'Error'))
371 except self.HTTP_ERRORS as e:
375 self._usable = ok != False
376 if self._result.get('status_code', None):
377 # The client worked well enough to get an HTTP status
378 # code, so presumably any problems are just on the
379 # server side and it's OK to reuse the client.
380 self._put_user_agent(curl)
382 # Don't return this client to the pool, in case it's
386 _logger.debug("Request fail: GET %s => %s: %s",
387 url, type(self._result['error']), str(self._result['error']))
390 _logger.info("HEAD %s: %s bytes",
391 self._result['status_code'],
392 self._result.get('content-length'))
395 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
396 self._result['status_code'],
397 len(self._result['body']),
399 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
401 if self.download_counter:
402 self.download_counter.add(len(self._result['body']))
403 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
404 if resp_md5 != locator.md5sum:
405 _logger.warning("Checksum fail: md5(%s) = %s",
407 self._result['error'] = arvados.errors.HttpError(
410 return self._result['body']
412 def put(self, hash_s, body, timeout=None):
413 url = self.root + hash_s
414 _logger.debug("Request: PUT %s", url)
415 curl = self._get_user_agent()
418 with timer.Timer() as t:
420 body_reader = BytesIO(body)
421 response_body = BytesIO()
422 curl.setopt(pycurl.NOSIGNAL, 1)
423 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
424 curl.setopt(pycurl.URL, url.encode('utf-8'))
425 # Using UPLOAD tells cURL to wait for a "go ahead" from the
426 # Keep server (in the form of a HTTP/1.1 "100 Continue"
427 # response) instead of sending the request body immediately.
428 # This allows the server to reject the request if the request
429 # is invalid or the server is read-only, without waiting for
430 # the client to send the entire block.
431 curl.setopt(pycurl.UPLOAD, True)
432 curl.setopt(pycurl.INFILESIZE, len(body))
433 curl.setopt(pycurl.READFUNCTION, body_reader.read)
434 curl.setopt(pycurl.HTTPHEADER, [
435 '{}: {}'.format(k,v) for k,v in self.put_headers.items()])
436 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
437 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
438 self._setcurltimeouts(curl, timeout)
441 except Exception as e:
442 raise arvados.errors.HttpError(0, str(e))
444 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
445 'body': response_body.getvalue().decode('utf-8'),
446 'headers': self._headers,
449 ok = retry.check_http_response_success(self._result['status_code'])
451 self._result['error'] = arvados.errors.HttpError(
452 self._result['status_code'],
453 self._headers.get('x-status-line', 'Error'))
454 except self.HTTP_ERRORS as e:
458 self._usable = ok != False # still usable if ok is True or None
459 if self._result.get('status_code', None):
460 # Client is functional. See comment in get().
461 self._put_user_agent(curl)
465 _logger.debug("Request fail: PUT %s => %s: %s",
466 url, type(self._result['error']), str(self._result['error']))
468 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
469 self._result['status_code'],
472 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
473 if self.upload_counter:
474 self.upload_counter.add(len(body))
477 def _setcurltimeouts(self, curl, timeouts):
480 elif isinstance(timeouts, tuple):
481 if len(timeouts) == 2:
482 conn_t, xfer_t = timeouts
483 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
485 conn_t, xfer_t, bandwidth_bps = timeouts
487 conn_t, xfer_t = (timeouts, timeouts)
488 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
489 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
490 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
491 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
493 def _headerfunction(self, header_line):
494 if isinstance(header_line, bytes):
495 header_line = header_line.decode('iso-8859-1')
496 if ':' in header_line:
497 name, value = header_line.split(':', 1)
498 name = name.strip().lower()
499 value = value.strip()
501 name = self._lastheadername
502 value = self._headers[name] + ' ' + header_line.strip()
503 elif header_line.startswith('HTTP/'):
504 name = 'x-status-line'
507 _logger.error("Unexpected header line: %s", header_line)
509 self._lastheadername = name
510 self._headers[name] = value
511 # Returning None implies all bytes were written
514 class KeepWriterQueue(queue.Queue):
515 def __init__(self, copies):
516 queue.Queue.__init__(self) # Old-style superclass
517 self.wanted_copies = copies
518 self.successful_copies = 0
520 self.successful_copies_lock = threading.Lock()
521 self.pending_tries = copies
522 self.pending_tries_notification = threading.Condition()
524 def write_success(self, response, replicas_nr):
525 with self.successful_copies_lock:
526 self.successful_copies += replicas_nr
527 self.response = response
528 with self.pending_tries_notification:
529 self.pending_tries_notification.notify_all()
531 def write_fail(self, ks):
532 with self.pending_tries_notification:
533 self.pending_tries += 1
534 self.pending_tries_notification.notify()
536 def pending_copies(self):
537 with self.successful_copies_lock:
538 return self.wanted_copies - self.successful_copies
540 def get_next_task(self):
541 with self.pending_tries_notification:
543 if self.pending_copies() < 1:
544 # This notify_all() is unnecessary --
545 # write_success() already called notify_all()
546 # when pending<1 became true, so it's not
547 # possible for any other thread to be in
548 # wait() now -- but it's cheap insurance
549 # against deadlock so we do it anyway:
550 self.pending_tries_notification.notify_all()
551 # Drain the queue and then raise Queue.Empty
555 elif self.pending_tries > 0:
556 service, service_root = self.get_nowait()
557 if service.finished():
560 self.pending_tries -= 1
561 return service, service_root
563 self.pending_tries_notification.notify_all()
566 self.pending_tries_notification.wait()
569 class KeepWriterThreadPool(object):
570 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
571 self.total_task_nr = 0
572 self.wanted_copies = copies
573 if (not max_service_replicas) or (max_service_replicas >= copies):
576 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
577 _logger.debug("Pool max threads is %d", num_threads)
579 self.queue = KeepClient.KeepWriterQueue(copies)
581 for _ in range(num_threads):
582 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
583 self.workers.append(w)
585 def add_task(self, ks, service_root):
586 self.queue.put((ks, service_root))
587 self.total_task_nr += 1
590 return self.queue.successful_copies
594 for worker in self.workers:
596 # Wait for finished work
600 return self.queue.response
603 class KeepWriterThread(threading.Thread):
604 TaskFailed = RuntimeError()
606 def __init__(self, queue, data, data_hash, timeout=None):
607 super(KeepClient.KeepWriterThread, self).__init__()
608 self.timeout = timeout
611 self.data_hash = data_hash
617 service, service_root = self.queue.get_next_task()
621 locator, copies = self.do_task(service, service_root)
622 except Exception as e:
623 if e is not self.TaskFailed:
624 _logger.exception("Exception in KeepWriterThread")
625 self.queue.write_fail(service)
627 self.queue.write_success(locator, copies)
629 self.queue.task_done()
631 def do_task(self, service, service_root):
632 success = bool(service.put(self.data_hash,
634 timeout=self.timeout))
635 result = service.last_result()
638 if result.get('status_code', None):
639 _logger.debug("Request fail: PUT %s => %s %s",
641 result['status_code'],
643 raise self.TaskFailed
645 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
646 str(threading.current_thread()),
651 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
652 except (KeyError, ValueError):
655 return result['body'].strip(), replicas_stored
658 def __init__(self, api_client=None, proxy=None,
659 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
660 api_token=None, local_store=None, block_cache=None,
661 num_retries=0, session=None):
662 """Initialize a new KeepClient.
666 The API client to use to find Keep services. If not
667 provided, KeepClient will build one from available Arvados
671 If specified, this KeepClient will send requests to this Keep
672 proxy. Otherwise, KeepClient will fall back to the setting of the
673 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
674 If you want to KeepClient does not use a proxy, pass in an empty
678 The initial timeout (in seconds) for HTTP requests to Keep
679 non-proxy servers. A tuple of three floats is interpreted as
680 (connection_timeout, read_timeout, minimum_bandwidth). A connection
681 will be aborted if the average traffic rate falls below
682 minimum_bandwidth bytes per second over an interval of read_timeout
683 seconds. Because timeouts are often a result of transient server
684 load, the actual connection timeout will be increased by a factor
685 of two on each retry.
686 Default: (2, 256, 32768).
689 The initial timeout (in seconds) for HTTP requests to
690 Keep proxies. A tuple of three floats is interpreted as
691 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
692 described above for adjusting connection timeouts on retry also
694 Default: (20, 256, 32768).
697 If you're not using an API client, but only talking
698 directly to a Keep proxy, this parameter specifies an API token
699 to authenticate Keep requests. It is an error to specify both
700 api_client and api_token. If you specify neither, KeepClient
701 will use one available from the Arvados configuration.
704 If specified, this KeepClient will bypass Keep
705 services, and save data to the named directory. If unspecified,
706 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
707 environment variable. If you want to ensure KeepClient does not
708 use local storage, pass in an empty string. This is primarily
709 intended to mock a server for testing.
712 The default number of times to retry failed requests.
713 This will be used as the default num_retries value when get() and
714 put() are called. Default 0.
716 self.lock = threading.Lock()
718 if config.get('ARVADOS_KEEP_SERVICES'):
719 proxy = config.get('ARVADOS_KEEP_SERVICES')
721 proxy = config.get('ARVADOS_KEEP_PROXY')
722 if api_token is None:
723 if api_client is None:
724 api_token = config.get('ARVADOS_API_TOKEN')
726 api_token = api_client.api_token
727 elif api_client is not None:
729 "can't build KeepClient with both API client and token")
730 if local_store is None:
731 local_store = os.environ.get('KEEP_LOCAL_STORE')
733 self.block_cache = block_cache if block_cache else KeepBlockCache()
734 self.timeout = timeout
735 self.proxy_timeout = proxy_timeout
736 self._user_agent_pool = queue.LifoQueue()
737 self.upload_counter = Counter()
738 self.download_counter = Counter()
739 self.put_counter = Counter()
740 self.get_counter = Counter()
741 self.hits_counter = Counter()
742 self.misses_counter = Counter()
745 self.local_store = local_store
746 self.get = self.local_store_get
747 self.put = self.local_store_put
749 self.num_retries = num_retries
750 self.max_replicas_per_service = None
752 proxy_uris = proxy.split()
753 for i in range(len(proxy_uris)):
754 if not proxy_uris[i].endswith('/'):
757 url = urllib.parse.urlparse(proxy_uris[i])
758 if not (url.scheme and url.netloc):
759 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
760 self.api_token = api_token
761 self._gateway_services = {}
762 self._keep_services = [{
763 'uuid': "00000-bi6l4-%015d" % idx,
764 'service_type': 'proxy',
765 '_service_root': uri,
766 } for idx, uri in enumerate(proxy_uris)]
767 self._writable_services = self._keep_services
768 self.using_proxy = True
769 self._static_services_list = True
771 # It's important to avoid instantiating an API client
772 # unless we actually need one, for testing's sake.
773 if api_client is None:
774 api_client = arvados.api('v1')
775 self.api_client = api_client
776 self.api_token = api_client.api_token
777 self._gateway_services = {}
778 self._keep_services = None
779 self._writable_services = None
780 self.using_proxy = None
781 self._static_services_list = False
783 def current_timeout(self, attempt_number):
784 """Return the appropriate timeout to use for this client.
786 The proxy timeout setting if the backend service is currently a proxy,
787 the regular timeout setting otherwise. The `attempt_number` indicates
788 how many times the operation has been tried already (starting from 0
789 for the first try), and scales the connection timeout portion of the
790 return value accordingly.
793 # TODO(twp): the timeout should be a property of a
794 # KeepService, not a KeepClient. See #4488.
795 t = self.proxy_timeout if self.using_proxy else self.timeout
797 return (t[0] * (1 << attempt_number), t[1])
799 return (t[0] * (1 << attempt_number), t[1], t[2])
800 def _any_nondisk_services(self, service_list):
801 return any(ks.get('service_type', 'disk') != 'disk'
802 for ks in service_list)
804 def build_services_list(self, force_rebuild=False):
805 if (self._static_services_list or
806 (self._keep_services and not force_rebuild)):
810 keep_services = self.api_client.keep_services().accessible()
811 except Exception: # API server predates Keep services.
812 keep_services = self.api_client.keep_disks().list()
814 # Gateway services are only used when specified by UUID,
815 # so there's nothing to gain by filtering them by
817 self._gateway_services = {ks['uuid']: ks for ks in
818 keep_services.execute()['items']}
819 if not self._gateway_services:
820 raise arvados.errors.NoKeepServersError()
822 # Precompute the base URI for each service.
823 for r in self._gateway_services.values():
824 host = r['service_host']
825 if not host.startswith('[') and host.find(':') >= 0:
826 # IPv6 URIs must be formatted like http://[::1]:80/...
827 host = '[' + host + ']'
828 r['_service_root'] = "{}://{}:{:d}/".format(
829 'https' if r['service_ssl_flag'] else 'http',
833 _logger.debug(str(self._gateway_services))
834 self._keep_services = [
835 ks for ks in self._gateway_services.values()
836 if not ks.get('service_type', '').startswith('gateway:')]
837 self._writable_services = [ks for ks in self._keep_services
838 if not ks.get('read_only')]
840 # For disk type services, max_replicas_per_service is 1
841 # It is unknown (unlimited) for other service types.
842 if self._any_nondisk_services(self._writable_services):
843 self.max_replicas_per_service = None
845 self.max_replicas_per_service = 1
847 def _service_weight(self, data_hash, service_uuid):
848 """Compute the weight of a Keep service endpoint for a data
849 block with a known hash.
851 The weight is md5(h + u) where u is the last 15 characters of
852 the service endpoint's UUID.
854 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
856 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
857 """Return an array of Keep service endpoints, in the order in
858 which they should be probed when reading or writing data with
859 the given hash+hints.
861 self.build_services_list(force_rebuild)
864 # Use the services indicated by the given +K@... remote
865 # service hints, if any are present and can be resolved to a
867 for hint in locator.hints:
868 if hint.startswith('K@'):
871 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
872 elif len(hint) == 29:
873 svc = self._gateway_services.get(hint[2:])
875 sorted_roots.append(svc['_service_root'])
877 # Sort the available local services by weight (heaviest first)
878 # for this locator, and return their service_roots (base URIs)
880 use_services = self._keep_services
882 use_services = self._writable_services
883 self.using_proxy = self._any_nondisk_services(use_services)
884 sorted_roots.extend([
885 svc['_service_root'] for svc in sorted(
888 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
889 _logger.debug("{}: {}".format(locator, sorted_roots))
892 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
893 # roots_map is a dictionary, mapping Keep service root strings
894 # to KeepService objects. Poll for Keep services, and add any
895 # new ones to roots_map. Return the current list of local
897 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
898 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
899 for root in local_roots:
900 if root not in roots_map:
901 roots_map[root] = self.KeepService(
902 root, self._user_agent_pool,
903 upload_counter=self.upload_counter,
904 download_counter=self.download_counter,
909 def _check_loop_result(result):
910 # KeepClient RetryLoops should save results as a 2-tuple: the
911 # actual result of the request, and the number of servers available
912 # to receive the request this round.
913 # This method returns True if there's a real result, False if
914 # there are no more servers available, otherwise None.
915 if isinstance(result, Exception):
917 result, tried_server_count = result
918 if (result is not None) and (result is not False):
920 elif tried_server_count < 1:
921 _logger.info("No more Keep services to try; giving up")
926 def get_from_cache(self, loc):
927 """Fetch a block only if is in the cache, otherwise return None."""
928 slot = self.block_cache.get(loc)
929 if slot is not None and slot.ready.is_set():
935 def head(self, loc_s, num_retries=None):
936 return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
939 def get(self, loc_s, num_retries=None):
940 return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
942 def _get_or_head(self, loc_s, method="GET", num_retries=None):
943 """Get data from Keep.
945 This method fetches one or more blocks of data from Keep. It
946 sends a request each Keep service registered with the API
947 server (or the proxy provided when this client was
948 instantiated), then each service named in location hints, in
949 sequence. As soon as one service provides the data, it's
953 * loc_s: A string of one or more comma-separated locators to fetch.
954 This method returns the concatenation of these blocks.
955 * num_retries: The number of times to retry GET requests to
956 *each* Keep server if it returns temporary failures, with
957 exponential backoff. Note that, in each loop, the method may try
958 to fetch data from every available Keep service, along with any
959 that are named in location hints in the locator. The default value
960 is set when the KeepClient is initialized.
963 return ''.join(self.get(x) for x in loc_s.split(','))
965 self.get_counter.add(1)
967 locator = KeepLocator(loc_s)
969 slot, first = self.block_cache.reserve_cache(locator.md5sum)
971 self.hits_counter.add(1)
975 self.misses_counter.add(1)
977 # If the locator has hints specifying a prefix (indicating a
978 # remote keepproxy) or the UUID of a local gateway service,
979 # read data from the indicated service(s) instead of the usual
980 # list of local disk services.
981 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
982 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
983 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
984 for hint in locator.hints if (
985 hint.startswith('K@') and
987 self._gateway_services.get(hint[2:])
989 # Map root URLs to their KeepService objects.
991 root: self.KeepService(root, self._user_agent_pool,
992 upload_counter=self.upload_counter,
993 download_counter=self.download_counter)
994 for root in hint_roots
997 # See #3147 for a discussion of the loop implementation. Highlights:
998 # * Refresh the list of Keep services after each failure, in case
999 # it's being updated.
1000 # * Retry until we succeed, we're out of retries, or every available
1001 # service has returned permanent failure.
1005 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1007 for tries_left in loop:
1009 sorted_roots = self.map_new_services(
1011 force_rebuild=(tries_left < num_retries),
1012 need_writable=False)
1013 except Exception as error:
1014 loop.save_result(error)
1017 # Query KeepService objects that haven't returned
1018 # permanent failure, in our specified shuffle order.
1019 services_to_try = [roots_map[root]
1020 for root in sorted_roots
1021 if roots_map[root].usable()]
1022 for keep_service in services_to_try:
1023 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1024 if blob is not None:
1026 loop.save_result((blob, len(services_to_try)))
1028 # Always cache the result, then return it if we succeeded.
1031 self.block_cache.cap_cache()
1033 if method == "HEAD":
1038 # Q: Including 403 is necessary for the Keep tests to continue
1039 # passing, but maybe they should expect KeepReadError instead?
1040 not_founds = sum(1 for key in sorted_roots
1041 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1042 service_errors = ((key, roots_map[key].last_result()['error'])
1043 for key in sorted_roots)
1045 raise arvados.errors.KeepReadError(
1046 "failed to read {}: no Keep services available ({})".format(
1047 loc_s, loop.last_result()))
1048 elif not_founds == len(sorted_roots):
1049 raise arvados.errors.NotFoundError(
1050 "{} not found".format(loc_s), service_errors)
1052 raise arvados.errors.KeepReadError(
1053 "failed to read {}".format(loc_s), service_errors, label="service")
1056 def put(self, data, copies=2, num_retries=None):
1057 """Save data in Keep.
1059 This method will get a list of Keep services from the API server, and
1060 send the data to each one simultaneously in a new thread. Once the
1061 uploads are finished, if enough copies are saved, this method returns
1062 the most recent HTTP response body. If requests fail to upload
1063 enough copies, this method raises KeepWriteError.
1066 * data: The string of data to upload.
1067 * copies: The number of copies that the user requires be saved.
1069 * num_retries: The number of times to retry PUT requests to
1070 *each* Keep server if it returns temporary failures, with
1071 exponential backoff. The default value is set when the
1072 KeepClient is initialized.
1075 if not isinstance(data, bytes):
1076 data = data.encode()
1078 self.put_counter.add(1)
1080 data_hash = hashlib.md5(data).hexdigest()
1081 loc_s = data_hash + '+' + str(len(data))
1084 locator = KeepLocator(loc_s)
1087 # Tell the proxy how many copies we want it to store
1088 headers['X-Keep-Desired-Replicas'] = str(copies)
1090 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1093 for tries_left in loop:
1095 sorted_roots = self.map_new_services(
1097 force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
1098 except Exception as error:
1099 loop.save_result(error)
1102 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1103 data_hash=data_hash,
1104 copies=copies - done,
1105 max_service_replicas=self.max_replicas_per_service,
1106 timeout=self.current_timeout(num_retries - tries_left))
1107 for service_root, ks in [(root, roots_map[root])
1108 for root in sorted_roots]:
1111 writer_pool.add_task(ks, service_root)
1113 done += writer_pool.done()
1114 loop.save_result((done >= copies, writer_pool.total_task_nr))
1117 return writer_pool.response()
1119 raise arvados.errors.KeepWriteError(
1120 "failed to write {}: no Keep services available ({})".format(
1121 data_hash, loop.last_result()))
1123 service_errors = ((key, roots_map[key].last_result()['error'])
1124 for key in sorted_roots
1125 if roots_map[key].last_result()['error'])
1126 raise arvados.errors.KeepWriteError(
1127 "failed to write {} (wanted {} copies but wrote {})".format(
1128 data_hash, copies, writer_pool.done()), service_errors, label="service")
1130 def local_store_put(self, data, copies=1, num_retries=None):
1131 """A stub for put().
1133 This method is used in place of the real put() method when
1134 using local storage (see constructor's local_store argument).
1136 copies and num_retries arguments are ignored: they are here
1137 only for the sake of offering the same call signature as
1140 Data stored this way can be retrieved via local_store_get().
1142 md5 = hashlib.md5(data).hexdigest()
1143 locator = '%s+%d' % (md5, len(data))
1144 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1146 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1147 os.path.join(self.local_store, md5))
1150 def local_store_get(self, loc_s, num_retries=None):
1151 """Companion to local_store_put()."""
1153 locator = KeepLocator(loc_s)
1155 raise arvados.errors.NotFoundError(
1156 "Invalid data locator: '%s'" % loc_s)
1157 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1159 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1162 def is_cached(self, locator):
1163 return self.block_cache.reserve_cache(expect_hash)