1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from __future__ import division
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
33 if sys.version_info >= (3, 0):
34 from io import BytesIO
36 from cStringIO import StringIO as BytesIO
39 import arvados.config as config
41 import arvados.retry as retry
43 import arvados.diskcache
45 _logger = logging.getLogger('arvados.keep')
46 global_client_object = None
49 # Monkey patch TCP constants when not available (apple). Values sourced from:
50 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
51 if sys.platform == 'darwin':
52 if not hasattr(socket, 'TCP_KEEPALIVE'):
53 socket.TCP_KEEPALIVE = 0x010
54 if not hasattr(socket, 'TCP_KEEPINTVL'):
55 socket.TCP_KEEPINTVL = 0x101
56 if not hasattr(socket, 'TCP_KEEPCNT'):
57 socket.TCP_KEEPCNT = 0x102
60 class KeepLocator(object):
61 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
62 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
64 def __init__(self, locator_str):
67 self._perm_expiry = None
68 pieces = iter(locator_str.split('+'))
69 self.md5sum = next(pieces)
71 self.size = int(next(pieces))
75 if self.HINT_RE.match(hint) is None:
76 raise ValueError("invalid hint format: {}".format(hint))
77 elif hint.startswith('A'):
78 self.parse_permission_hint(hint)
80 self.hints.append(hint)
85 for s in [self.md5sum, self.size,
86 self.permission_hint()] + self.hints
90 if self.size is not None:
91 return "%s+%i" % (self.md5sum, self.size)
95 def _make_hex_prop(name, length):
96 # Build and return a new property with the given name that
97 # must be a hex string of the given length.
98 data_name = '_{}'.format(name)
100 return getattr(self, data_name)
101 def setter(self, hex_str):
102 if not arvados.util.is_hex(hex_str, length):
103 raise ValueError("{} is not a {}-digit hex string: {!r}".
104 format(name, length, hex_str))
105 setattr(self, data_name, hex_str)
106 return property(getter, setter)
108 md5sum = _make_hex_prop('md5sum', 32)
109 perm_sig = _make_hex_prop('perm_sig', 40)
112 def perm_expiry(self):
113 return self._perm_expiry
116 def perm_expiry(self, value):
117 if not arvados.util.is_hex(value, 1, 8):
119 "permission timestamp must be a hex Unix timestamp: {}".
121 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
123 def permission_hint(self):
124 data = [self.perm_sig, self.perm_expiry]
127 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
128 return "A{}@{:08x}".format(*data)
130 def parse_permission_hint(self, s):
132 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
134 raise ValueError("bad permission hint {}".format(s))
136 def permission_expired(self, as_of_dt=None):
137 if self.perm_expiry is None:
139 elif as_of_dt is None:
140 as_of_dt = datetime.datetime.now()
141 return self.perm_expiry <= as_of_dt
145 """Simple interface to a global KeepClient object.
147 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
148 own API client. The global KeepClient will build an API client from the
149 current Arvados configuration, which may not match the one you built.
154 def global_client_object(cls):
155 global global_client_object
156 # Previously, KeepClient would change its behavior at runtime based
157 # on these configuration settings. We simulate that behavior here
158 # by checking the values and returning a new KeepClient if any of
160 key = (config.get('ARVADOS_API_HOST'),
161 config.get('ARVADOS_API_TOKEN'),
162 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
163 config.get('ARVADOS_KEEP_PROXY'),
164 os.environ.get('KEEP_LOCAL_STORE'))
165 if (global_client_object is None) or (cls._last_key != key):
166 global_client_object = KeepClient()
168 return global_client_object
171 def get(locator, **kwargs):
172 return Keep.global_client_object().get(locator, **kwargs)
175 def put(data, **kwargs):
176 return Keep.global_client_object().put(data, **kwargs)
178 class KeepBlockCache(object):
179 def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
180 self.cache_max = cache_max
182 self._cache_lock = threading.Lock()
183 self._max_slots = max_slots
184 self._disk_cache = disk_cache
185 self._disk_cache_dir = disk_cache_dir
187 if self._disk_cache and self._disk_cache_dir is None:
188 self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
189 os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
191 if self._max_slots == 0:
193 # default max slots to half of maximum file handles
194 # NOFILE typically defaults to 1024 on Linux so this
196 self._max_slots = resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 2
199 self._max_slots = 512
201 if self.cache_max == 0:
203 fs = os.statvfs(self._disk_cache_dir)
204 avail = (fs.f_bavail * fs.f_bsize) / 4
205 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
207 # 10% of total disk size
208 # 25% of available space
210 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
213 self.cache_max = (256 * 1024 * 1024)
215 self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
218 self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
222 class CacheSlot(object):
223 __slots__ = ("locator", "ready", "content")
225 def __init__(self, locator):
226 self.locator = locator
227 self.ready = threading.Event()
234 def set(self, value):
239 if self.content is None:
242 return len(self.content)
248 '''Cap the cache size to self.cache_max'''
249 with self._cache_lock:
250 # Select all slots except those where ready.is_set() and content is
251 # None (that means there was an error reading the block).
252 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
253 sm = sum([slot.size() for slot in self._cache])
254 while len(self._cache) > 0 and (sm > self.cache_max or len(self._cache) > self._max_slots):
255 for i in range(len(self._cache)-1, -1, -1):
256 # start from the back, find a slot that is a candidate to evict
257 if self._cache[i].ready.is_set():
258 sz = self._cache[i].size()
260 # If evict returns false it means the
261 # underlying disk cache couldn't lock the file
262 # for deletion because another process was using
263 # it. Don't count it as reducing the amount
264 # of data in the cache, find something else to
266 if self._cache[i].evict():
269 # either way we forget about it. either the
270 # other process will delete it, or if we need
271 # it again and it is still there, we'll find
276 def _get(self, locator):
277 # Test if the locator is already in the cache
278 for i in range(0, len(self._cache)):
279 if self._cache[i].locator == locator:
282 # move it to the front
284 self._cache.insert(0, n)
287 # see if it exists on disk
288 n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
290 self._cache.insert(0, n)
294 def get(self, locator):
295 with self._cache_lock:
296 return self._get(locator)
298 def reserve_cache(self, locator):
299 '''Reserve a cache slot for the specified locator,
300 or return the existing slot.'''
301 with self._cache_lock:
302 n = self._get(locator)
306 # Add a new cache slot for the locator
308 n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
310 n = KeepBlockCache.CacheSlot(locator)
311 self._cache.insert(0, n)
314 class Counter(object):
315 def __init__(self, v=0):
316 self._lk = threading.Lock()
328 class KeepClient(object):
330 # Default Keep server connection timeout: 2 seconds
331 # Default Keep server read timeout: 256 seconds
332 # Default Keep server bandwidth minimum: 32768 bytes per second
333 # Default Keep proxy connection timeout: 20 seconds
334 # Default Keep proxy read timeout: 256 seconds
335 # Default Keep proxy bandwidth minimum: 32768 bytes per second
336 DEFAULT_TIMEOUT = (2, 256, 32768)
337 DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
340 class KeepService(object):
341 """Make requests to a single Keep service, and track results.
343 A KeepService is intended to last long enough to perform one
344 transaction (GET or PUT) against one Keep service. This can
345 involve calling either get() or put() multiple times in order
346 to retry after transient failures. However, calling both get()
347 and put() on a single instance -- or using the same instance
348 to access two different Keep services -- will not produce
355 arvados.errors.HttpError,
358 def __init__(self, root, user_agent_pool=queue.LifoQueue(),
360 download_counter=None,
364 self._user_agent_pool = user_agent_pool
365 self._result = {'error': None}
369 self.get_headers = {'Accept': 'application/octet-stream'}
370 self.get_headers.update(headers)
371 self.put_headers = headers
372 self.upload_counter = upload_counter
373 self.download_counter = download_counter
374 self.insecure = insecure
377 """Is it worth attempting a request?"""
381 """Did the request succeed or encounter permanent failure?"""
382 return self._result['error'] == False or not self._usable
384 def last_result(self):
387 def _get_user_agent(self):
389 return self._user_agent_pool.get(block=False)
393 def _put_user_agent(self, ua):
396 self._user_agent_pool.put(ua, block=False)
400 def _socket_open(self, *args, **kwargs):
401 if len(args) + len(kwargs) == 2:
402 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
404 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
406 def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
407 return self._socket_open_pycurl_7_21_5(
409 address=collections.namedtuple(
410 'Address', ['family', 'socktype', 'protocol', 'addr'],
411 )(family, socktype, protocol, address))
413 def _socket_open_pycurl_7_21_5(self, purpose, address):
414 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
415 s = socket.socket(address.family, address.socktype, address.protocol)
416 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
417 # Will throw invalid protocol error on mac. This test prevents that.
418 if hasattr(socket, 'TCP_KEEPIDLE'):
419 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
420 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
424 def get(self, locator, method="GET", timeout=None):
425 # locator is a KeepLocator object.
426 url = self.root + str(locator)
427 _logger.debug("Request: %s %s", method, url)
428 curl = self._get_user_agent()
431 with timer.Timer() as t:
433 response_body = BytesIO()
434 curl.setopt(pycurl.NOSIGNAL, 1)
435 curl.setopt(pycurl.OPENSOCKETFUNCTION,
436 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
437 curl.setopt(pycurl.URL, url.encode('utf-8'))
438 curl.setopt(pycurl.HTTPHEADER, [
439 '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
440 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
441 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
443 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
444 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
446 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
448 curl.setopt(pycurl.NOBODY, True)
449 self._setcurltimeouts(curl, timeout, method=="HEAD")
453 except Exception as e:
454 raise arvados.errors.HttpError(0, str(e))
460 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
461 'body': response_body.getvalue(),
462 'headers': self._headers,
466 ok = retry.check_http_response_success(self._result['status_code'])
468 self._result['error'] = arvados.errors.HttpError(
469 self._result['status_code'],
470 self._headers.get('x-status-line', 'Error'))
471 except self.HTTP_ERRORS as e:
475 self._usable = ok != False
476 if self._result.get('status_code', None):
477 # The client worked well enough to get an HTTP status
478 # code, so presumably any problems are just on the
479 # server side and it's OK to reuse the client.
480 self._put_user_agent(curl)
482 # Don't return this client to the pool, in case it's
486 _logger.debug("Request fail: GET %s => %s: %s",
487 url, type(self._result['error']), str(self._result['error']))
490 _logger.info("HEAD %s: %s bytes",
491 self._result['status_code'],
492 self._result.get('content-length'))
493 if self._result['headers'].get('x-keep-locator'):
494 # This is a response to a remote block copy request, return
495 # the local copy block locator.
496 return self._result['headers'].get('x-keep-locator')
499 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
500 self._result['status_code'],
501 len(self._result['body']),
503 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
505 if self.download_counter:
506 self.download_counter.add(len(self._result['body']))
507 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
508 if resp_md5 != locator.md5sum:
509 _logger.warning("Checksum fail: md5(%s) = %s",
511 self._result['error'] = arvados.errors.HttpError(
514 return self._result['body']
516 def put(self, hash_s, body, timeout=None, headers={}):
517 put_headers = copy.copy(self.put_headers)
518 put_headers.update(headers)
519 url = self.root + hash_s
520 _logger.debug("Request: PUT %s", url)
521 curl = self._get_user_agent()
524 with timer.Timer() as t:
526 body_reader = BytesIO(body)
527 response_body = BytesIO()
528 curl.setopt(pycurl.NOSIGNAL, 1)
529 curl.setopt(pycurl.OPENSOCKETFUNCTION,
530 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
531 curl.setopt(pycurl.URL, url.encode('utf-8'))
532 # Using UPLOAD tells cURL to wait for a "go ahead" from the
533 # Keep server (in the form of a HTTP/1.1 "100 Continue"
534 # response) instead of sending the request body immediately.
535 # This allows the server to reject the request if the request
536 # is invalid or the server is read-only, without waiting for
537 # the client to send the entire block.
538 curl.setopt(pycurl.UPLOAD, True)
539 curl.setopt(pycurl.INFILESIZE, len(body))
540 curl.setopt(pycurl.READFUNCTION, body_reader.read)
541 curl.setopt(pycurl.HTTPHEADER, [
542 '{}: {}'.format(k,v) for k,v in put_headers.items()])
543 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
544 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
546 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
547 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
549 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
550 self._setcurltimeouts(curl, timeout)
553 except Exception as e:
554 raise arvados.errors.HttpError(0, str(e))
560 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
561 'body': response_body.getvalue().decode('utf-8'),
562 'headers': self._headers,
565 ok = retry.check_http_response_success(self._result['status_code'])
567 self._result['error'] = arvados.errors.HttpError(
568 self._result['status_code'],
569 self._headers.get('x-status-line', 'Error'))
570 except self.HTTP_ERRORS as e:
574 self._usable = ok != False # still usable if ok is True or None
575 if self._result.get('status_code', None):
576 # Client is functional. See comment in get().
577 self._put_user_agent(curl)
581 _logger.debug("Request fail: PUT %s => %s: %s",
582 url, type(self._result['error']), str(self._result['error']))
584 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
585 self._result['status_code'],
588 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
589 if self.upload_counter:
590 self.upload_counter.add(len(body))
593 def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
596 elif isinstance(timeouts, tuple):
597 if len(timeouts) == 2:
598 conn_t, xfer_t = timeouts
599 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
601 conn_t, xfer_t, bandwidth_bps = timeouts
603 conn_t, xfer_t = (timeouts, timeouts)
604 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
605 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
606 if not ignore_bandwidth:
607 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
608 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
610 def _headerfunction(self, header_line):
611 if isinstance(header_line, bytes):
612 header_line = header_line.decode('iso-8859-1')
613 if ':' in header_line:
614 name, value = header_line.split(':', 1)
615 name = name.strip().lower()
616 value = value.strip()
618 name = self._lastheadername
619 value = self._headers[name] + ' ' + header_line.strip()
620 elif header_line.startswith('HTTP/'):
621 name = 'x-status-line'
624 _logger.error("Unexpected header line: %s", header_line)
626 self._lastheadername = name
627 self._headers[name] = value
628 # Returning None implies all bytes were written
631 class KeepWriterQueue(queue.Queue):
632 def __init__(self, copies, classes=[]):
633 queue.Queue.__init__(self) # Old-style superclass
634 self.wanted_copies = copies
635 self.wanted_storage_classes = classes
636 self.successful_copies = 0
637 self.confirmed_storage_classes = {}
639 self.storage_classes_tracking = True
640 self.queue_data_lock = threading.RLock()
641 self.pending_tries = max(copies, len(classes))
642 self.pending_tries_notification = threading.Condition()
644 def write_success(self, response, replicas_nr, classes_confirmed):
645 with self.queue_data_lock:
646 self.successful_copies += replicas_nr
647 if classes_confirmed is None:
648 self.storage_classes_tracking = False
649 elif self.storage_classes_tracking:
650 for st_class, st_copies in classes_confirmed.items():
652 self.confirmed_storage_classes[st_class] += st_copies
654 self.confirmed_storage_classes[st_class] = st_copies
655 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
656 self.response = response
657 with self.pending_tries_notification:
658 self.pending_tries_notification.notify_all()
660 def write_fail(self, ks):
661 with self.pending_tries_notification:
662 self.pending_tries += 1
663 self.pending_tries_notification.notify()
665 def pending_copies(self):
666 with self.queue_data_lock:
667 return self.wanted_copies - self.successful_copies
669 def satisfied_classes(self):
670 with self.queue_data_lock:
671 if not self.storage_classes_tracking:
672 # Notifies disabled storage classes expectation to
675 return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
677 def pending_classes(self):
678 with self.queue_data_lock:
679 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
681 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
682 for st_class, st_copies in self.confirmed_storage_classes.items():
683 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
684 unsatisfied_classes.remove(st_class)
685 return unsatisfied_classes
687 def get_next_task(self):
688 with self.pending_tries_notification:
690 if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
691 # This notify_all() is unnecessary --
692 # write_success() already called notify_all()
693 # when pending<1 became true, so it's not
694 # possible for any other thread to be in
695 # wait() now -- but it's cheap insurance
696 # against deadlock so we do it anyway:
697 self.pending_tries_notification.notify_all()
698 # Drain the queue and then raise Queue.Empty
702 elif self.pending_tries > 0:
703 service, service_root = self.get_nowait()
704 if service.finished():
707 self.pending_tries -= 1
708 return service, service_root
710 self.pending_tries_notification.notify_all()
713 self.pending_tries_notification.wait()
716 class KeepWriterThreadPool(object):
717 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
718 self.total_task_nr = 0
719 if (not max_service_replicas) or (max_service_replicas >= copies):
722 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
723 _logger.debug("Pool max threads is %d", num_threads)
725 self.queue = KeepClient.KeepWriterQueue(copies, classes)
727 for _ in range(num_threads):
728 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
729 self.workers.append(w)
731 def add_task(self, ks, service_root):
732 self.queue.put((ks, service_root))
733 self.total_task_nr += 1
736 return self.queue.successful_copies, self.queue.satisfied_classes()
740 for worker in self.workers:
742 # Wait for finished work
746 return self.queue.response
749 class KeepWriterThread(threading.Thread):
750 class TaskFailed(RuntimeError): pass
752 def __init__(self, queue, data, data_hash, timeout=None):
753 super(KeepClient.KeepWriterThread, self).__init__()
754 self.timeout = timeout
757 self.data_hash = data_hash
763 service, service_root = self.queue.get_next_task()
767 locator, copies, classes = self.do_task(service, service_root)
768 except Exception as e:
769 if not isinstance(e, self.TaskFailed):
770 _logger.exception("Exception in KeepWriterThread")
771 self.queue.write_fail(service)
773 self.queue.write_success(locator, copies, classes)
775 self.queue.task_done()
777 def do_task(self, service, service_root):
778 classes = self.queue.pending_classes()
782 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
783 success = bool(service.put(self.data_hash,
785 timeout=self.timeout,
787 result = service.last_result()
790 if result.get('status_code'):
791 _logger.debug("Request fail: PUT %s => %s %s",
793 result.get('status_code'),
795 raise self.TaskFailed()
797 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
798 str(threading.current_thread()),
803 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
804 except (KeyError, ValueError):
807 classes_confirmed = {}
809 scch = result['headers']['x-keep-storage-classes-confirmed']
810 for confirmation in scch.replace(' ', '').split(','):
811 if '=' in confirmation:
812 stored_class, stored_copies = confirmation.split('=')[:2]
813 classes_confirmed[stored_class] = int(stored_copies)
814 except (KeyError, ValueError):
815 # Storage classes confirmed header missing or corrupt
816 classes_confirmed = None
818 return result['body'].strip(), replicas_stored, classes_confirmed
821 def __init__(self, api_client=None, proxy=None,
822 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
823 api_token=None, local_store=None, block_cache=None,
824 num_retries=0, session=None):
825 """Initialize a new KeepClient.
829 The API client to use to find Keep services. If not
830 provided, KeepClient will build one from available Arvados
834 If specified, this KeepClient will send requests to this Keep
835 proxy. Otherwise, KeepClient will fall back to the setting of the
836 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
837 If you want to KeepClient does not use a proxy, pass in an empty
841 The initial timeout (in seconds) for HTTP requests to Keep
842 non-proxy servers. A tuple of three floats is interpreted as
843 (connection_timeout, read_timeout, minimum_bandwidth). A connection
844 will be aborted if the average traffic rate falls below
845 minimum_bandwidth bytes per second over an interval of read_timeout
846 seconds. Because timeouts are often a result of transient server
847 load, the actual connection timeout will be increased by a factor
848 of two on each retry.
849 Default: (2, 256, 32768).
852 The initial timeout (in seconds) for HTTP requests to
853 Keep proxies. A tuple of three floats is interpreted as
854 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
855 described above for adjusting connection timeouts on retry also
857 Default: (20, 256, 32768).
860 If you're not using an API client, but only talking
861 directly to a Keep proxy, this parameter specifies an API token
862 to authenticate Keep requests. It is an error to specify both
863 api_client and api_token. If you specify neither, KeepClient
864 will use one available from the Arvados configuration.
867 If specified, this KeepClient will bypass Keep
868 services, and save data to the named directory. If unspecified,
869 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
870 environment variable. If you want to ensure KeepClient does not
871 use local storage, pass in an empty string. This is primarily
872 intended to mock a server for testing.
875 The default number of times to retry failed requests.
876 This will be used as the default num_retries value when get() and
877 put() are called. Default 0.
879 self.lock = threading.Lock()
881 if config.get('ARVADOS_KEEP_SERVICES'):
882 proxy = config.get('ARVADOS_KEEP_SERVICES')
884 proxy = config.get('ARVADOS_KEEP_PROXY')
885 if api_token is None:
886 if api_client is None:
887 api_token = config.get('ARVADOS_API_TOKEN')
889 api_token = api_client.api_token
890 elif api_client is not None:
892 "can't build KeepClient with both API client and token")
893 if local_store is None:
894 local_store = os.environ.get('KEEP_LOCAL_STORE')
896 if api_client is None:
897 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
899 self.insecure = api_client.insecure
901 self.block_cache = block_cache if block_cache else KeepBlockCache()
902 self.timeout = timeout
903 self.proxy_timeout = proxy_timeout
904 self._user_agent_pool = queue.LifoQueue()
905 self.upload_counter = Counter()
906 self.download_counter = Counter()
907 self.put_counter = Counter()
908 self.get_counter = Counter()
909 self.hits_counter = Counter()
910 self.misses_counter = Counter()
911 self._storage_classes_unsupported_warning = False
912 self._default_classes = []
915 self.local_store = local_store
916 self.head = self.local_store_head
917 self.get = self.local_store_get
918 self.put = self.local_store_put
920 self.num_retries = num_retries
921 self.max_replicas_per_service = None
923 proxy_uris = proxy.split()
924 for i in range(len(proxy_uris)):
925 if not proxy_uris[i].endswith('/'):
928 url = urllib.parse.urlparse(proxy_uris[i])
929 if not (url.scheme and url.netloc):
930 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
931 self.api_token = api_token
932 self._gateway_services = {}
933 self._keep_services = [{
934 'uuid': "00000-bi6l4-%015d" % idx,
935 'service_type': 'proxy',
936 '_service_root': uri,
937 } for idx, uri in enumerate(proxy_uris)]
938 self._writable_services = self._keep_services
939 self.using_proxy = True
940 self._static_services_list = True
942 # It's important to avoid instantiating an API client
943 # unless we actually need one, for testing's sake.
944 if api_client is None:
945 api_client = arvados.api('v1')
946 self.api_client = api_client
947 self.api_token = api_client.api_token
948 self._gateway_services = {}
949 self._keep_services = None
950 self._writable_services = None
951 self.using_proxy = None
952 self._static_services_list = False
954 self._default_classes = [
955 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
957 # We're talking to an old cluster
960 def current_timeout(self, attempt_number):
961 """Return the appropriate timeout to use for this client.
963 The proxy timeout setting if the backend service is currently a proxy,
964 the regular timeout setting otherwise. The `attempt_number` indicates
965 how many times the operation has been tried already (starting from 0
966 for the first try), and scales the connection timeout portion of the
967 return value accordingly.
970 # TODO(twp): the timeout should be a property of a
971 # KeepService, not a KeepClient. See #4488.
972 t = self.proxy_timeout if self.using_proxy else self.timeout
974 return (t[0] * (1 << attempt_number), t[1])
976 return (t[0] * (1 << attempt_number), t[1], t[2])
977 def _any_nondisk_services(self, service_list):
978 return any(ks.get('service_type', 'disk') != 'disk'
979 for ks in service_list)
981 def build_services_list(self, force_rebuild=False):
982 if (self._static_services_list or
983 (self._keep_services and not force_rebuild)):
987 keep_services = self.api_client.keep_services().accessible()
988 except Exception: # API server predates Keep services.
989 keep_services = self.api_client.keep_disks().list()
991 # Gateway services are only used when specified by UUID,
992 # so there's nothing to gain by filtering them by
994 self._gateway_services = {ks['uuid']: ks for ks in
995 keep_services.execute()['items']}
996 if not self._gateway_services:
997 raise arvados.errors.NoKeepServersError()
999 # Precompute the base URI for each service.
1000 for r in self._gateway_services.values():
1001 host = r['service_host']
1002 if not host.startswith('[') and host.find(':') >= 0:
1003 # IPv6 URIs must be formatted like http://[::1]:80/...
1004 host = '[' + host + ']'
1005 r['_service_root'] = "{}://{}:{:d}/".format(
1006 'https' if r['service_ssl_flag'] else 'http',
1010 _logger.debug(str(self._gateway_services))
1011 self._keep_services = [
1012 ks for ks in self._gateway_services.values()
1013 if not ks.get('service_type', '').startswith('gateway:')]
1014 self._writable_services = [ks for ks in self._keep_services
1015 if not ks.get('read_only')]
1017 # For disk type services, max_replicas_per_service is 1
1018 # It is unknown (unlimited) for other service types.
1019 if self._any_nondisk_services(self._writable_services):
1020 self.max_replicas_per_service = None
1022 self.max_replicas_per_service = 1
1024 def _service_weight(self, data_hash, service_uuid):
1025 """Compute the weight of a Keep service endpoint for a data
1026 block with a known hash.
1028 The weight is md5(h + u) where u is the last 15 characters of
1029 the service endpoint's UUID.
1031 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1033 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1034 """Return an array of Keep service endpoints, in the order in
1035 which they should be probed when reading or writing data with
1036 the given hash+hints.
1038 self.build_services_list(force_rebuild)
1041 # Use the services indicated by the given +K@... remote
1042 # service hints, if any are present and can be resolved to a
1044 for hint in locator.hints:
1045 if hint.startswith('K@'):
1047 sorted_roots.append(
1048 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1049 elif len(hint) == 29:
1050 svc = self._gateway_services.get(hint[2:])
1052 sorted_roots.append(svc['_service_root'])
1054 # Sort the available local services by weight (heaviest first)
1055 # for this locator, and return their service_roots (base URIs)
1057 use_services = self._keep_services
1059 use_services = self._writable_services
1060 self.using_proxy = self._any_nondisk_services(use_services)
1061 sorted_roots.extend([
1062 svc['_service_root'] for svc in sorted(
1065 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1066 _logger.debug("{}: {}".format(locator, sorted_roots))
1069 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1070 # roots_map is a dictionary, mapping Keep service root strings
1071 # to KeepService objects. Poll for Keep services, and add any
1072 # new ones to roots_map. Return the current list of local
1074 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1075 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1076 for root in local_roots:
1077 if root not in roots_map:
1078 roots_map[root] = self.KeepService(
1079 root, self._user_agent_pool,
1080 upload_counter=self.upload_counter,
1081 download_counter=self.download_counter,
1083 insecure=self.insecure)
1087 def _check_loop_result(result):
1088 # KeepClient RetryLoops should save results as a 2-tuple: the
1089 # actual result of the request, and the number of servers available
1090 # to receive the request this round.
1091 # This method returns True if there's a real result, False if
1092 # there are no more servers available, otherwise None.
1093 if isinstance(result, Exception):
1095 result, tried_server_count = result
1096 if (result is not None) and (result is not False):
1098 elif tried_server_count < 1:
1099 _logger.info("No more Keep services to try; giving up")
1104 def get_from_cache(self, loc_s):
1105 """Fetch a block only if is in the cache, otherwise return None."""
1106 locator = KeepLocator(loc_s)
1107 slot = self.block_cache.get(locator.md5sum)
1108 if slot is not None and slot.ready.is_set():
1113 def refresh_signature(self, loc):
1114 """Ask Keep to get the remote block and return its local signature"""
1115 now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1116 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1119 def head(self, loc_s, **kwargs):
1120 return self._get_or_head(loc_s, method="HEAD", **kwargs)
1123 def get(self, loc_s, **kwargs):
1124 return self._get_or_head(loc_s, method="GET", **kwargs)
1126 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1127 """Get data from Keep.
1129 This method fetches one or more blocks of data from Keep. It
1130 sends a request each Keep service registered with the API
1131 server (or the proxy provided when this client was
1132 instantiated), then each service named in location hints, in
1133 sequence. As soon as one service provides the data, it's
1137 * loc_s: A string of one or more comma-separated locators to fetch.
1138 This method returns the concatenation of these blocks.
1139 * num_retries: The number of times to retry GET requests to
1140 *each* Keep server if it returns temporary failures, with
1141 exponential backoff. Note that, in each loop, the method may try
1142 to fetch data from every available Keep service, along with any
1143 that are named in location hints in the locator. The default value
1144 is set when the KeepClient is initialized.
1147 return ''.join(self.get(x) for x in loc_s.split(','))
1149 self.get_counter.add(1)
1151 request_id = (request_id or
1152 (hasattr(self, 'api_client') and self.api_client.request_id) or
1153 arvados.util.new_request_id())
1156 headers['X-Request-Id'] = request_id
1161 locator = KeepLocator(loc_s)
1163 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1166 # this is request for a prefetch, if it is
1167 # already in flight, return immediately.
1168 # clear 'slot' to prevent finally block from
1169 # calling slot.set()
1172 self.hits_counter.add(1)
1175 raise arvados.errors.KeepReadError(
1176 "failed to read {}".format(loc_s))
1179 self.misses_counter.add(1)
1181 # If the locator has hints specifying a prefix (indicating a
1182 # remote keepproxy) or the UUID of a local gateway service,
1183 # read data from the indicated service(s) instead of the usual
1184 # list of local disk services.
1185 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1186 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1187 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1188 for hint in locator.hints if (
1189 hint.startswith('K@') and
1191 self._gateway_services.get(hint[2:])
1193 # Map root URLs to their KeepService objects.
1195 root: self.KeepService(root, self._user_agent_pool,
1196 upload_counter=self.upload_counter,
1197 download_counter=self.download_counter,
1199 insecure=self.insecure)
1200 for root in hint_roots
1203 # See #3147 for a discussion of the loop implementation. Highlights:
1204 # * Refresh the list of Keep services after each failure, in case
1205 # it's being updated.
1206 # * Retry until we succeed, we're out of retries, or every available
1207 # service has returned permanent failure.
1210 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1212 for tries_left in loop:
1214 sorted_roots = self.map_new_services(
1216 force_rebuild=(tries_left < num_retries),
1217 need_writable=False,
1219 except Exception as error:
1220 loop.save_result(error)
1223 # Query KeepService objects that haven't returned
1224 # permanent failure, in our specified shuffle order.
1225 services_to_try = [roots_map[root]
1226 for root in sorted_roots
1227 if roots_map[root].usable()]
1228 for keep_service in services_to_try:
1229 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1230 if blob is not None:
1232 loop.save_result((blob, len(services_to_try)))
1234 # Always cache the result, then return it if we succeeded.
1238 if slot is not None:
1240 self.block_cache.cap_cache()
1242 # Q: Including 403 is necessary for the Keep tests to continue
1243 # passing, but maybe they should expect KeepReadError instead?
1244 not_founds = sum(1 for key in sorted_roots
1245 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1246 service_errors = ((key, roots_map[key].last_result()['error'])
1247 for key in sorted_roots)
1249 raise arvados.errors.KeepReadError(
1250 "[{}] failed to read {}: no Keep services available ({})".format(
1251 request_id, loc_s, loop.last_result()))
1252 elif not_founds == len(sorted_roots):
1253 raise arvados.errors.NotFoundError(
1254 "[{}] {} not found".format(request_id, loc_s), service_errors)
1256 raise arvados.errors.KeepReadError(
1257 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1260 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1261 """Save data in Keep.
1263 This method will get a list of Keep services from the API server, and
1264 send the data to each one simultaneously in a new thread. Once the
1265 uploads are finished, if enough copies are saved, this method returns
1266 the most recent HTTP response body. If requests fail to upload
1267 enough copies, this method raises KeepWriteError.
1270 * data: The string of data to upload.
1271 * copies: The number of copies that the user requires be saved.
1273 * num_retries: The number of times to retry PUT requests to
1274 *each* Keep server if it returns temporary failures, with
1275 exponential backoff. The default value is set when the
1276 KeepClient is initialized.
1277 * classes: An optional list of storage class names where copies should
1281 classes = classes or self._default_classes
1283 if not isinstance(data, bytes):
1284 data = data.encode()
1286 self.put_counter.add(1)
1288 data_hash = hashlib.md5(data).hexdigest()
1289 loc_s = data_hash + '+' + str(len(data))
1292 locator = KeepLocator(loc_s)
1294 request_id = (request_id or
1295 (hasattr(self, 'api_client') and self.api_client.request_id) or
1296 arvados.util.new_request_id())
1298 'X-Request-Id': request_id,
1299 'X-Keep-Desired-Replicas': str(copies),
1302 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1306 for tries_left in loop:
1308 sorted_roots = self.map_new_services(
1310 force_rebuild=(tries_left < num_retries),
1313 except Exception as error:
1314 loop.save_result(error)
1317 pending_classes = []
1318 if done_classes is not None:
1319 pending_classes = list(set(classes) - set(done_classes))
1320 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1321 data_hash=data_hash,
1322 copies=copies - done_copies,
1323 max_service_replicas=self.max_replicas_per_service,
1324 timeout=self.current_timeout(num_retries - tries_left),
1325 classes=pending_classes)
1326 for service_root, ks in [(root, roots_map[root])
1327 for root in sorted_roots]:
1330 writer_pool.add_task(ks, service_root)
1332 pool_copies, pool_classes = writer_pool.done()
1333 done_copies += pool_copies
1334 if (done_classes is not None) and (pool_classes is not None):
1335 done_classes += pool_classes
1337 (done_copies >= copies and set(done_classes) == set(classes),
1338 writer_pool.total_task_nr))
1340 # Old keepstore contacted without storage classes support:
1341 # success is determined only by successful copies.
1343 # Disable storage classes tracking from this point forward.
1344 if not self._storage_classes_unsupported_warning:
1345 self._storage_classes_unsupported_warning = True
1346 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1349 (done_copies >= copies, writer_pool.total_task_nr))
1352 return writer_pool.response()
1354 raise arvados.errors.KeepWriteError(
1355 "[{}] failed to write {}: no Keep services available ({})".format(
1356 request_id, data_hash, loop.last_result()))
1358 service_errors = ((key, roots_map[key].last_result()['error'])
1359 for key in sorted_roots
1360 if roots_map[key].last_result()['error'])
1361 raise arvados.errors.KeepWriteError(
1362 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1363 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1365 def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1366 """A stub for put().
1368 This method is used in place of the real put() method when
1369 using local storage (see constructor's local_store argument).
1371 copies and num_retries arguments are ignored: they are here
1372 only for the sake of offering the same call signature as
1375 Data stored this way can be retrieved via local_store_get().
1377 md5 = hashlib.md5(data).hexdigest()
1378 locator = '%s+%d' % (md5, len(data))
1379 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1381 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1382 os.path.join(self.local_store, md5))
1385 def local_store_get(self, loc_s, num_retries=None):
1386 """Companion to local_store_put()."""
1388 locator = KeepLocator(loc_s)
1390 raise arvados.errors.NotFoundError(
1391 "Invalid data locator: '%s'" % loc_s)
1392 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1394 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1397 def local_store_head(self, loc_s, num_retries=None):
1398 """Companion to local_store_put()."""
1400 locator = KeepLocator(loc_s)
1402 raise arvados.errors.NotFoundError(
1403 "Invalid data locator: '%s'" % loc_s)
1404 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1406 if os.path.exists(os.path.join(self.local_store, locator.md5sum)):