1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from __future__ import division
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
32 if sys.version_info >= (3, 0):
33 from io import BytesIO
35 from cStringIO import StringIO as BytesIO
38 import arvados.config as config
40 import arvados.retry as retry
43 _logger = logging.getLogger('arvados.keep')
44 global_client_object = None
47 # Monkey patch TCP constants when not available (apple). Values sourced from:
48 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
49 if sys.platform == 'darwin':
50 if not hasattr(socket, 'TCP_KEEPALIVE'):
51 socket.TCP_KEEPALIVE = 0x010
52 if not hasattr(socket, 'TCP_KEEPINTVL'):
53 socket.TCP_KEEPINTVL = 0x101
54 if not hasattr(socket, 'TCP_KEEPCNT'):
55 socket.TCP_KEEPCNT = 0x102
58 class KeepLocator(object):
59 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
60 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
62 def __init__(self, locator_str):
65 self._perm_expiry = None
66 pieces = iter(locator_str.split('+'))
67 self.md5sum = next(pieces)
69 self.size = int(next(pieces))
73 if self.HINT_RE.match(hint) is None:
74 raise ValueError("invalid hint format: {}".format(hint))
75 elif hint.startswith('A'):
76 self.parse_permission_hint(hint)
78 self.hints.append(hint)
83 for s in [self.md5sum, self.size,
84 self.permission_hint()] + self.hints
88 if self.size is not None:
89 return "%s+%i" % (self.md5sum, self.size)
93 def _make_hex_prop(name, length):
94 # Build and return a new property with the given name that
95 # must be a hex string of the given length.
96 data_name = '_{}'.format(name)
98 return getattr(self, data_name)
99 def setter(self, hex_str):
100 if not arvados.util.is_hex(hex_str, length):
101 raise ValueError("{} is not a {}-digit hex string: {!r}".
102 format(name, length, hex_str))
103 setattr(self, data_name, hex_str)
104 return property(getter, setter)
106 md5sum = _make_hex_prop('md5sum', 32)
107 perm_sig = _make_hex_prop('perm_sig', 40)
110 def perm_expiry(self):
111 return self._perm_expiry
114 def perm_expiry(self, value):
115 if not arvados.util.is_hex(value, 1, 8):
117 "permission timestamp must be a hex Unix timestamp: {}".
119 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
121 def permission_hint(self):
122 data = [self.perm_sig, self.perm_expiry]
125 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
126 return "A{}@{:08x}".format(*data)
128 def parse_permission_hint(self, s):
130 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
132 raise ValueError("bad permission hint {}".format(s))
134 def permission_expired(self, as_of_dt=None):
135 if self.perm_expiry is None:
137 elif as_of_dt is None:
138 as_of_dt = datetime.datetime.now()
139 return self.perm_expiry <= as_of_dt
143 """Simple interface to a global KeepClient object.
145 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
146 own API client. The global KeepClient will build an API client from the
147 current Arvados configuration, which may not match the one you built.
152 def global_client_object(cls):
153 global global_client_object
154 # Previously, KeepClient would change its behavior at runtime based
155 # on these configuration settings. We simulate that behavior here
156 # by checking the values and returning a new KeepClient if any of
158 key = (config.get('ARVADOS_API_HOST'),
159 config.get('ARVADOS_API_TOKEN'),
160 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
161 config.get('ARVADOS_KEEP_PROXY'),
162 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
163 os.environ.get('KEEP_LOCAL_STORE'))
164 if (global_client_object is None) or (cls._last_key != key):
165 global_client_object = KeepClient()
167 return global_client_object
170 def get(locator, **kwargs):
171 return Keep.global_client_object().get(locator, **kwargs)
174 def put(data, **kwargs):
175 return Keep.global_client_object().put(data, **kwargs)
177 class KeepBlockCache(object):
178 # Default RAM cache is 256MiB
179 def __init__(self, cache_max=(256 * 1024 * 1024)):
180 self.cache_max = cache_max
182 self._cache_lock = threading.Lock()
184 class CacheSlot(object):
185 __slots__ = ("locator", "ready", "content")
187 def __init__(self, locator):
188 self.locator = locator
189 self.ready = threading.Event()
196 def set(self, value):
201 if self.content is None:
204 return len(self.content)
207 '''Cap the cache size to self.cache_max'''
208 with self._cache_lock:
209 # Select all slots except those where ready.is_set() and content is
210 # None (that means there was an error reading the block).
211 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
212 sm = sum([slot.size() for slot in self._cache])
213 while len(self._cache) > 0 and sm > self.cache_max:
214 for i in range(len(self._cache)-1, -1, -1):
215 if self._cache[i].ready.is_set():
218 sm = sum([slot.size() for slot in self._cache])
220 def _get(self, locator):
221 # Test if the locator is already in the cache
222 for i in range(0, len(self._cache)):
223 if self._cache[i].locator == locator:
226 # move it to the front
228 self._cache.insert(0, n)
232 def get(self, locator):
233 with self._cache_lock:
234 return self._get(locator)
236 def reserve_cache(self, locator):
237 '''Reserve a cache slot for the specified locator,
238 or return the existing slot.'''
239 with self._cache_lock:
240 n = self._get(locator)
244 # Add a new cache slot for the locator
245 n = KeepBlockCache.CacheSlot(locator)
246 self._cache.insert(0, n)
249 class Counter(object):
250 def __init__(self, v=0):
251 self._lk = threading.Lock()
263 class KeepClient(object):
265 # Default Keep server connection timeout: 2 seconds
266 # Default Keep server read timeout: 256 seconds
267 # Default Keep server bandwidth minimum: 32768 bytes per second
268 # Default Keep proxy connection timeout: 20 seconds
269 # Default Keep proxy read timeout: 256 seconds
270 # Default Keep proxy bandwidth minimum: 32768 bytes per second
271 DEFAULT_TIMEOUT = (2, 256, 32768)
272 DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
275 class KeepService(object):
276 """Make requests to a single Keep service, and track results.
278 A KeepService is intended to last long enough to perform one
279 transaction (GET or PUT) against one Keep service. This can
280 involve calling either get() or put() multiple times in order
281 to retry after transient failures. However, calling both get()
282 and put() on a single instance -- or using the same instance
283 to access two different Keep services -- will not produce
290 arvados.errors.HttpError,
293 def __init__(self, root, user_agent_pool=queue.LifoQueue(),
295 download_counter=None,
299 self._user_agent_pool = user_agent_pool
300 self._result = {'error': None}
304 self.get_headers = {'Accept': 'application/octet-stream'}
305 self.get_headers.update(headers)
306 self.put_headers = headers
307 self.upload_counter = upload_counter
308 self.download_counter = download_counter
309 self.insecure = insecure
312 """Is it worth attempting a request?"""
316 """Did the request succeed or encounter permanent failure?"""
317 return self._result['error'] == False or not self._usable
319 def last_result(self):
322 def _get_user_agent(self):
324 return self._user_agent_pool.get(block=False)
328 def _put_user_agent(self, ua):
331 self._user_agent_pool.put(ua, block=False)
335 def _socket_open(self, *args, **kwargs):
336 if len(args) + len(kwargs) == 2:
337 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
339 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
341 def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
342 return self._socket_open_pycurl_7_21_5(
344 address=collections.namedtuple(
345 'Address', ['family', 'socktype', 'protocol', 'addr'],
346 )(family, socktype, protocol, address))
348 def _socket_open_pycurl_7_21_5(self, purpose, address):
349 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
350 s = socket.socket(address.family, address.socktype, address.protocol)
351 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
352 # Will throw invalid protocol error on mac. This test prevents that.
353 if hasattr(socket, 'TCP_KEEPIDLE'):
354 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
355 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
359 def get(self, locator, method="GET", timeout=None):
360 # locator is a KeepLocator object.
361 url = self.root + str(locator)
362 _logger.debug("Request: %s %s", method, url)
363 curl = self._get_user_agent()
366 with timer.Timer() as t:
368 response_body = BytesIO()
369 curl.setopt(pycurl.NOSIGNAL, 1)
370 curl.setopt(pycurl.OPENSOCKETFUNCTION,
371 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
372 curl.setopt(pycurl.URL, url.encode('utf-8'))
373 curl.setopt(pycurl.HTTPHEADER, [
374 '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
375 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
376 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
378 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
379 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
381 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
383 curl.setopt(pycurl.NOBODY, True)
384 self._setcurltimeouts(curl, timeout, method=="HEAD")
388 except Exception as e:
389 raise arvados.errors.HttpError(0, str(e))
395 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
396 'body': response_body.getvalue(),
397 'headers': self._headers,
401 ok = retry.check_http_response_success(self._result['status_code'])
403 self._result['error'] = arvados.errors.HttpError(
404 self._result['status_code'],
405 self._headers.get('x-status-line', 'Error'))
406 except self.HTTP_ERRORS as e:
410 self._usable = ok != False
411 if self._result.get('status_code', None):
412 # The client worked well enough to get an HTTP status
413 # code, so presumably any problems are just on the
414 # server side and it's OK to reuse the client.
415 self._put_user_agent(curl)
417 # Don't return this client to the pool, in case it's
421 _logger.debug("Request fail: GET %s => %s: %s",
422 url, type(self._result['error']), str(self._result['error']))
425 _logger.info("HEAD %s: %s bytes",
426 self._result['status_code'],
427 self._result.get('content-length'))
428 if self._result['headers'].get('x-keep-locator'):
429 # This is a response to a remote block copy request, return
430 # the local copy block locator.
431 return self._result['headers'].get('x-keep-locator')
434 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
435 self._result['status_code'],
436 len(self._result['body']),
438 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
440 if self.download_counter:
441 self.download_counter.add(len(self._result['body']))
442 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
443 if resp_md5 != locator.md5sum:
444 _logger.warning("Checksum fail: md5(%s) = %s",
446 self._result['error'] = arvados.errors.HttpError(
449 return self._result['body']
451 def put(self, hash_s, body, timeout=None, headers={}):
452 put_headers = copy.copy(self.put_headers)
453 put_headers.update(headers)
454 url = self.root + hash_s
455 _logger.debug("Request: PUT %s", url)
456 curl = self._get_user_agent()
459 with timer.Timer() as t:
461 body_reader = BytesIO(body)
462 response_body = BytesIO()
463 curl.setopt(pycurl.NOSIGNAL, 1)
464 curl.setopt(pycurl.OPENSOCKETFUNCTION,
465 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
466 curl.setopt(pycurl.URL, url.encode('utf-8'))
467 # Using UPLOAD tells cURL to wait for a "go ahead" from the
468 # Keep server (in the form of a HTTP/1.1 "100 Continue"
469 # response) instead of sending the request body immediately.
470 # This allows the server to reject the request if the request
471 # is invalid or the server is read-only, without waiting for
472 # the client to send the entire block.
473 curl.setopt(pycurl.UPLOAD, True)
474 curl.setopt(pycurl.INFILESIZE, len(body))
475 curl.setopt(pycurl.READFUNCTION, body_reader.read)
476 curl.setopt(pycurl.HTTPHEADER, [
477 '{}: {}'.format(k,v) for k,v in put_headers.items()])
478 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
479 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
481 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
482 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
484 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
485 self._setcurltimeouts(curl, timeout)
488 except Exception as e:
489 raise arvados.errors.HttpError(0, str(e))
495 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
496 'body': response_body.getvalue().decode('utf-8'),
497 'headers': self._headers,
500 ok = retry.check_http_response_success(self._result['status_code'])
502 self._result['error'] = arvados.errors.HttpError(
503 self._result['status_code'],
504 self._headers.get('x-status-line', 'Error'))
505 except self.HTTP_ERRORS as e:
509 self._usable = ok != False # still usable if ok is True or None
510 if self._result.get('status_code', None):
511 # Client is functional. See comment in get().
512 self._put_user_agent(curl)
516 _logger.debug("Request fail: PUT %s => %s: %s",
517 url, type(self._result['error']), str(self._result['error']))
519 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
520 self._result['status_code'],
523 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
524 if self.upload_counter:
525 self.upload_counter.add(len(body))
528 def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
531 elif isinstance(timeouts, tuple):
532 if len(timeouts) == 2:
533 conn_t, xfer_t = timeouts
534 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
536 conn_t, xfer_t, bandwidth_bps = timeouts
538 conn_t, xfer_t = (timeouts, timeouts)
539 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
540 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
541 if not ignore_bandwidth:
542 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
543 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
545 def _headerfunction(self, header_line):
546 if isinstance(header_line, bytes):
547 header_line = header_line.decode('iso-8859-1')
548 if ':' in header_line:
549 name, value = header_line.split(':', 1)
550 name = name.strip().lower()
551 value = value.strip()
553 name = self._lastheadername
554 value = self._headers[name] + ' ' + header_line.strip()
555 elif header_line.startswith('HTTP/'):
556 name = 'x-status-line'
559 _logger.error("Unexpected header line: %s", header_line)
561 self._lastheadername = name
562 self._headers[name] = value
563 # Returning None implies all bytes were written
566 class KeepWriterQueue(queue.Queue):
567 def __init__(self, copies, classes=[]):
568 queue.Queue.__init__(self) # Old-style superclass
569 self.wanted_copies = copies
570 self.wanted_storage_classes = classes
571 self.successful_copies = 0
572 self.confirmed_storage_classes = {}
574 self.storage_classes_tracking = True
575 self.queue_data_lock = threading.RLock()
576 self.pending_tries = max(copies, len(classes))
577 self.pending_tries_notification = threading.Condition()
579 def write_success(self, response, replicas_nr, classes_confirmed):
580 with self.queue_data_lock:
581 self.successful_copies += replicas_nr
582 if classes_confirmed is None:
583 self.storage_classes_tracking = False
584 elif self.storage_classes_tracking:
585 for st_class, st_copies in classes_confirmed.items():
587 self.confirmed_storage_classes[st_class] += st_copies
589 self.confirmed_storage_classes[st_class] = st_copies
590 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
591 self.response = response
592 with self.pending_tries_notification:
593 self.pending_tries_notification.notify_all()
595 def write_fail(self, ks):
596 with self.pending_tries_notification:
597 self.pending_tries += 1
598 self.pending_tries_notification.notify()
600 def pending_copies(self):
601 with self.queue_data_lock:
602 return self.wanted_copies - self.successful_copies
604 def satisfied_classes(self):
605 with self.queue_data_lock:
606 if not self.storage_classes_tracking:
607 # Notifies disabled storage classes expectation to
610 return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
612 def pending_classes(self):
613 with self.queue_data_lock:
614 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
616 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
617 for st_class, st_copies in self.confirmed_storage_classes.items():
618 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
619 unsatisfied_classes.remove(st_class)
620 return unsatisfied_classes
622 def get_next_task(self):
623 with self.pending_tries_notification:
625 if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
626 # This notify_all() is unnecessary --
627 # write_success() already called notify_all()
628 # when pending<1 became true, so it's not
629 # possible for any other thread to be in
630 # wait() now -- but it's cheap insurance
631 # against deadlock so we do it anyway:
632 self.pending_tries_notification.notify_all()
633 # Drain the queue and then raise Queue.Empty
637 elif self.pending_tries > 0:
638 service, service_root = self.get_nowait()
639 if service.finished():
642 self.pending_tries -= 1
643 return service, service_root
645 self.pending_tries_notification.notify_all()
648 self.pending_tries_notification.wait()
651 class KeepWriterThreadPool(object):
652 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
653 self.total_task_nr = 0
654 if (not max_service_replicas) or (max_service_replicas >= copies):
657 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
658 _logger.debug("Pool max threads is %d", num_threads)
660 self.queue = KeepClient.KeepWriterQueue(copies, classes)
662 for _ in range(num_threads):
663 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
664 self.workers.append(w)
666 def add_task(self, ks, service_root):
667 self.queue.put((ks, service_root))
668 self.total_task_nr += 1
671 return self.queue.successful_copies, self.queue.satisfied_classes()
675 for worker in self.workers:
677 # Wait for finished work
681 return self.queue.response
684 class KeepWriterThread(threading.Thread):
685 class TaskFailed(RuntimeError): pass
687 def __init__(self, queue, data, data_hash, timeout=None):
688 super(KeepClient.KeepWriterThread, self).__init__()
689 self.timeout = timeout
692 self.data_hash = data_hash
698 service, service_root = self.queue.get_next_task()
702 locator, copies, classes = self.do_task(service, service_root)
703 except Exception as e:
704 if not isinstance(e, self.TaskFailed):
705 _logger.exception("Exception in KeepWriterThread")
706 self.queue.write_fail(service)
708 self.queue.write_success(locator, copies, classes)
710 self.queue.task_done()
712 def do_task(self, service, service_root):
713 classes = self.queue.pending_classes()
717 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
718 success = bool(service.put(self.data_hash,
720 timeout=self.timeout,
722 result = service.last_result()
725 if result.get('status_code'):
726 _logger.debug("Request fail: PUT %s => %s %s",
728 result.get('status_code'),
730 raise self.TaskFailed()
732 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
733 str(threading.current_thread()),
738 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
739 except (KeyError, ValueError):
742 classes_confirmed = {}
744 scch = result['headers']['x-keep-storage-classes-confirmed']
745 for confirmation in scch.replace(' ', '').split(','):
746 if '=' in confirmation:
747 stored_class, stored_copies = confirmation.split('=')[:2]
748 classes_confirmed[stored_class] = int(stored_copies)
749 except (KeyError, ValueError):
750 # Storage classes confirmed header missing or corrupt
751 classes_confirmed = None
753 return result['body'].strip(), replicas_stored, classes_confirmed
756 def __init__(self, api_client=None, proxy=None,
757 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
758 api_token=None, local_store=None, block_cache=None,
759 num_retries=0, session=None):
760 """Initialize a new KeepClient.
764 The API client to use to find Keep services. If not
765 provided, KeepClient will build one from available Arvados
769 If specified, this KeepClient will send requests to this Keep
770 proxy. Otherwise, KeepClient will fall back to the setting of the
771 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
772 If you want to KeepClient does not use a proxy, pass in an empty
776 The initial timeout (in seconds) for HTTP requests to Keep
777 non-proxy servers. A tuple of three floats is interpreted as
778 (connection_timeout, read_timeout, minimum_bandwidth). A connection
779 will be aborted if the average traffic rate falls below
780 minimum_bandwidth bytes per second over an interval of read_timeout
781 seconds. Because timeouts are often a result of transient server
782 load, the actual connection timeout will be increased by a factor
783 of two on each retry.
784 Default: (2, 256, 32768).
787 The initial timeout (in seconds) for HTTP requests to
788 Keep proxies. A tuple of three floats is interpreted as
789 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
790 described above for adjusting connection timeouts on retry also
792 Default: (20, 256, 32768).
795 If you're not using an API client, but only talking
796 directly to a Keep proxy, this parameter specifies an API token
797 to authenticate Keep requests. It is an error to specify both
798 api_client and api_token. If you specify neither, KeepClient
799 will use one available from the Arvados configuration.
802 If specified, this KeepClient will bypass Keep
803 services, and save data to the named directory. If unspecified,
804 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
805 environment variable. If you want to ensure KeepClient does not
806 use local storage, pass in an empty string. This is primarily
807 intended to mock a server for testing.
810 The default number of times to retry failed requests.
811 This will be used as the default num_retries value when get() and
812 put() are called. Default 0.
814 self.lock = threading.Lock()
816 if config.get('ARVADOS_KEEP_SERVICES'):
817 proxy = config.get('ARVADOS_KEEP_SERVICES')
819 proxy = config.get('ARVADOS_KEEP_PROXY')
820 if api_token is None:
821 if api_client is None:
822 api_token = config.get('ARVADOS_API_TOKEN')
824 api_token = api_client.api_token
825 elif api_client is not None:
827 "can't build KeepClient with both API client and token")
828 if local_store is None:
829 local_store = os.environ.get('KEEP_LOCAL_STORE')
831 if api_client is None:
832 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
834 self.insecure = api_client.insecure
836 self.block_cache = block_cache if block_cache else KeepBlockCache()
837 self.timeout = timeout
838 self.proxy_timeout = proxy_timeout
839 self._user_agent_pool = queue.LifoQueue()
840 self.upload_counter = Counter()
841 self.download_counter = Counter()
842 self.put_counter = Counter()
843 self.get_counter = Counter()
844 self.hits_counter = Counter()
845 self.misses_counter = Counter()
846 self._storage_classes_unsupported_warning = False
847 self._default_classes = []
850 self.local_store = local_store
851 self.head = self.local_store_head
852 self.get = self.local_store_get
853 self.put = self.local_store_put
855 self.num_retries = num_retries
856 self.max_replicas_per_service = None
858 proxy_uris = proxy.split()
859 for i in range(len(proxy_uris)):
860 if not proxy_uris[i].endswith('/'):
863 url = urllib.parse.urlparse(proxy_uris[i])
864 if not (url.scheme and url.netloc):
865 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
866 self.api_token = api_token
867 self._gateway_services = {}
868 self._keep_services = [{
869 'uuid': "00000-bi6l4-%015d" % idx,
870 'service_type': 'proxy',
871 '_service_root': uri,
872 } for idx, uri in enumerate(proxy_uris)]
873 self._writable_services = self._keep_services
874 self.using_proxy = True
875 self._static_services_list = True
877 # It's important to avoid instantiating an API client
878 # unless we actually need one, for testing's sake.
879 if api_client is None:
880 api_client = arvados.api('v1')
881 self.api_client = api_client
882 self.api_token = api_client.api_token
883 self._gateway_services = {}
884 self._keep_services = None
885 self._writable_services = None
886 self.using_proxy = None
887 self._static_services_list = False
889 self._default_classes = [
890 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
892 # We're talking to an old cluster
895 def current_timeout(self, attempt_number):
896 """Return the appropriate timeout to use for this client.
898 The proxy timeout setting if the backend service is currently a proxy,
899 the regular timeout setting otherwise. The `attempt_number` indicates
900 how many times the operation has been tried already (starting from 0
901 for the first try), and scales the connection timeout portion of the
902 return value accordingly.
905 # TODO(twp): the timeout should be a property of a
906 # KeepService, not a KeepClient. See #4488.
907 t = self.proxy_timeout if self.using_proxy else self.timeout
909 return (t[0] * (1 << attempt_number), t[1])
911 return (t[0] * (1 << attempt_number), t[1], t[2])
912 def _any_nondisk_services(self, service_list):
913 return any(ks.get('service_type', 'disk') != 'disk'
914 for ks in service_list)
916 def build_services_list(self, force_rebuild=False):
917 if (self._static_services_list or
918 (self._keep_services and not force_rebuild)):
922 keep_services = self.api_client.keep_services().accessible()
923 except Exception: # API server predates Keep services.
924 keep_services = self.api_client.keep_disks().list()
926 # Gateway services are only used when specified by UUID,
927 # so there's nothing to gain by filtering them by
929 self._gateway_services = {ks['uuid']: ks for ks in
930 keep_services.execute()['items']}
931 if not self._gateway_services:
932 raise arvados.errors.NoKeepServersError()
934 # Precompute the base URI for each service.
935 for r in self._gateway_services.values():
936 host = r['service_host']
937 if not host.startswith('[') and host.find(':') >= 0:
938 # IPv6 URIs must be formatted like http://[::1]:80/...
939 host = '[' + host + ']'
940 r['_service_root'] = "{}://{}:{:d}/".format(
941 'https' if r['service_ssl_flag'] else 'http',
945 _logger.debug(str(self._gateway_services))
946 self._keep_services = [
947 ks for ks in self._gateway_services.values()
948 if not ks.get('service_type', '').startswith('gateway:')]
949 self._writable_services = [ks for ks in self._keep_services
950 if not ks.get('read_only')]
952 # For disk type services, max_replicas_per_service is 1
953 # It is unknown (unlimited) for other service types.
954 if self._any_nondisk_services(self._writable_services):
955 self.max_replicas_per_service = None
957 self.max_replicas_per_service = 1
959 def _service_weight(self, data_hash, service_uuid):
960 """Compute the weight of a Keep service endpoint for a data
961 block with a known hash.
963 The weight is md5(h + u) where u is the last 15 characters of
964 the service endpoint's UUID.
966 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
968 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
969 """Return an array of Keep service endpoints, in the order in
970 which they should be probed when reading or writing data with
971 the given hash+hints.
973 self.build_services_list(force_rebuild)
976 # Use the services indicated by the given +K@... remote
977 # service hints, if any are present and can be resolved to a
979 for hint in locator.hints:
980 if hint.startswith('K@'):
983 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
984 elif len(hint) == 29:
985 svc = self._gateway_services.get(hint[2:])
987 sorted_roots.append(svc['_service_root'])
989 # Sort the available local services by weight (heaviest first)
990 # for this locator, and return their service_roots (base URIs)
992 use_services = self._keep_services
994 use_services = self._writable_services
995 self.using_proxy = self._any_nondisk_services(use_services)
996 sorted_roots.extend([
997 svc['_service_root'] for svc in sorted(
1000 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1001 _logger.debug("{}: {}".format(locator, sorted_roots))
1004 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1005 # roots_map is a dictionary, mapping Keep service root strings
1006 # to KeepService objects. Poll for Keep services, and add any
1007 # new ones to roots_map. Return the current list of local
1009 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1010 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1011 for root in local_roots:
1012 if root not in roots_map:
1013 roots_map[root] = self.KeepService(
1014 root, self._user_agent_pool,
1015 upload_counter=self.upload_counter,
1016 download_counter=self.download_counter,
1018 insecure=self.insecure)
1022 def _check_loop_result(result):
1023 # KeepClient RetryLoops should save results as a 2-tuple: the
1024 # actual result of the request, and the number of servers available
1025 # to receive the request this round.
1026 # This method returns True if there's a real result, False if
1027 # there are no more servers available, otherwise None.
1028 if isinstance(result, Exception):
1030 result, tried_server_count = result
1031 if (result is not None) and (result is not False):
1033 elif tried_server_count < 1:
1034 _logger.info("No more Keep services to try; giving up")
1039 def get_from_cache(self, loc_s):
1040 """Fetch a block only if is in the cache, otherwise return None."""
1041 locator = KeepLocator(loc_s)
1042 slot = self.block_cache.get(locator.md5sum)
1043 if slot is not None and slot.ready.is_set():
1048 def refresh_signature(self, loc):
1049 """Ask Keep to get the remote block and return its local signature"""
1050 now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1051 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1054 def head(self, loc_s, **kwargs):
1055 return self._get_or_head(loc_s, method="HEAD", **kwargs)
1058 def get(self, loc_s, **kwargs):
1059 return self._get_or_head(loc_s, method="GET", **kwargs)
1061 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1062 """Get data from Keep.
1064 This method fetches one or more blocks of data from Keep. It
1065 sends a request each Keep service registered with the API
1066 server (or the proxy provided when this client was
1067 instantiated), then each service named in location hints, in
1068 sequence. As soon as one service provides the data, it's
1072 * loc_s: A string of one or more comma-separated locators to fetch.
1073 This method returns the concatenation of these blocks.
1074 * num_retries: The number of times to retry GET requests to
1075 *each* Keep server if it returns temporary failures, with
1076 exponential backoff. Note that, in each loop, the method may try
1077 to fetch data from every available Keep service, along with any
1078 that are named in location hints in the locator. The default value
1079 is set when the KeepClient is initialized.
1082 return ''.join(self.get(x) for x in loc_s.split(','))
1084 self.get_counter.add(1)
1086 request_id = (request_id or
1087 (hasattr(self, 'api_client') and self.api_client.request_id) or
1088 arvados.util.new_request_id())
1091 headers['X-Request-Id'] = request_id
1096 locator = KeepLocator(loc_s)
1098 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1101 # this is request for a prefetch, if it is
1102 # already in flight, return immediately.
1103 # clear 'slot' to prevent finally block from
1104 # calling slot.set()
1107 self.hits_counter.add(1)
1110 raise arvados.errors.KeepReadError(
1111 "failed to read {}".format(loc_s))
1114 self.misses_counter.add(1)
1116 # If the locator has hints specifying a prefix (indicating a
1117 # remote keepproxy) or the UUID of a local gateway service,
1118 # read data from the indicated service(s) instead of the usual
1119 # list of local disk services.
1120 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1121 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1122 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1123 for hint in locator.hints if (
1124 hint.startswith('K@') and
1126 self._gateway_services.get(hint[2:])
1128 # Map root URLs to their KeepService objects.
1130 root: self.KeepService(root, self._user_agent_pool,
1131 upload_counter=self.upload_counter,
1132 download_counter=self.download_counter,
1134 insecure=self.insecure)
1135 for root in hint_roots
1138 # See #3147 for a discussion of the loop implementation. Highlights:
1139 # * Refresh the list of Keep services after each failure, in case
1140 # it's being updated.
1141 # * Retry until we succeed, we're out of retries, or every available
1142 # service has returned permanent failure.
1145 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1147 for tries_left in loop:
1149 sorted_roots = self.map_new_services(
1151 force_rebuild=(tries_left < num_retries),
1152 need_writable=False,
1154 except Exception as error:
1155 loop.save_result(error)
1158 # Query KeepService objects that haven't returned
1159 # permanent failure, in our specified shuffle order.
1160 services_to_try = [roots_map[root]
1161 for root in sorted_roots
1162 if roots_map[root].usable()]
1163 for keep_service in services_to_try:
1164 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1165 if blob is not None:
1167 loop.save_result((blob, len(services_to_try)))
1169 # Always cache the result, then return it if we succeeded.
1173 if slot is not None:
1175 self.block_cache.cap_cache()
1177 # Q: Including 403 is necessary for the Keep tests to continue
1178 # passing, but maybe they should expect KeepReadError instead?
1179 not_founds = sum(1 for key in sorted_roots
1180 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1181 service_errors = ((key, roots_map[key].last_result()['error'])
1182 for key in sorted_roots)
1184 raise arvados.errors.KeepReadError(
1185 "[{}] failed to read {}: no Keep services available ({})".format(
1186 request_id, loc_s, loop.last_result()))
1187 elif not_founds == len(sorted_roots):
1188 raise arvados.errors.NotFoundError(
1189 "[{}] {} not found".format(request_id, loc_s), service_errors)
1191 raise arvados.errors.KeepReadError(
1192 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1195 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1196 """Save data in Keep.
1198 This method will get a list of Keep services from the API server, and
1199 send the data to each one simultaneously in a new thread. Once the
1200 uploads are finished, if enough copies are saved, this method returns
1201 the most recent HTTP response body. If requests fail to upload
1202 enough copies, this method raises KeepWriteError.
1205 * data: The string of data to upload.
1206 * copies: The number of copies that the user requires be saved.
1208 * num_retries: The number of times to retry PUT requests to
1209 *each* Keep server if it returns temporary failures, with
1210 exponential backoff. The default value is set when the
1211 KeepClient is initialized.
1212 * classes: An optional list of storage class names where copies should
1216 classes = classes or self._default_classes
1218 if not isinstance(data, bytes):
1219 data = data.encode()
1221 self.put_counter.add(1)
1223 data_hash = hashlib.md5(data).hexdigest()
1224 loc_s = data_hash + '+' + str(len(data))
1227 locator = KeepLocator(loc_s)
1229 request_id = (request_id or
1230 (hasattr(self, 'api_client') and self.api_client.request_id) or
1231 arvados.util.new_request_id())
1233 'X-Request-Id': request_id,
1234 'X-Keep-Desired-Replicas': str(copies),
1237 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1241 for tries_left in loop:
1243 sorted_roots = self.map_new_services(
1245 force_rebuild=(tries_left < num_retries),
1248 except Exception as error:
1249 loop.save_result(error)
1252 pending_classes = []
1253 if done_classes is not None:
1254 pending_classes = list(set(classes) - set(done_classes))
1255 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1256 data_hash=data_hash,
1257 copies=copies - done_copies,
1258 max_service_replicas=self.max_replicas_per_service,
1259 timeout=self.current_timeout(num_retries - tries_left),
1260 classes=pending_classes)
1261 for service_root, ks in [(root, roots_map[root])
1262 for root in sorted_roots]:
1265 writer_pool.add_task(ks, service_root)
1267 pool_copies, pool_classes = writer_pool.done()
1268 done_copies += pool_copies
1269 if (done_classes is not None) and (pool_classes is not None):
1270 done_classes += pool_classes
1272 (done_copies >= copies and set(done_classes) == set(classes),
1273 writer_pool.total_task_nr))
1275 # Old keepstore contacted without storage classes support:
1276 # success is determined only by successful copies.
1278 # Disable storage classes tracking from this point forward.
1279 if not self._storage_classes_unsupported_warning:
1280 self._storage_classes_unsupported_warning = True
1281 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1284 (done_copies >= copies, writer_pool.total_task_nr))
1287 return writer_pool.response()
1289 raise arvados.errors.KeepWriteError(
1290 "[{}] failed to write {}: no Keep services available ({})".format(
1291 request_id, data_hash, loop.last_result()))
1293 service_errors = ((key, roots_map[key].last_result()['error'])
1294 for key in sorted_roots
1295 if roots_map[key].last_result()['error'])
1296 raise arvados.errors.KeepWriteError(
1297 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1298 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1300 def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1301 """A stub for put().
1303 This method is used in place of the real put() method when
1304 using local storage (see constructor's local_store argument).
1306 copies and num_retries arguments are ignored: they are here
1307 only for the sake of offering the same call signature as
1310 Data stored this way can be retrieved via local_store_get().
1312 md5 = hashlib.md5(data).hexdigest()
1313 locator = '%s+%d' % (md5, len(data))
1314 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1316 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1317 os.path.join(self.local_store, md5))
1320 def local_store_get(self, loc_s, num_retries=None):
1321 """Companion to local_store_put()."""
1323 locator = KeepLocator(loc_s)
1325 raise arvados.errors.NotFoundError(
1326 "Invalid data locator: '%s'" % loc_s)
1327 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1329 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1332 def local_store_head(self, loc_s, num_retries=None):
1333 """Companion to local_store_put()."""
1335 locator = KeepLocator(loc_s)
1337 raise arvados.errors.NotFoundError(
1338 "Invalid data locator: '%s'" % loc_s)
1339 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1341 if os.path.exists(os.path.join(self.local_store, locator.md5sum)):