1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from __future__ import division
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
36 if sys.version_info >= (3, 0):
37 from io import BytesIO
39 from cStringIO import StringIO as BytesIO
42 import arvados.config as config
44 import arvados.retry as retry
46 import arvados.diskcache
47 from arvados._pycurlhelper import PyCurlHelper
49 _logger = logging.getLogger('arvados.keep')
50 global_client_object = None
53 # Monkey patch TCP constants when not available (apple). Values sourced from:
54 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
55 if sys.platform == 'darwin':
56 if not hasattr(socket, 'TCP_KEEPALIVE'):
57 socket.TCP_KEEPALIVE = 0x010
58 if not hasattr(socket, 'TCP_KEEPINTVL'):
59 socket.TCP_KEEPINTVL = 0x101
60 if not hasattr(socket, 'TCP_KEEPCNT'):
61 socket.TCP_KEEPCNT = 0x102
64 class KeepLocator(object):
65 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
66 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
68 def __init__(self, locator_str):
71 self._perm_expiry = None
72 pieces = iter(locator_str.split('+'))
73 self.md5sum = next(pieces)
75 self.size = int(next(pieces))
79 if self.HINT_RE.match(hint) is None:
80 raise ValueError("invalid hint format: {}".format(hint))
81 elif hint.startswith('A'):
82 self.parse_permission_hint(hint)
84 self.hints.append(hint)
89 for s in [self.md5sum, self.size,
90 self.permission_hint()] + self.hints
94 if self.size is not None:
95 return "%s+%i" % (self.md5sum, self.size)
99 def _make_hex_prop(name, length):
100 # Build and return a new property with the given name that
101 # must be a hex string of the given length.
102 data_name = '_{}'.format(name)
104 return getattr(self, data_name)
105 def setter(self, hex_str):
106 if not arvados.util.is_hex(hex_str, length):
107 raise ValueError("{} is not a {}-digit hex string: {!r}".
108 format(name, length, hex_str))
109 setattr(self, data_name, hex_str)
110 return property(getter, setter)
112 md5sum = _make_hex_prop('md5sum', 32)
113 perm_sig = _make_hex_prop('perm_sig', 40)
116 def perm_expiry(self):
117 return self._perm_expiry
120 def perm_expiry(self, value):
121 if not arvados.util.is_hex(value, 1, 8):
123 "permission timestamp must be a hex Unix timestamp: {}".
125 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
127 def permission_hint(self):
128 data = [self.perm_sig, self.perm_expiry]
131 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
132 return "A{}@{:08x}".format(*data)
134 def parse_permission_hint(self, s):
136 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
138 raise ValueError("bad permission hint {}".format(s))
140 def permission_expired(self, as_of_dt=None):
141 if self.perm_expiry is None:
143 elif as_of_dt is None:
144 as_of_dt = datetime.datetime.now()
145 return self.perm_expiry <= as_of_dt
149 """Simple interface to a global KeepClient object.
151 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
152 own API client. The global KeepClient will build an API client from the
153 current Arvados configuration, which may not match the one you built.
158 def global_client_object(cls):
159 global global_client_object
160 # Previously, KeepClient would change its behavior at runtime based
161 # on these configuration settings. We simulate that behavior here
162 # by checking the values and returning a new KeepClient if any of
164 key = (config.get('ARVADOS_API_HOST'),
165 config.get('ARVADOS_API_TOKEN'),
166 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
167 config.get('ARVADOS_KEEP_PROXY'),
168 os.environ.get('KEEP_LOCAL_STORE'))
169 if (global_client_object is None) or (cls._last_key != key):
170 global_client_object = KeepClient()
172 return global_client_object
175 def get(locator, **kwargs):
176 return Keep.global_client_object().get(locator, **kwargs)
179 def put(data, **kwargs):
180 return Keep.global_client_object().put(data, **kwargs)
182 class KeepBlockCache(object):
183 def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
184 self.cache_max = cache_max
185 self._cache = collections.OrderedDict()
186 self._cache_lock = threading.Lock()
187 self._max_slots = max_slots
188 self._disk_cache = disk_cache
189 self._disk_cache_dir = disk_cache_dir
190 self._cache_updating = threading.Condition(self._cache_lock)
192 if self._disk_cache and self._disk_cache_dir is None:
193 self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
194 os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
196 if self._max_slots == 0:
198 # Each block uses two file descriptors, one used to
199 # open it initially and hold the flock(), and a second
200 # hidden one used by mmap().
202 # Set max slots to 1/8 of maximum file handles. This
203 # means we'll use at most 1/4 of total file handles.
205 # NOFILE typically defaults to 1024 on Linux so this
206 # is 128 slots (256 file handles), which means we can
207 # cache up to 8 GiB of 64 MiB blocks. This leaves
208 # 768 file handles for sockets and other stuff.
210 # When we want the ability to have more cache (e.g. in
211 # arv-mount) we'll increase rlimit before calling
213 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
216 self._max_slots = 512
218 if self.cache_max == 0:
220 fs = os.statvfs(self._disk_cache_dir)
221 # Calculation of available space incorporates existing cache usage
222 existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
223 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
224 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
226 # 10% of total disk size
227 # 25% of available space
229 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
232 self.cache_max = (256 * 1024 * 1024)
234 self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
237 self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
241 class CacheSlot(object):
242 __slots__ = ("locator", "ready", "content")
244 def __init__(self, locator):
245 self.locator = locator
246 self.ready = threading.Event()
253 def set(self, value):
258 if self.content is None:
261 return len(self.content)
268 return (self.content is None)
270 def _resize_cache(self, cache_max, max_slots):
271 # Try and make sure the contents of the cache do not exceed
272 # the supplied maximums.
275 for slot in self._cache.values():
278 if sm <= cache_max and len(self._cache) <= max_slots:
281 _evict_candidates = collections.deque(self._cache.values())
282 while len(_evict_candidates) > 0 and (sm > cache_max or len(self._cache) > max_slots):
283 slot = _evict_candidates.popleft()
284 if not slot.ready.is_set():
287 if slot.content is None:
289 del self._cache[slot.locator]
294 # If evict returns false it means the
295 # underlying disk cache couldn't lock the file
296 # for deletion because another process was using
297 # it. Don't count it as reducing the amount
298 # of data in the cache, find something else to
303 # check to make sure the underlying data is gone
305 # either way we forget about it. either the
306 # other process will delete it, or if we need
307 # it again and it is still there, we'll find
309 del self._cache[slot.locator]
313 '''Cap the cache size to self.cache_max'''
314 with self._cache_updating:
315 self._resize_cache(self.cache_max, self._max_slots)
316 self._cache_updating.notify_all()
318 def _get(self, locator):
319 # Test if the locator is already in the cache
320 if locator in self._cache:
321 n = self._cache[locator]
322 if n.ready.is_set() and n.content is None:
323 del self._cache[n.locator]
325 self._cache.move_to_end(locator)
328 # see if it exists on disk
329 n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
331 self._cache[n.locator] = n
335 def get(self, locator):
336 with self._cache_lock:
337 return self._get(locator)
339 def reserve_cache(self, locator):
340 '''Reserve a cache slot for the specified locator,
341 or return the existing slot.'''
342 with self._cache_updating:
343 n = self._get(locator)
347 # Add a new cache slot for the locator
348 self._resize_cache(self.cache_max, self._max_slots-1)
349 while len(self._cache) >= self._max_slots:
350 # If there isn't a slot available, need to wait
351 # for something to happen that releases one of the
352 # cache slots. Idle for 200 ms or woken up by
354 self._cache_updating.wait(timeout=0.2)
355 self._resize_cache(self.cache_max, self._max_slots-1)
358 n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
360 n = KeepBlockCache.CacheSlot(locator)
361 self._cache[n.locator] = n
364 def set(self, slot, blob):
369 if e.errno == errno.ENOMEM:
370 # Reduce max slots to current - 4, cap cache and retry
371 with self._cache_lock:
372 self._max_slots = max(4, len(self._cache) - 4)
373 elif e.errno == errno.ENOSPC:
374 # Reduce disk max space to current - 256 MiB, cap cache and retry
375 with self._cache_lock:
376 sm = sum([st.size() for st in self._cache.values()])
377 self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
378 elif e.errno == errno.ENODEV:
379 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
380 except Exception as e:
383 # Check if we should evict things from the cache. Either
384 # because we added a new thing or there was an error and
385 # we possibly adjusted the limits down, so we might need
386 # to push something out.
390 # Only gets here if there was an error the first time. The
391 # exception handler adjusts limits downward in some cases
392 # to free up resources, which would make the operation
395 except Exception as e:
396 # It failed again. Give up.
398 raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
402 class Counter(object):
403 def __init__(self, v=0):
404 self._lk = threading.Lock()
416 class KeepClient(object):
417 DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
418 DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
420 class KeepService(PyCurlHelper):
421 """Make requests to a single Keep service, and track results.
423 A KeepService is intended to last long enough to perform one
424 transaction (GET or PUT) against one Keep service. This can
425 involve calling either get() or put() multiple times in order
426 to retry after transient failures. However, calling both get()
427 and put() on a single instance -- or using the same instance
428 to access two different Keep services -- will not produce
435 arvados.errors.HttpError,
438 def __init__(self, root, user_agent_pool=queue.LifoQueue(),
440 download_counter=None,
443 super(KeepClient.KeepService, self).__init__()
445 self._user_agent_pool = user_agent_pool
446 self._result = {'error': None}
450 self.get_headers = {'Accept': 'application/octet-stream'}
451 self.get_headers.update(headers)
452 self.put_headers = headers
453 self.upload_counter = upload_counter
454 self.download_counter = download_counter
455 self.insecure = insecure
458 """Is it worth attempting a request?"""
462 """Did the request succeed or encounter permanent failure?"""
463 return self._result['error'] == False or not self._usable
465 def last_result(self):
468 def _get_user_agent(self):
470 return self._user_agent_pool.get(block=False)
474 def _put_user_agent(self, ua):
477 self._user_agent_pool.put(ua, block=False)
481 def get(self, locator, method="GET", timeout=None):
482 # locator is a KeepLocator object.
483 url = self.root + str(locator)
484 _logger.debug("Request: %s %s", method, url)
485 curl = self._get_user_agent()
488 with timer.Timer() as t:
490 response_body = BytesIO()
491 curl.setopt(pycurl.NOSIGNAL, 1)
492 curl.setopt(pycurl.OPENSOCKETFUNCTION,
493 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
494 curl.setopt(pycurl.URL, url.encode('utf-8'))
495 curl.setopt(pycurl.HTTPHEADER, [
496 '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
497 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
498 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
500 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
501 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
503 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
505 curl.setopt(pycurl.NOBODY, True)
507 curl.setopt(pycurl.HTTPGET, True)
508 self._setcurltimeouts(curl, timeout, method=="HEAD")
512 except Exception as e:
513 raise arvados.errors.HttpError(0, str(e))
519 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
520 'body': response_body.getvalue(),
521 'headers': self._headers,
525 ok = retry.check_http_response_success(self._result['status_code'])
527 self._result['error'] = arvados.errors.HttpError(
528 self._result['status_code'],
529 self._headers.get('x-status-line', 'Error'))
530 except self.HTTP_ERRORS as e:
534 self._usable = ok != False
535 if self._result.get('status_code', None):
536 # The client worked well enough to get an HTTP status
537 # code, so presumably any problems are just on the
538 # server side and it's OK to reuse the client.
539 self._put_user_agent(curl)
541 # Don't return this client to the pool, in case it's
545 _logger.debug("Request fail: GET %s => %s: %s",
546 url, type(self._result['error']), str(self._result['error']))
549 _logger.info("HEAD %s: %s bytes",
550 self._result['status_code'],
551 self._result.get('content-length'))
552 if self._result['headers'].get('x-keep-locator'):
553 # This is a response to a remote block copy request, return
554 # the local copy block locator.
555 return self._result['headers'].get('x-keep-locator')
558 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
559 self._result['status_code'],
560 len(self._result['body']),
562 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
564 if self.download_counter:
565 self.download_counter.add(len(self._result['body']))
566 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
567 if resp_md5 != locator.md5sum:
568 _logger.warning("Checksum fail: md5(%s) = %s",
570 self._result['error'] = arvados.errors.HttpError(
573 return self._result['body']
575 def put(self, hash_s, body, timeout=None, headers={}):
576 put_headers = copy.copy(self.put_headers)
577 put_headers.update(headers)
578 url = self.root + hash_s
579 _logger.debug("Request: PUT %s", url)
580 curl = self._get_user_agent()
583 with timer.Timer() as t:
585 body_reader = BytesIO(body)
586 response_body = BytesIO()
587 curl.setopt(pycurl.NOSIGNAL, 1)
588 curl.setopt(pycurl.OPENSOCKETFUNCTION,
589 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
590 curl.setopt(pycurl.URL, url.encode('utf-8'))
591 # Using UPLOAD tells cURL to wait for a "go ahead" from the
592 # Keep server (in the form of a HTTP/1.1 "100 Continue"
593 # response) instead of sending the request body immediately.
594 # This allows the server to reject the request if the request
595 # is invalid or the server is read-only, without waiting for
596 # the client to send the entire block.
597 curl.setopt(pycurl.UPLOAD, True)
598 curl.setopt(pycurl.INFILESIZE, len(body))
599 curl.setopt(pycurl.READFUNCTION, body_reader.read)
600 curl.setopt(pycurl.HTTPHEADER, [
601 '{}: {}'.format(k,v) for k,v in put_headers.items()])
602 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
603 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
605 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
606 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
608 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
609 self._setcurltimeouts(curl, timeout)
612 except Exception as e:
613 raise arvados.errors.HttpError(0, str(e))
619 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
620 'body': response_body.getvalue().decode('utf-8'),
621 'headers': self._headers,
624 ok = retry.check_http_response_success(self._result['status_code'])
626 self._result['error'] = arvados.errors.HttpError(
627 self._result['status_code'],
628 self._headers.get('x-status-line', 'Error'))
629 except self.HTTP_ERRORS as e:
633 self._usable = ok != False # still usable if ok is True or None
634 if self._result.get('status_code', None):
635 # Client is functional. See comment in get().
636 self._put_user_agent(curl)
640 _logger.debug("Request fail: PUT %s => %s: %s",
641 url, type(self._result['error']), str(self._result['error']))
643 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
644 self._result['status_code'],
647 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
648 if self.upload_counter:
649 self.upload_counter.add(len(body))
653 class KeepWriterQueue(queue.Queue):
654 def __init__(self, copies, classes=[]):
655 queue.Queue.__init__(self) # Old-style superclass
656 self.wanted_copies = copies
657 self.wanted_storage_classes = classes
658 self.successful_copies = 0
659 self.confirmed_storage_classes = {}
661 self.storage_classes_tracking = True
662 self.queue_data_lock = threading.RLock()
663 self.pending_tries = max(copies, len(classes))
664 self.pending_tries_notification = threading.Condition()
666 def write_success(self, response, replicas_nr, classes_confirmed):
667 with self.queue_data_lock:
668 self.successful_copies += replicas_nr
669 if classes_confirmed is None:
670 self.storage_classes_tracking = False
671 elif self.storage_classes_tracking:
672 for st_class, st_copies in classes_confirmed.items():
674 self.confirmed_storage_classes[st_class] += st_copies
676 self.confirmed_storage_classes[st_class] = st_copies
677 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
678 self.response = response
679 with self.pending_tries_notification:
680 self.pending_tries_notification.notify_all()
682 def write_fail(self, ks):
683 with self.pending_tries_notification:
684 self.pending_tries += 1
685 self.pending_tries_notification.notify()
687 def pending_copies(self):
688 with self.queue_data_lock:
689 return self.wanted_copies - self.successful_copies
691 def satisfied_classes(self):
692 with self.queue_data_lock:
693 if not self.storage_classes_tracking:
694 # Notifies disabled storage classes expectation to
697 return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
699 def pending_classes(self):
700 with self.queue_data_lock:
701 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
703 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
704 for st_class, st_copies in self.confirmed_storage_classes.items():
705 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
706 unsatisfied_classes.remove(st_class)
707 return unsatisfied_classes
709 def get_next_task(self):
710 with self.pending_tries_notification:
712 if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
713 # This notify_all() is unnecessary --
714 # write_success() already called notify_all()
715 # when pending<1 became true, so it's not
716 # possible for any other thread to be in
717 # wait() now -- but it's cheap insurance
718 # against deadlock so we do it anyway:
719 self.pending_tries_notification.notify_all()
720 # Drain the queue and then raise Queue.Empty
724 elif self.pending_tries > 0:
725 service, service_root = self.get_nowait()
726 if service.finished():
729 self.pending_tries -= 1
730 return service, service_root
732 self.pending_tries_notification.notify_all()
735 self.pending_tries_notification.wait()
738 class KeepWriterThreadPool(object):
739 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
740 self.total_task_nr = 0
741 if (not max_service_replicas) or (max_service_replicas >= copies):
744 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
745 _logger.debug("Pool max threads is %d", num_threads)
747 self.queue = KeepClient.KeepWriterQueue(copies, classes)
749 for _ in range(num_threads):
750 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
751 self.workers.append(w)
753 def add_task(self, ks, service_root):
754 self.queue.put((ks, service_root))
755 self.total_task_nr += 1
758 return self.queue.successful_copies, self.queue.satisfied_classes()
762 for worker in self.workers:
764 # Wait for finished work
768 return self.queue.response
771 class KeepWriterThread(threading.Thread):
772 class TaskFailed(RuntimeError): pass
774 def __init__(self, queue, data, data_hash, timeout=None):
775 super(KeepClient.KeepWriterThread, self).__init__()
776 self.timeout = timeout
779 self.data_hash = data_hash
785 service, service_root = self.queue.get_next_task()
789 locator, copies, classes = self.do_task(service, service_root)
790 except Exception as e:
791 if not isinstance(e, self.TaskFailed):
792 _logger.exception("Exception in KeepWriterThread")
793 self.queue.write_fail(service)
795 self.queue.write_success(locator, copies, classes)
797 self.queue.task_done()
799 def do_task(self, service, service_root):
800 classes = self.queue.pending_classes()
804 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
805 success = bool(service.put(self.data_hash,
807 timeout=self.timeout,
809 result = service.last_result()
812 if result.get('status_code'):
813 _logger.debug("Request fail: PUT %s => %s %s",
815 result.get('status_code'),
817 raise self.TaskFailed()
819 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
820 str(threading.current_thread()),
825 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
826 except (KeyError, ValueError):
829 classes_confirmed = {}
831 scch = result['headers']['x-keep-storage-classes-confirmed']
832 for confirmation in scch.replace(' ', '').split(','):
833 if '=' in confirmation:
834 stored_class, stored_copies = confirmation.split('=')[:2]
835 classes_confirmed[stored_class] = int(stored_copies)
836 except (KeyError, ValueError):
837 # Storage classes confirmed header missing or corrupt
838 classes_confirmed = None
840 return result['body'].strip(), replicas_stored, classes_confirmed
843 def __init__(self, api_client=None, proxy=None,
844 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
845 api_token=None, local_store=None, block_cache=None,
846 num_retries=10, session=None, num_prefetch_threads=None):
847 """Initialize a new KeepClient.
851 The API client to use to find Keep services. If not
852 provided, KeepClient will build one from available Arvados
856 If specified, this KeepClient will send requests to this Keep
857 proxy. Otherwise, KeepClient will fall back to the setting of the
858 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
859 If you want to KeepClient does not use a proxy, pass in an empty
863 The initial timeout (in seconds) for HTTP requests to Keep
864 non-proxy servers. A tuple of three floats is interpreted as
865 (connection_timeout, read_timeout, minimum_bandwidth). A connection
866 will be aborted if the average traffic rate falls below
867 minimum_bandwidth bytes per second over an interval of read_timeout
868 seconds. Because timeouts are often a result of transient server
869 load, the actual connection timeout will be increased by a factor
870 of two on each retry.
871 Default: (2, 256, 32768).
874 The initial timeout (in seconds) for HTTP requests to
875 Keep proxies. A tuple of three floats is interpreted as
876 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
877 described above for adjusting connection timeouts on retry also
879 Default: (20, 256, 32768).
882 If you're not using an API client, but only talking
883 directly to a Keep proxy, this parameter specifies an API token
884 to authenticate Keep requests. It is an error to specify both
885 api_client and api_token. If you specify neither, KeepClient
886 will use one available from the Arvados configuration.
889 If specified, this KeepClient will bypass Keep
890 services, and save data to the named directory. If unspecified,
891 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
892 environment variable. If you want to ensure KeepClient does not
893 use local storage, pass in an empty string. This is primarily
894 intended to mock a server for testing.
897 The default number of times to retry failed requests.
898 This will be used as the default num_retries value when get() and
899 put() are called. Default 10.
901 self.lock = threading.Lock()
903 if config.get('ARVADOS_KEEP_SERVICES'):
904 proxy = config.get('ARVADOS_KEEP_SERVICES')
906 proxy = config.get('ARVADOS_KEEP_PROXY')
907 if api_token is None:
908 if api_client is None:
909 api_token = config.get('ARVADOS_API_TOKEN')
911 api_token = api_client.api_token
912 elif api_client is not None:
914 "can't build KeepClient with both API client and token")
915 if local_store is None:
916 local_store = os.environ.get('KEEP_LOCAL_STORE')
918 if api_client is None:
919 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
921 self.insecure = api_client.insecure
923 self.block_cache = block_cache if block_cache else KeepBlockCache()
924 self.timeout = timeout
925 self.proxy_timeout = proxy_timeout
926 self._user_agent_pool = queue.LifoQueue()
927 self.upload_counter = Counter()
928 self.download_counter = Counter()
929 self.put_counter = Counter()
930 self.get_counter = Counter()
931 self.hits_counter = Counter()
932 self.misses_counter = Counter()
933 self._storage_classes_unsupported_warning = False
934 self._default_classes = []
935 if num_prefetch_threads is not None:
936 self.num_prefetch_threads = num_prefetch_threads
938 self.num_prefetch_threads = 2
939 self._prefetch_queue = None
940 self._prefetch_threads = None
943 self.local_store = local_store
944 self.head = self.local_store_head
945 self.get = self.local_store_get
946 self.put = self.local_store_put
948 self.num_retries = num_retries
949 self.max_replicas_per_service = None
951 proxy_uris = proxy.split()
952 for i in range(len(proxy_uris)):
953 if not proxy_uris[i].endswith('/'):
956 url = urllib.parse.urlparse(proxy_uris[i])
957 if not (url.scheme and url.netloc):
958 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
959 self.api_token = api_token
960 self._gateway_services = {}
961 self._keep_services = [{
962 'uuid': "00000-bi6l4-%015d" % idx,
963 'service_type': 'proxy',
964 '_service_root': uri,
965 } for idx, uri in enumerate(proxy_uris)]
966 self._writable_services = self._keep_services
967 self.using_proxy = True
968 self._static_services_list = True
970 # It's important to avoid instantiating an API client
971 # unless we actually need one, for testing's sake.
972 if api_client is None:
973 api_client = arvados.api('v1')
974 self.api_client = api_client
975 self.api_token = api_client.api_token
976 self._gateway_services = {}
977 self._keep_services = None
978 self._writable_services = None
979 self.using_proxy = None
980 self._static_services_list = False
982 self._default_classes = [
983 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
985 # We're talking to an old cluster
988 def current_timeout(self, attempt_number):
989 """Return the appropriate timeout to use for this client.
991 The proxy timeout setting if the backend service is currently a proxy,
992 the regular timeout setting otherwise. The `attempt_number` indicates
993 how many times the operation has been tried already (starting from 0
994 for the first try), and scales the connection timeout portion of the
995 return value accordingly.
998 # TODO(twp): the timeout should be a property of a
999 # KeepService, not a KeepClient. See #4488.
1000 t = self.proxy_timeout if self.using_proxy else self.timeout
1002 return (t[0] * (1 << attempt_number), t[1])
1004 return (t[0] * (1 << attempt_number), t[1], t[2])
1005 def _any_nondisk_services(self, service_list):
1006 return any(ks.get('service_type', 'disk') != 'disk'
1007 for ks in service_list)
1009 def build_services_list(self, force_rebuild=False):
1010 if (self._static_services_list or
1011 (self._keep_services and not force_rebuild)):
1015 keep_services = self.api_client.keep_services().accessible()
1016 except Exception: # API server predates Keep services.
1017 keep_services = self.api_client.keep_disks().list()
1019 # Gateway services are only used when specified by UUID,
1020 # so there's nothing to gain by filtering them by
1022 self._gateway_services = {ks['uuid']: ks for ks in
1023 keep_services.execute()['items']}
1024 if not self._gateway_services:
1025 raise arvados.errors.NoKeepServersError()
1027 # Precompute the base URI for each service.
1028 for r in self._gateway_services.values():
1029 host = r['service_host']
1030 if not host.startswith('[') and host.find(':') >= 0:
1031 # IPv6 URIs must be formatted like http://[::1]:80/...
1032 host = '[' + host + ']'
1033 r['_service_root'] = "{}://{}:{:d}/".format(
1034 'https' if r['service_ssl_flag'] else 'http',
1038 _logger.debug(str(self._gateway_services))
1039 self._keep_services = [
1040 ks for ks in self._gateway_services.values()
1041 if not ks.get('service_type', '').startswith('gateway:')]
1042 self._writable_services = [ks for ks in self._keep_services
1043 if not ks.get('read_only')]
1045 # For disk type services, max_replicas_per_service is 1
1046 # It is unknown (unlimited) for other service types.
1047 if self._any_nondisk_services(self._writable_services):
1048 self.max_replicas_per_service = None
1050 self.max_replicas_per_service = 1
1052 def _service_weight(self, data_hash, service_uuid):
1053 """Compute the weight of a Keep service endpoint for a data
1054 block with a known hash.
1056 The weight is md5(h + u) where u is the last 15 characters of
1057 the service endpoint's UUID.
1059 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1061 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1062 """Return an array of Keep service endpoints, in the order in
1063 which they should be probed when reading or writing data with
1064 the given hash+hints.
1066 self.build_services_list(force_rebuild)
1069 # Use the services indicated by the given +K@... remote
1070 # service hints, if any are present and can be resolved to a
1072 for hint in locator.hints:
1073 if hint.startswith('K@'):
1075 sorted_roots.append(
1076 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1077 elif len(hint) == 29:
1078 svc = self._gateway_services.get(hint[2:])
1080 sorted_roots.append(svc['_service_root'])
1082 # Sort the available local services by weight (heaviest first)
1083 # for this locator, and return their service_roots (base URIs)
1085 use_services = self._keep_services
1087 use_services = self._writable_services
1088 self.using_proxy = self._any_nondisk_services(use_services)
1089 sorted_roots.extend([
1090 svc['_service_root'] for svc in sorted(
1093 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1094 _logger.debug("{}: {}".format(locator, sorted_roots))
1097 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1098 # roots_map is a dictionary, mapping Keep service root strings
1099 # to KeepService objects. Poll for Keep services, and add any
1100 # new ones to roots_map. Return the current list of local
1102 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1103 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1104 for root in local_roots:
1105 if root not in roots_map:
1106 roots_map[root] = self.KeepService(
1107 root, self._user_agent_pool,
1108 upload_counter=self.upload_counter,
1109 download_counter=self.download_counter,
1111 insecure=self.insecure)
1115 def _check_loop_result(result):
1116 # KeepClient RetryLoops should save results as a 2-tuple: the
1117 # actual result of the request, and the number of servers available
1118 # to receive the request this round.
1119 # This method returns True if there's a real result, False if
1120 # there are no more servers available, otherwise None.
1121 if isinstance(result, Exception):
1123 result, tried_server_count = result
1124 if (result is not None) and (result is not False):
1126 elif tried_server_count < 1:
1127 _logger.info("No more Keep services to try; giving up")
1132 def get_from_cache(self, loc_s):
1133 """Fetch a block only if is in the cache, otherwise return None."""
1134 locator = KeepLocator(loc_s)
1135 slot = self.block_cache.get(locator.md5sum)
1136 if slot is not None and slot.ready.is_set():
1141 def refresh_signature(self, loc):
1142 """Ask Keep to get the remote block and return its local signature"""
1143 now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1144 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1147 def head(self, loc_s, **kwargs):
1148 return self._get_or_head(loc_s, method="HEAD", **kwargs)
1151 def get(self, loc_s, **kwargs):
1152 return self._get_or_head(loc_s, method="GET", **kwargs)
1154 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1155 """Get data from Keep.
1157 This method fetches one or more blocks of data from Keep. It
1158 sends a request each Keep service registered with the API
1159 server (or the proxy provided when this client was
1160 instantiated), then each service named in location hints, in
1161 sequence. As soon as one service provides the data, it's
1165 * loc_s: A string of one or more comma-separated locators to fetch.
1166 This method returns the concatenation of these blocks.
1167 * num_retries: The number of times to retry GET requests to
1168 *each* Keep server if it returns temporary failures, with
1169 exponential backoff. Note that, in each loop, the method may try
1170 to fetch data from every available Keep service, along with any
1171 that are named in location hints in the locator. The default value
1172 is set when the KeepClient is initialized.
1175 return ''.join(self.get(x) for x in loc_s.split(','))
1177 self.get_counter.add(1)
1179 request_id = (request_id or
1180 (hasattr(self, 'api_client') and self.api_client.request_id) or
1181 arvados.util.new_request_id())
1184 headers['X-Request-Id'] = request_id
1189 locator = KeepLocator(loc_s)
1192 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1194 # Fresh and empty "first time it is used" slot
1197 # this is request for a prefetch to fill in
1198 # the cache, don't need to wait for the
1199 # result, so if it is already in flight return
1200 # immediately. Clear 'slot' to prevent
1201 # finally block from calling slot.set()
1206 if blob is not None:
1207 self.hits_counter.add(1)
1210 # If blob is None, this means either
1212 # (a) another thread was fetching this block and
1213 # failed with an error or
1215 # (b) cache thrashing caused the slot to be
1216 # evicted (content set to None) by another thread
1217 # between the call to reserve_cache() and get().
1219 # We'll handle these cases by reserving a new slot
1220 # and then doing a full GET request.
1223 self.misses_counter.add(1)
1225 # If the locator has hints specifying a prefix (indicating a
1226 # remote keepproxy) or the UUID of a local gateway service,
1227 # read data from the indicated service(s) instead of the usual
1228 # list of local disk services.
1229 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1230 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1231 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1232 for hint in locator.hints if (
1233 hint.startswith('K@') and
1235 self._gateway_services.get(hint[2:])
1237 # Map root URLs to their KeepService objects.
1239 root: self.KeepService(root, self._user_agent_pool,
1240 upload_counter=self.upload_counter,
1241 download_counter=self.download_counter,
1243 insecure=self.insecure)
1244 for root in hint_roots
1247 # See #3147 for a discussion of the loop implementation. Highlights:
1248 # * Refresh the list of Keep services after each failure, in case
1249 # it's being updated.
1250 # * Retry until we succeed, we're out of retries, or every available
1251 # service has returned permanent failure.
1254 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1256 for tries_left in loop:
1258 sorted_roots = self.map_new_services(
1260 force_rebuild=(tries_left < num_retries),
1261 need_writable=False,
1263 except Exception as error:
1264 loop.save_result(error)
1267 # Query KeepService objects that haven't returned
1268 # permanent failure, in our specified shuffle order.
1269 services_to_try = [roots_map[root]
1270 for root in sorted_roots
1271 if roots_map[root].usable()]
1272 for keep_service in services_to_try:
1273 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1274 if blob is not None:
1276 loop.save_result((blob, len(services_to_try)))
1278 # Always cache the result, then return it if we succeeded.
1282 if slot is not None:
1283 self.block_cache.set(slot, blob)
1285 # Q: Including 403 is necessary for the Keep tests to continue
1286 # passing, but maybe they should expect KeepReadError instead?
1287 not_founds = sum(1 for key in sorted_roots
1288 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1289 service_errors = ((key, roots_map[key].last_result()['error'])
1290 for key in sorted_roots)
1292 raise arvados.errors.KeepReadError(
1293 "[{}] failed to read {}: no Keep services available ({})".format(
1294 request_id, loc_s, loop.last_result()))
1295 elif not_founds == len(sorted_roots):
1296 raise arvados.errors.NotFoundError(
1297 "[{}] {} not found".format(request_id, loc_s), service_errors)
1299 raise arvados.errors.KeepReadError(
1300 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1303 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1304 """Save data in Keep.
1306 This method will get a list of Keep services from the API server, and
1307 send the data to each one simultaneously in a new thread. Once the
1308 uploads are finished, if enough copies are saved, this method returns
1309 the most recent HTTP response body. If requests fail to upload
1310 enough copies, this method raises KeepWriteError.
1313 * data: The string of data to upload.
1314 * copies: The number of copies that the user requires be saved.
1316 * num_retries: The number of times to retry PUT requests to
1317 *each* Keep server if it returns temporary failures, with
1318 exponential backoff. The default value is set when the
1319 KeepClient is initialized.
1320 * classes: An optional list of storage class names where copies should
1324 classes = classes or self._default_classes
1326 if not isinstance(data, bytes):
1327 data = data.encode()
1329 self.put_counter.add(1)
1331 data_hash = hashlib.md5(data).hexdigest()
1332 loc_s = data_hash + '+' + str(len(data))
1335 locator = KeepLocator(loc_s)
1337 request_id = (request_id or
1338 (hasattr(self, 'api_client') and self.api_client.request_id) or
1339 arvados.util.new_request_id())
1341 'X-Request-Id': request_id,
1342 'X-Keep-Desired-Replicas': str(copies),
1345 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1349 for tries_left in loop:
1351 sorted_roots = self.map_new_services(
1353 force_rebuild=(tries_left < num_retries),
1356 except Exception as error:
1357 loop.save_result(error)
1360 pending_classes = []
1361 if done_classes is not None:
1362 pending_classes = list(set(classes) - set(done_classes))
1363 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1364 data_hash=data_hash,
1365 copies=copies - done_copies,
1366 max_service_replicas=self.max_replicas_per_service,
1367 timeout=self.current_timeout(num_retries - tries_left),
1368 classes=pending_classes)
1369 for service_root, ks in [(root, roots_map[root])
1370 for root in sorted_roots]:
1373 writer_pool.add_task(ks, service_root)
1375 pool_copies, pool_classes = writer_pool.done()
1376 done_copies += pool_copies
1377 if (done_classes is not None) and (pool_classes is not None):
1378 done_classes += pool_classes
1380 (done_copies >= copies and set(done_classes) == set(classes),
1381 writer_pool.total_task_nr))
1383 # Old keepstore contacted without storage classes support:
1384 # success is determined only by successful copies.
1386 # Disable storage classes tracking from this point forward.
1387 if not self._storage_classes_unsupported_warning:
1388 self._storage_classes_unsupported_warning = True
1389 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1392 (done_copies >= copies, writer_pool.total_task_nr))
1395 return writer_pool.response()
1397 raise arvados.errors.KeepWriteError(
1398 "[{}] failed to write {}: no Keep services available ({})".format(
1399 request_id, data_hash, loop.last_result()))
1401 service_errors = ((key, roots_map[key].last_result()['error'])
1402 for key in sorted_roots
1403 if roots_map[key].last_result()['error'])
1404 raise arvados.errors.KeepWriteError(
1405 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1406 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1408 def _block_prefetch_worker(self):
1409 """The background downloader thread."""
1412 b = self._prefetch_queue.get()
1415 self.get(b, prefetch=True)
1417 _logger.exception("Exception doing block prefetch")
1419 def _start_prefetch_threads(self):
1420 if self._prefetch_threads is None:
1422 if self._prefetch_threads is not None:
1424 self._prefetch_queue = queue.Queue()
1425 self._prefetch_threads = []
1426 for i in range(0, self.num_prefetch_threads):
1427 thread = threading.Thread(target=self._block_prefetch_worker)
1428 self._prefetch_threads.append(thread)
1429 thread.daemon = True
1432 def block_prefetch(self, locator):
1434 This relies on the fact that KeepClient implements a block cache,
1435 so repeated requests for the same block will not result in repeated
1436 downloads (unless the block is evicted from the cache.) This method
1440 if self.block_cache.get(locator) is not None:
1443 self._start_prefetch_threads()
1444 self._prefetch_queue.put(locator)
1446 def stop_prefetch_threads(self):
1448 if self._prefetch_threads is not None:
1449 for t in self._prefetch_threads:
1450 self._prefetch_queue.put(None)
1451 for t in self._prefetch_threads:
1453 self._prefetch_threads = None
1454 self._prefetch_queue = None
1456 def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1457 """A stub for put().
1459 This method is used in place of the real put() method when
1460 using local storage (see constructor's local_store argument).
1462 copies and num_retries arguments are ignored: they are here
1463 only for the sake of offering the same call signature as
1466 Data stored this way can be retrieved via local_store_get().
1468 md5 = hashlib.md5(data).hexdigest()
1469 locator = '%s+%d' % (md5, len(data))
1470 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1472 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1473 os.path.join(self.local_store, md5))
1476 def local_store_get(self, loc_s, num_retries=None):
1477 """Companion to local_store_put()."""
1479 locator = KeepLocator(loc_s)
1481 raise arvados.errors.NotFoundError(
1482 "Invalid data locator: '%s'" % loc_s)
1483 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1485 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1488 def local_store_head(self, loc_s, num_retries=None):
1489 """Companion to local_store_put()."""
1491 locator = KeepLocator(loc_s)
1493 raise arvados.errors.NotFoundError(
1494 "Invalid data locator: '%s'" % loc_s)
1495 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1497 if os.path.exists(os.path.join(self.local_store, locator.md5sum)):