1 from __future__ import absolute_import
2 from __future__ import division
3 from future import standard_library
4 standard_library.install_aliases()
5 from builtins import next
6 from builtins import str
7 from builtins import range
8 from past.utils import old_div
9 from builtins import object
27 import arvados.config as config
29 import arvados.retry as retry
32 _logger = logging.getLogger('arvados.keep')
33 global_client_object = None
36 # Monkey patch TCP constants when not available (apple). Values sourced from:
37 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
38 if sys.platform == 'darwin':
39 if not hasattr(socket, 'TCP_KEEPALIVE'):
40 socket.TCP_KEEPALIVE = 0x010
41 if not hasattr(socket, 'TCP_KEEPINTVL'):
42 socket.TCP_KEEPINTVL = 0x101
43 if not hasattr(socket, 'TCP_KEEPCNT'):
44 socket.TCP_KEEPCNT = 0x102
47 class KeepLocator(object):
48 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
49 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
51 def __init__(self, locator_str):
54 self._perm_expiry = None
55 pieces = iter(locator_str.split('+'))
56 self.md5sum = next(pieces)
58 self.size = int(next(pieces))
62 if self.HINT_RE.match(hint) is None:
63 raise ValueError("invalid hint format: {}".format(hint))
64 elif hint.startswith('A'):
65 self.parse_permission_hint(hint)
67 self.hints.append(hint)
71 str(s) for s in [self.md5sum, self.size,
72 self.permission_hint()] + self.hints
76 if self.size is not None:
77 return "%s+%i" % (self.md5sum, self.size)
81 def _make_hex_prop(name, length):
82 # Build and return a new property with the given name that
83 # must be a hex string of the given length.
84 data_name = '_{}'.format(name)
86 return getattr(self, data_name)
87 def setter(self, hex_str):
88 if not arvados.util.is_hex(hex_str, length):
89 raise ValueError("{} is not a {}-digit hex string: {}".
90 format(name, length, hex_str))
91 setattr(self, data_name, hex_str)
92 return property(getter, setter)
94 md5sum = _make_hex_prop('md5sum', 32)
95 perm_sig = _make_hex_prop('perm_sig', 40)
98 def perm_expiry(self):
99 return self._perm_expiry
102 def perm_expiry(self, value):
103 if not arvados.util.is_hex(value, 1, 8):
105 "permission timestamp must be a hex Unix timestamp: {}".
107 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
109 def permission_hint(self):
110 data = [self.perm_sig, self.perm_expiry]
113 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
114 return "A{}@{:08x}".format(*data)
116 def parse_permission_hint(self, s):
118 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
120 raise ValueError("bad permission hint {}".format(s))
122 def permission_expired(self, as_of_dt=None):
123 if self.perm_expiry is None:
125 elif as_of_dt is None:
126 as_of_dt = datetime.datetime.now()
127 return self.perm_expiry <= as_of_dt
131 """Simple interface to a global KeepClient object.
133 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
134 own API client. The global KeepClient will build an API client from the
135 current Arvados configuration, which may not match the one you built.
140 def global_client_object(cls):
141 global global_client_object
142 # Previously, KeepClient would change its behavior at runtime based
143 # on these configuration settings. We simulate that behavior here
144 # by checking the values and returning a new KeepClient if any of
146 key = (config.get('ARVADOS_API_HOST'),
147 config.get('ARVADOS_API_TOKEN'),
148 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
149 config.get('ARVADOS_KEEP_PROXY'),
150 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
151 os.environ.get('KEEP_LOCAL_STORE'))
152 if (global_client_object is None) or (cls._last_key != key):
153 global_client_object = KeepClient()
155 return global_client_object
158 def get(locator, **kwargs):
159 return Keep.global_client_object().get(locator, **kwargs)
162 def put(data, **kwargs):
163 return Keep.global_client_object().put(data, **kwargs)
165 class KeepBlockCache(object):
166 # Default RAM cache is 256MiB
167 def __init__(self, cache_max=(256 * 1024 * 1024)):
168 self.cache_max = cache_max
170 self._cache_lock = threading.Lock()
172 class CacheSlot(object):
173 __slots__ = ("locator", "ready", "content")
175 def __init__(self, locator):
176 self.locator = locator
177 self.ready = threading.Event()
184 def set(self, value):
189 if self.content is None:
192 return len(self.content)
195 '''Cap the cache size to self.cache_max'''
196 with self._cache_lock:
197 # Select all slots except those where ready.is_set() and content is
198 # None (that means there was an error reading the block).
199 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
200 sm = sum([slot.size() for slot in self._cache])
201 while len(self._cache) > 0 and sm > self.cache_max:
202 for i in range(len(self._cache)-1, -1, -1):
203 if self._cache[i].ready.is_set():
206 sm = sum([slot.size() for slot in self._cache])
208 def _get(self, locator):
209 # Test if the locator is already in the cache
210 for i in range(0, len(self._cache)):
211 if self._cache[i].locator == locator:
214 # move it to the front
216 self._cache.insert(0, n)
220 def get(self, locator):
221 with self._cache_lock:
222 return self._get(locator)
224 def reserve_cache(self, locator):
225 '''Reserve a cache slot for the specified locator,
226 or return the existing slot.'''
227 with self._cache_lock:
228 n = self._get(locator)
232 # Add a new cache slot for the locator
233 n = KeepBlockCache.CacheSlot(locator)
234 self._cache.insert(0, n)
237 class Counter(object):
238 def __init__(self, v=0):
239 self._lk = threading.Lock()
251 class KeepClient(object):
253 # Default Keep server connection timeout: 2 seconds
254 # Default Keep server read timeout: 256 seconds
255 # Default Keep server bandwidth minimum: 32768 bytes per second
256 # Default Keep proxy connection timeout: 20 seconds
257 # Default Keep proxy read timeout: 256 seconds
258 # Default Keep proxy bandwidth minimum: 32768 bytes per second
259 DEFAULT_TIMEOUT = (2, 256, 32768)
260 DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
263 class KeepService(object):
264 """Make requests to a single Keep service, and track results.
266 A KeepService is intended to last long enough to perform one
267 transaction (GET or PUT) against one Keep service. This can
268 involve calling either get() or put() multiple times in order
269 to retry after transient failures. However, calling both get()
270 and put() on a single instance -- or using the same instance
271 to access two different Keep services -- will not produce
278 arvados.errors.HttpError,
281 def __init__(self, root, user_agent_pool=queue.LifoQueue(),
283 download_counter=None, **headers):
285 self._user_agent_pool = user_agent_pool
286 self._result = {'error': None}
289 self.get_headers = {'Accept': 'application/octet-stream'}
290 self.get_headers.update(headers)
291 self.put_headers = headers
292 self.upload_counter = upload_counter
293 self.download_counter = download_counter
296 """Is it worth attempting a request?"""
300 """Did the request succeed or encounter permanent failure?"""
301 return self._result['error'] == False or not self._usable
303 def last_result(self):
306 def _get_user_agent(self):
308 return self._user_agent_pool.get(block=False)
312 def _put_user_agent(self, ua):
315 self._user_agent_pool.put(ua, block=False)
320 def _socket_open(family, socktype, protocol, address=None):
321 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
322 s = socket.socket(family, socktype, protocol)
323 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
324 # Will throw invalid protocol error on mac. This test prevents that.
325 if hasattr(socket, 'TCP_KEEPIDLE'):
326 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
327 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
330 def get(self, locator, method="GET", timeout=None):
331 # locator is a KeepLocator object.
332 url = self.root + str(locator)
333 _logger.debug("Request: %s %s", method, url)
334 curl = self._get_user_agent()
337 with timer.Timer() as t:
339 response_body = io.StringIO()
340 curl.setopt(pycurl.NOSIGNAL, 1)
341 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
342 curl.setopt(pycurl.URL, url.encode('utf-8'))
343 curl.setopt(pycurl.HTTPHEADER, [
344 '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
345 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
346 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
348 curl.setopt(pycurl.NOBODY, True)
349 self._setcurltimeouts(curl, timeout)
353 except Exception as e:
354 raise arvados.errors.HttpError(0, str(e))
356 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
357 'body': response_body.getvalue(),
358 'headers': self._headers,
362 ok = retry.check_http_response_success(self._result['status_code'])
364 self._result['error'] = arvados.errors.HttpError(
365 self._result['status_code'],
366 self._headers.get('x-status-line', 'Error'))
367 except self.HTTP_ERRORS as e:
371 self._usable = ok != False
372 if self._result.get('status_code', None):
373 # The client worked well enough to get an HTTP status
374 # code, so presumably any problems are just on the
375 # server side and it's OK to reuse the client.
376 self._put_user_agent(curl)
378 # Don't return this client to the pool, in case it's
382 _logger.debug("Request fail: GET %s => %s: %s",
383 url, type(self._result['error']), str(self._result['error']))
386 _logger.info("HEAD %s: %s bytes",
387 self._result['status_code'],
388 self._result.get('content-length'))
391 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
392 self._result['status_code'],
393 len(self._result['body']),
395 old_div((old_div(len(self._result['body']),(1024.0*1024))),t.secs) if t.secs > 0 else 0)
397 if self.download_counter:
398 self.download_counter.add(len(self._result['body']))
399 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
400 if resp_md5 != locator.md5sum:
401 _logger.warning("Checksum fail: md5(%s) = %s",
403 self._result['error'] = arvados.errors.HttpError(
406 return self._result['body']
408 def put(self, hash_s, body, timeout=None):
409 url = self.root + hash_s
410 _logger.debug("Request: PUT %s", url)
411 curl = self._get_user_agent()
414 with timer.Timer() as t:
416 body_reader = io.StringIO(body)
417 response_body = io.StringIO()
418 curl.setopt(pycurl.NOSIGNAL, 1)
419 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
420 curl.setopt(pycurl.URL, url.encode('utf-8'))
421 # Using UPLOAD tells cURL to wait for a "go ahead" from the
422 # Keep server (in the form of a HTTP/1.1 "100 Continue"
423 # response) instead of sending the request body immediately.
424 # This allows the server to reject the request if the request
425 # is invalid or the server is read-only, without waiting for
426 # the client to send the entire block.
427 curl.setopt(pycurl.UPLOAD, True)
428 curl.setopt(pycurl.INFILESIZE, len(body))
429 curl.setopt(pycurl.READFUNCTION, body_reader.read)
430 curl.setopt(pycurl.HTTPHEADER, [
431 '{}: {}'.format(k,v) for k,v in self.put_headers.items()])
432 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
433 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
434 self._setcurltimeouts(curl, timeout)
437 except Exception as e:
438 raise arvados.errors.HttpError(0, str(e))
440 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
441 'body': response_body.getvalue(),
442 'headers': self._headers,
445 ok = retry.check_http_response_success(self._result['status_code'])
447 self._result['error'] = arvados.errors.HttpError(
448 self._result['status_code'],
449 self._headers.get('x-status-line', 'Error'))
450 except self.HTTP_ERRORS as e:
454 self._usable = ok != False # still usable if ok is True or None
455 if self._result.get('status_code', None):
456 # Client is functional. See comment in get().
457 self._put_user_agent(curl)
461 _logger.debug("Request fail: PUT %s => %s: %s",
462 url, type(self._result['error']), str(self._result['error']))
464 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
465 self._result['status_code'],
468 old_div((old_div(len(body),(1024.0*1024))),t.secs) if t.secs > 0 else 0)
469 if self.upload_counter:
470 self.upload_counter.add(len(body))
473 def _setcurltimeouts(self, curl, timeouts):
476 elif isinstance(timeouts, tuple):
477 if len(timeouts) == 2:
478 conn_t, xfer_t = timeouts
479 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
481 conn_t, xfer_t, bandwidth_bps = timeouts
483 conn_t, xfer_t = (timeouts, timeouts)
484 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
485 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
486 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
487 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
489 def _headerfunction(self, header_line):
490 header_line = header_line.decode('iso-8859-1')
491 if ':' in header_line:
492 name, value = header_line.split(':', 1)
493 name = name.strip().lower()
494 value = value.strip()
496 name = self._lastheadername
497 value = self._headers[name] + ' ' + header_line.strip()
498 elif header_line.startswith('HTTP/'):
499 name = 'x-status-line'
502 _logger.error("Unexpected header line: %s", header_line)
504 self._lastheadername = name
505 self._headers[name] = value
506 # Returning None implies all bytes were written
509 class KeepWriterQueue(queue.Queue):
510 def __init__(self, copies):
511 queue.Queue.__init__(self) # Old-style superclass
512 self.wanted_copies = copies
513 self.successful_copies = 0
515 self.successful_copies_lock = threading.Lock()
516 self.pending_tries = copies
517 self.pending_tries_notification = threading.Condition()
519 def write_success(self, response, replicas_nr):
520 with self.successful_copies_lock:
521 self.successful_copies += replicas_nr
522 self.response = response
523 with self.pending_tries_notification:
524 self.pending_tries_notification.notify_all()
526 def write_fail(self, ks):
527 with self.pending_tries_notification:
528 self.pending_tries += 1
529 self.pending_tries_notification.notify()
531 def pending_copies(self):
532 with self.successful_copies_lock:
533 return self.wanted_copies - self.successful_copies
535 def get_next_task(self):
536 with self.pending_tries_notification:
538 if self.pending_copies() < 1:
539 # This notify_all() is unnecessary --
540 # write_success() already called notify_all()
541 # when pending<1 became true, so it's not
542 # possible for any other thread to be in
543 # wait() now -- but it's cheap insurance
544 # against deadlock so we do it anyway:
545 self.pending_tries_notification.notify_all()
546 # Drain the queue and then raise Queue.Empty
550 elif self.pending_tries > 0:
551 service, service_root = self.get_nowait()
552 if service.finished():
555 self.pending_tries -= 1
556 return service, service_root
558 self.pending_tries_notification.notify_all()
561 self.pending_tries_notification.wait()
564 class KeepWriterThreadPool(object):
565 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
566 self.total_task_nr = 0
567 self.wanted_copies = copies
568 if (not max_service_replicas) or (max_service_replicas >= copies):
571 num_threads = int(math.ceil(old_div(float(copies), max_service_replicas)))
572 _logger.debug("Pool max threads is %d", num_threads)
574 self.queue = KeepClient.KeepWriterQueue(copies)
576 for _ in range(num_threads):
577 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
578 self.workers.append(w)
580 def add_task(self, ks, service_root):
581 self.queue.put((ks, service_root))
582 self.total_task_nr += 1
585 return self.queue.successful_copies
589 for worker in self.workers:
591 # Wait for finished work
595 return self.queue.response
598 class KeepWriterThread(threading.Thread):
599 TaskFailed = RuntimeError()
601 def __init__(self, queue, data, data_hash, timeout=None):
602 super(KeepClient.KeepWriterThread, self).__init__()
603 self.timeout = timeout
606 self.data_hash = data_hash
612 service, service_root = self.queue.get_next_task()
616 locator, copies = self.do_task(service, service_root)
617 except Exception as e:
618 if e is not self.TaskFailed:
619 _logger.exception("Exception in KeepWriterThread")
620 self.queue.write_fail(service)
622 self.queue.write_success(locator, copies)
624 self.queue.task_done()
626 def do_task(self, service, service_root):
627 success = bool(service.put(self.data_hash,
629 timeout=self.timeout))
630 result = service.last_result()
633 if result.get('status_code', None):
634 _logger.debug("Request fail: PUT %s => %s %s",
636 result['status_code'],
638 raise self.TaskFailed
640 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
641 str(threading.current_thread()),
646 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
647 except (KeyError, ValueError):
650 return result['body'].strip(), replicas_stored
653 def __init__(self, api_client=None, proxy=None,
654 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
655 api_token=None, local_store=None, block_cache=None,
656 num_retries=0, session=None):
657 """Initialize a new KeepClient.
661 The API client to use to find Keep services. If not
662 provided, KeepClient will build one from available Arvados
666 If specified, this KeepClient will send requests to this Keep
667 proxy. Otherwise, KeepClient will fall back to the setting of the
668 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
669 If you want to KeepClient does not use a proxy, pass in an empty
673 The initial timeout (in seconds) for HTTP requests to Keep
674 non-proxy servers. A tuple of three floats is interpreted as
675 (connection_timeout, read_timeout, minimum_bandwidth). A connection
676 will be aborted if the average traffic rate falls below
677 minimum_bandwidth bytes per second over an interval of read_timeout
678 seconds. Because timeouts are often a result of transient server
679 load, the actual connection timeout will be increased by a factor
680 of two on each retry.
681 Default: (2, 256, 32768).
684 The initial timeout (in seconds) for HTTP requests to
685 Keep proxies. A tuple of three floats is interpreted as
686 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
687 described above for adjusting connection timeouts on retry also
689 Default: (20, 256, 32768).
692 If you're not using an API client, but only talking
693 directly to a Keep proxy, this parameter specifies an API token
694 to authenticate Keep requests. It is an error to specify both
695 api_client and api_token. If you specify neither, KeepClient
696 will use one available from the Arvados configuration.
699 If specified, this KeepClient will bypass Keep
700 services, and save data to the named directory. If unspecified,
701 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
702 environment variable. If you want to ensure KeepClient does not
703 use local storage, pass in an empty string. This is primarily
704 intended to mock a server for testing.
707 The default number of times to retry failed requests.
708 This will be used as the default num_retries value when get() and
709 put() are called. Default 0.
711 self.lock = threading.Lock()
713 if config.get('ARVADOS_KEEP_SERVICES'):
714 proxy = config.get('ARVADOS_KEEP_SERVICES')
716 proxy = config.get('ARVADOS_KEEP_PROXY')
717 if api_token is None:
718 if api_client is None:
719 api_token = config.get('ARVADOS_API_TOKEN')
721 api_token = api_client.api_token
722 elif api_client is not None:
724 "can't build KeepClient with both API client and token")
725 if local_store is None:
726 local_store = os.environ.get('KEEP_LOCAL_STORE')
728 self.block_cache = block_cache if block_cache else KeepBlockCache()
729 self.timeout = timeout
730 self.proxy_timeout = proxy_timeout
731 self._user_agent_pool = queue.LifoQueue()
732 self.upload_counter = Counter()
733 self.download_counter = Counter()
734 self.put_counter = Counter()
735 self.get_counter = Counter()
736 self.hits_counter = Counter()
737 self.misses_counter = Counter()
740 self.local_store = local_store
741 self.get = self.local_store_get
742 self.put = self.local_store_put
744 self.num_retries = num_retries
745 self.max_replicas_per_service = None
747 proxy_uris = proxy.split()
748 for i in range(len(proxy_uris)):
749 if not proxy_uris[i].endswith('/'):
752 url = urllib.parse.urlparse(proxy_uris[i])
753 if not (url.scheme and url.netloc):
754 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
755 self.api_token = api_token
756 self._gateway_services = {}
757 self._keep_services = [{
758 'uuid': "00000-bi6l4-%015d" % idx,
759 'service_type': 'proxy',
760 '_service_root': uri,
761 } for idx, uri in enumerate(proxy_uris)]
762 self._writable_services = self._keep_services
763 self.using_proxy = True
764 self._static_services_list = True
766 # It's important to avoid instantiating an API client
767 # unless we actually need one, for testing's sake.
768 if api_client is None:
769 api_client = arvados.api('v1')
770 self.api_client = api_client
771 self.api_token = api_client.api_token
772 self._gateway_services = {}
773 self._keep_services = None
774 self._writable_services = None
775 self.using_proxy = None
776 self._static_services_list = False
778 def current_timeout(self, attempt_number):
779 """Return the appropriate timeout to use for this client.
781 The proxy timeout setting if the backend service is currently a proxy,
782 the regular timeout setting otherwise. The `attempt_number` indicates
783 how many times the operation has been tried already (starting from 0
784 for the first try), and scales the connection timeout portion of the
785 return value accordingly.
788 # TODO(twp): the timeout should be a property of a
789 # KeepService, not a KeepClient. See #4488.
790 t = self.proxy_timeout if self.using_proxy else self.timeout
792 return (t[0] * (1 << attempt_number), t[1])
794 return (t[0] * (1 << attempt_number), t[1], t[2])
795 def _any_nondisk_services(self, service_list):
796 return any(ks.get('service_type', 'disk') != 'disk'
797 for ks in service_list)
799 def build_services_list(self, force_rebuild=False):
800 if (self._static_services_list or
801 (self._keep_services and not force_rebuild)):
805 keep_services = self.api_client.keep_services().accessible()
806 except Exception: # API server predates Keep services.
807 keep_services = self.api_client.keep_disks().list()
809 # Gateway services are only used when specified by UUID,
810 # so there's nothing to gain by filtering them by
812 self._gateway_services = {ks['uuid']: ks for ks in
813 keep_services.execute()['items']}
814 if not self._gateway_services:
815 raise arvados.errors.NoKeepServersError()
817 # Precompute the base URI for each service.
818 for r in self._gateway_services.values():
819 host = r['service_host']
820 if not host.startswith('[') and host.find(':') >= 0:
821 # IPv6 URIs must be formatted like http://[::1]:80/...
822 host = '[' + host + ']'
823 r['_service_root'] = "{}://{}:{:d}/".format(
824 'https' if r['service_ssl_flag'] else 'http',
828 _logger.debug(str(self._gateway_services))
829 self._keep_services = [
830 ks for ks in self._gateway_services.values()
831 if not ks.get('service_type', '').startswith('gateway:')]
832 self._writable_services = [ks for ks in self._keep_services
833 if not ks.get('read_only')]
835 # For disk type services, max_replicas_per_service is 1
836 # It is unknown (unlimited) for other service types.
837 if self._any_nondisk_services(self._writable_services):
838 self.max_replicas_per_service = None
840 self.max_replicas_per_service = 1
842 def _service_weight(self, data_hash, service_uuid):
843 """Compute the weight of a Keep service endpoint for a data
844 block with a known hash.
846 The weight is md5(h + u) where u is the last 15 characters of
847 the service endpoint's UUID.
849 return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
851 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
852 """Return an array of Keep service endpoints, in the order in
853 which they should be probed when reading or writing data with
854 the given hash+hints.
856 self.build_services_list(force_rebuild)
859 # Use the services indicated by the given +K@... remote
860 # service hints, if any are present and can be resolved to a
862 for hint in locator.hints:
863 if hint.startswith('K@'):
866 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
867 elif len(hint) == 29:
868 svc = self._gateway_services.get(hint[2:])
870 sorted_roots.append(svc['_service_root'])
872 # Sort the available local services by weight (heaviest first)
873 # for this locator, and return their service_roots (base URIs)
875 use_services = self._keep_services
877 use_services = self._writable_services
878 self.using_proxy = self._any_nondisk_services(use_services)
879 sorted_roots.extend([
880 svc['_service_root'] for svc in sorted(
883 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
884 _logger.debug("{}: {}".format(locator, sorted_roots))
887 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
888 # roots_map is a dictionary, mapping Keep service root strings
889 # to KeepService objects. Poll for Keep services, and add any
890 # new ones to roots_map. Return the current list of local
892 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
893 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
894 for root in local_roots:
895 if root not in roots_map:
896 roots_map[root] = self.KeepService(
897 root, self._user_agent_pool,
898 upload_counter=self.upload_counter,
899 download_counter=self.download_counter,
904 def _check_loop_result(result):
905 # KeepClient RetryLoops should save results as a 2-tuple: the
906 # actual result of the request, and the number of servers available
907 # to receive the request this round.
908 # This method returns True if there's a real result, False if
909 # there are no more servers available, otherwise None.
910 if isinstance(result, Exception):
912 result, tried_server_count = result
913 if (result is not None) and (result is not False):
915 elif tried_server_count < 1:
916 _logger.info("No more Keep services to try; giving up")
921 def get_from_cache(self, loc):
922 """Fetch a block only if is in the cache, otherwise return None."""
923 slot = self.block_cache.get(loc)
924 if slot is not None and slot.ready.is_set():
930 def head(self, loc_s, num_retries=None):
931 return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
934 def get(self, loc_s, num_retries=None):
935 return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
937 def _get_or_head(self, loc_s, method="GET", num_retries=None):
938 """Get data from Keep.
940 This method fetches one or more blocks of data from Keep. It
941 sends a request each Keep service registered with the API
942 server (or the proxy provided when this client was
943 instantiated), then each service named in location hints, in
944 sequence. As soon as one service provides the data, it's
948 * loc_s: A string of one or more comma-separated locators to fetch.
949 This method returns the concatenation of these blocks.
950 * num_retries: The number of times to retry GET requests to
951 *each* Keep server if it returns temporary failures, with
952 exponential backoff. Note that, in each loop, the method may try
953 to fetch data from every available Keep service, along with any
954 that are named in location hints in the locator. The default value
955 is set when the KeepClient is initialized.
958 return ''.join(self.get(x) for x in loc_s.split(','))
960 self.get_counter.add(1)
962 locator = KeepLocator(loc_s)
964 slot, first = self.block_cache.reserve_cache(locator.md5sum)
966 self.hits_counter.add(1)
970 self.misses_counter.add(1)
972 # If the locator has hints specifying a prefix (indicating a
973 # remote keepproxy) or the UUID of a local gateway service,
974 # read data from the indicated service(s) instead of the usual
975 # list of local disk services.
976 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
977 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
978 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
979 for hint in locator.hints if (
980 hint.startswith('K@') and
982 self._gateway_services.get(hint[2:])
984 # Map root URLs to their KeepService objects.
986 root: self.KeepService(root, self._user_agent_pool,
987 upload_counter=self.upload_counter,
988 download_counter=self.download_counter)
989 for root in hint_roots
992 # See #3147 for a discussion of the loop implementation. Highlights:
993 # * Refresh the list of Keep services after each failure, in case
994 # it's being updated.
995 # * Retry until we succeed, we're out of retries, or every available
996 # service has returned permanent failure.
1000 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1002 for tries_left in loop:
1004 sorted_roots = self.map_new_services(
1006 force_rebuild=(tries_left < num_retries),
1007 need_writable=False)
1008 except Exception as error:
1009 loop.save_result(error)
1012 # Query KeepService objects that haven't returned
1013 # permanent failure, in our specified shuffle order.
1014 services_to_try = [roots_map[root]
1015 for root in sorted_roots
1016 if roots_map[root].usable()]
1017 for keep_service in services_to_try:
1018 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1019 if blob is not None:
1021 loop.save_result((blob, len(services_to_try)))
1023 # Always cache the result, then return it if we succeeded.
1026 self.block_cache.cap_cache()
1028 if method == "HEAD":
1033 # Q: Including 403 is necessary for the Keep tests to continue
1034 # passing, but maybe they should expect KeepReadError instead?
1035 not_founds = sum(1 for key in sorted_roots
1036 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1037 service_errors = ((key, roots_map[key].last_result()['error'])
1038 for key in sorted_roots)
1040 raise arvados.errors.KeepReadError(
1041 "failed to read {}: no Keep services available ({})".format(
1042 loc_s, loop.last_result()))
1043 elif not_founds == len(sorted_roots):
1044 raise arvados.errors.NotFoundError(
1045 "{} not found".format(loc_s), service_errors)
1047 raise arvados.errors.KeepReadError(
1048 "failed to read {}".format(loc_s), service_errors, label="service")
1051 def put(self, data, copies=2, num_retries=None):
1052 """Save data in Keep.
1054 This method will get a list of Keep services from the API server, and
1055 send the data to each one simultaneously in a new thread. Once the
1056 uploads are finished, if enough copies are saved, this method returns
1057 the most recent HTTP response body. If requests fail to upload
1058 enough copies, this method raises KeepWriteError.
1061 * data: The string of data to upload.
1062 * copies: The number of copies that the user requires be saved.
1064 * num_retries: The number of times to retry PUT requests to
1065 *each* Keep server if it returns temporary failures, with
1066 exponential backoff. The default value is set when the
1067 KeepClient is initialized.
1070 if isinstance(data, str):
1071 data = data.encode("ascii")
1072 elif not isinstance(data, str):
1073 raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
1075 self.put_counter.add(1)
1077 data_hash = hashlib.md5(data).hexdigest()
1078 loc_s = data_hash + '+' + str(len(data))
1081 locator = KeepLocator(loc_s)
1084 # Tell the proxy how many copies we want it to store
1085 headers['X-Keep-Desired-Replicas'] = str(copies)
1087 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1090 for tries_left in loop:
1092 sorted_roots = self.map_new_services(
1094 force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
1095 except Exception as error:
1096 loop.save_result(error)
1099 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1100 data_hash=data_hash,
1101 copies=copies - done,
1102 max_service_replicas=self.max_replicas_per_service,
1103 timeout=self.current_timeout(num_retries - tries_left))
1104 for service_root, ks in [(root, roots_map[root])
1105 for root in sorted_roots]:
1108 writer_pool.add_task(ks, service_root)
1110 done += writer_pool.done()
1111 loop.save_result((done >= copies, writer_pool.total_task_nr))
1114 return writer_pool.response()
1116 raise arvados.errors.KeepWriteError(
1117 "failed to write {}: no Keep services available ({})".format(
1118 data_hash, loop.last_result()))
1120 service_errors = ((key, roots_map[key].last_result()['error'])
1121 for key in sorted_roots
1122 if roots_map[key].last_result()['error'])
1123 raise arvados.errors.KeepWriteError(
1124 "failed to write {} (wanted {} copies but wrote {})".format(
1125 data_hash, copies, writer_pool.done()), service_errors, label="service")
1127 def local_store_put(self, data, copies=1, num_retries=None):
1128 """A stub for put().
1130 This method is used in place of the real put() method when
1131 using local storage (see constructor's local_store argument).
1133 copies and num_retries arguments are ignored: they are here
1134 only for the sake of offering the same call signature as
1137 Data stored this way can be retrieved via local_store_get().
1139 md5 = hashlib.md5(data).hexdigest()
1140 locator = '%s+%d' % (md5, len(data))
1141 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1143 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1144 os.path.join(self.local_store, md5))
1147 def local_store_get(self, loc_s, num_retries=None):
1148 """Companion to local_store_put()."""
1150 locator = KeepLocator(loc_s)
1152 raise arvados.errors.NotFoundError(
1153 "Invalid data locator: '%s'" % loc_s)
1154 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1156 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1159 def is_cached(self, locator):
1160 return self.block_cache.reserve_cache(expect_hash)