1 from __future__ import absolute_import
2 from __future__ import division
3 from future import standard_library
4 standard_library.install_aliases()
5 from builtins import next
6 from builtins import str
7 from builtins import range
8 from builtins import object
26 if sys.version_info >= (3, 0):
27 from io import BytesIO
29 from cStringIO import StringIO as BytesIO
32 import arvados.config as config
34 import arvados.retry as retry
37 _logger = logging.getLogger('arvados.keep')
38 global_client_object = None
41 # Monkey patch TCP constants when not available (apple). Values sourced from:
42 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
43 if sys.platform == 'darwin':
44 if not hasattr(socket, 'TCP_KEEPALIVE'):
45 socket.TCP_KEEPALIVE = 0x010
46 if not hasattr(socket, 'TCP_KEEPINTVL'):
47 socket.TCP_KEEPINTVL = 0x101
48 if not hasattr(socket, 'TCP_KEEPCNT'):
49 socket.TCP_KEEPCNT = 0x102
52 class KeepLocator(object):
53 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
54 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
56 def __init__(self, locator_str):
59 self._perm_expiry = None
60 pieces = iter(locator_str.split('+'))
61 self.md5sum = next(pieces)
63 self.size = int(next(pieces))
67 if self.HINT_RE.match(hint) is None:
68 raise ValueError("invalid hint format: {}".format(hint))
69 elif hint.startswith('A'):
70 self.parse_permission_hint(hint)
72 self.hints.append(hint)
76 str(s) for s in [self.md5sum, self.size,
77 self.permission_hint()] + self.hints
81 if self.size is not None:
82 return "%s+%i" % (self.md5sum, self.size)
86 def _make_hex_prop(name, length):
87 # Build and return a new property with the given name that
88 # must be a hex string of the given length.
89 data_name = '_{}'.format(name)
91 return getattr(self, data_name)
92 def setter(self, hex_str):
93 if not arvados.util.is_hex(hex_str, length):
94 raise ValueError("{} is not a {}-digit hex string: {!r}".
95 format(name, length, hex_str))
96 setattr(self, data_name, hex_str)
97 return property(getter, setter)
99 md5sum = _make_hex_prop('md5sum', 32)
100 perm_sig = _make_hex_prop('perm_sig', 40)
103 def perm_expiry(self):
104 return self._perm_expiry
107 def perm_expiry(self, value):
108 if not arvados.util.is_hex(value, 1, 8):
110 "permission timestamp must be a hex Unix timestamp: {}".
112 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
114 def permission_hint(self):
115 data = [self.perm_sig, self.perm_expiry]
118 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
119 return "A{}@{:08x}".format(*data)
121 def parse_permission_hint(self, s):
123 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
125 raise ValueError("bad permission hint {}".format(s))
127 def permission_expired(self, as_of_dt=None):
128 if self.perm_expiry is None:
130 elif as_of_dt is None:
131 as_of_dt = datetime.datetime.now()
132 return self.perm_expiry <= as_of_dt
136 """Simple interface to a global KeepClient object.
138 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
139 own API client. The global KeepClient will build an API client from the
140 current Arvados configuration, which may not match the one you built.
145 def global_client_object(cls):
146 global global_client_object
147 # Previously, KeepClient would change its behavior at runtime based
148 # on these configuration settings. We simulate that behavior here
149 # by checking the values and returning a new KeepClient if any of
151 key = (config.get('ARVADOS_API_HOST'),
152 config.get('ARVADOS_API_TOKEN'),
153 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
154 config.get('ARVADOS_KEEP_PROXY'),
155 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
156 os.environ.get('KEEP_LOCAL_STORE'))
157 if (global_client_object is None) or (cls._last_key != key):
158 global_client_object = KeepClient()
160 return global_client_object
163 def get(locator, **kwargs):
164 return Keep.global_client_object().get(locator, **kwargs)
167 def put(data, **kwargs):
168 return Keep.global_client_object().put(data, **kwargs)
170 class KeepBlockCache(object):
171 # Default RAM cache is 256MiB
172 def __init__(self, cache_max=(256 * 1024 * 1024)):
173 self.cache_max = cache_max
175 self._cache_lock = threading.Lock()
177 class CacheSlot(object):
178 __slots__ = ("locator", "ready", "content")
180 def __init__(self, locator):
181 self.locator = locator
182 self.ready = threading.Event()
189 def set(self, value):
194 if self.content is None:
197 return len(self.content)
200 '''Cap the cache size to self.cache_max'''
201 with self._cache_lock:
202 # Select all slots except those where ready.is_set() and content is
203 # None (that means there was an error reading the block).
204 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
205 sm = sum([slot.size() for slot in self._cache])
206 while len(self._cache) > 0 and sm > self.cache_max:
207 for i in range(len(self._cache)-1, -1, -1):
208 if self._cache[i].ready.is_set():
211 sm = sum([slot.size() for slot in self._cache])
213 def _get(self, locator):
214 # Test if the locator is already in the cache
215 for i in range(0, len(self._cache)):
216 if self._cache[i].locator == locator:
219 # move it to the front
221 self._cache.insert(0, n)
225 def get(self, locator):
226 with self._cache_lock:
227 return self._get(locator)
229 def reserve_cache(self, locator):
230 '''Reserve a cache slot for the specified locator,
231 or return the existing slot.'''
232 with self._cache_lock:
233 n = self._get(locator)
237 # Add a new cache slot for the locator
238 n = KeepBlockCache.CacheSlot(locator)
239 self._cache.insert(0, n)
242 class Counter(object):
243 def __init__(self, v=0):
244 self._lk = threading.Lock()
256 class KeepClient(object):
258 # Default Keep server connection timeout: 2 seconds
259 # Default Keep server read timeout: 256 seconds
260 # Default Keep server bandwidth minimum: 32768 bytes per second
261 # Default Keep proxy connection timeout: 20 seconds
262 # Default Keep proxy read timeout: 256 seconds
263 # Default Keep proxy bandwidth minimum: 32768 bytes per second
264 DEFAULT_TIMEOUT = (2, 256, 32768)
265 DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
268 class KeepService(object):
269 """Make requests to a single Keep service, and track results.
271 A KeepService is intended to last long enough to perform one
272 transaction (GET or PUT) against one Keep service. This can
273 involve calling either get() or put() multiple times in order
274 to retry after transient failures. However, calling both get()
275 and put() on a single instance -- or using the same instance
276 to access two different Keep services -- will not produce
283 arvados.errors.HttpError,
286 def __init__(self, root, user_agent_pool=queue.LifoQueue(),
288 download_counter=None, **headers):
290 self._user_agent_pool = user_agent_pool
291 self._result = {'error': None}
295 self.get_headers = {'Accept': 'application/octet-stream'}
296 self.get_headers.update(headers)
297 self.put_headers = headers
298 self.upload_counter = upload_counter
299 self.download_counter = download_counter
302 """Is it worth attempting a request?"""
306 """Did the request succeed or encounter permanent failure?"""
307 return self._result['error'] == False or not self._usable
309 def last_result(self):
312 def _get_user_agent(self):
314 return self._user_agent_pool.get(block=False)
318 def _put_user_agent(self, ua):
321 self._user_agent_pool.put(ua, block=False)
325 def _socket_open(self, *args, **kwargs):
326 if len(args) + len(kwargs) == 2:
327 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
329 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
331 def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
332 return self._socket_open_pycurl_7_21_5(
334 address=collections.namedtuple(
335 'Address', ['family', 'socktype', 'protocol', 'addr'],
336 )(family, socktype, protocol, address))
338 def _socket_open_pycurl_7_21_5(self, purpose, address):
339 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
340 s = socket.socket(address.family, address.socktype, address.protocol)
341 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
342 # Will throw invalid protocol error on mac. This test prevents that.
343 if hasattr(socket, 'TCP_KEEPIDLE'):
344 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
345 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
349 def get(self, locator, method="GET", timeout=None):
350 # locator is a KeepLocator object.
351 url = self.root + str(locator)
352 _logger.debug("Request: %s %s", method, url)
353 curl = self._get_user_agent()
356 with timer.Timer() as t:
358 response_body = BytesIO()
359 curl.setopt(pycurl.NOSIGNAL, 1)
360 curl.setopt(pycurl.OPENSOCKETFUNCTION,
361 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
362 curl.setopt(pycurl.URL, url.encode('utf-8'))
363 curl.setopt(pycurl.HTTPHEADER, [
364 '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
365 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
366 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
368 curl.setopt(pycurl.NOBODY, True)
369 self._setcurltimeouts(curl, timeout)
373 except Exception as e:
374 raise arvados.errors.HttpError(0, str(e))
380 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
381 'body': response_body.getvalue(),
382 'headers': self._headers,
386 ok = retry.check_http_response_success(self._result['status_code'])
388 self._result['error'] = arvados.errors.HttpError(
389 self._result['status_code'],
390 self._headers.get('x-status-line', 'Error'))
391 except self.HTTP_ERRORS as e:
395 self._usable = ok != False
396 if self._result.get('status_code', None):
397 # The client worked well enough to get an HTTP status
398 # code, so presumably any problems are just on the
399 # server side and it's OK to reuse the client.
400 self._put_user_agent(curl)
402 # Don't return this client to the pool, in case it's
406 _logger.debug("Request fail: GET %s => %s: %s",
407 url, type(self._result['error']), str(self._result['error']))
410 _logger.info("HEAD %s: %s bytes",
411 self._result['status_code'],
412 self._result.get('content-length'))
415 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
416 self._result['status_code'],
417 len(self._result['body']),
419 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
421 if self.download_counter:
422 self.download_counter.add(len(self._result['body']))
423 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
424 if resp_md5 != locator.md5sum:
425 _logger.warning("Checksum fail: md5(%s) = %s",
427 self._result['error'] = arvados.errors.HttpError(
430 return self._result['body']
432 def put(self, hash_s, body, timeout=None):
433 url = self.root + hash_s
434 _logger.debug("Request: PUT %s", url)
435 curl = self._get_user_agent()
438 with timer.Timer() as t:
440 body_reader = BytesIO(body)
441 response_body = BytesIO()
442 curl.setopt(pycurl.NOSIGNAL, 1)
443 curl.setopt(pycurl.OPENSOCKETFUNCTION,
444 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
445 curl.setopt(pycurl.URL, url.encode('utf-8'))
446 # Using UPLOAD tells cURL to wait for a "go ahead" from the
447 # Keep server (in the form of a HTTP/1.1 "100 Continue"
448 # response) instead of sending the request body immediately.
449 # This allows the server to reject the request if the request
450 # is invalid or the server is read-only, without waiting for
451 # the client to send the entire block.
452 curl.setopt(pycurl.UPLOAD, True)
453 curl.setopt(pycurl.INFILESIZE, len(body))
454 curl.setopt(pycurl.READFUNCTION, body_reader.read)
455 curl.setopt(pycurl.HTTPHEADER, [
456 '{}: {}'.format(k,v) for k,v in self.put_headers.items()])
457 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
458 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
459 self._setcurltimeouts(curl, timeout)
462 except Exception as e:
463 raise arvados.errors.HttpError(0, str(e))
469 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
470 'body': response_body.getvalue().decode('utf-8'),
471 'headers': self._headers,
474 ok = retry.check_http_response_success(self._result['status_code'])
476 self._result['error'] = arvados.errors.HttpError(
477 self._result['status_code'],
478 self._headers.get('x-status-line', 'Error'))
479 except self.HTTP_ERRORS as e:
483 self._usable = ok != False # still usable if ok is True or None
484 if self._result.get('status_code', None):
485 # Client is functional. See comment in get().
486 self._put_user_agent(curl)
490 _logger.debug("Request fail: PUT %s => %s: %s",
491 url, type(self._result['error']), str(self._result['error']))
493 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
494 self._result['status_code'],
497 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
498 if self.upload_counter:
499 self.upload_counter.add(len(body))
502 def _setcurltimeouts(self, curl, timeouts):
505 elif isinstance(timeouts, tuple):
506 if len(timeouts) == 2:
507 conn_t, xfer_t = timeouts
508 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
510 conn_t, xfer_t, bandwidth_bps = timeouts
512 conn_t, xfer_t = (timeouts, timeouts)
513 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
514 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
515 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
516 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
518 def _headerfunction(self, header_line):
519 if isinstance(header_line, bytes):
520 header_line = header_line.decode('iso-8859-1')
521 if ':' in header_line:
522 name, value = header_line.split(':', 1)
523 name = name.strip().lower()
524 value = value.strip()
526 name = self._lastheadername
527 value = self._headers[name] + ' ' + header_line.strip()
528 elif header_line.startswith('HTTP/'):
529 name = 'x-status-line'
532 _logger.error("Unexpected header line: %s", header_line)
534 self._lastheadername = name
535 self._headers[name] = value
536 # Returning None implies all bytes were written
539 class KeepWriterQueue(queue.Queue):
540 def __init__(self, copies):
541 queue.Queue.__init__(self) # Old-style superclass
542 self.wanted_copies = copies
543 self.successful_copies = 0
545 self.successful_copies_lock = threading.Lock()
546 self.pending_tries = copies
547 self.pending_tries_notification = threading.Condition()
549 def write_success(self, response, replicas_nr):
550 with self.successful_copies_lock:
551 self.successful_copies += replicas_nr
552 self.response = response
553 with self.pending_tries_notification:
554 self.pending_tries_notification.notify_all()
556 def write_fail(self, ks):
557 with self.pending_tries_notification:
558 self.pending_tries += 1
559 self.pending_tries_notification.notify()
561 def pending_copies(self):
562 with self.successful_copies_lock:
563 return self.wanted_copies - self.successful_copies
565 def get_next_task(self):
566 with self.pending_tries_notification:
568 if self.pending_copies() < 1:
569 # This notify_all() is unnecessary --
570 # write_success() already called notify_all()
571 # when pending<1 became true, so it's not
572 # possible for any other thread to be in
573 # wait() now -- but it's cheap insurance
574 # against deadlock so we do it anyway:
575 self.pending_tries_notification.notify_all()
576 # Drain the queue and then raise Queue.Empty
580 elif self.pending_tries > 0:
581 service, service_root = self.get_nowait()
582 if service.finished():
585 self.pending_tries -= 1
586 return service, service_root
588 self.pending_tries_notification.notify_all()
591 self.pending_tries_notification.wait()
594 class KeepWriterThreadPool(object):
595 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
596 self.total_task_nr = 0
597 self.wanted_copies = copies
598 if (not max_service_replicas) or (max_service_replicas >= copies):
601 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
602 _logger.debug("Pool max threads is %d", num_threads)
604 self.queue = KeepClient.KeepWriterQueue(copies)
606 for _ in range(num_threads):
607 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
608 self.workers.append(w)
610 def add_task(self, ks, service_root):
611 self.queue.put((ks, service_root))
612 self.total_task_nr += 1
615 return self.queue.successful_copies
619 for worker in self.workers:
621 # Wait for finished work
625 return self.queue.response
628 class KeepWriterThread(threading.Thread):
629 TaskFailed = RuntimeError()
631 def __init__(self, queue, data, data_hash, timeout=None):
632 super(KeepClient.KeepWriterThread, self).__init__()
633 self.timeout = timeout
636 self.data_hash = data_hash
642 service, service_root = self.queue.get_next_task()
646 locator, copies = self.do_task(service, service_root)
647 except Exception as e:
648 if e is not self.TaskFailed:
649 _logger.exception("Exception in KeepWriterThread")
650 self.queue.write_fail(service)
652 self.queue.write_success(locator, copies)
654 self.queue.task_done()
656 def do_task(self, service, service_root):
657 success = bool(service.put(self.data_hash,
659 timeout=self.timeout))
660 result = service.last_result()
663 if result.get('status_code', None):
664 _logger.debug("Request fail: PUT %s => %s %s",
666 result['status_code'],
668 raise self.TaskFailed
670 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
671 str(threading.current_thread()),
676 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
677 except (KeyError, ValueError):
680 return result['body'].strip(), replicas_stored
683 def __init__(self, api_client=None, proxy=None,
684 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
685 api_token=None, local_store=None, block_cache=None,
686 num_retries=0, session=None):
687 """Initialize a new KeepClient.
691 The API client to use to find Keep services. If not
692 provided, KeepClient will build one from available Arvados
696 If specified, this KeepClient will send requests to this Keep
697 proxy. Otherwise, KeepClient will fall back to the setting of the
698 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
699 If you want to KeepClient does not use a proxy, pass in an empty
703 The initial timeout (in seconds) for HTTP requests to Keep
704 non-proxy servers. A tuple of three floats is interpreted as
705 (connection_timeout, read_timeout, minimum_bandwidth). A connection
706 will be aborted if the average traffic rate falls below
707 minimum_bandwidth bytes per second over an interval of read_timeout
708 seconds. Because timeouts are often a result of transient server
709 load, the actual connection timeout will be increased by a factor
710 of two on each retry.
711 Default: (2, 256, 32768).
714 The initial timeout (in seconds) for HTTP requests to
715 Keep proxies. A tuple of three floats is interpreted as
716 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
717 described above for adjusting connection timeouts on retry also
719 Default: (20, 256, 32768).
722 If you're not using an API client, but only talking
723 directly to a Keep proxy, this parameter specifies an API token
724 to authenticate Keep requests. It is an error to specify both
725 api_client and api_token. If you specify neither, KeepClient
726 will use one available from the Arvados configuration.
729 If specified, this KeepClient will bypass Keep
730 services, and save data to the named directory. If unspecified,
731 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
732 environment variable. If you want to ensure KeepClient does not
733 use local storage, pass in an empty string. This is primarily
734 intended to mock a server for testing.
737 The default number of times to retry failed requests.
738 This will be used as the default num_retries value when get() and
739 put() are called. Default 0.
741 self.lock = threading.Lock()
743 if config.get('ARVADOS_KEEP_SERVICES'):
744 proxy = config.get('ARVADOS_KEEP_SERVICES')
746 proxy = config.get('ARVADOS_KEEP_PROXY')
747 if api_token is None:
748 if api_client is None:
749 api_token = config.get('ARVADOS_API_TOKEN')
751 api_token = api_client.api_token
752 elif api_client is not None:
754 "can't build KeepClient with both API client and token")
755 if local_store is None:
756 local_store = os.environ.get('KEEP_LOCAL_STORE')
758 self.block_cache = block_cache if block_cache else KeepBlockCache()
759 self.timeout = timeout
760 self.proxy_timeout = proxy_timeout
761 self._user_agent_pool = queue.LifoQueue()
762 self.upload_counter = Counter()
763 self.download_counter = Counter()
764 self.put_counter = Counter()
765 self.get_counter = Counter()
766 self.hits_counter = Counter()
767 self.misses_counter = Counter()
770 self.local_store = local_store
771 self.get = self.local_store_get
772 self.put = self.local_store_put
774 self.num_retries = num_retries
775 self.max_replicas_per_service = None
777 proxy_uris = proxy.split()
778 for i in range(len(proxy_uris)):
779 if not proxy_uris[i].endswith('/'):
782 url = urllib.parse.urlparse(proxy_uris[i])
783 if not (url.scheme and url.netloc):
784 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
785 self.api_token = api_token
786 self._gateway_services = {}
787 self._keep_services = [{
788 'uuid': "00000-bi6l4-%015d" % idx,
789 'service_type': 'proxy',
790 '_service_root': uri,
791 } for idx, uri in enumerate(proxy_uris)]
792 self._writable_services = self._keep_services
793 self.using_proxy = True
794 self._static_services_list = True
796 # It's important to avoid instantiating an API client
797 # unless we actually need one, for testing's sake.
798 if api_client is None:
799 api_client = arvados.api('v1')
800 self.api_client = api_client
801 self.api_token = api_client.api_token
802 self._gateway_services = {}
803 self._keep_services = None
804 self._writable_services = None
805 self.using_proxy = None
806 self._static_services_list = False
808 def current_timeout(self, attempt_number):
809 """Return the appropriate timeout to use for this client.
811 The proxy timeout setting if the backend service is currently a proxy,
812 the regular timeout setting otherwise. The `attempt_number` indicates
813 how many times the operation has been tried already (starting from 0
814 for the first try), and scales the connection timeout portion of the
815 return value accordingly.
818 # TODO(twp): the timeout should be a property of a
819 # KeepService, not a KeepClient. See #4488.
820 t = self.proxy_timeout if self.using_proxy else self.timeout
822 return (t[0] * (1 << attempt_number), t[1])
824 return (t[0] * (1 << attempt_number), t[1], t[2])
825 def _any_nondisk_services(self, service_list):
826 return any(ks.get('service_type', 'disk') != 'disk'
827 for ks in service_list)
829 def build_services_list(self, force_rebuild=False):
830 if (self._static_services_list or
831 (self._keep_services and not force_rebuild)):
835 keep_services = self.api_client.keep_services().accessible()
836 except Exception: # API server predates Keep services.
837 keep_services = self.api_client.keep_disks().list()
839 # Gateway services are only used when specified by UUID,
840 # so there's nothing to gain by filtering them by
842 self._gateway_services = {ks['uuid']: ks for ks in
843 keep_services.execute()['items']}
844 if not self._gateway_services:
845 raise arvados.errors.NoKeepServersError()
847 # Precompute the base URI for each service.
848 for r in self._gateway_services.values():
849 host = r['service_host']
850 if not host.startswith('[') and host.find(':') >= 0:
851 # IPv6 URIs must be formatted like http://[::1]:80/...
852 host = '[' + host + ']'
853 r['_service_root'] = "{}://{}:{:d}/".format(
854 'https' if r['service_ssl_flag'] else 'http',
858 _logger.debug(str(self._gateway_services))
859 self._keep_services = [
860 ks for ks in self._gateway_services.values()
861 if not ks.get('service_type', '').startswith('gateway:')]
862 self._writable_services = [ks for ks in self._keep_services
863 if not ks.get('read_only')]
865 # For disk type services, max_replicas_per_service is 1
866 # It is unknown (unlimited) for other service types.
867 if self._any_nondisk_services(self._writable_services):
868 self.max_replicas_per_service = None
870 self.max_replicas_per_service = 1
872 def _service_weight(self, data_hash, service_uuid):
873 """Compute the weight of a Keep service endpoint for a data
874 block with a known hash.
876 The weight is md5(h + u) where u is the last 15 characters of
877 the service endpoint's UUID.
879 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
881 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
882 """Return an array of Keep service endpoints, in the order in
883 which they should be probed when reading or writing data with
884 the given hash+hints.
886 self.build_services_list(force_rebuild)
889 # Use the services indicated by the given +K@... remote
890 # service hints, if any are present and can be resolved to a
892 for hint in locator.hints:
893 if hint.startswith('K@'):
896 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
897 elif len(hint) == 29:
898 svc = self._gateway_services.get(hint[2:])
900 sorted_roots.append(svc['_service_root'])
902 # Sort the available local services by weight (heaviest first)
903 # for this locator, and return their service_roots (base URIs)
905 use_services = self._keep_services
907 use_services = self._writable_services
908 self.using_proxy = self._any_nondisk_services(use_services)
909 sorted_roots.extend([
910 svc['_service_root'] for svc in sorted(
913 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
914 _logger.debug("{}: {}".format(locator, sorted_roots))
917 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
918 # roots_map is a dictionary, mapping Keep service root strings
919 # to KeepService objects. Poll for Keep services, and add any
920 # new ones to roots_map. Return the current list of local
922 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
923 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
924 for root in local_roots:
925 if root not in roots_map:
926 roots_map[root] = self.KeepService(
927 root, self._user_agent_pool,
928 upload_counter=self.upload_counter,
929 download_counter=self.download_counter,
934 def _check_loop_result(result):
935 # KeepClient RetryLoops should save results as a 2-tuple: the
936 # actual result of the request, and the number of servers available
937 # to receive the request this round.
938 # This method returns True if there's a real result, False if
939 # there are no more servers available, otherwise None.
940 if isinstance(result, Exception):
942 result, tried_server_count = result
943 if (result is not None) and (result is not False):
945 elif tried_server_count < 1:
946 _logger.info("No more Keep services to try; giving up")
951 def get_from_cache(self, loc):
952 """Fetch a block only if is in the cache, otherwise return None."""
953 slot = self.block_cache.get(loc)
954 if slot is not None and slot.ready.is_set():
960 def head(self, loc_s, num_retries=None):
961 return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
964 def get(self, loc_s, num_retries=None):
965 return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
967 def _get_or_head(self, loc_s, method="GET", num_retries=None):
968 """Get data from Keep.
970 This method fetches one or more blocks of data from Keep. It
971 sends a request each Keep service registered with the API
972 server (or the proxy provided when this client was
973 instantiated), then each service named in location hints, in
974 sequence. As soon as one service provides the data, it's
978 * loc_s: A string of one or more comma-separated locators to fetch.
979 This method returns the concatenation of these blocks.
980 * num_retries: The number of times to retry GET requests to
981 *each* Keep server if it returns temporary failures, with
982 exponential backoff. Note that, in each loop, the method may try
983 to fetch data from every available Keep service, along with any
984 that are named in location hints in the locator. The default value
985 is set when the KeepClient is initialized.
988 return ''.join(self.get(x) for x in loc_s.split(','))
990 self.get_counter.add(1)
992 locator = KeepLocator(loc_s)
994 slot, first = self.block_cache.reserve_cache(locator.md5sum)
996 self.hits_counter.add(1)
1000 self.misses_counter.add(1)
1002 # If the locator has hints specifying a prefix (indicating a
1003 # remote keepproxy) or the UUID of a local gateway service,
1004 # read data from the indicated service(s) instead of the usual
1005 # list of local disk services.
1006 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1007 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1008 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1009 for hint in locator.hints if (
1010 hint.startswith('K@') and
1012 self._gateway_services.get(hint[2:])
1014 # Map root URLs to their KeepService objects.
1016 root: self.KeepService(root, self._user_agent_pool,
1017 upload_counter=self.upload_counter,
1018 download_counter=self.download_counter)
1019 for root in hint_roots
1022 # See #3147 for a discussion of the loop implementation. Highlights:
1023 # * Refresh the list of Keep services after each failure, in case
1024 # it's being updated.
1025 # * Retry until we succeed, we're out of retries, or every available
1026 # service has returned permanent failure.
1030 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1032 for tries_left in loop:
1034 sorted_roots = self.map_new_services(
1036 force_rebuild=(tries_left < num_retries),
1037 need_writable=False)
1038 except Exception as error:
1039 loop.save_result(error)
1042 # Query KeepService objects that haven't returned
1043 # permanent failure, in our specified shuffle order.
1044 services_to_try = [roots_map[root]
1045 for root in sorted_roots
1046 if roots_map[root].usable()]
1047 for keep_service in services_to_try:
1048 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1049 if blob is not None:
1051 loop.save_result((blob, len(services_to_try)))
1053 # Always cache the result, then return it if we succeeded.
1056 self.block_cache.cap_cache()
1058 if method == "HEAD":
1063 # Q: Including 403 is necessary for the Keep tests to continue
1064 # passing, but maybe they should expect KeepReadError instead?
1065 not_founds = sum(1 for key in sorted_roots
1066 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1067 service_errors = ((key, roots_map[key].last_result()['error'])
1068 for key in sorted_roots)
1070 raise arvados.errors.KeepReadError(
1071 "failed to read {}: no Keep services available ({})".format(
1072 loc_s, loop.last_result()))
1073 elif not_founds == len(sorted_roots):
1074 raise arvados.errors.NotFoundError(
1075 "{} not found".format(loc_s), service_errors)
1077 raise arvados.errors.KeepReadError(
1078 "failed to read {}".format(loc_s), service_errors, label="service")
1081 def put(self, data, copies=2, num_retries=None):
1082 """Save data in Keep.
1084 This method will get a list of Keep services from the API server, and
1085 send the data to each one simultaneously in a new thread. Once the
1086 uploads are finished, if enough copies are saved, this method returns
1087 the most recent HTTP response body. If requests fail to upload
1088 enough copies, this method raises KeepWriteError.
1091 * data: The string of data to upload.
1092 * copies: The number of copies that the user requires be saved.
1094 * num_retries: The number of times to retry PUT requests to
1095 *each* Keep server if it returns temporary failures, with
1096 exponential backoff. The default value is set when the
1097 KeepClient is initialized.
1100 if not isinstance(data, bytes):
1101 data = data.encode()
1103 self.put_counter.add(1)
1105 data_hash = hashlib.md5(data).hexdigest()
1106 loc_s = data_hash + '+' + str(len(data))
1109 locator = KeepLocator(loc_s)
1112 # Tell the proxy how many copies we want it to store
1113 headers['X-Keep-Desired-Replicas'] = str(copies)
1115 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1118 for tries_left in loop:
1120 sorted_roots = self.map_new_services(
1122 force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
1123 except Exception as error:
1124 loop.save_result(error)
1127 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1128 data_hash=data_hash,
1129 copies=copies - done,
1130 max_service_replicas=self.max_replicas_per_service,
1131 timeout=self.current_timeout(num_retries - tries_left))
1132 for service_root, ks in [(root, roots_map[root])
1133 for root in sorted_roots]:
1136 writer_pool.add_task(ks, service_root)
1138 done += writer_pool.done()
1139 loop.save_result((done >= copies, writer_pool.total_task_nr))
1142 return writer_pool.response()
1144 raise arvados.errors.KeepWriteError(
1145 "failed to write {}: no Keep services available ({})".format(
1146 data_hash, loop.last_result()))
1148 service_errors = ((key, roots_map[key].last_result()['error'])
1149 for key in sorted_roots
1150 if roots_map[key].last_result()['error'])
1151 raise arvados.errors.KeepWriteError(
1152 "failed to write {} (wanted {} copies but wrote {})".format(
1153 data_hash, copies, writer_pool.done()), service_errors, label="service")
1155 def local_store_put(self, data, copies=1, num_retries=None):
1156 """A stub for put().
1158 This method is used in place of the real put() method when
1159 using local storage (see constructor's local_store argument).
1161 copies and num_retries arguments are ignored: they are here
1162 only for the sake of offering the same call signature as
1165 Data stored this way can be retrieved via local_store_get().
1167 md5 = hashlib.md5(data).hexdigest()
1168 locator = '%s+%d' % (md5, len(data))
1169 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1171 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1172 os.path.join(self.local_store, md5))
1175 def local_store_get(self, loc_s, num_retries=None):
1176 """Companion to local_store_put()."""
1178 locator = KeepLocator(loc_s)
1180 raise arvados.errors.NotFoundError(
1181 "Invalid data locator: '%s'" % loc_s)
1182 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1184 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1187 def is_cached(self, locator):
1188 return self.block_cache.reserve_cache(expect_hash)