1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from __future__ import division
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
33 if sys.version_info >= (3, 0):
34 from io import BytesIO
36 from cStringIO import StringIO as BytesIO
39 import arvados.config as config
41 import arvados.retry as retry
43 import arvados.diskcache
45 _logger = logging.getLogger('arvados.keep')
46 global_client_object = None
49 # Monkey patch TCP constants when not available (apple). Values sourced from:
50 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
51 if sys.platform == 'darwin':
52 if not hasattr(socket, 'TCP_KEEPALIVE'):
53 socket.TCP_KEEPALIVE = 0x010
54 if not hasattr(socket, 'TCP_KEEPINTVL'):
55 socket.TCP_KEEPINTVL = 0x101
56 if not hasattr(socket, 'TCP_KEEPCNT'):
57 socket.TCP_KEEPCNT = 0x102
60 class KeepLocator(object):
61 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
62 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
64 def __init__(self, locator_str):
67 self._perm_expiry = None
68 pieces = iter(locator_str.split('+'))
69 self.md5sum = next(pieces)
71 self.size = int(next(pieces))
75 if self.HINT_RE.match(hint) is None:
76 raise ValueError("invalid hint format: {}".format(hint))
77 elif hint.startswith('A'):
78 self.parse_permission_hint(hint)
80 self.hints.append(hint)
85 for s in [self.md5sum, self.size,
86 self.permission_hint()] + self.hints
90 if self.size is not None:
91 return "%s+%i" % (self.md5sum, self.size)
95 def _make_hex_prop(name, length):
96 # Build and return a new property with the given name that
97 # must be a hex string of the given length.
98 data_name = '_{}'.format(name)
100 return getattr(self, data_name)
101 def setter(self, hex_str):
102 if not arvados.util.is_hex(hex_str, length):
103 raise ValueError("{} is not a {}-digit hex string: {!r}".
104 format(name, length, hex_str))
105 setattr(self, data_name, hex_str)
106 return property(getter, setter)
108 md5sum = _make_hex_prop('md5sum', 32)
109 perm_sig = _make_hex_prop('perm_sig', 40)
112 def perm_expiry(self):
113 return self._perm_expiry
116 def perm_expiry(self, value):
117 if not arvados.util.is_hex(value, 1, 8):
119 "permission timestamp must be a hex Unix timestamp: {}".
121 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
123 def permission_hint(self):
124 data = [self.perm_sig, self.perm_expiry]
127 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
128 return "A{}@{:08x}".format(*data)
130 def parse_permission_hint(self, s):
132 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
134 raise ValueError("bad permission hint {}".format(s))
136 def permission_expired(self, as_of_dt=None):
137 if self.perm_expiry is None:
139 elif as_of_dt is None:
140 as_of_dt = datetime.datetime.now()
141 return self.perm_expiry <= as_of_dt
145 """Simple interface to a global KeepClient object.
147 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
148 own API client. The global KeepClient will build an API client from the
149 current Arvados configuration, which may not match the one you built.
154 def global_client_object(cls):
155 global global_client_object
156 # Previously, KeepClient would change its behavior at runtime based
157 # on these configuration settings. We simulate that behavior here
158 # by checking the values and returning a new KeepClient if any of
160 key = (config.get('ARVADOS_API_HOST'),
161 config.get('ARVADOS_API_TOKEN'),
162 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
163 config.get('ARVADOS_KEEP_PROXY'),
164 os.environ.get('KEEP_LOCAL_STORE'))
165 if (global_client_object is None) or (cls._last_key != key):
166 global_client_object = KeepClient()
168 return global_client_object
171 def get(locator, **kwargs):
172 return Keep.global_client_object().get(locator, **kwargs)
175 def put(data, **kwargs):
176 return Keep.global_client_object().put(data, **kwargs)
178 class KeepBlockCache(object):
179 def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
180 self.cache_max = cache_max
182 self._cache_lock = threading.Lock()
183 self._max_slots = max_slots
184 self._disk_cache = disk_cache
185 self._disk_cache_dir = disk_cache_dir
187 if self._disk_cache and self._disk_cache_dir is None:
188 self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
189 os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
191 if self._max_slots == 0:
193 # default set max slots to half of maximum file handles
194 self._max_slots = resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 2
196 self._max_slots = 1024
198 if self.cache_max == 0:
200 fs = os.statvfs(self._disk_cache_dir)
201 avail = (fs.f_bavail * fs.f_bsize) / 2
202 # Half the available space or max_slots * 64 MiB
203 self.cache_max = min(avail, (self._max_slots * 64 * 1024 * 1024))
206 self.cache_max = (256 * 1024 * 1024)
208 self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
211 self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
215 class CacheSlot(object):
216 __slots__ = ("locator", "ready", "content")
218 def __init__(self, locator):
219 self.locator = locator
220 self.ready = threading.Event()
227 def set(self, value):
232 if self.content is None:
235 return len(self.content)
241 '''Cap the cache size to self.cache_max'''
242 with self._cache_lock:
243 # Select all slots except those where ready.is_set() and content is
244 # None (that means there was an error reading the block).
245 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
246 sm = sum([slot.size() for slot in self._cache])
247 while len(self._cache) > 0 and (sm > self.cache_max or len(self._cache) > self._max_slots):
248 for i in range(len(self._cache)-1, -1, -1):
249 # start from the back, find a slot that is a candidate to evict
250 if self._cache[i].ready.is_set():
251 sz = self._cache[i].size()
253 # If evict returns false it means the
254 # underlying disk cache couldn't lock the file
255 # for deletion because another process was using
256 # it. Don't count it as reducing the amount
257 # of data in the cache, find something else to
259 if self._cache[i].evict():
262 # either way we forget about it. either the
263 # other process will delete it, or if we need
264 # it again and it is still there, we'll find
269 def _get(self, locator):
270 # Test if the locator is already in the cache
271 for i in range(0, len(self._cache)):
272 if self._cache[i].locator == locator:
275 # move it to the front
277 self._cache.insert(0, n)
280 # see if it exists on disk
281 n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
283 self._cache.insert(0, n)
287 def get(self, locator):
288 with self._cache_lock:
289 return self._get(locator)
291 def reserve_cache(self, locator):
292 '''Reserve a cache slot for the specified locator,
293 or return the existing slot.'''
294 with self._cache_lock:
295 n = self._get(locator)
299 # Add a new cache slot for the locator
301 n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
303 n = KeepBlockCache.CacheSlot(locator)
304 self._cache.insert(0, n)
307 class Counter(object):
308 def __init__(self, v=0):
309 self._lk = threading.Lock()
321 class KeepClient(object):
323 # Default Keep server connection timeout: 2 seconds
324 # Default Keep server read timeout: 256 seconds
325 # Default Keep server bandwidth minimum: 32768 bytes per second
326 # Default Keep proxy connection timeout: 20 seconds
327 # Default Keep proxy read timeout: 256 seconds
328 # Default Keep proxy bandwidth minimum: 32768 bytes per second
329 DEFAULT_TIMEOUT = (2, 256, 32768)
330 DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
333 class KeepService(object):
334 """Make requests to a single Keep service, and track results.
336 A KeepService is intended to last long enough to perform one
337 transaction (GET or PUT) against one Keep service. This can
338 involve calling either get() or put() multiple times in order
339 to retry after transient failures. However, calling both get()
340 and put() on a single instance -- or using the same instance
341 to access two different Keep services -- will not produce
348 arvados.errors.HttpError,
351 def __init__(self, root, user_agent_pool=queue.LifoQueue(),
353 download_counter=None,
357 self._user_agent_pool = user_agent_pool
358 self._result = {'error': None}
362 self.get_headers = {'Accept': 'application/octet-stream'}
363 self.get_headers.update(headers)
364 self.put_headers = headers
365 self.upload_counter = upload_counter
366 self.download_counter = download_counter
367 self.insecure = insecure
370 """Is it worth attempting a request?"""
374 """Did the request succeed or encounter permanent failure?"""
375 return self._result['error'] == False or not self._usable
377 def last_result(self):
380 def _get_user_agent(self):
382 return self._user_agent_pool.get(block=False)
386 def _put_user_agent(self, ua):
389 self._user_agent_pool.put(ua, block=False)
393 def _socket_open(self, *args, **kwargs):
394 if len(args) + len(kwargs) == 2:
395 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
397 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
399 def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
400 return self._socket_open_pycurl_7_21_5(
402 address=collections.namedtuple(
403 'Address', ['family', 'socktype', 'protocol', 'addr'],
404 )(family, socktype, protocol, address))
406 def _socket_open_pycurl_7_21_5(self, purpose, address):
407 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
408 s = socket.socket(address.family, address.socktype, address.protocol)
409 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
410 # Will throw invalid protocol error on mac. This test prevents that.
411 if hasattr(socket, 'TCP_KEEPIDLE'):
412 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
413 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
417 def get(self, locator, method="GET", timeout=None):
418 # locator is a KeepLocator object.
419 url = self.root + str(locator)
420 _logger.debug("Request: %s %s", method, url)
421 curl = self._get_user_agent()
424 with timer.Timer() as t:
426 response_body = BytesIO()
427 curl.setopt(pycurl.NOSIGNAL, 1)
428 curl.setopt(pycurl.OPENSOCKETFUNCTION,
429 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
430 curl.setopt(pycurl.URL, url.encode('utf-8'))
431 curl.setopt(pycurl.HTTPHEADER, [
432 '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
433 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
434 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
436 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
437 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
439 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
441 curl.setopt(pycurl.NOBODY, True)
442 self._setcurltimeouts(curl, timeout, method=="HEAD")
446 except Exception as e:
447 raise arvados.errors.HttpError(0, str(e))
453 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
454 'body': response_body.getvalue(),
455 'headers': self._headers,
459 ok = retry.check_http_response_success(self._result['status_code'])
461 self._result['error'] = arvados.errors.HttpError(
462 self._result['status_code'],
463 self._headers.get('x-status-line', 'Error'))
464 except self.HTTP_ERRORS as e:
468 self._usable = ok != False
469 if self._result.get('status_code', None):
470 # The client worked well enough to get an HTTP status
471 # code, so presumably any problems are just on the
472 # server side and it's OK to reuse the client.
473 self._put_user_agent(curl)
475 # Don't return this client to the pool, in case it's
479 _logger.debug("Request fail: GET %s => %s: %s",
480 url, type(self._result['error']), str(self._result['error']))
483 _logger.info("HEAD %s: %s bytes",
484 self._result['status_code'],
485 self._result.get('content-length'))
486 if self._result['headers'].get('x-keep-locator'):
487 # This is a response to a remote block copy request, return
488 # the local copy block locator.
489 return self._result['headers'].get('x-keep-locator')
492 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
493 self._result['status_code'],
494 len(self._result['body']),
496 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
498 if self.download_counter:
499 self.download_counter.add(len(self._result['body']))
500 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
501 if resp_md5 != locator.md5sum:
502 _logger.warning("Checksum fail: md5(%s) = %s",
504 self._result['error'] = arvados.errors.HttpError(
507 return self._result['body']
509 def put(self, hash_s, body, timeout=None, headers={}):
510 put_headers = copy.copy(self.put_headers)
511 put_headers.update(headers)
512 url = self.root + hash_s
513 _logger.debug("Request: PUT %s", url)
514 curl = self._get_user_agent()
517 with timer.Timer() as t:
519 body_reader = BytesIO(body)
520 response_body = BytesIO()
521 curl.setopt(pycurl.NOSIGNAL, 1)
522 curl.setopt(pycurl.OPENSOCKETFUNCTION,
523 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
524 curl.setopt(pycurl.URL, url.encode('utf-8'))
525 # Using UPLOAD tells cURL to wait for a "go ahead" from the
526 # Keep server (in the form of a HTTP/1.1 "100 Continue"
527 # response) instead of sending the request body immediately.
528 # This allows the server to reject the request if the request
529 # is invalid or the server is read-only, without waiting for
530 # the client to send the entire block.
531 curl.setopt(pycurl.UPLOAD, True)
532 curl.setopt(pycurl.INFILESIZE, len(body))
533 curl.setopt(pycurl.READFUNCTION, body_reader.read)
534 curl.setopt(pycurl.HTTPHEADER, [
535 '{}: {}'.format(k,v) for k,v in put_headers.items()])
536 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
537 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
539 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
540 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
542 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
543 self._setcurltimeouts(curl, timeout)
546 except Exception as e:
547 raise arvados.errors.HttpError(0, str(e))
553 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
554 'body': response_body.getvalue().decode('utf-8'),
555 'headers': self._headers,
558 ok = retry.check_http_response_success(self._result['status_code'])
560 self._result['error'] = arvados.errors.HttpError(
561 self._result['status_code'],
562 self._headers.get('x-status-line', 'Error'))
563 except self.HTTP_ERRORS as e:
567 self._usable = ok != False # still usable if ok is True or None
568 if self._result.get('status_code', None):
569 # Client is functional. See comment in get().
570 self._put_user_agent(curl)
574 _logger.debug("Request fail: PUT %s => %s: %s",
575 url, type(self._result['error']), str(self._result['error']))
577 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
578 self._result['status_code'],
581 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
582 if self.upload_counter:
583 self.upload_counter.add(len(body))
586 def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
589 elif isinstance(timeouts, tuple):
590 if len(timeouts) == 2:
591 conn_t, xfer_t = timeouts
592 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
594 conn_t, xfer_t, bandwidth_bps = timeouts
596 conn_t, xfer_t = (timeouts, timeouts)
597 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
598 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
599 if not ignore_bandwidth:
600 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
601 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
603 def _headerfunction(self, header_line):
604 if isinstance(header_line, bytes):
605 header_line = header_line.decode('iso-8859-1')
606 if ':' in header_line:
607 name, value = header_line.split(':', 1)
608 name = name.strip().lower()
609 value = value.strip()
611 name = self._lastheadername
612 value = self._headers[name] + ' ' + header_line.strip()
613 elif header_line.startswith('HTTP/'):
614 name = 'x-status-line'
617 _logger.error("Unexpected header line: %s", header_line)
619 self._lastheadername = name
620 self._headers[name] = value
621 # Returning None implies all bytes were written
624 class KeepWriterQueue(queue.Queue):
625 def __init__(self, copies, classes=[]):
626 queue.Queue.__init__(self) # Old-style superclass
627 self.wanted_copies = copies
628 self.wanted_storage_classes = classes
629 self.successful_copies = 0
630 self.confirmed_storage_classes = {}
632 self.storage_classes_tracking = True
633 self.queue_data_lock = threading.RLock()
634 self.pending_tries = max(copies, len(classes))
635 self.pending_tries_notification = threading.Condition()
637 def write_success(self, response, replicas_nr, classes_confirmed):
638 with self.queue_data_lock:
639 self.successful_copies += replicas_nr
640 if classes_confirmed is None:
641 self.storage_classes_tracking = False
642 elif self.storage_classes_tracking:
643 for st_class, st_copies in classes_confirmed.items():
645 self.confirmed_storage_classes[st_class] += st_copies
647 self.confirmed_storage_classes[st_class] = st_copies
648 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
649 self.response = response
650 with self.pending_tries_notification:
651 self.pending_tries_notification.notify_all()
653 def write_fail(self, ks):
654 with self.pending_tries_notification:
655 self.pending_tries += 1
656 self.pending_tries_notification.notify()
658 def pending_copies(self):
659 with self.queue_data_lock:
660 return self.wanted_copies - self.successful_copies
662 def satisfied_classes(self):
663 with self.queue_data_lock:
664 if not self.storage_classes_tracking:
665 # Notifies disabled storage classes expectation to
668 return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
670 def pending_classes(self):
671 with self.queue_data_lock:
672 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
674 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
675 for st_class, st_copies in self.confirmed_storage_classes.items():
676 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
677 unsatisfied_classes.remove(st_class)
678 return unsatisfied_classes
680 def get_next_task(self):
681 with self.pending_tries_notification:
683 if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
684 # This notify_all() is unnecessary --
685 # write_success() already called notify_all()
686 # when pending<1 became true, so it's not
687 # possible for any other thread to be in
688 # wait() now -- but it's cheap insurance
689 # against deadlock so we do it anyway:
690 self.pending_tries_notification.notify_all()
691 # Drain the queue and then raise Queue.Empty
695 elif self.pending_tries > 0:
696 service, service_root = self.get_nowait()
697 if service.finished():
700 self.pending_tries -= 1
701 return service, service_root
703 self.pending_tries_notification.notify_all()
706 self.pending_tries_notification.wait()
709 class KeepWriterThreadPool(object):
710 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
711 self.total_task_nr = 0
712 if (not max_service_replicas) or (max_service_replicas >= copies):
715 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
716 _logger.debug("Pool max threads is %d", num_threads)
718 self.queue = KeepClient.KeepWriterQueue(copies, classes)
720 for _ in range(num_threads):
721 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
722 self.workers.append(w)
724 def add_task(self, ks, service_root):
725 self.queue.put((ks, service_root))
726 self.total_task_nr += 1
729 return self.queue.successful_copies, self.queue.satisfied_classes()
733 for worker in self.workers:
735 # Wait for finished work
739 return self.queue.response
742 class KeepWriterThread(threading.Thread):
743 class TaskFailed(RuntimeError): pass
745 def __init__(self, queue, data, data_hash, timeout=None):
746 super(KeepClient.KeepWriterThread, self).__init__()
747 self.timeout = timeout
750 self.data_hash = data_hash
756 service, service_root = self.queue.get_next_task()
760 locator, copies, classes = self.do_task(service, service_root)
761 except Exception as e:
762 if not isinstance(e, self.TaskFailed):
763 _logger.exception("Exception in KeepWriterThread")
764 self.queue.write_fail(service)
766 self.queue.write_success(locator, copies, classes)
768 self.queue.task_done()
770 def do_task(self, service, service_root):
771 classes = self.queue.pending_classes()
775 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
776 success = bool(service.put(self.data_hash,
778 timeout=self.timeout,
780 result = service.last_result()
783 if result.get('status_code'):
784 _logger.debug("Request fail: PUT %s => %s %s",
786 result.get('status_code'),
788 raise self.TaskFailed()
790 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
791 str(threading.current_thread()),
796 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
797 except (KeyError, ValueError):
800 classes_confirmed = {}
802 scch = result['headers']['x-keep-storage-classes-confirmed']
803 for confirmation in scch.replace(' ', '').split(','):
804 if '=' in confirmation:
805 stored_class, stored_copies = confirmation.split('=')[:2]
806 classes_confirmed[stored_class] = int(stored_copies)
807 except (KeyError, ValueError):
808 # Storage classes confirmed header missing or corrupt
809 classes_confirmed = None
811 return result['body'].strip(), replicas_stored, classes_confirmed
814 def __init__(self, api_client=None, proxy=None,
815 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
816 api_token=None, local_store=None, block_cache=None,
817 num_retries=0, session=None):
818 """Initialize a new KeepClient.
822 The API client to use to find Keep services. If not
823 provided, KeepClient will build one from available Arvados
827 If specified, this KeepClient will send requests to this Keep
828 proxy. Otherwise, KeepClient will fall back to the setting of the
829 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
830 If you want to KeepClient does not use a proxy, pass in an empty
834 The initial timeout (in seconds) for HTTP requests to Keep
835 non-proxy servers. A tuple of three floats is interpreted as
836 (connection_timeout, read_timeout, minimum_bandwidth). A connection
837 will be aborted if the average traffic rate falls below
838 minimum_bandwidth bytes per second over an interval of read_timeout
839 seconds. Because timeouts are often a result of transient server
840 load, the actual connection timeout will be increased by a factor
841 of two on each retry.
842 Default: (2, 256, 32768).
845 The initial timeout (in seconds) for HTTP requests to
846 Keep proxies. A tuple of three floats is interpreted as
847 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
848 described above for adjusting connection timeouts on retry also
850 Default: (20, 256, 32768).
853 If you're not using an API client, but only talking
854 directly to a Keep proxy, this parameter specifies an API token
855 to authenticate Keep requests. It is an error to specify both
856 api_client and api_token. If you specify neither, KeepClient
857 will use one available from the Arvados configuration.
860 If specified, this KeepClient will bypass Keep
861 services, and save data to the named directory. If unspecified,
862 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
863 environment variable. If you want to ensure KeepClient does not
864 use local storage, pass in an empty string. This is primarily
865 intended to mock a server for testing.
868 The default number of times to retry failed requests.
869 This will be used as the default num_retries value when get() and
870 put() are called. Default 0.
872 self.lock = threading.Lock()
874 if config.get('ARVADOS_KEEP_SERVICES'):
875 proxy = config.get('ARVADOS_KEEP_SERVICES')
877 proxy = config.get('ARVADOS_KEEP_PROXY')
878 if api_token is None:
879 if api_client is None:
880 api_token = config.get('ARVADOS_API_TOKEN')
882 api_token = api_client.api_token
883 elif api_client is not None:
885 "can't build KeepClient with both API client and token")
886 if local_store is None:
887 local_store = os.environ.get('KEEP_LOCAL_STORE')
889 if api_client is None:
890 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
892 self.insecure = api_client.insecure
894 self.block_cache = block_cache if block_cache else KeepBlockCache()
895 self.timeout = timeout
896 self.proxy_timeout = proxy_timeout
897 self._user_agent_pool = queue.LifoQueue()
898 self.upload_counter = Counter()
899 self.download_counter = Counter()
900 self.put_counter = Counter()
901 self.get_counter = Counter()
902 self.hits_counter = Counter()
903 self.misses_counter = Counter()
904 self._storage_classes_unsupported_warning = False
905 self._default_classes = []
908 self.local_store = local_store
909 self.head = self.local_store_head
910 self.get = self.local_store_get
911 self.put = self.local_store_put
913 self.num_retries = num_retries
914 self.max_replicas_per_service = None
916 proxy_uris = proxy.split()
917 for i in range(len(proxy_uris)):
918 if not proxy_uris[i].endswith('/'):
921 url = urllib.parse.urlparse(proxy_uris[i])
922 if not (url.scheme and url.netloc):
923 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
924 self.api_token = api_token
925 self._gateway_services = {}
926 self._keep_services = [{
927 'uuid': "00000-bi6l4-%015d" % idx,
928 'service_type': 'proxy',
929 '_service_root': uri,
930 } for idx, uri in enumerate(proxy_uris)]
931 self._writable_services = self._keep_services
932 self.using_proxy = True
933 self._static_services_list = True
935 # It's important to avoid instantiating an API client
936 # unless we actually need one, for testing's sake.
937 if api_client is None:
938 api_client = arvados.api('v1')
939 self.api_client = api_client
940 self.api_token = api_client.api_token
941 self._gateway_services = {}
942 self._keep_services = None
943 self._writable_services = None
944 self.using_proxy = None
945 self._static_services_list = False
947 self._default_classes = [
948 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
950 # We're talking to an old cluster
953 def current_timeout(self, attempt_number):
954 """Return the appropriate timeout to use for this client.
956 The proxy timeout setting if the backend service is currently a proxy,
957 the regular timeout setting otherwise. The `attempt_number` indicates
958 how many times the operation has been tried already (starting from 0
959 for the first try), and scales the connection timeout portion of the
960 return value accordingly.
963 # TODO(twp): the timeout should be a property of a
964 # KeepService, not a KeepClient. See #4488.
965 t = self.proxy_timeout if self.using_proxy else self.timeout
967 return (t[0] * (1 << attempt_number), t[1])
969 return (t[0] * (1 << attempt_number), t[1], t[2])
970 def _any_nondisk_services(self, service_list):
971 return any(ks.get('service_type', 'disk') != 'disk'
972 for ks in service_list)
974 def build_services_list(self, force_rebuild=False):
975 if (self._static_services_list or
976 (self._keep_services and not force_rebuild)):
980 keep_services = self.api_client.keep_services().accessible()
981 except Exception: # API server predates Keep services.
982 keep_services = self.api_client.keep_disks().list()
984 # Gateway services are only used when specified by UUID,
985 # so there's nothing to gain by filtering them by
987 self._gateway_services = {ks['uuid']: ks for ks in
988 keep_services.execute()['items']}
989 if not self._gateway_services:
990 raise arvados.errors.NoKeepServersError()
992 # Precompute the base URI for each service.
993 for r in self._gateway_services.values():
994 host = r['service_host']
995 if not host.startswith('[') and host.find(':') >= 0:
996 # IPv6 URIs must be formatted like http://[::1]:80/...
997 host = '[' + host + ']'
998 r['_service_root'] = "{}://{}:{:d}/".format(
999 'https' if r['service_ssl_flag'] else 'http',
1003 _logger.debug(str(self._gateway_services))
1004 self._keep_services = [
1005 ks for ks in self._gateway_services.values()
1006 if not ks.get('service_type', '').startswith('gateway:')]
1007 self._writable_services = [ks for ks in self._keep_services
1008 if not ks.get('read_only')]
1010 # For disk type services, max_replicas_per_service is 1
1011 # It is unknown (unlimited) for other service types.
1012 if self._any_nondisk_services(self._writable_services):
1013 self.max_replicas_per_service = None
1015 self.max_replicas_per_service = 1
1017 def _service_weight(self, data_hash, service_uuid):
1018 """Compute the weight of a Keep service endpoint for a data
1019 block with a known hash.
1021 The weight is md5(h + u) where u is the last 15 characters of
1022 the service endpoint's UUID.
1024 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1026 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1027 """Return an array of Keep service endpoints, in the order in
1028 which they should be probed when reading or writing data with
1029 the given hash+hints.
1031 self.build_services_list(force_rebuild)
1034 # Use the services indicated by the given +K@... remote
1035 # service hints, if any are present and can be resolved to a
1037 for hint in locator.hints:
1038 if hint.startswith('K@'):
1040 sorted_roots.append(
1041 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1042 elif len(hint) == 29:
1043 svc = self._gateway_services.get(hint[2:])
1045 sorted_roots.append(svc['_service_root'])
1047 # Sort the available local services by weight (heaviest first)
1048 # for this locator, and return their service_roots (base URIs)
1050 use_services = self._keep_services
1052 use_services = self._writable_services
1053 self.using_proxy = self._any_nondisk_services(use_services)
1054 sorted_roots.extend([
1055 svc['_service_root'] for svc in sorted(
1058 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1059 _logger.debug("{}: {}".format(locator, sorted_roots))
1062 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1063 # roots_map is a dictionary, mapping Keep service root strings
1064 # to KeepService objects. Poll for Keep services, and add any
1065 # new ones to roots_map. Return the current list of local
1067 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1068 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1069 for root in local_roots:
1070 if root not in roots_map:
1071 roots_map[root] = self.KeepService(
1072 root, self._user_agent_pool,
1073 upload_counter=self.upload_counter,
1074 download_counter=self.download_counter,
1076 insecure=self.insecure)
1080 def _check_loop_result(result):
1081 # KeepClient RetryLoops should save results as a 2-tuple: the
1082 # actual result of the request, and the number of servers available
1083 # to receive the request this round.
1084 # This method returns True if there's a real result, False if
1085 # there are no more servers available, otherwise None.
1086 if isinstance(result, Exception):
1088 result, tried_server_count = result
1089 if (result is not None) and (result is not False):
1091 elif tried_server_count < 1:
1092 _logger.info("No more Keep services to try; giving up")
1097 def get_from_cache(self, loc_s):
1098 """Fetch a block only if is in the cache, otherwise return None."""
1099 locator = KeepLocator(loc_s)
1100 slot = self.block_cache.get(locator.md5sum)
1101 if slot is not None and slot.ready.is_set():
1106 def refresh_signature(self, loc):
1107 """Ask Keep to get the remote block and return its local signature"""
1108 now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1109 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1112 def head(self, loc_s, **kwargs):
1113 return self._get_or_head(loc_s, method="HEAD", **kwargs)
1116 def get(self, loc_s, **kwargs):
1117 return self._get_or_head(loc_s, method="GET", **kwargs)
1119 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1120 """Get data from Keep.
1122 This method fetches one or more blocks of data from Keep. It
1123 sends a request each Keep service registered with the API
1124 server (or the proxy provided when this client was
1125 instantiated), then each service named in location hints, in
1126 sequence. As soon as one service provides the data, it's
1130 * loc_s: A string of one or more comma-separated locators to fetch.
1131 This method returns the concatenation of these blocks.
1132 * num_retries: The number of times to retry GET requests to
1133 *each* Keep server if it returns temporary failures, with
1134 exponential backoff. Note that, in each loop, the method may try
1135 to fetch data from every available Keep service, along with any
1136 that are named in location hints in the locator. The default value
1137 is set when the KeepClient is initialized.
1140 return ''.join(self.get(x) for x in loc_s.split(','))
1142 self.get_counter.add(1)
1144 request_id = (request_id or
1145 (hasattr(self, 'api_client') and self.api_client.request_id) or
1146 arvados.util.new_request_id())
1149 headers['X-Request-Id'] = request_id
1154 locator = KeepLocator(loc_s)
1156 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1159 # this is request for a prefetch, if it is
1160 # already in flight, return immediately.
1161 # clear 'slot' to prevent finally block from
1162 # calling slot.set()
1165 self.hits_counter.add(1)
1168 raise arvados.errors.KeepReadError(
1169 "failed to read {}".format(loc_s))
1172 self.misses_counter.add(1)
1174 # If the locator has hints specifying a prefix (indicating a
1175 # remote keepproxy) or the UUID of a local gateway service,
1176 # read data from the indicated service(s) instead of the usual
1177 # list of local disk services.
1178 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1179 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1180 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1181 for hint in locator.hints if (
1182 hint.startswith('K@') and
1184 self._gateway_services.get(hint[2:])
1186 # Map root URLs to their KeepService objects.
1188 root: self.KeepService(root, self._user_agent_pool,
1189 upload_counter=self.upload_counter,
1190 download_counter=self.download_counter,
1192 insecure=self.insecure)
1193 for root in hint_roots
1196 # See #3147 for a discussion of the loop implementation. Highlights:
1197 # * Refresh the list of Keep services after each failure, in case
1198 # it's being updated.
1199 # * Retry until we succeed, we're out of retries, or every available
1200 # service has returned permanent failure.
1203 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1205 for tries_left in loop:
1207 sorted_roots = self.map_new_services(
1209 force_rebuild=(tries_left < num_retries),
1210 need_writable=False,
1212 except Exception as error:
1213 loop.save_result(error)
1216 # Query KeepService objects that haven't returned
1217 # permanent failure, in our specified shuffle order.
1218 services_to_try = [roots_map[root]
1219 for root in sorted_roots
1220 if roots_map[root].usable()]
1221 for keep_service in services_to_try:
1222 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1223 if blob is not None:
1225 loop.save_result((blob, len(services_to_try)))
1227 # Always cache the result, then return it if we succeeded.
1231 if slot is not None:
1233 self.block_cache.cap_cache()
1235 # Q: Including 403 is necessary for the Keep tests to continue
1236 # passing, but maybe they should expect KeepReadError instead?
1237 not_founds = sum(1 for key in sorted_roots
1238 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1239 service_errors = ((key, roots_map[key].last_result()['error'])
1240 for key in sorted_roots)
1242 raise arvados.errors.KeepReadError(
1243 "[{}] failed to read {}: no Keep services available ({})".format(
1244 request_id, loc_s, loop.last_result()))
1245 elif not_founds == len(sorted_roots):
1246 raise arvados.errors.NotFoundError(
1247 "[{}] {} not found".format(request_id, loc_s), service_errors)
1249 raise arvados.errors.KeepReadError(
1250 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1253 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1254 """Save data in Keep.
1256 This method will get a list of Keep services from the API server, and
1257 send the data to each one simultaneously in a new thread. Once the
1258 uploads are finished, if enough copies are saved, this method returns
1259 the most recent HTTP response body. If requests fail to upload
1260 enough copies, this method raises KeepWriteError.
1263 * data: The string of data to upload.
1264 * copies: The number of copies that the user requires be saved.
1266 * num_retries: The number of times to retry PUT requests to
1267 *each* Keep server if it returns temporary failures, with
1268 exponential backoff. The default value is set when the
1269 KeepClient is initialized.
1270 * classes: An optional list of storage class names where copies should
1274 classes = classes or self._default_classes
1276 if not isinstance(data, bytes):
1277 data = data.encode()
1279 self.put_counter.add(1)
1281 data_hash = hashlib.md5(data).hexdigest()
1282 loc_s = data_hash + '+' + str(len(data))
1285 locator = KeepLocator(loc_s)
1287 request_id = (request_id or
1288 (hasattr(self, 'api_client') and self.api_client.request_id) or
1289 arvados.util.new_request_id())
1291 'X-Request-Id': request_id,
1292 'X-Keep-Desired-Replicas': str(copies),
1295 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1299 for tries_left in loop:
1301 sorted_roots = self.map_new_services(
1303 force_rebuild=(tries_left < num_retries),
1306 except Exception as error:
1307 loop.save_result(error)
1310 pending_classes = []
1311 if done_classes is not None:
1312 pending_classes = list(set(classes) - set(done_classes))
1313 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1314 data_hash=data_hash,
1315 copies=copies - done_copies,
1316 max_service_replicas=self.max_replicas_per_service,
1317 timeout=self.current_timeout(num_retries - tries_left),
1318 classes=pending_classes)
1319 for service_root, ks in [(root, roots_map[root])
1320 for root in sorted_roots]:
1323 writer_pool.add_task(ks, service_root)
1325 pool_copies, pool_classes = writer_pool.done()
1326 done_copies += pool_copies
1327 if (done_classes is not None) and (pool_classes is not None):
1328 done_classes += pool_classes
1330 (done_copies >= copies and set(done_classes) == set(classes),
1331 writer_pool.total_task_nr))
1333 # Old keepstore contacted without storage classes support:
1334 # success is determined only by successful copies.
1336 # Disable storage classes tracking from this point forward.
1337 if not self._storage_classes_unsupported_warning:
1338 self._storage_classes_unsupported_warning = True
1339 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1342 (done_copies >= copies, writer_pool.total_task_nr))
1345 return writer_pool.response()
1347 raise arvados.errors.KeepWriteError(
1348 "[{}] failed to write {}: no Keep services available ({})".format(
1349 request_id, data_hash, loop.last_result()))
1351 service_errors = ((key, roots_map[key].last_result()['error'])
1352 for key in sorted_roots
1353 if roots_map[key].last_result()['error'])
1354 raise arvados.errors.KeepWriteError(
1355 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1356 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1358 def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1359 """A stub for put().
1361 This method is used in place of the real put() method when
1362 using local storage (see constructor's local_store argument).
1364 copies and num_retries arguments are ignored: they are here
1365 only for the sake of offering the same call signature as
1368 Data stored this way can be retrieved via local_store_get().
1370 md5 = hashlib.md5(data).hexdigest()
1371 locator = '%s+%d' % (md5, len(data))
1372 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1374 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1375 os.path.join(self.local_store, md5))
1378 def local_store_get(self, loc_s, num_retries=None):
1379 """Companion to local_store_put()."""
1381 locator = KeepLocator(loc_s)
1383 raise arvados.errors.NotFoundError(
1384 "Invalid data locator: '%s'" % loc_s)
1385 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1387 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1390 def local_store_head(self, loc_s, num_retries=None):
1391 """Companion to local_store_put()."""
1393 locator = KeepLocator(loc_s)
1395 raise arvados.errors.NotFoundError(
1396 "Invalid data locator: '%s'" % loc_s)
1397 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1399 if os.path.exists(os.path.join(self.local_store, locator.md5sum)):