1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from __future__ import division
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
33 if sys.version_info >= (3, 0):
34 from io import BytesIO
36 from cStringIO import StringIO as BytesIO
39 import arvados.config as config
41 import arvados.retry as retry
43 import arvados.diskcache
45 _logger = logging.getLogger('arvados.keep')
46 global_client_object = None
49 # Monkey patch TCP constants when not available (apple). Values sourced from:
50 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
51 if sys.platform == 'darwin':
52 if not hasattr(socket, 'TCP_KEEPALIVE'):
53 socket.TCP_KEEPALIVE = 0x010
54 if not hasattr(socket, 'TCP_KEEPINTVL'):
55 socket.TCP_KEEPINTVL = 0x101
56 if not hasattr(socket, 'TCP_KEEPCNT'):
57 socket.TCP_KEEPCNT = 0x102
60 class KeepLocator(object):
61 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
62 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
64 def __init__(self, locator_str):
67 self._perm_expiry = None
68 pieces = iter(locator_str.split('+'))
69 self.md5sum = next(pieces)
71 self.size = int(next(pieces))
75 if self.HINT_RE.match(hint) is None:
76 raise ValueError("invalid hint format: {}".format(hint))
77 elif hint.startswith('A'):
78 self.parse_permission_hint(hint)
80 self.hints.append(hint)
85 for s in [self.md5sum, self.size,
86 self.permission_hint()] + self.hints
90 if self.size is not None:
91 return "%s+%i" % (self.md5sum, self.size)
95 def _make_hex_prop(name, length):
96 # Build and return a new property with the given name that
97 # must be a hex string of the given length.
98 data_name = '_{}'.format(name)
100 return getattr(self, data_name)
101 def setter(self, hex_str):
102 if not arvados.util.is_hex(hex_str, length):
103 raise ValueError("{} is not a {}-digit hex string: {!r}".
104 format(name, length, hex_str))
105 setattr(self, data_name, hex_str)
106 return property(getter, setter)
108 md5sum = _make_hex_prop('md5sum', 32)
109 perm_sig = _make_hex_prop('perm_sig', 40)
112 def perm_expiry(self):
113 return self._perm_expiry
116 def perm_expiry(self, value):
117 if not arvados.util.is_hex(value, 1, 8):
119 "permission timestamp must be a hex Unix timestamp: {}".
121 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
123 def permission_hint(self):
124 data = [self.perm_sig, self.perm_expiry]
127 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
128 return "A{}@{:08x}".format(*data)
130 def parse_permission_hint(self, s):
132 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
134 raise ValueError("bad permission hint {}".format(s))
136 def permission_expired(self, as_of_dt=None):
137 if self.perm_expiry is None:
139 elif as_of_dt is None:
140 as_of_dt = datetime.datetime.now()
141 return self.perm_expiry <= as_of_dt
145 """Simple interface to a global KeepClient object.
147 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
148 own API client. The global KeepClient will build an API client from the
149 current Arvados configuration, which may not match the one you built.
154 def global_client_object(cls):
155 global global_client_object
156 # Previously, KeepClient would change its behavior at runtime based
157 # on these configuration settings. We simulate that behavior here
158 # by checking the values and returning a new KeepClient if any of
160 key = (config.get('ARVADOS_API_HOST'),
161 config.get('ARVADOS_API_TOKEN'),
162 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
163 config.get('ARVADOS_KEEP_PROXY'),
164 os.environ.get('KEEP_LOCAL_STORE'))
165 if (global_client_object is None) or (cls._last_key != key):
166 global_client_object = KeepClient()
168 return global_client_object
171 def get(locator, **kwargs):
172 return Keep.global_client_object().get(locator, **kwargs)
175 def put(data, **kwargs):
176 return Keep.global_client_object().put(data, **kwargs)
178 class KeepBlockCache(object):
179 def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
180 self.cache_max = cache_max
182 self._cache_lock = threading.Lock()
183 self._max_slots = max_slots
184 self._disk_cache = disk_cache
185 self._disk_cache_dir = disk_cache_dir
187 if self._disk_cache and self._disk_cache_dir is None:
188 self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
189 os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
191 if self._max_slots == 0:
193 # default set max slots to half of maximum file handles
194 self._max_slots = resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 2
196 self._max_slots = 1024
198 if self.cache_max == 0:
200 fs = os.statvfs(self._disk_cache_dir)
201 avail = (fs.f_bavail * fs.f_bsize) / 2
202 # Half the available space or max_slots * 64 MiB
203 self.cache_max = min(avail, (self._max_slots * 64 * 1024 * 1024))
206 self.cache_max = (256 * 1024 * 1024)
208 self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
211 self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
215 class CacheSlot(object):
216 __slots__ = ("locator", "ready", "content")
218 def __init__(self, locator):
219 self.locator = locator
220 self.ready = threading.Event()
227 def set(self, value):
232 if self.content is None:
235 return len(self.content)
241 '''Cap the cache size to self.cache_max'''
242 with self._cache_lock:
243 # Select all slots except those where ready.is_set() and content is
244 # None (that means there was an error reading the block).
245 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
246 sm = sum([slot.size() for slot in self._cache])
247 while len(self._cache) > 0 and (sm > self.cache_max or len(self._cache) > self._max_slots):
248 for i in range(len(self._cache)-1, -1, -1):
249 if self._cache[i].ready.is_set():
250 self._cache[i].evict()
253 sm = sum([slot.size() for slot in self._cache])
255 def _get(self, locator):
256 # Test if the locator is already in the cache
257 for i in range(0, len(self._cache)):
258 if self._cache[i].locator == locator:
261 # move it to the front
263 self._cache.insert(0, n)
266 # see if it exists on disk
267 n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
269 self._cache.insert(0, n)
273 def get(self, locator):
274 with self._cache_lock:
275 return self._get(locator)
277 def reserve_cache(self, locator):
278 '''Reserve a cache slot for the specified locator,
279 or return the existing slot.'''
280 with self._cache_lock:
281 n = self._get(locator)
285 # Add a new cache slot for the locator
287 n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
289 n = KeepBlockCache.CacheSlot(locator)
290 self._cache.insert(0, n)
293 class Counter(object):
294 def __init__(self, v=0):
295 self._lk = threading.Lock()
307 class KeepClient(object):
309 # Default Keep server connection timeout: 2 seconds
310 # Default Keep server read timeout: 256 seconds
311 # Default Keep server bandwidth minimum: 32768 bytes per second
312 # Default Keep proxy connection timeout: 20 seconds
313 # Default Keep proxy read timeout: 256 seconds
314 # Default Keep proxy bandwidth minimum: 32768 bytes per second
315 DEFAULT_TIMEOUT = (2, 256, 32768)
316 DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
319 class KeepService(object):
320 """Make requests to a single Keep service, and track results.
322 A KeepService is intended to last long enough to perform one
323 transaction (GET or PUT) against one Keep service. This can
324 involve calling either get() or put() multiple times in order
325 to retry after transient failures. However, calling both get()
326 and put() on a single instance -- or using the same instance
327 to access two different Keep services -- will not produce
334 arvados.errors.HttpError,
337 def __init__(self, root, user_agent_pool=queue.LifoQueue(),
339 download_counter=None,
343 self._user_agent_pool = user_agent_pool
344 self._result = {'error': None}
348 self.get_headers = {'Accept': 'application/octet-stream'}
349 self.get_headers.update(headers)
350 self.put_headers = headers
351 self.upload_counter = upload_counter
352 self.download_counter = download_counter
353 self.insecure = insecure
356 """Is it worth attempting a request?"""
360 """Did the request succeed or encounter permanent failure?"""
361 return self._result['error'] == False or not self._usable
363 def last_result(self):
366 def _get_user_agent(self):
368 return self._user_agent_pool.get(block=False)
372 def _put_user_agent(self, ua):
375 self._user_agent_pool.put(ua, block=False)
379 def _socket_open(self, *args, **kwargs):
380 if len(args) + len(kwargs) == 2:
381 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
383 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
385 def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
386 return self._socket_open_pycurl_7_21_5(
388 address=collections.namedtuple(
389 'Address', ['family', 'socktype', 'protocol', 'addr'],
390 )(family, socktype, protocol, address))
392 def _socket_open_pycurl_7_21_5(self, purpose, address):
393 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
394 s = socket.socket(address.family, address.socktype, address.protocol)
395 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
396 # Will throw invalid protocol error on mac. This test prevents that.
397 if hasattr(socket, 'TCP_KEEPIDLE'):
398 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
399 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
403 def get(self, locator, method="GET", timeout=None):
404 # locator is a KeepLocator object.
405 url = self.root + str(locator)
406 _logger.debug("Request: %s %s", method, url)
407 curl = self._get_user_agent()
410 with timer.Timer() as t:
412 response_body = BytesIO()
413 curl.setopt(pycurl.NOSIGNAL, 1)
414 curl.setopt(pycurl.OPENSOCKETFUNCTION,
415 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
416 curl.setopt(pycurl.URL, url.encode('utf-8'))
417 curl.setopt(pycurl.HTTPHEADER, [
418 '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
419 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
420 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
422 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
423 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
425 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
427 curl.setopt(pycurl.NOBODY, True)
428 self._setcurltimeouts(curl, timeout, method=="HEAD")
432 except Exception as e:
433 raise arvados.errors.HttpError(0, str(e))
439 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
440 'body': response_body.getvalue(),
441 'headers': self._headers,
445 ok = retry.check_http_response_success(self._result['status_code'])
447 self._result['error'] = arvados.errors.HttpError(
448 self._result['status_code'],
449 self._headers.get('x-status-line', 'Error'))
450 except self.HTTP_ERRORS as e:
454 self._usable = ok != False
455 if self._result.get('status_code', None):
456 # The client worked well enough to get an HTTP status
457 # code, so presumably any problems are just on the
458 # server side and it's OK to reuse the client.
459 self._put_user_agent(curl)
461 # Don't return this client to the pool, in case it's
465 _logger.debug("Request fail: GET %s => %s: %s",
466 url, type(self._result['error']), str(self._result['error']))
469 _logger.info("HEAD %s: %s bytes",
470 self._result['status_code'],
471 self._result.get('content-length'))
472 if self._result['headers'].get('x-keep-locator'):
473 # This is a response to a remote block copy request, return
474 # the local copy block locator.
475 return self._result['headers'].get('x-keep-locator')
478 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
479 self._result['status_code'],
480 len(self._result['body']),
482 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
484 if self.download_counter:
485 self.download_counter.add(len(self._result['body']))
486 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
487 if resp_md5 != locator.md5sum:
488 _logger.warning("Checksum fail: md5(%s) = %s",
490 self._result['error'] = arvados.errors.HttpError(
493 return self._result['body']
495 def put(self, hash_s, body, timeout=None, headers={}):
496 put_headers = copy.copy(self.put_headers)
497 put_headers.update(headers)
498 url = self.root + hash_s
499 _logger.debug("Request: PUT %s", url)
500 curl = self._get_user_agent()
503 with timer.Timer() as t:
505 body_reader = BytesIO(body)
506 response_body = BytesIO()
507 curl.setopt(pycurl.NOSIGNAL, 1)
508 curl.setopt(pycurl.OPENSOCKETFUNCTION,
509 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
510 curl.setopt(pycurl.URL, url.encode('utf-8'))
511 # Using UPLOAD tells cURL to wait for a "go ahead" from the
512 # Keep server (in the form of a HTTP/1.1 "100 Continue"
513 # response) instead of sending the request body immediately.
514 # This allows the server to reject the request if the request
515 # is invalid or the server is read-only, without waiting for
516 # the client to send the entire block.
517 curl.setopt(pycurl.UPLOAD, True)
518 curl.setopt(pycurl.INFILESIZE, len(body))
519 curl.setopt(pycurl.READFUNCTION, body_reader.read)
520 curl.setopt(pycurl.HTTPHEADER, [
521 '{}: {}'.format(k,v) for k,v in put_headers.items()])
522 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
523 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
525 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
526 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
528 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
529 self._setcurltimeouts(curl, timeout)
532 except Exception as e:
533 raise arvados.errors.HttpError(0, str(e))
539 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
540 'body': response_body.getvalue().decode('utf-8'),
541 'headers': self._headers,
544 ok = retry.check_http_response_success(self._result['status_code'])
546 self._result['error'] = arvados.errors.HttpError(
547 self._result['status_code'],
548 self._headers.get('x-status-line', 'Error'))
549 except self.HTTP_ERRORS as e:
553 self._usable = ok != False # still usable if ok is True or None
554 if self._result.get('status_code', None):
555 # Client is functional. See comment in get().
556 self._put_user_agent(curl)
560 _logger.debug("Request fail: PUT %s => %s: %s",
561 url, type(self._result['error']), str(self._result['error']))
563 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
564 self._result['status_code'],
567 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
568 if self.upload_counter:
569 self.upload_counter.add(len(body))
572 def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
575 elif isinstance(timeouts, tuple):
576 if len(timeouts) == 2:
577 conn_t, xfer_t = timeouts
578 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
580 conn_t, xfer_t, bandwidth_bps = timeouts
582 conn_t, xfer_t = (timeouts, timeouts)
583 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
584 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
585 if not ignore_bandwidth:
586 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
587 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
589 def _headerfunction(self, header_line):
590 if isinstance(header_line, bytes):
591 header_line = header_line.decode('iso-8859-1')
592 if ':' in header_line:
593 name, value = header_line.split(':', 1)
594 name = name.strip().lower()
595 value = value.strip()
597 name = self._lastheadername
598 value = self._headers[name] + ' ' + header_line.strip()
599 elif header_line.startswith('HTTP/'):
600 name = 'x-status-line'
603 _logger.error("Unexpected header line: %s", header_line)
605 self._lastheadername = name
606 self._headers[name] = value
607 # Returning None implies all bytes were written
610 class KeepWriterQueue(queue.Queue):
611 def __init__(self, copies, classes=[]):
612 queue.Queue.__init__(self) # Old-style superclass
613 self.wanted_copies = copies
614 self.wanted_storage_classes = classes
615 self.successful_copies = 0
616 self.confirmed_storage_classes = {}
618 self.storage_classes_tracking = True
619 self.queue_data_lock = threading.RLock()
620 self.pending_tries = max(copies, len(classes))
621 self.pending_tries_notification = threading.Condition()
623 def write_success(self, response, replicas_nr, classes_confirmed):
624 with self.queue_data_lock:
625 self.successful_copies += replicas_nr
626 if classes_confirmed is None:
627 self.storage_classes_tracking = False
628 elif self.storage_classes_tracking:
629 for st_class, st_copies in classes_confirmed.items():
631 self.confirmed_storage_classes[st_class] += st_copies
633 self.confirmed_storage_classes[st_class] = st_copies
634 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
635 self.response = response
636 with self.pending_tries_notification:
637 self.pending_tries_notification.notify_all()
639 def write_fail(self, ks):
640 with self.pending_tries_notification:
641 self.pending_tries += 1
642 self.pending_tries_notification.notify()
644 def pending_copies(self):
645 with self.queue_data_lock:
646 return self.wanted_copies - self.successful_copies
648 def satisfied_classes(self):
649 with self.queue_data_lock:
650 if not self.storage_classes_tracking:
651 # Notifies disabled storage classes expectation to
654 return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
656 def pending_classes(self):
657 with self.queue_data_lock:
658 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
660 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
661 for st_class, st_copies in self.confirmed_storage_classes.items():
662 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
663 unsatisfied_classes.remove(st_class)
664 return unsatisfied_classes
666 def get_next_task(self):
667 with self.pending_tries_notification:
669 if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
670 # This notify_all() is unnecessary --
671 # write_success() already called notify_all()
672 # when pending<1 became true, so it's not
673 # possible for any other thread to be in
674 # wait() now -- but it's cheap insurance
675 # against deadlock so we do it anyway:
676 self.pending_tries_notification.notify_all()
677 # Drain the queue and then raise Queue.Empty
681 elif self.pending_tries > 0:
682 service, service_root = self.get_nowait()
683 if service.finished():
686 self.pending_tries -= 1
687 return service, service_root
689 self.pending_tries_notification.notify_all()
692 self.pending_tries_notification.wait()
695 class KeepWriterThreadPool(object):
696 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
697 self.total_task_nr = 0
698 if (not max_service_replicas) or (max_service_replicas >= copies):
701 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
702 _logger.debug("Pool max threads is %d", num_threads)
704 self.queue = KeepClient.KeepWriterQueue(copies, classes)
706 for _ in range(num_threads):
707 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
708 self.workers.append(w)
710 def add_task(self, ks, service_root):
711 self.queue.put((ks, service_root))
712 self.total_task_nr += 1
715 return self.queue.successful_copies, self.queue.satisfied_classes()
719 for worker in self.workers:
721 # Wait for finished work
725 return self.queue.response
728 class KeepWriterThread(threading.Thread):
729 class TaskFailed(RuntimeError): pass
731 def __init__(self, queue, data, data_hash, timeout=None):
732 super(KeepClient.KeepWriterThread, self).__init__()
733 self.timeout = timeout
736 self.data_hash = data_hash
742 service, service_root = self.queue.get_next_task()
746 locator, copies, classes = self.do_task(service, service_root)
747 except Exception as e:
748 if not isinstance(e, self.TaskFailed):
749 _logger.exception("Exception in KeepWriterThread")
750 self.queue.write_fail(service)
752 self.queue.write_success(locator, copies, classes)
754 self.queue.task_done()
756 def do_task(self, service, service_root):
757 classes = self.queue.pending_classes()
761 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
762 success = bool(service.put(self.data_hash,
764 timeout=self.timeout,
766 result = service.last_result()
769 if result.get('status_code'):
770 _logger.debug("Request fail: PUT %s => %s %s",
772 result.get('status_code'),
774 raise self.TaskFailed()
776 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
777 str(threading.current_thread()),
782 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
783 except (KeyError, ValueError):
786 classes_confirmed = {}
788 scch = result['headers']['x-keep-storage-classes-confirmed']
789 for confirmation in scch.replace(' ', '').split(','):
790 if '=' in confirmation:
791 stored_class, stored_copies = confirmation.split('=')[:2]
792 classes_confirmed[stored_class] = int(stored_copies)
793 except (KeyError, ValueError):
794 # Storage classes confirmed header missing or corrupt
795 classes_confirmed = None
797 return result['body'].strip(), replicas_stored, classes_confirmed
800 def __init__(self, api_client=None, proxy=None,
801 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
802 api_token=None, local_store=None, block_cache=None,
803 num_retries=0, session=None):
804 """Initialize a new KeepClient.
808 The API client to use to find Keep services. If not
809 provided, KeepClient will build one from available Arvados
813 If specified, this KeepClient will send requests to this Keep
814 proxy. Otherwise, KeepClient will fall back to the setting of the
815 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
816 If you want to KeepClient does not use a proxy, pass in an empty
820 The initial timeout (in seconds) for HTTP requests to Keep
821 non-proxy servers. A tuple of three floats is interpreted as
822 (connection_timeout, read_timeout, minimum_bandwidth). A connection
823 will be aborted if the average traffic rate falls below
824 minimum_bandwidth bytes per second over an interval of read_timeout
825 seconds. Because timeouts are often a result of transient server
826 load, the actual connection timeout will be increased by a factor
827 of two on each retry.
828 Default: (2, 256, 32768).
831 The initial timeout (in seconds) for HTTP requests to
832 Keep proxies. A tuple of three floats is interpreted as
833 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
834 described above for adjusting connection timeouts on retry also
836 Default: (20, 256, 32768).
839 If you're not using an API client, but only talking
840 directly to a Keep proxy, this parameter specifies an API token
841 to authenticate Keep requests. It is an error to specify both
842 api_client and api_token. If you specify neither, KeepClient
843 will use one available from the Arvados configuration.
846 If specified, this KeepClient will bypass Keep
847 services, and save data to the named directory. If unspecified,
848 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
849 environment variable. If you want to ensure KeepClient does not
850 use local storage, pass in an empty string. This is primarily
851 intended to mock a server for testing.
854 The default number of times to retry failed requests.
855 This will be used as the default num_retries value when get() and
856 put() are called. Default 0.
858 self.lock = threading.Lock()
860 if config.get('ARVADOS_KEEP_SERVICES'):
861 proxy = config.get('ARVADOS_KEEP_SERVICES')
863 proxy = config.get('ARVADOS_KEEP_PROXY')
864 if api_token is None:
865 if api_client is None:
866 api_token = config.get('ARVADOS_API_TOKEN')
868 api_token = api_client.api_token
869 elif api_client is not None:
871 "can't build KeepClient with both API client and token")
872 if local_store is None:
873 local_store = os.environ.get('KEEP_LOCAL_STORE')
875 if api_client is None:
876 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
878 self.insecure = api_client.insecure
880 self.block_cache = block_cache if block_cache else KeepBlockCache()
881 self.timeout = timeout
882 self.proxy_timeout = proxy_timeout
883 self._user_agent_pool = queue.LifoQueue()
884 self.upload_counter = Counter()
885 self.download_counter = Counter()
886 self.put_counter = Counter()
887 self.get_counter = Counter()
888 self.hits_counter = Counter()
889 self.misses_counter = Counter()
890 self._storage_classes_unsupported_warning = False
891 self._default_classes = []
894 self.local_store = local_store
895 self.head = self.local_store_head
896 self.get = self.local_store_get
897 self.put = self.local_store_put
899 self.num_retries = num_retries
900 self.max_replicas_per_service = None
902 proxy_uris = proxy.split()
903 for i in range(len(proxy_uris)):
904 if not proxy_uris[i].endswith('/'):
907 url = urllib.parse.urlparse(proxy_uris[i])
908 if not (url.scheme and url.netloc):
909 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
910 self.api_token = api_token
911 self._gateway_services = {}
912 self._keep_services = [{
913 'uuid': "00000-bi6l4-%015d" % idx,
914 'service_type': 'proxy',
915 '_service_root': uri,
916 } for idx, uri in enumerate(proxy_uris)]
917 self._writable_services = self._keep_services
918 self.using_proxy = True
919 self._static_services_list = True
921 # It's important to avoid instantiating an API client
922 # unless we actually need one, for testing's sake.
923 if api_client is None:
924 api_client = arvados.api('v1')
925 self.api_client = api_client
926 self.api_token = api_client.api_token
927 self._gateway_services = {}
928 self._keep_services = None
929 self._writable_services = None
930 self.using_proxy = None
931 self._static_services_list = False
933 self._default_classes = [
934 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
936 # We're talking to an old cluster
939 def current_timeout(self, attempt_number):
940 """Return the appropriate timeout to use for this client.
942 The proxy timeout setting if the backend service is currently a proxy,
943 the regular timeout setting otherwise. The `attempt_number` indicates
944 how many times the operation has been tried already (starting from 0
945 for the first try), and scales the connection timeout portion of the
946 return value accordingly.
949 # TODO(twp): the timeout should be a property of a
950 # KeepService, not a KeepClient. See #4488.
951 t = self.proxy_timeout if self.using_proxy else self.timeout
953 return (t[0] * (1 << attempt_number), t[1])
955 return (t[0] * (1 << attempt_number), t[1], t[2])
956 def _any_nondisk_services(self, service_list):
957 return any(ks.get('service_type', 'disk') != 'disk'
958 for ks in service_list)
960 def build_services_list(self, force_rebuild=False):
961 if (self._static_services_list or
962 (self._keep_services and not force_rebuild)):
966 keep_services = self.api_client.keep_services().accessible()
967 except Exception: # API server predates Keep services.
968 keep_services = self.api_client.keep_disks().list()
970 # Gateway services are only used when specified by UUID,
971 # so there's nothing to gain by filtering them by
973 self._gateway_services = {ks['uuid']: ks for ks in
974 keep_services.execute()['items']}
975 if not self._gateway_services:
976 raise arvados.errors.NoKeepServersError()
978 # Precompute the base URI for each service.
979 for r in self._gateway_services.values():
980 host = r['service_host']
981 if not host.startswith('[') and host.find(':') >= 0:
982 # IPv6 URIs must be formatted like http://[::1]:80/...
983 host = '[' + host + ']'
984 r['_service_root'] = "{}://{}:{:d}/".format(
985 'https' if r['service_ssl_flag'] else 'http',
989 _logger.debug(str(self._gateway_services))
990 self._keep_services = [
991 ks for ks in self._gateway_services.values()
992 if not ks.get('service_type', '').startswith('gateway:')]
993 self._writable_services = [ks for ks in self._keep_services
994 if not ks.get('read_only')]
996 # For disk type services, max_replicas_per_service is 1
997 # It is unknown (unlimited) for other service types.
998 if self._any_nondisk_services(self._writable_services):
999 self.max_replicas_per_service = None
1001 self.max_replicas_per_service = 1
1003 def _service_weight(self, data_hash, service_uuid):
1004 """Compute the weight of a Keep service endpoint for a data
1005 block with a known hash.
1007 The weight is md5(h + u) where u is the last 15 characters of
1008 the service endpoint's UUID.
1010 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1012 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1013 """Return an array of Keep service endpoints, in the order in
1014 which they should be probed when reading or writing data with
1015 the given hash+hints.
1017 self.build_services_list(force_rebuild)
1020 # Use the services indicated by the given +K@... remote
1021 # service hints, if any are present and can be resolved to a
1023 for hint in locator.hints:
1024 if hint.startswith('K@'):
1026 sorted_roots.append(
1027 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1028 elif len(hint) == 29:
1029 svc = self._gateway_services.get(hint[2:])
1031 sorted_roots.append(svc['_service_root'])
1033 # Sort the available local services by weight (heaviest first)
1034 # for this locator, and return their service_roots (base URIs)
1036 use_services = self._keep_services
1038 use_services = self._writable_services
1039 self.using_proxy = self._any_nondisk_services(use_services)
1040 sorted_roots.extend([
1041 svc['_service_root'] for svc in sorted(
1044 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1045 _logger.debug("{}: {}".format(locator, sorted_roots))
1048 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1049 # roots_map is a dictionary, mapping Keep service root strings
1050 # to KeepService objects. Poll for Keep services, and add any
1051 # new ones to roots_map. Return the current list of local
1053 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1054 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1055 for root in local_roots:
1056 if root not in roots_map:
1057 roots_map[root] = self.KeepService(
1058 root, self._user_agent_pool,
1059 upload_counter=self.upload_counter,
1060 download_counter=self.download_counter,
1062 insecure=self.insecure)
1066 def _check_loop_result(result):
1067 # KeepClient RetryLoops should save results as a 2-tuple: the
1068 # actual result of the request, and the number of servers available
1069 # to receive the request this round.
1070 # This method returns True if there's a real result, False if
1071 # there are no more servers available, otherwise None.
1072 if isinstance(result, Exception):
1074 result, tried_server_count = result
1075 if (result is not None) and (result is not False):
1077 elif tried_server_count < 1:
1078 _logger.info("No more Keep services to try; giving up")
1083 def get_from_cache(self, loc_s):
1084 """Fetch a block only if is in the cache, otherwise return None."""
1085 locator = KeepLocator(loc_s)
1086 slot = self.block_cache.get(locator.md5sum)
1087 if slot is not None and slot.ready.is_set():
1092 def refresh_signature(self, loc):
1093 """Ask Keep to get the remote block and return its local signature"""
1094 now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1095 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1098 def head(self, loc_s, **kwargs):
1099 return self._get_or_head(loc_s, method="HEAD", **kwargs)
1102 def get(self, loc_s, **kwargs):
1103 return self._get_or_head(loc_s, method="GET", **kwargs)
1105 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1106 """Get data from Keep.
1108 This method fetches one or more blocks of data from Keep. It
1109 sends a request each Keep service registered with the API
1110 server (or the proxy provided when this client was
1111 instantiated), then each service named in location hints, in
1112 sequence. As soon as one service provides the data, it's
1116 * loc_s: A string of one or more comma-separated locators to fetch.
1117 This method returns the concatenation of these blocks.
1118 * num_retries: The number of times to retry GET requests to
1119 *each* Keep server if it returns temporary failures, with
1120 exponential backoff. Note that, in each loop, the method may try
1121 to fetch data from every available Keep service, along with any
1122 that are named in location hints in the locator. The default value
1123 is set when the KeepClient is initialized.
1126 return ''.join(self.get(x) for x in loc_s.split(','))
1128 self.get_counter.add(1)
1130 request_id = (request_id or
1131 (hasattr(self, 'api_client') and self.api_client.request_id) or
1132 arvados.util.new_request_id())
1135 headers['X-Request-Id'] = request_id
1140 locator = KeepLocator(loc_s)
1142 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1145 # this is request for a prefetch, if it is
1146 # already in flight, return immediately.
1147 # clear 'slot' to prevent finally block from
1148 # calling slot.set()
1151 self.hits_counter.add(1)
1154 raise arvados.errors.KeepReadError(
1155 "failed to read {}".format(loc_s))
1158 self.misses_counter.add(1)
1160 # If the locator has hints specifying a prefix (indicating a
1161 # remote keepproxy) or the UUID of a local gateway service,
1162 # read data from the indicated service(s) instead of the usual
1163 # list of local disk services.
1164 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1165 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1166 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1167 for hint in locator.hints if (
1168 hint.startswith('K@') and
1170 self._gateway_services.get(hint[2:])
1172 # Map root URLs to their KeepService objects.
1174 root: self.KeepService(root, self._user_agent_pool,
1175 upload_counter=self.upload_counter,
1176 download_counter=self.download_counter,
1178 insecure=self.insecure)
1179 for root in hint_roots
1182 # See #3147 for a discussion of the loop implementation. Highlights:
1183 # * Refresh the list of Keep services after each failure, in case
1184 # it's being updated.
1185 # * Retry until we succeed, we're out of retries, or every available
1186 # service has returned permanent failure.
1189 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1191 for tries_left in loop:
1193 sorted_roots = self.map_new_services(
1195 force_rebuild=(tries_left < num_retries),
1196 need_writable=False,
1198 except Exception as error:
1199 loop.save_result(error)
1202 # Query KeepService objects that haven't returned
1203 # permanent failure, in our specified shuffle order.
1204 services_to_try = [roots_map[root]
1205 for root in sorted_roots
1206 if roots_map[root].usable()]
1207 for keep_service in services_to_try:
1208 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1209 if blob is not None:
1211 loop.save_result((blob, len(services_to_try)))
1213 # Always cache the result, then return it if we succeeded.
1217 if slot is not None:
1219 self.block_cache.cap_cache()
1221 # Q: Including 403 is necessary for the Keep tests to continue
1222 # passing, but maybe they should expect KeepReadError instead?
1223 not_founds = sum(1 for key in sorted_roots
1224 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1225 service_errors = ((key, roots_map[key].last_result()['error'])
1226 for key in sorted_roots)
1228 raise arvados.errors.KeepReadError(
1229 "[{}] failed to read {}: no Keep services available ({})".format(
1230 request_id, loc_s, loop.last_result()))
1231 elif not_founds == len(sorted_roots):
1232 raise arvados.errors.NotFoundError(
1233 "[{}] {} not found".format(request_id, loc_s), service_errors)
1235 raise arvados.errors.KeepReadError(
1236 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1239 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1240 """Save data in Keep.
1242 This method will get a list of Keep services from the API server, and
1243 send the data to each one simultaneously in a new thread. Once the
1244 uploads are finished, if enough copies are saved, this method returns
1245 the most recent HTTP response body. If requests fail to upload
1246 enough copies, this method raises KeepWriteError.
1249 * data: The string of data to upload.
1250 * copies: The number of copies that the user requires be saved.
1252 * num_retries: The number of times to retry PUT requests to
1253 *each* Keep server if it returns temporary failures, with
1254 exponential backoff. The default value is set when the
1255 KeepClient is initialized.
1256 * classes: An optional list of storage class names where copies should
1260 classes = classes or self._default_classes
1262 if not isinstance(data, bytes):
1263 data = data.encode()
1265 self.put_counter.add(1)
1267 data_hash = hashlib.md5(data).hexdigest()
1268 loc_s = data_hash + '+' + str(len(data))
1271 locator = KeepLocator(loc_s)
1273 request_id = (request_id or
1274 (hasattr(self, 'api_client') and self.api_client.request_id) or
1275 arvados.util.new_request_id())
1277 'X-Request-Id': request_id,
1278 'X-Keep-Desired-Replicas': str(copies),
1281 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1285 for tries_left in loop:
1287 sorted_roots = self.map_new_services(
1289 force_rebuild=(tries_left < num_retries),
1292 except Exception as error:
1293 loop.save_result(error)
1296 pending_classes = []
1297 if done_classes is not None:
1298 pending_classes = list(set(classes) - set(done_classes))
1299 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1300 data_hash=data_hash,
1301 copies=copies - done_copies,
1302 max_service_replicas=self.max_replicas_per_service,
1303 timeout=self.current_timeout(num_retries - tries_left),
1304 classes=pending_classes)
1305 for service_root, ks in [(root, roots_map[root])
1306 for root in sorted_roots]:
1309 writer_pool.add_task(ks, service_root)
1311 pool_copies, pool_classes = writer_pool.done()
1312 done_copies += pool_copies
1313 if (done_classes is not None) and (pool_classes is not None):
1314 done_classes += pool_classes
1316 (done_copies >= copies and set(done_classes) == set(classes),
1317 writer_pool.total_task_nr))
1319 # Old keepstore contacted without storage classes support:
1320 # success is determined only by successful copies.
1322 # Disable storage classes tracking from this point forward.
1323 if not self._storage_classes_unsupported_warning:
1324 self._storage_classes_unsupported_warning = True
1325 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1328 (done_copies >= copies, writer_pool.total_task_nr))
1331 return writer_pool.response()
1333 raise arvados.errors.KeepWriteError(
1334 "[{}] failed to write {}: no Keep services available ({})".format(
1335 request_id, data_hash, loop.last_result()))
1337 service_errors = ((key, roots_map[key].last_result()['error'])
1338 for key in sorted_roots
1339 if roots_map[key].last_result()['error'])
1340 raise arvados.errors.KeepWriteError(
1341 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1342 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1344 def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1345 """A stub for put().
1347 This method is used in place of the real put() method when
1348 using local storage (see constructor's local_store argument).
1350 copies and num_retries arguments are ignored: they are here
1351 only for the sake of offering the same call signature as
1354 Data stored this way can be retrieved via local_store_get().
1356 md5 = hashlib.md5(data).hexdigest()
1357 locator = '%s+%d' % (md5, len(data))
1358 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1360 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1361 os.path.join(self.local_store, md5))
1364 def local_store_get(self, loc_s, num_retries=None):
1365 """Companion to local_store_put()."""
1367 locator = KeepLocator(loc_s)
1369 raise arvados.errors.NotFoundError(
1370 "Invalid data locator: '%s'" % loc_s)
1371 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1373 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1376 def local_store_head(self, loc_s, num_retries=None):
1377 """Companion to local_store_put()."""
1379 locator = KeepLocator(loc_s)
1381 raise arvados.errors.NotFoundError(
1382 "Invalid data locator: '%s'" % loc_s)
1383 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1385 if os.path.exists(os.path.join(self.local_store, locator.md5sum)):