1 from __future__ import absolute_import
2 from __future__ import division
3 from future import standard_library
4 standard_library.install_aliases()
5 from builtins import next
6 from builtins import str
7 from builtins import range
8 from builtins import object
25 if sys.version_info >= (3, 0):
26 from io import BytesIO
28 from cStringIO import StringIO as BytesIO
31 import arvados.config as config
33 import arvados.retry as retry
36 _logger = logging.getLogger('arvados.keep')
37 global_client_object = None
40 # Monkey patch TCP constants when not available (apple). Values sourced from:
41 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
42 if sys.platform == 'darwin':
43 if not hasattr(socket, 'TCP_KEEPALIVE'):
44 socket.TCP_KEEPALIVE = 0x010
45 if not hasattr(socket, 'TCP_KEEPINTVL'):
46 socket.TCP_KEEPINTVL = 0x101
47 if not hasattr(socket, 'TCP_KEEPCNT'):
48 socket.TCP_KEEPCNT = 0x102
51 class KeepLocator(object):
52 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
53 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
55 def __init__(self, locator_str):
58 self._perm_expiry = None
59 pieces = iter(locator_str.split('+'))
60 self.md5sum = next(pieces)
62 self.size = int(next(pieces))
66 if self.HINT_RE.match(hint) is None:
67 raise ValueError("invalid hint format: {}".format(hint))
68 elif hint.startswith('A'):
69 self.parse_permission_hint(hint)
71 self.hints.append(hint)
75 str(s) for s in [self.md5sum, self.size,
76 self.permission_hint()] + self.hints
80 if self.size is not None:
81 return "%s+%i" % (self.md5sum, self.size)
85 def _make_hex_prop(name, length):
86 # Build and return a new property with the given name that
87 # must be a hex string of the given length.
88 data_name = '_{}'.format(name)
90 return getattr(self, data_name)
91 def setter(self, hex_str):
92 if not arvados.util.is_hex(hex_str, length):
93 raise ValueError("{} is not a {}-digit hex string: {!r}".
94 format(name, length, hex_str))
95 setattr(self, data_name, hex_str)
96 return property(getter, setter)
98 md5sum = _make_hex_prop('md5sum', 32)
99 perm_sig = _make_hex_prop('perm_sig', 40)
102 def perm_expiry(self):
103 return self._perm_expiry
106 def perm_expiry(self, value):
107 if not arvados.util.is_hex(value, 1, 8):
109 "permission timestamp must be a hex Unix timestamp: {}".
111 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
113 def permission_hint(self):
114 data = [self.perm_sig, self.perm_expiry]
117 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
118 return "A{}@{:08x}".format(*data)
120 def parse_permission_hint(self, s):
122 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
124 raise ValueError("bad permission hint {}".format(s))
126 def permission_expired(self, as_of_dt=None):
127 if self.perm_expiry is None:
129 elif as_of_dt is None:
130 as_of_dt = datetime.datetime.now()
131 return self.perm_expiry <= as_of_dt
135 """Simple interface to a global KeepClient object.
137 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
138 own API client. The global KeepClient will build an API client from the
139 current Arvados configuration, which may not match the one you built.
144 def global_client_object(cls):
145 global global_client_object
146 # Previously, KeepClient would change its behavior at runtime based
147 # on these configuration settings. We simulate that behavior here
148 # by checking the values and returning a new KeepClient if any of
150 key = (config.get('ARVADOS_API_HOST'),
151 config.get('ARVADOS_API_TOKEN'),
152 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
153 config.get('ARVADOS_KEEP_PROXY'),
154 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
155 os.environ.get('KEEP_LOCAL_STORE'))
156 if (global_client_object is None) or (cls._last_key != key):
157 global_client_object = KeepClient()
159 return global_client_object
162 def get(locator, **kwargs):
163 return Keep.global_client_object().get(locator, **kwargs)
166 def put(data, **kwargs):
167 return Keep.global_client_object().put(data, **kwargs)
169 class KeepBlockCache(object):
170 # Default RAM cache is 256MiB
171 def __init__(self, cache_max=(256 * 1024 * 1024)):
172 self.cache_max = cache_max
174 self._cache_lock = threading.Lock()
176 class CacheSlot(object):
177 __slots__ = ("locator", "ready", "content")
179 def __init__(self, locator):
180 self.locator = locator
181 self.ready = threading.Event()
188 def set(self, value):
193 if self.content is None:
196 return len(self.content)
199 '''Cap the cache size to self.cache_max'''
200 with self._cache_lock:
201 # Select all slots except those where ready.is_set() and content is
202 # None (that means there was an error reading the block).
203 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
204 sm = sum([slot.size() for slot in self._cache])
205 while len(self._cache) > 0 and sm > self.cache_max:
206 for i in range(len(self._cache)-1, -1, -1):
207 if self._cache[i].ready.is_set():
210 sm = sum([slot.size() for slot in self._cache])
212 def _get(self, locator):
213 # Test if the locator is already in the cache
214 for i in range(0, len(self._cache)):
215 if self._cache[i].locator == locator:
218 # move it to the front
220 self._cache.insert(0, n)
224 def get(self, locator):
225 with self._cache_lock:
226 return self._get(locator)
228 def reserve_cache(self, locator):
229 '''Reserve a cache slot for the specified locator,
230 or return the existing slot.'''
231 with self._cache_lock:
232 n = self._get(locator)
236 # Add a new cache slot for the locator
237 n = KeepBlockCache.CacheSlot(locator)
238 self._cache.insert(0, n)
241 class Counter(object):
242 def __init__(self, v=0):
243 self._lk = threading.Lock()
255 class KeepClient(object):
257 # Default Keep server connection timeout: 2 seconds
258 # Default Keep server read timeout: 256 seconds
259 # Default Keep server bandwidth minimum: 32768 bytes per second
260 # Default Keep proxy connection timeout: 20 seconds
261 # Default Keep proxy read timeout: 256 seconds
262 # Default Keep proxy bandwidth minimum: 32768 bytes per second
263 DEFAULT_TIMEOUT = (2, 256, 32768)
264 DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
267 class KeepService(object):
268 """Make requests to a single Keep service, and track results.
270 A KeepService is intended to last long enough to perform one
271 transaction (GET or PUT) against one Keep service. This can
272 involve calling either get() or put() multiple times in order
273 to retry after transient failures. However, calling both get()
274 and put() on a single instance -- or using the same instance
275 to access two different Keep services -- will not produce
282 arvados.errors.HttpError,
285 def __init__(self, root, user_agent_pool=queue.LifoQueue(),
287 download_counter=None, **headers):
289 self._user_agent_pool = user_agent_pool
290 self._result = {'error': None}
294 self.get_headers = {'Accept': 'application/octet-stream'}
295 self.get_headers.update(headers)
296 self.put_headers = headers
297 self.upload_counter = upload_counter
298 self.download_counter = download_counter
301 """Is it worth attempting a request?"""
305 """Did the request succeed or encounter permanent failure?"""
306 return self._result['error'] == False or not self._usable
308 def last_result(self):
311 def _get_user_agent(self):
313 return self._user_agent_pool.get(block=False)
317 def _put_user_agent(self, ua):
320 self._user_agent_pool.put(ua, block=False)
324 def _socket_open(self, family, socktype, protocol, address=None):
325 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
326 s = socket.socket(family, socktype, protocol)
327 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
328 # Will throw invalid protocol error on mac. This test prevents that.
329 if hasattr(socket, 'TCP_KEEPIDLE'):
330 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
331 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
335 def get(self, locator, method="GET", timeout=None):
336 # locator is a KeepLocator object.
337 url = self.root + str(locator)
338 _logger.debug("Request: %s %s", method, url)
339 curl = self._get_user_agent()
342 with timer.Timer() as t:
344 response_body = BytesIO()
345 curl.setopt(pycurl.NOSIGNAL, 1)
346 curl.setopt(pycurl.OPENSOCKETFUNCTION,
347 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
348 curl.setopt(pycurl.URL, url.encode('utf-8'))
349 curl.setopt(pycurl.HTTPHEADER, [
350 '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
351 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
352 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
354 curl.setopt(pycurl.NOBODY, True)
355 self._setcurltimeouts(curl, timeout)
359 except Exception as e:
360 raise arvados.errors.HttpError(0, str(e))
366 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
367 'body': response_body.getvalue(),
368 'headers': self._headers,
372 ok = retry.check_http_response_success(self._result['status_code'])
374 self._result['error'] = arvados.errors.HttpError(
375 self._result['status_code'],
376 self._headers.get('x-status-line', 'Error'))
377 except self.HTTP_ERRORS as e:
381 self._usable = ok != False
382 if self._result.get('status_code', None):
383 # The client worked well enough to get an HTTP status
384 # code, so presumably any problems are just on the
385 # server side and it's OK to reuse the client.
386 self._put_user_agent(curl)
388 # Don't return this client to the pool, in case it's
392 _logger.debug("Request fail: GET %s => %s: %s",
393 url, type(self._result['error']), str(self._result['error']))
396 _logger.info("HEAD %s: %s bytes",
397 self._result['status_code'],
398 self._result.get('content-length'))
401 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
402 self._result['status_code'],
403 len(self._result['body']),
405 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
407 if self.download_counter:
408 self.download_counter.add(len(self._result['body']))
409 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
410 if resp_md5 != locator.md5sum:
411 _logger.warning("Checksum fail: md5(%s) = %s",
413 self._result['error'] = arvados.errors.HttpError(
416 return self._result['body']
418 def put(self, hash_s, body, timeout=None):
419 url = self.root + hash_s
420 _logger.debug("Request: PUT %s", url)
421 curl = self._get_user_agent()
424 with timer.Timer() as t:
426 body_reader = BytesIO(body)
427 response_body = BytesIO()
428 curl.setopt(pycurl.NOSIGNAL, 1)
429 curl.setopt(pycurl.OPENSOCKETFUNCTION,
430 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
431 curl.setopt(pycurl.URL, url.encode('utf-8'))
432 # Using UPLOAD tells cURL to wait for a "go ahead" from the
433 # Keep server (in the form of a HTTP/1.1 "100 Continue"
434 # response) instead of sending the request body immediately.
435 # This allows the server to reject the request if the request
436 # is invalid or the server is read-only, without waiting for
437 # the client to send the entire block.
438 curl.setopt(pycurl.UPLOAD, True)
439 curl.setopt(pycurl.INFILESIZE, len(body))
440 curl.setopt(pycurl.READFUNCTION, body_reader.read)
441 curl.setopt(pycurl.HTTPHEADER, [
442 '{}: {}'.format(k,v) for k,v in self.put_headers.items()])
443 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
444 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
445 self._setcurltimeouts(curl, timeout)
448 except Exception as e:
449 raise arvados.errors.HttpError(0, str(e))
455 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
456 'body': response_body.getvalue().decode('utf-8'),
457 'headers': self._headers,
460 ok = retry.check_http_response_success(self._result['status_code'])
462 self._result['error'] = arvados.errors.HttpError(
463 self._result['status_code'],
464 self._headers.get('x-status-line', 'Error'))
465 except self.HTTP_ERRORS as e:
469 self._usable = ok != False # still usable if ok is True or None
470 if self._result.get('status_code', None):
471 # Client is functional. See comment in get().
472 self._put_user_agent(curl)
476 _logger.debug("Request fail: PUT %s => %s: %s",
477 url, type(self._result['error']), str(self._result['error']))
479 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
480 self._result['status_code'],
483 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
484 if self.upload_counter:
485 self.upload_counter.add(len(body))
488 def _setcurltimeouts(self, curl, timeouts):
491 elif isinstance(timeouts, tuple):
492 if len(timeouts) == 2:
493 conn_t, xfer_t = timeouts
494 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
496 conn_t, xfer_t, bandwidth_bps = timeouts
498 conn_t, xfer_t = (timeouts, timeouts)
499 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
500 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
501 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
502 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
504 def _headerfunction(self, header_line):
505 if isinstance(header_line, bytes):
506 header_line = header_line.decode('iso-8859-1')
507 if ':' in header_line:
508 name, value = header_line.split(':', 1)
509 name = name.strip().lower()
510 value = value.strip()
512 name = self._lastheadername
513 value = self._headers[name] + ' ' + header_line.strip()
514 elif header_line.startswith('HTTP/'):
515 name = 'x-status-line'
518 _logger.error("Unexpected header line: %s", header_line)
520 self._lastheadername = name
521 self._headers[name] = value
522 # Returning None implies all bytes were written
525 class KeepWriterQueue(queue.Queue):
526 def __init__(self, copies):
527 queue.Queue.__init__(self) # Old-style superclass
528 self.wanted_copies = copies
529 self.successful_copies = 0
531 self.successful_copies_lock = threading.Lock()
532 self.pending_tries = copies
533 self.pending_tries_notification = threading.Condition()
535 def write_success(self, response, replicas_nr):
536 with self.successful_copies_lock:
537 self.successful_copies += replicas_nr
538 self.response = response
539 with self.pending_tries_notification:
540 self.pending_tries_notification.notify_all()
542 def write_fail(self, ks):
543 with self.pending_tries_notification:
544 self.pending_tries += 1
545 self.pending_tries_notification.notify()
547 def pending_copies(self):
548 with self.successful_copies_lock:
549 return self.wanted_copies - self.successful_copies
551 def get_next_task(self):
552 with self.pending_tries_notification:
554 if self.pending_copies() < 1:
555 # This notify_all() is unnecessary --
556 # write_success() already called notify_all()
557 # when pending<1 became true, so it's not
558 # possible for any other thread to be in
559 # wait() now -- but it's cheap insurance
560 # against deadlock so we do it anyway:
561 self.pending_tries_notification.notify_all()
562 # Drain the queue and then raise Queue.Empty
566 elif self.pending_tries > 0:
567 service, service_root = self.get_nowait()
568 if service.finished():
571 self.pending_tries -= 1
572 return service, service_root
574 self.pending_tries_notification.notify_all()
577 self.pending_tries_notification.wait()
580 class KeepWriterThreadPool(object):
581 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
582 self.total_task_nr = 0
583 self.wanted_copies = copies
584 if (not max_service_replicas) or (max_service_replicas >= copies):
587 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
588 _logger.debug("Pool max threads is %d", num_threads)
590 self.queue = KeepClient.KeepWriterQueue(copies)
592 for _ in range(num_threads):
593 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
594 self.workers.append(w)
596 def add_task(self, ks, service_root):
597 self.queue.put((ks, service_root))
598 self.total_task_nr += 1
601 return self.queue.successful_copies
605 for worker in self.workers:
607 # Wait for finished work
611 return self.queue.response
614 class KeepWriterThread(threading.Thread):
615 TaskFailed = RuntimeError()
617 def __init__(self, queue, data, data_hash, timeout=None):
618 super(KeepClient.KeepWriterThread, self).__init__()
619 self.timeout = timeout
622 self.data_hash = data_hash
628 service, service_root = self.queue.get_next_task()
632 locator, copies = self.do_task(service, service_root)
633 except Exception as e:
634 if e is not self.TaskFailed:
635 _logger.exception("Exception in KeepWriterThread")
636 self.queue.write_fail(service)
638 self.queue.write_success(locator, copies)
640 self.queue.task_done()
642 def do_task(self, service, service_root):
643 success = bool(service.put(self.data_hash,
645 timeout=self.timeout))
646 result = service.last_result()
649 if result.get('status_code', None):
650 _logger.debug("Request fail: PUT %s => %s %s",
652 result['status_code'],
654 raise self.TaskFailed
656 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
657 str(threading.current_thread()),
662 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
663 except (KeyError, ValueError):
666 return result['body'].strip(), replicas_stored
669 def __init__(self, api_client=None, proxy=None,
670 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
671 api_token=None, local_store=None, block_cache=None,
672 num_retries=0, session=None):
673 """Initialize a new KeepClient.
677 The API client to use to find Keep services. If not
678 provided, KeepClient will build one from available Arvados
682 If specified, this KeepClient will send requests to this Keep
683 proxy. Otherwise, KeepClient will fall back to the setting of the
684 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
685 If you want to KeepClient does not use a proxy, pass in an empty
689 The initial timeout (in seconds) for HTTP requests to Keep
690 non-proxy servers. A tuple of three floats is interpreted as
691 (connection_timeout, read_timeout, minimum_bandwidth). A connection
692 will be aborted if the average traffic rate falls below
693 minimum_bandwidth bytes per second over an interval of read_timeout
694 seconds. Because timeouts are often a result of transient server
695 load, the actual connection timeout will be increased by a factor
696 of two on each retry.
697 Default: (2, 256, 32768).
700 The initial timeout (in seconds) for HTTP requests to
701 Keep proxies. A tuple of three floats is interpreted as
702 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
703 described above for adjusting connection timeouts on retry also
705 Default: (20, 256, 32768).
708 If you're not using an API client, but only talking
709 directly to a Keep proxy, this parameter specifies an API token
710 to authenticate Keep requests. It is an error to specify both
711 api_client and api_token. If you specify neither, KeepClient
712 will use one available from the Arvados configuration.
715 If specified, this KeepClient will bypass Keep
716 services, and save data to the named directory. If unspecified,
717 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
718 environment variable. If you want to ensure KeepClient does not
719 use local storage, pass in an empty string. This is primarily
720 intended to mock a server for testing.
723 The default number of times to retry failed requests.
724 This will be used as the default num_retries value when get() and
725 put() are called. Default 0.
727 self.lock = threading.Lock()
729 if config.get('ARVADOS_KEEP_SERVICES'):
730 proxy = config.get('ARVADOS_KEEP_SERVICES')
732 proxy = config.get('ARVADOS_KEEP_PROXY')
733 if api_token is None:
734 if api_client is None:
735 api_token = config.get('ARVADOS_API_TOKEN')
737 api_token = api_client.api_token
738 elif api_client is not None:
740 "can't build KeepClient with both API client and token")
741 if local_store is None:
742 local_store = os.environ.get('KEEP_LOCAL_STORE')
744 self.block_cache = block_cache if block_cache else KeepBlockCache()
745 self.timeout = timeout
746 self.proxy_timeout = proxy_timeout
747 self._user_agent_pool = queue.LifoQueue()
748 self.upload_counter = Counter()
749 self.download_counter = Counter()
750 self.put_counter = Counter()
751 self.get_counter = Counter()
752 self.hits_counter = Counter()
753 self.misses_counter = Counter()
756 self.local_store = local_store
757 self.get = self.local_store_get
758 self.put = self.local_store_put
760 self.num_retries = num_retries
761 self.max_replicas_per_service = None
763 proxy_uris = proxy.split()
764 for i in range(len(proxy_uris)):
765 if not proxy_uris[i].endswith('/'):
768 url = urllib.parse.urlparse(proxy_uris[i])
769 if not (url.scheme and url.netloc):
770 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
771 self.api_token = api_token
772 self._gateway_services = {}
773 self._keep_services = [{
774 'uuid': "00000-bi6l4-%015d" % idx,
775 'service_type': 'proxy',
776 '_service_root': uri,
777 } for idx, uri in enumerate(proxy_uris)]
778 self._writable_services = self._keep_services
779 self.using_proxy = True
780 self._static_services_list = True
782 # It's important to avoid instantiating an API client
783 # unless we actually need one, for testing's sake.
784 if api_client is None:
785 api_client = arvados.api('v1')
786 self.api_client = api_client
787 self.api_token = api_client.api_token
788 self._gateway_services = {}
789 self._keep_services = None
790 self._writable_services = None
791 self.using_proxy = None
792 self._static_services_list = False
794 def current_timeout(self, attempt_number):
795 """Return the appropriate timeout to use for this client.
797 The proxy timeout setting if the backend service is currently a proxy,
798 the regular timeout setting otherwise. The `attempt_number` indicates
799 how many times the operation has been tried already (starting from 0
800 for the first try), and scales the connection timeout portion of the
801 return value accordingly.
804 # TODO(twp): the timeout should be a property of a
805 # KeepService, not a KeepClient. See #4488.
806 t = self.proxy_timeout if self.using_proxy else self.timeout
808 return (t[0] * (1 << attempt_number), t[1])
810 return (t[0] * (1 << attempt_number), t[1], t[2])
811 def _any_nondisk_services(self, service_list):
812 return any(ks.get('service_type', 'disk') != 'disk'
813 for ks in service_list)
815 def build_services_list(self, force_rebuild=False):
816 if (self._static_services_list or
817 (self._keep_services and not force_rebuild)):
821 keep_services = self.api_client.keep_services().accessible()
822 except Exception: # API server predates Keep services.
823 keep_services = self.api_client.keep_disks().list()
825 # Gateway services are only used when specified by UUID,
826 # so there's nothing to gain by filtering them by
828 self._gateway_services = {ks['uuid']: ks for ks in
829 keep_services.execute()['items']}
830 if not self._gateway_services:
831 raise arvados.errors.NoKeepServersError()
833 # Precompute the base URI for each service.
834 for r in self._gateway_services.values():
835 host = r['service_host']
836 if not host.startswith('[') and host.find(':') >= 0:
837 # IPv6 URIs must be formatted like http://[::1]:80/...
838 host = '[' + host + ']'
839 r['_service_root'] = "{}://{}:{:d}/".format(
840 'https' if r['service_ssl_flag'] else 'http',
844 _logger.debug(str(self._gateway_services))
845 self._keep_services = [
846 ks for ks in self._gateway_services.values()
847 if not ks.get('service_type', '').startswith('gateway:')]
848 self._writable_services = [ks for ks in self._keep_services
849 if not ks.get('read_only')]
851 # For disk type services, max_replicas_per_service is 1
852 # It is unknown (unlimited) for other service types.
853 if self._any_nondisk_services(self._writable_services):
854 self.max_replicas_per_service = None
856 self.max_replicas_per_service = 1
858 def _service_weight(self, data_hash, service_uuid):
859 """Compute the weight of a Keep service endpoint for a data
860 block with a known hash.
862 The weight is md5(h + u) where u is the last 15 characters of
863 the service endpoint's UUID.
865 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
867 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
868 """Return an array of Keep service endpoints, in the order in
869 which they should be probed when reading or writing data with
870 the given hash+hints.
872 self.build_services_list(force_rebuild)
875 # Use the services indicated by the given +K@... remote
876 # service hints, if any are present and can be resolved to a
878 for hint in locator.hints:
879 if hint.startswith('K@'):
882 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
883 elif len(hint) == 29:
884 svc = self._gateway_services.get(hint[2:])
886 sorted_roots.append(svc['_service_root'])
888 # Sort the available local services by weight (heaviest first)
889 # for this locator, and return their service_roots (base URIs)
891 use_services = self._keep_services
893 use_services = self._writable_services
894 self.using_proxy = self._any_nondisk_services(use_services)
895 sorted_roots.extend([
896 svc['_service_root'] for svc in sorted(
899 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
900 _logger.debug("{}: {}".format(locator, sorted_roots))
903 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
904 # roots_map is a dictionary, mapping Keep service root strings
905 # to KeepService objects. Poll for Keep services, and add any
906 # new ones to roots_map. Return the current list of local
908 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
909 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
910 for root in local_roots:
911 if root not in roots_map:
912 roots_map[root] = self.KeepService(
913 root, self._user_agent_pool,
914 upload_counter=self.upload_counter,
915 download_counter=self.download_counter,
920 def _check_loop_result(result):
921 # KeepClient RetryLoops should save results as a 2-tuple: the
922 # actual result of the request, and the number of servers available
923 # to receive the request this round.
924 # This method returns True if there's a real result, False if
925 # there are no more servers available, otherwise None.
926 if isinstance(result, Exception):
928 result, tried_server_count = result
929 if (result is not None) and (result is not False):
931 elif tried_server_count < 1:
932 _logger.info("No more Keep services to try; giving up")
937 def get_from_cache(self, loc):
938 """Fetch a block only if is in the cache, otherwise return None."""
939 slot = self.block_cache.get(loc)
940 if slot is not None and slot.ready.is_set():
946 def head(self, loc_s, num_retries=None):
947 return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
950 def get(self, loc_s, num_retries=None):
951 return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
953 def _get_or_head(self, loc_s, method="GET", num_retries=None):
954 """Get data from Keep.
956 This method fetches one or more blocks of data from Keep. It
957 sends a request each Keep service registered with the API
958 server (or the proxy provided when this client was
959 instantiated), then each service named in location hints, in
960 sequence. As soon as one service provides the data, it's
964 * loc_s: A string of one or more comma-separated locators to fetch.
965 This method returns the concatenation of these blocks.
966 * num_retries: The number of times to retry GET requests to
967 *each* Keep server if it returns temporary failures, with
968 exponential backoff. Note that, in each loop, the method may try
969 to fetch data from every available Keep service, along with any
970 that are named in location hints in the locator. The default value
971 is set when the KeepClient is initialized.
974 return ''.join(self.get(x) for x in loc_s.split(','))
976 self.get_counter.add(1)
978 locator = KeepLocator(loc_s)
980 slot, first = self.block_cache.reserve_cache(locator.md5sum)
982 self.hits_counter.add(1)
986 self.misses_counter.add(1)
988 # If the locator has hints specifying a prefix (indicating a
989 # remote keepproxy) or the UUID of a local gateway service,
990 # read data from the indicated service(s) instead of the usual
991 # list of local disk services.
992 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
993 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
994 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
995 for hint in locator.hints if (
996 hint.startswith('K@') and
998 self._gateway_services.get(hint[2:])
1000 # Map root URLs to their KeepService objects.
1002 root: self.KeepService(root, self._user_agent_pool,
1003 upload_counter=self.upload_counter,
1004 download_counter=self.download_counter)
1005 for root in hint_roots
1008 # See #3147 for a discussion of the loop implementation. Highlights:
1009 # * Refresh the list of Keep services after each failure, in case
1010 # it's being updated.
1011 # * Retry until we succeed, we're out of retries, or every available
1012 # service has returned permanent failure.
1016 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1018 for tries_left in loop:
1020 sorted_roots = self.map_new_services(
1022 force_rebuild=(tries_left < num_retries),
1023 need_writable=False)
1024 except Exception as error:
1025 loop.save_result(error)
1028 # Query KeepService objects that haven't returned
1029 # permanent failure, in our specified shuffle order.
1030 services_to_try = [roots_map[root]
1031 for root in sorted_roots
1032 if roots_map[root].usable()]
1033 for keep_service in services_to_try:
1034 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1035 if blob is not None:
1037 loop.save_result((blob, len(services_to_try)))
1039 # Always cache the result, then return it if we succeeded.
1042 self.block_cache.cap_cache()
1044 if method == "HEAD":
1049 # Q: Including 403 is necessary for the Keep tests to continue
1050 # passing, but maybe they should expect KeepReadError instead?
1051 not_founds = sum(1 for key in sorted_roots
1052 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1053 service_errors = ((key, roots_map[key].last_result()['error'])
1054 for key in sorted_roots)
1056 raise arvados.errors.KeepReadError(
1057 "failed to read {}: no Keep services available ({})".format(
1058 loc_s, loop.last_result()))
1059 elif not_founds == len(sorted_roots):
1060 raise arvados.errors.NotFoundError(
1061 "{} not found".format(loc_s), service_errors)
1063 raise arvados.errors.KeepReadError(
1064 "failed to read {}".format(loc_s), service_errors, label="service")
1067 def put(self, data, copies=2, num_retries=None):
1068 """Save data in Keep.
1070 This method will get a list of Keep services from the API server, and
1071 send the data to each one simultaneously in a new thread. Once the
1072 uploads are finished, if enough copies are saved, this method returns
1073 the most recent HTTP response body. If requests fail to upload
1074 enough copies, this method raises KeepWriteError.
1077 * data: The string of data to upload.
1078 * copies: The number of copies that the user requires be saved.
1080 * num_retries: The number of times to retry PUT requests to
1081 *each* Keep server if it returns temporary failures, with
1082 exponential backoff. The default value is set when the
1083 KeepClient is initialized.
1086 if not isinstance(data, bytes):
1087 data = data.encode()
1089 self.put_counter.add(1)
1091 data_hash = hashlib.md5(data).hexdigest()
1092 loc_s = data_hash + '+' + str(len(data))
1095 locator = KeepLocator(loc_s)
1098 # Tell the proxy how many copies we want it to store
1099 headers['X-Keep-Desired-Replicas'] = str(copies)
1101 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1104 for tries_left in loop:
1106 sorted_roots = self.map_new_services(
1108 force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
1109 except Exception as error:
1110 loop.save_result(error)
1113 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1114 data_hash=data_hash,
1115 copies=copies - done,
1116 max_service_replicas=self.max_replicas_per_service,
1117 timeout=self.current_timeout(num_retries - tries_left))
1118 for service_root, ks in [(root, roots_map[root])
1119 for root in sorted_roots]:
1122 writer_pool.add_task(ks, service_root)
1124 done += writer_pool.done()
1125 loop.save_result((done >= copies, writer_pool.total_task_nr))
1128 return writer_pool.response()
1130 raise arvados.errors.KeepWriteError(
1131 "failed to write {}: no Keep services available ({})".format(
1132 data_hash, loop.last_result()))
1134 service_errors = ((key, roots_map[key].last_result()['error'])
1135 for key in sorted_roots
1136 if roots_map[key].last_result()['error'])
1137 raise arvados.errors.KeepWriteError(
1138 "failed to write {} (wanted {} copies but wrote {})".format(
1139 data_hash, copies, writer_pool.done()), service_errors, label="service")
1141 def local_store_put(self, data, copies=1, num_retries=None):
1142 """A stub for put().
1144 This method is used in place of the real put() method when
1145 using local storage (see constructor's local_store argument).
1147 copies and num_retries arguments are ignored: they are here
1148 only for the sake of offering the same call signature as
1151 Data stored this way can be retrieved via local_store_get().
1153 md5 = hashlib.md5(data).hexdigest()
1154 locator = '%s+%d' % (md5, len(data))
1155 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1157 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1158 os.path.join(self.local_store, md5))
1161 def local_store_get(self, loc_s, num_retries=None):
1162 """Companion to local_store_put()."""
1164 locator = KeepLocator(loc_s)
1166 raise arvados.errors.NotFoundError(
1167 "Invalid data locator: '%s'" % loc_s)
1168 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1170 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1173 def is_cached(self, locator):
1174 return self.block_cache.reserve_cache(expect_hash)