1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from __future__ import division
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
32 if sys.version_info >= (3, 0):
33 from io import BytesIO
35 from cStringIO import StringIO as BytesIO
38 import arvados.config as config
40 import arvados.retry as retry
43 _logger = logging.getLogger('arvados.keep')
44 global_client_object = None
47 # Monkey patch TCP constants when not available (apple). Values sourced from:
48 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
49 if sys.platform == 'darwin':
50 if not hasattr(socket, 'TCP_KEEPALIVE'):
51 socket.TCP_KEEPALIVE = 0x010
52 if not hasattr(socket, 'TCP_KEEPINTVL'):
53 socket.TCP_KEEPINTVL = 0x101
54 if not hasattr(socket, 'TCP_KEEPCNT'):
55 socket.TCP_KEEPCNT = 0x102
58 class KeepLocator(object):
59 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
60 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
62 def __init__(self, locator_str):
65 self._perm_expiry = None
66 pieces = iter(locator_str.split('+'))
67 self.md5sum = next(pieces)
69 self.size = int(next(pieces))
73 if self.HINT_RE.match(hint) is None:
74 raise ValueError("invalid hint format: {}".format(hint))
75 elif hint.startswith('A'):
76 self.parse_permission_hint(hint)
78 self.hints.append(hint)
83 for s in [self.md5sum, self.size,
84 self.permission_hint()] + self.hints
88 if self.size is not None:
89 return "%s+%i" % (self.md5sum, self.size)
93 def _make_hex_prop(name, length):
94 # Build and return a new property with the given name that
95 # must be a hex string of the given length.
96 data_name = '_{}'.format(name)
98 return getattr(self, data_name)
99 def setter(self, hex_str):
100 if not arvados.util.is_hex(hex_str, length):
101 raise ValueError("{} is not a {}-digit hex string: {!r}".
102 format(name, length, hex_str))
103 setattr(self, data_name, hex_str)
104 return property(getter, setter)
106 md5sum = _make_hex_prop('md5sum', 32)
107 perm_sig = _make_hex_prop('perm_sig', 40)
110 def perm_expiry(self):
111 return self._perm_expiry
114 def perm_expiry(self, value):
115 if not arvados.util.is_hex(value, 1, 8):
117 "permission timestamp must be a hex Unix timestamp: {}".
119 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
121 def permission_hint(self):
122 data = [self.perm_sig, self.perm_expiry]
125 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
126 return "A{}@{:08x}".format(*data)
128 def parse_permission_hint(self, s):
130 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
132 raise ValueError("bad permission hint {}".format(s))
134 def permission_expired(self, as_of_dt=None):
135 if self.perm_expiry is None:
137 elif as_of_dt is None:
138 as_of_dt = datetime.datetime.now()
139 return self.perm_expiry <= as_of_dt
143 """Simple interface to a global KeepClient object.
145 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
146 own API client. The global KeepClient will build an API client from the
147 current Arvados configuration, which may not match the one you built.
152 def global_client_object(cls):
153 global global_client_object
154 # Previously, KeepClient would change its behavior at runtime based
155 # on these configuration settings. We simulate that behavior here
156 # by checking the values and returning a new KeepClient if any of
158 key = (config.get('ARVADOS_API_HOST'),
159 config.get('ARVADOS_API_TOKEN'),
160 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
161 config.get('ARVADOS_KEEP_PROXY'),
162 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
163 os.environ.get('KEEP_LOCAL_STORE'))
164 if (global_client_object is None) or (cls._last_key != key):
165 global_client_object = KeepClient()
167 return global_client_object
170 def get(locator, **kwargs):
171 return Keep.global_client_object().get(locator, **kwargs)
174 def put(data, **kwargs):
175 return Keep.global_client_object().put(data, **kwargs)
177 class KeepBlockCache(object):
178 # Default RAM cache is 256MiB
179 def __init__(self, cache_max=(256 * 1024 * 1024)):
180 self.cache_max = cache_max
182 self._cache_lock = threading.Lock()
184 class CacheSlot(object):
185 __slots__ = ("locator", "ready", "content")
187 def __init__(self, locator):
188 self.locator = locator
189 self.ready = threading.Event()
196 def set(self, value):
201 if self.content is None:
204 return len(self.content)
207 '''Cap the cache size to self.cache_max'''
208 with self._cache_lock:
209 # Select all slots except those where ready.is_set() and content is
210 # None (that means there was an error reading the block).
211 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
212 sm = sum([slot.size() for slot in self._cache])
213 while len(self._cache) > 0 and sm > self.cache_max:
214 for i in range(len(self._cache)-1, -1, -1):
215 if self._cache[i].ready.is_set():
218 sm = sum([slot.size() for slot in self._cache])
220 def _get(self, locator):
221 # Test if the locator is already in the cache
222 for i in range(0, len(self._cache)):
223 if self._cache[i].locator == locator:
226 # move it to the front
228 self._cache.insert(0, n)
232 def get(self, locator):
233 with self._cache_lock:
234 return self._get(locator)
236 def reserve_cache(self, locator):
237 '''Reserve a cache slot for the specified locator,
238 or return the existing slot.'''
239 with self._cache_lock:
240 n = self._get(locator)
244 # Add a new cache slot for the locator
245 n = KeepBlockCache.CacheSlot(locator)
246 self._cache.insert(0, n)
249 class Counter(object):
250 def __init__(self, v=0):
251 self._lk = threading.Lock()
263 class KeepClient(object):
265 # Default Keep server connection timeout: 2 seconds
266 # Default Keep server read timeout: 256 seconds
267 # Default Keep server bandwidth minimum: 32768 bytes per second
268 # Default Keep proxy connection timeout: 20 seconds
269 # Default Keep proxy read timeout: 256 seconds
270 # Default Keep proxy bandwidth minimum: 32768 bytes per second
271 DEFAULT_TIMEOUT = (2, 256, 32768)
272 DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
275 class KeepService(object):
276 """Make requests to a single Keep service, and track results.
278 A KeepService is intended to last long enough to perform one
279 transaction (GET or PUT) against one Keep service. This can
280 involve calling either get() or put() multiple times in order
281 to retry after transient failures. However, calling both get()
282 and put() on a single instance -- or using the same instance
283 to access two different Keep services -- will not produce
290 arvados.errors.HttpError,
293 def __init__(self, root, user_agent_pool=queue.LifoQueue(),
295 download_counter=None,
299 self._user_agent_pool = user_agent_pool
300 self._result = {'error': None}
304 self.get_headers = {'Accept': 'application/octet-stream'}
305 self.get_headers.update(headers)
306 self.put_headers = headers
307 self.upload_counter = upload_counter
308 self.download_counter = download_counter
309 self.insecure = insecure
312 """Is it worth attempting a request?"""
316 """Did the request succeed or encounter permanent failure?"""
317 return self._result['error'] == False or not self._usable
319 def last_result(self):
322 def _get_user_agent(self):
324 return self._user_agent_pool.get(block=False)
328 def _put_user_agent(self, ua):
331 self._user_agent_pool.put(ua, block=False)
335 def _socket_open(self, *args, **kwargs):
336 if len(args) + len(kwargs) == 2:
337 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
339 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
341 def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
342 return self._socket_open_pycurl_7_21_5(
344 address=collections.namedtuple(
345 'Address', ['family', 'socktype', 'protocol', 'addr'],
346 )(family, socktype, protocol, address))
348 def _socket_open_pycurl_7_21_5(self, purpose, address):
349 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
350 s = socket.socket(address.family, address.socktype, address.protocol)
351 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
352 # Will throw invalid protocol error on mac. This test prevents that.
353 if hasattr(socket, 'TCP_KEEPIDLE'):
354 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
355 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
359 def get(self, locator, method="GET", timeout=None):
360 # locator is a KeepLocator object.
361 url = self.root + str(locator)
362 _logger.debug("Request: %s %s", method, url)
363 curl = self._get_user_agent()
366 with timer.Timer() as t:
368 response_body = BytesIO()
369 curl.setopt(pycurl.NOSIGNAL, 1)
370 curl.setopt(pycurl.OPENSOCKETFUNCTION,
371 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
372 curl.setopt(pycurl.URL, url.encode('utf-8'))
373 curl.setopt(pycurl.HTTPHEADER, [
374 '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
375 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
376 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
378 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
380 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
382 curl.setopt(pycurl.NOBODY, True)
383 self._setcurltimeouts(curl, timeout, method=="HEAD")
387 except Exception as e:
388 raise arvados.errors.HttpError(0, str(e))
394 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
395 'body': response_body.getvalue(),
396 'headers': self._headers,
400 ok = retry.check_http_response_success(self._result['status_code'])
402 self._result['error'] = arvados.errors.HttpError(
403 self._result['status_code'],
404 self._headers.get('x-status-line', 'Error'))
405 except self.HTTP_ERRORS as e:
409 self._usable = ok != False
410 if self._result.get('status_code', None):
411 # The client worked well enough to get an HTTP status
412 # code, so presumably any problems are just on the
413 # server side and it's OK to reuse the client.
414 self._put_user_agent(curl)
416 # Don't return this client to the pool, in case it's
420 _logger.debug("Request fail: GET %s => %s: %s",
421 url, type(self._result['error']), str(self._result['error']))
424 _logger.info("HEAD %s: %s bytes",
425 self._result['status_code'],
426 self._result.get('content-length'))
427 if self._result['headers'].get('x-keep-locator'):
428 # This is a response to a remote block copy request, return
429 # the local copy block locator.
430 return self._result['headers'].get('x-keep-locator')
433 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
434 self._result['status_code'],
435 len(self._result['body']),
437 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
439 if self.download_counter:
440 self.download_counter.add(len(self._result['body']))
441 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
442 if resp_md5 != locator.md5sum:
443 _logger.warning("Checksum fail: md5(%s) = %s",
445 self._result['error'] = arvados.errors.HttpError(
448 return self._result['body']
450 def put(self, hash_s, body, timeout=None, headers={}):
451 put_headers = copy.copy(self.put_headers)
452 put_headers.update(headers)
453 url = self.root + hash_s
454 _logger.debug("Request: PUT %s", url)
455 curl = self._get_user_agent()
458 with timer.Timer() as t:
460 body_reader = BytesIO(body)
461 response_body = BytesIO()
462 curl.setopt(pycurl.NOSIGNAL, 1)
463 curl.setopt(pycurl.OPENSOCKETFUNCTION,
464 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
465 curl.setopt(pycurl.URL, url.encode('utf-8'))
466 # Using UPLOAD tells cURL to wait for a "go ahead" from the
467 # Keep server (in the form of a HTTP/1.1 "100 Continue"
468 # response) instead of sending the request body immediately.
469 # This allows the server to reject the request if the request
470 # is invalid or the server is read-only, without waiting for
471 # the client to send the entire block.
472 curl.setopt(pycurl.UPLOAD, True)
473 curl.setopt(pycurl.INFILESIZE, len(body))
474 curl.setopt(pycurl.READFUNCTION, body_reader.read)
475 curl.setopt(pycurl.HTTPHEADER, [
476 '{}: {}'.format(k,v) for k,v in put_headers.items()])
477 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
478 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
480 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
482 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
483 self._setcurltimeouts(curl, timeout)
486 except Exception as e:
487 raise arvados.errors.HttpError(0, str(e))
493 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
494 'body': response_body.getvalue().decode('utf-8'),
495 'headers': self._headers,
498 ok = retry.check_http_response_success(self._result['status_code'])
500 self._result['error'] = arvados.errors.HttpError(
501 self._result['status_code'],
502 self._headers.get('x-status-line', 'Error'))
503 except self.HTTP_ERRORS as e:
507 self._usable = ok != False # still usable if ok is True or None
508 if self._result.get('status_code', None):
509 # Client is functional. See comment in get().
510 self._put_user_agent(curl)
514 _logger.debug("Request fail: PUT %s => %s: %s",
515 url, type(self._result['error']), str(self._result['error']))
517 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
518 self._result['status_code'],
521 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
522 if self.upload_counter:
523 self.upload_counter.add(len(body))
526 def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
529 elif isinstance(timeouts, tuple):
530 if len(timeouts) == 2:
531 conn_t, xfer_t = timeouts
532 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
534 conn_t, xfer_t, bandwidth_bps = timeouts
536 conn_t, xfer_t = (timeouts, timeouts)
537 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
538 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
539 if not ignore_bandwidth:
540 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
541 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
543 def _headerfunction(self, header_line):
544 if isinstance(header_line, bytes):
545 header_line = header_line.decode('iso-8859-1')
546 if ':' in header_line:
547 name, value = header_line.split(':', 1)
548 name = name.strip().lower()
549 value = value.strip()
551 name = self._lastheadername
552 value = self._headers[name] + ' ' + header_line.strip()
553 elif header_line.startswith('HTTP/'):
554 name = 'x-status-line'
557 _logger.error("Unexpected header line: %s", header_line)
559 self._lastheadername = name
560 self._headers[name] = value
561 # Returning None implies all bytes were written
564 class KeepWriterQueue(queue.Queue):
565 def __init__(self, copies, classes=[]):
566 queue.Queue.__init__(self) # Old-style superclass
567 self.wanted_copies = copies
568 self.wanted_storage_classes = classes
569 self.successful_copies = 0
570 self.confirmed_storage_classes = {}
572 self.storage_classes_tracking = True
573 self.queue_data_lock = threading.Lock()
574 self.pending_tries = max(copies, len(classes))+1
575 self.pending_tries_notification = threading.Condition()
577 def write_success(self, response, replicas_nr, classes_confirmed):
578 with self.queue_data_lock:
579 self.successful_copies += replicas_nr
580 if classes_confirmed is None:
581 self.storage_classes_tracking = False
582 elif self.storage_classes_tracking:
583 for st_class, st_copies in classes_confirmed.items():
585 self.confirmed_storage_classes[st_class] += st_copies
587 self.confirmed_storage_classes[st_class] = st_copies
588 self.response = response
589 with self.pending_tries_notification:
590 self.pending_tries_notification.notify_all()
592 def write_fail(self, ks):
593 with self.pending_tries_notification:
594 self.pending_tries += 1
595 self.pending_tries_notification.notify()
597 def pending_copies(self):
598 with self.queue_data_lock:
599 return self.wanted_copies - self.successful_copies
601 def satisfied_classes(self):
602 with self.queue_data_lock:
603 if not self.storage_classes_tracking:
604 # Notifies disabled storage classes expectation to
607 return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
609 def pending_classes(self):
610 with self.queue_data_lock:
611 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
613 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
614 for st_class, st_copies in self.confirmed_storage_classes.items():
615 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
616 unsatisfied_classes.remove(st_class)
617 return unsatisfied_classes
619 def get_next_task(self):
620 with self.pending_tries_notification:
622 if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
623 # This notify_all() is unnecessary --
624 # write_success() already called notify_all()
625 # when pending<1 became true, so it's not
626 # possible for any other thread to be in
627 # wait() now -- but it's cheap insurance
628 # against deadlock so we do it anyway:
629 self.pending_tries_notification.notify_all()
630 # Drain the queue and then raise Queue.Empty
634 elif self.pending_tries > 0:
635 service, service_root = self.get_nowait()
636 if service.finished():
639 self.pending_tries -= 1
640 return service, service_root
642 self.pending_tries_notification.notify_all()
645 self.pending_tries_notification.wait()
648 class KeepWriterThreadPool(object):
649 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
650 self.total_task_nr = 0
651 if (not max_service_replicas) or (max_service_replicas >= copies):
654 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
655 _logger.debug("Pool max threads is %d", num_threads)
657 self.queue = KeepClient.KeepWriterQueue(copies, classes)
659 for _ in range(num_threads):
660 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
661 self.workers.append(w)
663 def add_task(self, ks, service_root):
664 self.queue.put((ks, service_root))
665 self.total_task_nr += 1
668 return self.queue.successful_copies, self.queue.satisfied_classes()
672 for worker in self.workers:
674 # Wait for finished work
678 return self.queue.response
681 class KeepWriterThread(threading.Thread):
682 class TaskFailed(RuntimeError): pass
684 def __init__(self, queue, data, data_hash, timeout=None):
685 super(KeepClient.KeepWriterThread, self).__init__()
686 self.timeout = timeout
689 self.data_hash = data_hash
695 service, service_root = self.queue.get_next_task()
699 locator, copies, classes = self.do_task(service, service_root)
700 except Exception as e:
701 if not isinstance(e, self.TaskFailed):
702 _logger.exception("Exception in KeepWriterThread")
703 self.queue.write_fail(service)
705 self.queue.write_success(locator, copies, classes)
707 self.queue.task_done()
709 def do_task(self, service, service_root):
710 classes = self.queue.pending_classes()
714 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
715 success = bool(service.put(self.data_hash,
717 timeout=self.timeout,
719 result = service.last_result()
722 if result.get('status_code', None):
723 _logger.debug("Request fail: PUT %s => %s %s",
725 result['status_code'],
727 raise self.TaskFailed()
729 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
730 str(threading.current_thread()),
735 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
736 except (KeyError, ValueError):
739 classes_confirmed = {}
741 scch = result['headers']['x-keep-storage-classes-confirmed']
742 for confirmation in scch.replace(' ', '').split(','):
743 if '=' in confirmation:
744 stored_class, stored_copies = confirmation.split('=')[:2]
745 classes_confirmed[stored_class] = int(stored_copies)
746 except (KeyError, ValueError):
747 # Storage classes confirmed header missing or corrupt
748 classes_confirmed = None
750 return result['body'].strip(), replicas_stored, classes_confirmed
753 def __init__(self, api_client=None, proxy=None,
754 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
755 api_token=None, local_store=None, block_cache=None,
756 num_retries=0, session=None):
757 """Initialize a new KeepClient.
761 The API client to use to find Keep services. If not
762 provided, KeepClient will build one from available Arvados
766 If specified, this KeepClient will send requests to this Keep
767 proxy. Otherwise, KeepClient will fall back to the setting of the
768 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
769 If you want to KeepClient does not use a proxy, pass in an empty
773 The initial timeout (in seconds) for HTTP requests to Keep
774 non-proxy servers. A tuple of three floats is interpreted as
775 (connection_timeout, read_timeout, minimum_bandwidth). A connection
776 will be aborted if the average traffic rate falls below
777 minimum_bandwidth bytes per second over an interval of read_timeout
778 seconds. Because timeouts are often a result of transient server
779 load, the actual connection timeout will be increased by a factor
780 of two on each retry.
781 Default: (2, 256, 32768).
784 The initial timeout (in seconds) for HTTP requests to
785 Keep proxies. A tuple of three floats is interpreted as
786 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
787 described above for adjusting connection timeouts on retry also
789 Default: (20, 256, 32768).
792 If you're not using an API client, but only talking
793 directly to a Keep proxy, this parameter specifies an API token
794 to authenticate Keep requests. It is an error to specify both
795 api_client and api_token. If you specify neither, KeepClient
796 will use one available from the Arvados configuration.
799 If specified, this KeepClient will bypass Keep
800 services, and save data to the named directory. If unspecified,
801 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
802 environment variable. If you want to ensure KeepClient does not
803 use local storage, pass in an empty string. This is primarily
804 intended to mock a server for testing.
807 The default number of times to retry failed requests.
808 This will be used as the default num_retries value when get() and
809 put() are called. Default 0.
811 self.lock = threading.Lock()
813 if config.get('ARVADOS_KEEP_SERVICES'):
814 proxy = config.get('ARVADOS_KEEP_SERVICES')
816 proxy = config.get('ARVADOS_KEEP_PROXY')
817 if api_token is None:
818 if api_client is None:
819 api_token = config.get('ARVADOS_API_TOKEN')
821 api_token = api_client.api_token
822 elif api_client is not None:
824 "can't build KeepClient with both API client and token")
825 if local_store is None:
826 local_store = os.environ.get('KEEP_LOCAL_STORE')
828 if api_client is None:
829 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
831 self.insecure = api_client.insecure
833 self.block_cache = block_cache if block_cache else KeepBlockCache()
834 self.timeout = timeout
835 self.proxy_timeout = proxy_timeout
836 self._user_agent_pool = queue.LifoQueue()
837 self.upload_counter = Counter()
838 self.download_counter = Counter()
839 self.put_counter = Counter()
840 self.get_counter = Counter()
841 self.hits_counter = Counter()
842 self.misses_counter = Counter()
845 self.local_store = local_store
846 self.head = self.local_store_head
847 self.get = self.local_store_get
848 self.put = self.local_store_put
850 self.num_retries = num_retries
851 self.max_replicas_per_service = None
853 proxy_uris = proxy.split()
854 for i in range(len(proxy_uris)):
855 if not proxy_uris[i].endswith('/'):
858 url = urllib.parse.urlparse(proxy_uris[i])
859 if not (url.scheme and url.netloc):
860 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
861 self.api_token = api_token
862 self._gateway_services = {}
863 self._keep_services = [{
864 'uuid': "00000-bi6l4-%015d" % idx,
865 'service_type': 'proxy',
866 '_service_root': uri,
867 } for idx, uri in enumerate(proxy_uris)]
868 self._writable_services = self._keep_services
869 self.using_proxy = True
870 self._static_services_list = True
872 # It's important to avoid instantiating an API client
873 # unless we actually need one, for testing's sake.
874 if api_client is None:
875 api_client = arvados.api('v1')
876 self.api_client = api_client
877 self.api_token = api_client.api_token
878 self._gateway_services = {}
879 self._keep_services = None
880 self._writable_services = None
881 self.using_proxy = None
882 self._static_services_list = False
884 def current_timeout(self, attempt_number):
885 """Return the appropriate timeout to use for this client.
887 The proxy timeout setting if the backend service is currently a proxy,
888 the regular timeout setting otherwise. The `attempt_number` indicates
889 how many times the operation has been tried already (starting from 0
890 for the first try), and scales the connection timeout portion of the
891 return value accordingly.
894 # TODO(twp): the timeout should be a property of a
895 # KeepService, not a KeepClient. See #4488.
896 t = self.proxy_timeout if self.using_proxy else self.timeout
898 return (t[0] * (1 << attempt_number), t[1])
900 return (t[0] * (1 << attempt_number), t[1], t[2])
901 def _any_nondisk_services(self, service_list):
902 return any(ks.get('service_type', 'disk') != 'disk'
903 for ks in service_list)
905 def build_services_list(self, force_rebuild=False):
906 if (self._static_services_list or
907 (self._keep_services and not force_rebuild)):
911 keep_services = self.api_client.keep_services().accessible()
912 except Exception: # API server predates Keep services.
913 keep_services = self.api_client.keep_disks().list()
915 # Gateway services are only used when specified by UUID,
916 # so there's nothing to gain by filtering them by
918 self._gateway_services = {ks['uuid']: ks for ks in
919 keep_services.execute()['items']}
920 if not self._gateway_services:
921 raise arvados.errors.NoKeepServersError()
923 # Precompute the base URI for each service.
924 for r in self._gateway_services.values():
925 host = r['service_host']
926 if not host.startswith('[') and host.find(':') >= 0:
927 # IPv6 URIs must be formatted like http://[::1]:80/...
928 host = '[' + host + ']'
929 r['_service_root'] = "{}://{}:{:d}/".format(
930 'https' if r['service_ssl_flag'] else 'http',
934 _logger.debug(str(self._gateway_services))
935 self._keep_services = [
936 ks for ks in self._gateway_services.values()
937 if not ks.get('service_type', '').startswith('gateway:')]
938 self._writable_services = [ks for ks in self._keep_services
939 if not ks.get('read_only')]
941 # For disk type services, max_replicas_per_service is 1
942 # It is unknown (unlimited) for other service types.
943 if self._any_nondisk_services(self._writable_services):
944 self.max_replicas_per_service = None
946 self.max_replicas_per_service = 1
948 def _service_weight(self, data_hash, service_uuid):
949 """Compute the weight of a Keep service endpoint for a data
950 block with a known hash.
952 The weight is md5(h + u) where u is the last 15 characters of
953 the service endpoint's UUID.
955 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
957 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
958 """Return an array of Keep service endpoints, in the order in
959 which they should be probed when reading or writing data with
960 the given hash+hints.
962 self.build_services_list(force_rebuild)
965 # Use the services indicated by the given +K@... remote
966 # service hints, if any are present and can be resolved to a
968 for hint in locator.hints:
969 if hint.startswith('K@'):
972 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
973 elif len(hint) == 29:
974 svc = self._gateway_services.get(hint[2:])
976 sorted_roots.append(svc['_service_root'])
978 # Sort the available local services by weight (heaviest first)
979 # for this locator, and return their service_roots (base URIs)
981 use_services = self._keep_services
983 use_services = self._writable_services
984 self.using_proxy = self._any_nondisk_services(use_services)
985 sorted_roots.extend([
986 svc['_service_root'] for svc in sorted(
989 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
990 _logger.debug("{}: {}".format(locator, sorted_roots))
993 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
994 # roots_map is a dictionary, mapping Keep service root strings
995 # to KeepService objects. Poll for Keep services, and add any
996 # new ones to roots_map. Return the current list of local
998 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
999 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1000 for root in local_roots:
1001 if root not in roots_map:
1002 roots_map[root] = self.KeepService(
1003 root, self._user_agent_pool,
1004 upload_counter=self.upload_counter,
1005 download_counter=self.download_counter,
1007 insecure=self.insecure)
1011 def _check_loop_result(result):
1012 # KeepClient RetryLoops should save results as a 2-tuple: the
1013 # actual result of the request, and the number of servers available
1014 # to receive the request this round.
1015 # This method returns True if there's a real result, False if
1016 # there are no more servers available, otherwise None.
1017 if isinstance(result, Exception):
1019 result, tried_server_count = result
1020 if (result is not None) and (result is not False):
1022 elif tried_server_count < 1:
1023 _logger.info("No more Keep services to try; giving up")
1028 def get_from_cache(self, loc):
1029 """Fetch a block only if is in the cache, otherwise return None."""
1030 slot = self.block_cache.get(loc)
1031 if slot is not None and slot.ready.is_set():
1036 def refresh_signature(self, loc):
1037 """Ask Keep to get the remote block and return its local signature"""
1038 now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1039 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1042 def head(self, loc_s, **kwargs):
1043 return self._get_or_head(loc_s, method="HEAD", **kwargs)
1046 def get(self, loc_s, **kwargs):
1047 return self._get_or_head(loc_s, method="GET", **kwargs)
1049 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None):
1050 """Get data from Keep.
1052 This method fetches one or more blocks of data from Keep. It
1053 sends a request each Keep service registered with the API
1054 server (or the proxy provided when this client was
1055 instantiated), then each service named in location hints, in
1056 sequence. As soon as one service provides the data, it's
1060 * loc_s: A string of one or more comma-separated locators to fetch.
1061 This method returns the concatenation of these blocks.
1062 * num_retries: The number of times to retry GET requests to
1063 *each* Keep server if it returns temporary failures, with
1064 exponential backoff. Note that, in each loop, the method may try
1065 to fetch data from every available Keep service, along with any
1066 that are named in location hints in the locator. The default value
1067 is set when the KeepClient is initialized.
1070 return ''.join(self.get(x) for x in loc_s.split(','))
1072 self.get_counter.add(1)
1077 locator = KeepLocator(loc_s)
1079 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1081 self.hits_counter.add(1)
1084 raise arvados.errors.KeepReadError(
1085 "failed to read {}".format(loc_s))
1088 self.misses_counter.add(1)
1092 headers['X-Request-Id'] = (request_id or
1093 (hasattr(self, 'api_client') and self.api_client.request_id) or
1094 arvados.util.new_request_id())
1096 # If the locator has hints specifying a prefix (indicating a
1097 # remote keepproxy) or the UUID of a local gateway service,
1098 # read data from the indicated service(s) instead of the usual
1099 # list of local disk services.
1100 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1101 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1102 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1103 for hint in locator.hints if (
1104 hint.startswith('K@') and
1106 self._gateway_services.get(hint[2:])
1108 # Map root URLs to their KeepService objects.
1110 root: self.KeepService(root, self._user_agent_pool,
1111 upload_counter=self.upload_counter,
1112 download_counter=self.download_counter,
1114 insecure=self.insecure)
1115 for root in hint_roots
1118 # See #3147 for a discussion of the loop implementation. Highlights:
1119 # * Refresh the list of Keep services after each failure, in case
1120 # it's being updated.
1121 # * Retry until we succeed, we're out of retries, or every available
1122 # service has returned permanent failure.
1125 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1127 for tries_left in loop:
1129 sorted_roots = self.map_new_services(
1131 force_rebuild=(tries_left < num_retries),
1132 need_writable=False,
1134 except Exception as error:
1135 loop.save_result(error)
1138 # Query KeepService objects that haven't returned
1139 # permanent failure, in our specified shuffle order.
1140 services_to_try = [roots_map[root]
1141 for root in sorted_roots
1142 if roots_map[root].usable()]
1143 for keep_service in services_to_try:
1144 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1145 if blob is not None:
1147 loop.save_result((blob, len(services_to_try)))
1149 # Always cache the result, then return it if we succeeded.
1153 if slot is not None:
1155 self.block_cache.cap_cache()
1157 # Q: Including 403 is necessary for the Keep tests to continue
1158 # passing, but maybe they should expect KeepReadError instead?
1159 not_founds = sum(1 for key in sorted_roots
1160 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1161 service_errors = ((key, roots_map[key].last_result()['error'])
1162 for key in sorted_roots)
1164 raise arvados.errors.KeepReadError(
1165 "failed to read {}: no Keep services available ({})".format(
1166 loc_s, loop.last_result()))
1167 elif not_founds == len(sorted_roots):
1168 raise arvados.errors.NotFoundError(
1169 "{} not found".format(loc_s), service_errors)
1171 raise arvados.errors.KeepReadError(
1172 "failed to read {} after {}".format(loc_s, loop.attempts_str()), service_errors, label="service")
1175 def put(self, data, copies=2, num_retries=None, request_id=None, classes=[]):
1176 """Save data in Keep.
1178 This method will get a list of Keep services from the API server, and
1179 send the data to each one simultaneously in a new thread. Once the
1180 uploads are finished, if enough copies are saved, this method returns
1181 the most recent HTTP response body. If requests fail to upload
1182 enough copies, this method raises KeepWriteError.
1185 * data: The string of data to upload.
1186 * copies: The number of copies that the user requires be saved.
1188 * num_retries: The number of times to retry PUT requests to
1189 *each* Keep server if it returns temporary failures, with
1190 exponential backoff. The default value is set when the
1191 KeepClient is initialized.
1192 * classes: An optional list of storage class names where copies should
1196 if not isinstance(data, bytes):
1197 data = data.encode()
1199 self.put_counter.add(1)
1201 data_hash = hashlib.md5(data).hexdigest()
1202 loc_s = data_hash + '+' + str(len(data))
1205 locator = KeepLocator(loc_s)
1208 'X-Request-Id': (request_id or
1209 (hasattr(self, 'api_client') and self.api_client.request_id) or
1210 arvados.util.new_request_id()),
1211 'X-Keep-Desired-Replicas': str(copies),
1214 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1218 for tries_left in loop:
1220 sorted_roots = self.map_new_services(
1222 force_rebuild=(tries_left < num_retries),
1225 except Exception as error:
1226 loop.save_result(error)
1229 pending_classes = []
1230 if done_classes is not None:
1231 pending_classes = list(set(classes) - set(done_classes))
1232 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1233 data_hash=data_hash,
1234 copies=copies - done_copies,
1235 max_service_replicas=self.max_replicas_per_service,
1236 timeout=self.current_timeout(num_retries - tries_left),
1237 classes=pending_classes)
1238 for service_root, ks in [(root, roots_map[root])
1239 for root in sorted_roots]:
1242 writer_pool.add_task(ks, service_root)
1244 pool_copies, pool_classes = writer_pool.done()
1245 done_copies += pool_copies
1246 if (done_classes is not None) and (pool_classes is not None):
1247 done_classes += pool_classes
1249 (done_copies >= copies and set(done_classes) == set(classes),
1250 writer_pool.total_task_nr))
1252 # Old keepstore contacted without storage classes support:
1253 # success is determined only by successful copies.
1255 # Disable storage classes tracking from this point forward.
1256 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1259 (done_copies >= copies, writer_pool.total_task_nr))
1262 return writer_pool.response()
1264 raise arvados.errors.KeepWriteError(
1265 "failed to write {}: no Keep services available ({})".format(
1266 data_hash, loop.last_result()))
1268 service_errors = ((key, roots_map[key].last_result()['error'])
1269 for key in sorted_roots
1270 if roots_map[key].last_result()['error'])
1271 raise arvados.errors.KeepWriteError(
1272 "failed to write {} after {} (wanted {} copies but wrote {})".format(
1273 data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1275 def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1276 """A stub for put().
1278 This method is used in place of the real put() method when
1279 using local storage (see constructor's local_store argument).
1281 copies and num_retries arguments are ignored: they are here
1282 only for the sake of offering the same call signature as
1285 Data stored this way can be retrieved via local_store_get().
1287 md5 = hashlib.md5(data).hexdigest()
1288 locator = '%s+%d' % (md5, len(data))
1289 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1291 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1292 os.path.join(self.local_store, md5))
1295 def local_store_get(self, loc_s, num_retries=None):
1296 """Companion to local_store_put()."""
1298 locator = KeepLocator(loc_s)
1300 raise arvados.errors.NotFoundError(
1301 "Invalid data locator: '%s'" % loc_s)
1302 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1304 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1307 def local_store_head(self, loc_s, num_retries=None):
1308 """Companion to local_store_put()."""
1310 locator = KeepLocator(loc_s)
1312 raise arvados.errors.NotFoundError(
1313 "Invalid data locator: '%s'" % loc_s)
1314 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1316 if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1319 def is_cached(self, locator):
1320 return self.block_cache.reserve_cache(expect_hash)