1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from __future__ import division
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
36 if sys.version_info >= (3, 0):
37 from io import BytesIO
39 from cStringIO import StringIO as BytesIO
42 import arvados.config as config
44 import arvados.retry as retry
46 import arvados.diskcache
47 from arvados._pycurlhelper import PyCurlHelper
49 _logger = logging.getLogger('arvados.keep')
50 global_client_object = None
53 # Monkey patch TCP constants when not available (apple). Values sourced from:
54 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
55 if sys.platform == 'darwin':
56 if not hasattr(socket, 'TCP_KEEPALIVE'):
57 socket.TCP_KEEPALIVE = 0x010
58 if not hasattr(socket, 'TCP_KEEPINTVL'):
59 socket.TCP_KEEPINTVL = 0x101
60 if not hasattr(socket, 'TCP_KEEPCNT'):
61 socket.TCP_KEEPCNT = 0x102
64 class KeepLocator(object):
65 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
66 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
68 def __init__(self, locator_str):
71 self._perm_expiry = None
72 pieces = iter(locator_str.split('+'))
73 self.md5sum = next(pieces)
75 self.size = int(next(pieces))
79 if self.HINT_RE.match(hint) is None:
80 raise ValueError("invalid hint format: {}".format(hint))
81 elif hint.startswith('A'):
82 self.parse_permission_hint(hint)
84 self.hints.append(hint)
89 for s in [self.md5sum, self.size,
90 self.permission_hint()] + self.hints
94 if self.size is not None:
95 return "%s+%i" % (self.md5sum, self.size)
99 def _make_hex_prop(name, length):
100 # Build and return a new property with the given name that
101 # must be a hex string of the given length.
102 data_name = '_{}'.format(name)
104 return getattr(self, data_name)
105 def setter(self, hex_str):
106 if not arvados.util.is_hex(hex_str, length):
107 raise ValueError("{} is not a {}-digit hex string: {!r}".
108 format(name, length, hex_str))
109 setattr(self, data_name, hex_str)
110 return property(getter, setter)
112 md5sum = _make_hex_prop('md5sum', 32)
113 perm_sig = _make_hex_prop('perm_sig', 40)
116 def perm_expiry(self):
117 return self._perm_expiry
120 def perm_expiry(self, value):
121 if not arvados.util.is_hex(value, 1, 8):
123 "permission timestamp must be a hex Unix timestamp: {}".
125 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
127 def permission_hint(self):
128 data = [self.perm_sig, self.perm_expiry]
131 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
132 return "A{}@{:08x}".format(*data)
134 def parse_permission_hint(self, s):
136 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
138 raise ValueError("bad permission hint {}".format(s))
140 def permission_expired(self, as_of_dt=None):
141 if self.perm_expiry is None:
143 elif as_of_dt is None:
144 as_of_dt = datetime.datetime.now()
145 return self.perm_expiry <= as_of_dt
149 """Simple interface to a global KeepClient object.
151 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
152 own API client. The global KeepClient will build an API client from the
153 current Arvados configuration, which may not match the one you built.
158 def global_client_object(cls):
159 global global_client_object
160 # Previously, KeepClient would change its behavior at runtime based
161 # on these configuration settings. We simulate that behavior here
162 # by checking the values and returning a new KeepClient if any of
164 key = (config.get('ARVADOS_API_HOST'),
165 config.get('ARVADOS_API_TOKEN'),
166 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
167 config.get('ARVADOS_KEEP_PROXY'),
168 os.environ.get('KEEP_LOCAL_STORE'))
169 if (global_client_object is None) or (cls._last_key != key):
170 global_client_object = KeepClient()
172 return global_client_object
175 def get(locator, **kwargs):
176 return Keep.global_client_object().get(locator, **kwargs)
179 def put(data, **kwargs):
180 return Keep.global_client_object().put(data, **kwargs)
182 class KeepBlockCache(object):
183 def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
184 self.cache_max = cache_max
185 self._cache = collections.OrderedDict()
186 self._cache_lock = threading.Lock()
187 self._max_slots = max_slots
188 self._disk_cache = disk_cache
189 self._disk_cache_dir = disk_cache_dir
190 self._cache_updating = threading.Condition(self._cache_lock)
192 if self._disk_cache and self._disk_cache_dir is None:
193 self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
194 os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
196 if self._max_slots == 0:
198 # Each block uses two file descriptors, one used to
199 # open it initially and hold the flock(), and a second
200 # hidden one used by mmap().
202 # Set max slots to 1/8 of maximum file handles. This
203 # means we'll use at most 1/4 of total file handles.
205 # NOFILE typically defaults to 1024 on Linux so this
206 # is 128 slots (256 file handles), which means we can
207 # cache up to 8 GiB of 64 MiB blocks. This leaves
208 # 768 file handles for sockets and other stuff.
210 # When we want the ability to have more cache (e.g. in
211 # arv-mount) we'll increase rlimit before calling
213 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
216 self._max_slots = 512
218 if self.cache_max == 0:
220 fs = os.statvfs(self._disk_cache_dir)
221 # Calculation of available space incorporates existing cache usage
222 existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
223 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
224 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
226 # 10% of total disk size
227 # 25% of available space
229 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
232 self.cache_max = (256 * 1024 * 1024)
234 self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
237 self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
241 class CacheSlot(object):
242 __slots__ = ("locator", "ready", "content")
244 def __init__(self, locator):
245 self.locator = locator
246 self.ready = threading.Event()
253 def set(self, value):
258 if self.content is None:
261 return len(self.content)
268 return (self.content is None)
270 def _resize_cache(self, cache_max, max_slots):
271 # Try and make sure the contents of the cache do not exceed
272 # the supplied maximums.
274 _evict_candidates = collections.deque(self._cache.values())
275 sm = sum([slot.size() for slot in _evict_candidates])
276 while len(self._cache) > 0 and (sm > cache_max or len(self._cache) > max_slots):
277 slot = _evict_candidates.popleft()
278 if not slot.ready.is_set():
281 if slot.content is None:
283 del self._cache[slot.locator]
288 # If evict returns false it means the
289 # underlying disk cache couldn't lock the file
290 # for deletion because another process was using
291 # it. Don't count it as reducing the amount
292 # of data in the cache, find something else to
297 # check to make sure the underlying data is gone
299 # either way we forget about it. either the
300 # other process will delete it, or if we need
301 # it again and it is still there, we'll find
303 del self._cache[slot.locator]
307 '''Cap the cache size to self.cache_max'''
308 with self._cache_updating:
309 self._resize_cache(self.cache_max, self._max_slots)
310 self._cache_updating.notify_all()
312 def _get(self, locator):
313 # Test if the locator is already in the cache
314 if locator in self._cache:
315 n = self._cache[locator]
316 self._cache.move_to_back(locator)
319 # see if it exists on disk
320 n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
322 self._cache[n.locator] = n
326 def get(self, locator):
327 with self._cache_lock:
328 return self._get(locator)
330 def reserve_cache(self, locator):
331 '''Reserve a cache slot for the specified locator,
332 or return the existing slot.'''
333 with self._cache_updating:
334 n = self._get(locator)
338 # Add a new cache slot for the locator
339 self._resize_cache(self.cache_max, self._max_slots-1)
340 while len(self._cache) >= self._max_slots:
341 # If there isn't a slot available, need to wait
342 # for something to happen that releases one of the
343 # cache slots. Idle for 200 ms or woken up by
345 self._cache_updating.wait(timeout=0.2)
346 self._resize_cache(self.cache_max, self._max_slots-1)
349 n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
351 n = KeepBlockCache.CacheSlot(locator)
352 self._cache[n.locator] = n
355 def set(self, slot, blob):
360 if e.errno == errno.ENOMEM:
361 # Reduce max slots to current - 4, cap cache and retry
362 with self._cache_lock:
363 self._max_slots = max(4, len(self._cache) - 4)
364 elif e.errno == errno.ENOSPC:
365 # Reduce disk max space to current - 256 MiB, cap cache and retry
366 with self._cache_lock:
367 sm = sum([st.size() for st in self._cache.values()])
368 self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
369 elif e.errno == errno.ENODEV:
370 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
371 except Exception as e:
374 # Check if we should evict things from the cache. Either
375 # because we added a new thing or there was an error and
376 # we possibly adjusted the limits down, so we might need
377 # to push something out.
381 # Only gets here if there was an error the first time. The
382 # exception handler adjusts limits downward in some cases
383 # to free up resources, which would make the operation
386 except Exception as e:
387 # It failed again. Give up.
389 raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
393 class Counter(object):
394 def __init__(self, v=0):
395 self._lk = threading.Lock()
407 class KeepClient(object):
408 DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
409 DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
411 class KeepService(PyCurlHelper):
412 """Make requests to a single Keep service, and track results.
414 A KeepService is intended to last long enough to perform one
415 transaction (GET or PUT) against one Keep service. This can
416 involve calling either get() or put() multiple times in order
417 to retry after transient failures. However, calling both get()
418 and put() on a single instance -- or using the same instance
419 to access two different Keep services -- will not produce
426 arvados.errors.HttpError,
429 def __init__(self, root, user_agent_pool=queue.LifoQueue(),
431 download_counter=None,
434 super(KeepClient.KeepService, self).__init__()
436 self._user_agent_pool = user_agent_pool
437 self._result = {'error': None}
441 self.get_headers = {'Accept': 'application/octet-stream'}
442 self.get_headers.update(headers)
443 self.put_headers = headers
444 self.upload_counter = upload_counter
445 self.download_counter = download_counter
446 self.insecure = insecure
449 """Is it worth attempting a request?"""
453 """Did the request succeed or encounter permanent failure?"""
454 return self._result['error'] == False or not self._usable
456 def last_result(self):
459 def _get_user_agent(self):
461 return self._user_agent_pool.get(block=False)
465 def _put_user_agent(self, ua):
468 self._user_agent_pool.put(ua, block=False)
472 def get(self, locator, method="GET", timeout=None):
473 # locator is a KeepLocator object.
474 url = self.root + str(locator)
475 _logger.debug("Request: %s %s", method, url)
476 curl = self._get_user_agent()
479 with timer.Timer() as t:
481 response_body = BytesIO()
482 curl.setopt(pycurl.NOSIGNAL, 1)
483 curl.setopt(pycurl.OPENSOCKETFUNCTION,
484 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
485 curl.setopt(pycurl.URL, url.encode('utf-8'))
486 curl.setopt(pycurl.HTTPHEADER, [
487 '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
488 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
489 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
491 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
492 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
494 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
496 curl.setopt(pycurl.NOBODY, True)
498 curl.setopt(pycurl.HTTPGET, True)
499 self._setcurltimeouts(curl, timeout, method=="HEAD")
503 except Exception as e:
504 raise arvados.errors.HttpError(0, str(e))
510 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
511 'body': response_body.getvalue(),
512 'headers': self._headers,
516 ok = retry.check_http_response_success(self._result['status_code'])
518 self._result['error'] = arvados.errors.HttpError(
519 self._result['status_code'],
520 self._headers.get('x-status-line', 'Error'))
521 except self.HTTP_ERRORS as e:
525 self._usable = ok != False
526 if self._result.get('status_code', None):
527 # The client worked well enough to get an HTTP status
528 # code, so presumably any problems are just on the
529 # server side and it's OK to reuse the client.
530 self._put_user_agent(curl)
532 # Don't return this client to the pool, in case it's
536 _logger.debug("Request fail: GET %s => %s: %s",
537 url, type(self._result['error']), str(self._result['error']))
540 _logger.info("HEAD %s: %s bytes",
541 self._result['status_code'],
542 self._result.get('content-length'))
543 if self._result['headers'].get('x-keep-locator'):
544 # This is a response to a remote block copy request, return
545 # the local copy block locator.
546 return self._result['headers'].get('x-keep-locator')
549 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
550 self._result['status_code'],
551 len(self._result['body']),
553 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
555 if self.download_counter:
556 self.download_counter.add(len(self._result['body']))
557 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
558 if resp_md5 != locator.md5sum:
559 _logger.warning("Checksum fail: md5(%s) = %s",
561 self._result['error'] = arvados.errors.HttpError(
564 return self._result['body']
566 def put(self, hash_s, body, timeout=None, headers={}):
567 put_headers = copy.copy(self.put_headers)
568 put_headers.update(headers)
569 url = self.root + hash_s
570 _logger.debug("Request: PUT %s", url)
571 curl = self._get_user_agent()
574 with timer.Timer() as t:
576 body_reader = BytesIO(body)
577 response_body = BytesIO()
578 curl.setopt(pycurl.NOSIGNAL, 1)
579 curl.setopt(pycurl.OPENSOCKETFUNCTION,
580 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
581 curl.setopt(pycurl.URL, url.encode('utf-8'))
582 # Using UPLOAD tells cURL to wait for a "go ahead" from the
583 # Keep server (in the form of a HTTP/1.1 "100 Continue"
584 # response) instead of sending the request body immediately.
585 # This allows the server to reject the request if the request
586 # is invalid or the server is read-only, without waiting for
587 # the client to send the entire block.
588 curl.setopt(pycurl.UPLOAD, True)
589 curl.setopt(pycurl.INFILESIZE, len(body))
590 curl.setopt(pycurl.READFUNCTION, body_reader.read)
591 curl.setopt(pycurl.HTTPHEADER, [
592 '{}: {}'.format(k,v) for k,v in put_headers.items()])
593 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
594 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
596 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
597 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
599 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
600 self._setcurltimeouts(curl, timeout)
603 except Exception as e:
604 raise arvados.errors.HttpError(0, str(e))
610 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
611 'body': response_body.getvalue().decode('utf-8'),
612 'headers': self._headers,
615 ok = retry.check_http_response_success(self._result['status_code'])
617 self._result['error'] = arvados.errors.HttpError(
618 self._result['status_code'],
619 self._headers.get('x-status-line', 'Error'))
620 except self.HTTP_ERRORS as e:
624 self._usable = ok != False # still usable if ok is True or None
625 if self._result.get('status_code', None):
626 # Client is functional. See comment in get().
627 self._put_user_agent(curl)
631 _logger.debug("Request fail: PUT %s => %s: %s",
632 url, type(self._result['error']), str(self._result['error']))
634 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
635 self._result['status_code'],
638 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
639 if self.upload_counter:
640 self.upload_counter.add(len(body))
644 class KeepWriterQueue(queue.Queue):
645 def __init__(self, copies, classes=[]):
646 queue.Queue.__init__(self) # Old-style superclass
647 self.wanted_copies = copies
648 self.wanted_storage_classes = classes
649 self.successful_copies = 0
650 self.confirmed_storage_classes = {}
652 self.storage_classes_tracking = True
653 self.queue_data_lock = threading.RLock()
654 self.pending_tries = max(copies, len(classes))
655 self.pending_tries_notification = threading.Condition()
657 def write_success(self, response, replicas_nr, classes_confirmed):
658 with self.queue_data_lock:
659 self.successful_copies += replicas_nr
660 if classes_confirmed is None:
661 self.storage_classes_tracking = False
662 elif self.storage_classes_tracking:
663 for st_class, st_copies in classes_confirmed.items():
665 self.confirmed_storage_classes[st_class] += st_copies
667 self.confirmed_storage_classes[st_class] = st_copies
668 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
669 self.response = response
670 with self.pending_tries_notification:
671 self.pending_tries_notification.notify_all()
673 def write_fail(self, ks):
674 with self.pending_tries_notification:
675 self.pending_tries += 1
676 self.pending_tries_notification.notify()
678 def pending_copies(self):
679 with self.queue_data_lock:
680 return self.wanted_copies - self.successful_copies
682 def satisfied_classes(self):
683 with self.queue_data_lock:
684 if not self.storage_classes_tracking:
685 # Notifies disabled storage classes expectation to
688 return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
690 def pending_classes(self):
691 with self.queue_data_lock:
692 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
694 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
695 for st_class, st_copies in self.confirmed_storage_classes.items():
696 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
697 unsatisfied_classes.remove(st_class)
698 return unsatisfied_classes
700 def get_next_task(self):
701 with self.pending_tries_notification:
703 if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
704 # This notify_all() is unnecessary --
705 # write_success() already called notify_all()
706 # when pending<1 became true, so it's not
707 # possible for any other thread to be in
708 # wait() now -- but it's cheap insurance
709 # against deadlock so we do it anyway:
710 self.pending_tries_notification.notify_all()
711 # Drain the queue and then raise Queue.Empty
715 elif self.pending_tries > 0:
716 service, service_root = self.get_nowait()
717 if service.finished():
720 self.pending_tries -= 1
721 return service, service_root
723 self.pending_tries_notification.notify_all()
726 self.pending_tries_notification.wait()
729 class KeepWriterThreadPool(object):
730 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
731 self.total_task_nr = 0
732 if (not max_service_replicas) or (max_service_replicas >= copies):
735 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
736 _logger.debug("Pool max threads is %d", num_threads)
738 self.queue = KeepClient.KeepWriterQueue(copies, classes)
740 for _ in range(num_threads):
741 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
742 self.workers.append(w)
744 def add_task(self, ks, service_root):
745 self.queue.put((ks, service_root))
746 self.total_task_nr += 1
749 return self.queue.successful_copies, self.queue.satisfied_classes()
753 for worker in self.workers:
755 # Wait for finished work
759 return self.queue.response
762 class KeepWriterThread(threading.Thread):
763 class TaskFailed(RuntimeError): pass
765 def __init__(self, queue, data, data_hash, timeout=None):
766 super(KeepClient.KeepWriterThread, self).__init__()
767 self.timeout = timeout
770 self.data_hash = data_hash
776 service, service_root = self.queue.get_next_task()
780 locator, copies, classes = self.do_task(service, service_root)
781 except Exception as e:
782 if not isinstance(e, self.TaskFailed):
783 _logger.exception("Exception in KeepWriterThread")
784 self.queue.write_fail(service)
786 self.queue.write_success(locator, copies, classes)
788 self.queue.task_done()
790 def do_task(self, service, service_root):
791 classes = self.queue.pending_classes()
795 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
796 success = bool(service.put(self.data_hash,
798 timeout=self.timeout,
800 result = service.last_result()
803 if result.get('status_code'):
804 _logger.debug("Request fail: PUT %s => %s %s",
806 result.get('status_code'),
808 raise self.TaskFailed()
810 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
811 str(threading.current_thread()),
816 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
817 except (KeyError, ValueError):
820 classes_confirmed = {}
822 scch = result['headers']['x-keep-storage-classes-confirmed']
823 for confirmation in scch.replace(' ', '').split(','):
824 if '=' in confirmation:
825 stored_class, stored_copies = confirmation.split('=')[:2]
826 classes_confirmed[stored_class] = int(stored_copies)
827 except (KeyError, ValueError):
828 # Storage classes confirmed header missing or corrupt
829 classes_confirmed = None
831 return result['body'].strip(), replicas_stored, classes_confirmed
834 def __init__(self, api_client=None, proxy=None,
835 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
836 api_token=None, local_store=None, block_cache=None,
837 num_retries=10, session=None, num_prefetch_threads=None):
838 """Initialize a new KeepClient.
842 The API client to use to find Keep services. If not
843 provided, KeepClient will build one from available Arvados
847 If specified, this KeepClient will send requests to this Keep
848 proxy. Otherwise, KeepClient will fall back to the setting of the
849 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
850 If you want to KeepClient does not use a proxy, pass in an empty
854 The initial timeout (in seconds) for HTTP requests to Keep
855 non-proxy servers. A tuple of three floats is interpreted as
856 (connection_timeout, read_timeout, minimum_bandwidth). A connection
857 will be aborted if the average traffic rate falls below
858 minimum_bandwidth bytes per second over an interval of read_timeout
859 seconds. Because timeouts are often a result of transient server
860 load, the actual connection timeout will be increased by a factor
861 of two on each retry.
862 Default: (2, 256, 32768).
865 The initial timeout (in seconds) for HTTP requests to
866 Keep proxies. A tuple of three floats is interpreted as
867 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
868 described above for adjusting connection timeouts on retry also
870 Default: (20, 256, 32768).
873 If you're not using an API client, but only talking
874 directly to a Keep proxy, this parameter specifies an API token
875 to authenticate Keep requests. It is an error to specify both
876 api_client and api_token. If you specify neither, KeepClient
877 will use one available from the Arvados configuration.
880 If specified, this KeepClient will bypass Keep
881 services, and save data to the named directory. If unspecified,
882 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
883 environment variable. If you want to ensure KeepClient does not
884 use local storage, pass in an empty string. This is primarily
885 intended to mock a server for testing.
888 The default number of times to retry failed requests.
889 This will be used as the default num_retries value when get() and
890 put() are called. Default 10.
892 self.lock = threading.Lock()
894 if config.get('ARVADOS_KEEP_SERVICES'):
895 proxy = config.get('ARVADOS_KEEP_SERVICES')
897 proxy = config.get('ARVADOS_KEEP_PROXY')
898 if api_token is None:
899 if api_client is None:
900 api_token = config.get('ARVADOS_API_TOKEN')
902 api_token = api_client.api_token
903 elif api_client is not None:
905 "can't build KeepClient with both API client and token")
906 if local_store is None:
907 local_store = os.environ.get('KEEP_LOCAL_STORE')
909 if api_client is None:
910 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
912 self.insecure = api_client.insecure
914 self.block_cache = block_cache if block_cache else KeepBlockCache()
915 self.timeout = timeout
916 self.proxy_timeout = proxy_timeout
917 self._user_agent_pool = queue.LifoQueue()
918 self.upload_counter = Counter()
919 self.download_counter = Counter()
920 self.put_counter = Counter()
921 self.get_counter = Counter()
922 self.hits_counter = Counter()
923 self.misses_counter = Counter()
924 self._storage_classes_unsupported_warning = False
925 self._default_classes = []
926 self.num_prefetch_threads = num_prefetch_threads or 2
927 self._prefetch_queue = None
928 self._prefetch_threads = None
931 self.local_store = local_store
932 self.head = self.local_store_head
933 self.get = self.local_store_get
934 self.put = self.local_store_put
936 self.num_retries = num_retries
937 self.max_replicas_per_service = None
939 proxy_uris = proxy.split()
940 for i in range(len(proxy_uris)):
941 if not proxy_uris[i].endswith('/'):
944 url = urllib.parse.urlparse(proxy_uris[i])
945 if not (url.scheme and url.netloc):
946 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
947 self.api_token = api_token
948 self._gateway_services = {}
949 self._keep_services = [{
950 'uuid': "00000-bi6l4-%015d" % idx,
951 'service_type': 'proxy',
952 '_service_root': uri,
953 } for idx, uri in enumerate(proxy_uris)]
954 self._writable_services = self._keep_services
955 self.using_proxy = True
956 self._static_services_list = True
958 # It's important to avoid instantiating an API client
959 # unless we actually need one, for testing's sake.
960 if api_client is None:
961 api_client = arvados.api('v1')
962 self.api_client = api_client
963 self.api_token = api_client.api_token
964 self._gateway_services = {}
965 self._keep_services = None
966 self._writable_services = None
967 self.using_proxy = None
968 self._static_services_list = False
970 self._default_classes = [
971 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
973 # We're talking to an old cluster
976 def current_timeout(self, attempt_number):
977 """Return the appropriate timeout to use for this client.
979 The proxy timeout setting if the backend service is currently a proxy,
980 the regular timeout setting otherwise. The `attempt_number` indicates
981 how many times the operation has been tried already (starting from 0
982 for the first try), and scales the connection timeout portion of the
983 return value accordingly.
986 # TODO(twp): the timeout should be a property of a
987 # KeepService, not a KeepClient. See #4488.
988 t = self.proxy_timeout if self.using_proxy else self.timeout
990 return (t[0] * (1 << attempt_number), t[1])
992 return (t[0] * (1 << attempt_number), t[1], t[2])
993 def _any_nondisk_services(self, service_list):
994 return any(ks.get('service_type', 'disk') != 'disk'
995 for ks in service_list)
997 def build_services_list(self, force_rebuild=False):
998 if (self._static_services_list or
999 (self._keep_services and not force_rebuild)):
1003 keep_services = self.api_client.keep_services().accessible()
1004 except Exception: # API server predates Keep services.
1005 keep_services = self.api_client.keep_disks().list()
1007 # Gateway services are only used when specified by UUID,
1008 # so there's nothing to gain by filtering them by
1010 self._gateway_services = {ks['uuid']: ks for ks in
1011 keep_services.execute()['items']}
1012 if not self._gateway_services:
1013 raise arvados.errors.NoKeepServersError()
1015 # Precompute the base URI for each service.
1016 for r in self._gateway_services.values():
1017 host = r['service_host']
1018 if not host.startswith('[') and host.find(':') >= 0:
1019 # IPv6 URIs must be formatted like http://[::1]:80/...
1020 host = '[' + host + ']'
1021 r['_service_root'] = "{}://{}:{:d}/".format(
1022 'https' if r['service_ssl_flag'] else 'http',
1026 _logger.debug(str(self._gateway_services))
1027 self._keep_services = [
1028 ks for ks in self._gateway_services.values()
1029 if not ks.get('service_type', '').startswith('gateway:')]
1030 self._writable_services = [ks for ks in self._keep_services
1031 if not ks.get('read_only')]
1033 # For disk type services, max_replicas_per_service is 1
1034 # It is unknown (unlimited) for other service types.
1035 if self._any_nondisk_services(self._writable_services):
1036 self.max_replicas_per_service = None
1038 self.max_replicas_per_service = 1
1040 def _service_weight(self, data_hash, service_uuid):
1041 """Compute the weight of a Keep service endpoint for a data
1042 block with a known hash.
1044 The weight is md5(h + u) where u is the last 15 characters of
1045 the service endpoint's UUID.
1047 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1049 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1050 """Return an array of Keep service endpoints, in the order in
1051 which they should be probed when reading or writing data with
1052 the given hash+hints.
1054 self.build_services_list(force_rebuild)
1057 # Use the services indicated by the given +K@... remote
1058 # service hints, if any are present and can be resolved to a
1060 for hint in locator.hints:
1061 if hint.startswith('K@'):
1063 sorted_roots.append(
1064 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1065 elif len(hint) == 29:
1066 svc = self._gateway_services.get(hint[2:])
1068 sorted_roots.append(svc['_service_root'])
1070 # Sort the available local services by weight (heaviest first)
1071 # for this locator, and return their service_roots (base URIs)
1073 use_services = self._keep_services
1075 use_services = self._writable_services
1076 self.using_proxy = self._any_nondisk_services(use_services)
1077 sorted_roots.extend([
1078 svc['_service_root'] for svc in sorted(
1081 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1082 _logger.debug("{}: {}".format(locator, sorted_roots))
1085 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1086 # roots_map is a dictionary, mapping Keep service root strings
1087 # to KeepService objects. Poll for Keep services, and add any
1088 # new ones to roots_map. Return the current list of local
1090 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1091 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1092 for root in local_roots:
1093 if root not in roots_map:
1094 roots_map[root] = self.KeepService(
1095 root, self._user_agent_pool,
1096 upload_counter=self.upload_counter,
1097 download_counter=self.download_counter,
1099 insecure=self.insecure)
1103 def _check_loop_result(result):
1104 # KeepClient RetryLoops should save results as a 2-tuple: the
1105 # actual result of the request, and the number of servers available
1106 # to receive the request this round.
1107 # This method returns True if there's a real result, False if
1108 # there are no more servers available, otherwise None.
1109 if isinstance(result, Exception):
1111 result, tried_server_count = result
1112 if (result is not None) and (result is not False):
1114 elif tried_server_count < 1:
1115 _logger.info("No more Keep services to try; giving up")
1120 def get_from_cache(self, loc_s):
1121 """Fetch a block only if is in the cache, otherwise return None."""
1122 locator = KeepLocator(loc_s)
1123 slot = self.block_cache.get(locator.md5sum)
1124 if slot is not None and slot.ready.is_set():
1129 def refresh_signature(self, loc):
1130 """Ask Keep to get the remote block and return its local signature"""
1131 now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1132 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1135 def head(self, loc_s, **kwargs):
1136 return self._get_or_head(loc_s, method="HEAD", **kwargs)
1139 def get(self, loc_s, **kwargs):
1140 return self._get_or_head(loc_s, method="GET", **kwargs)
1142 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1143 """Get data from Keep.
1145 This method fetches one or more blocks of data from Keep. It
1146 sends a request each Keep service registered with the API
1147 server (or the proxy provided when this client was
1148 instantiated), then each service named in location hints, in
1149 sequence. As soon as one service provides the data, it's
1153 * loc_s: A string of one or more comma-separated locators to fetch.
1154 This method returns the concatenation of these blocks.
1155 * num_retries: The number of times to retry GET requests to
1156 *each* Keep server if it returns temporary failures, with
1157 exponential backoff. Note that, in each loop, the method may try
1158 to fetch data from every available Keep service, along with any
1159 that are named in location hints in the locator. The default value
1160 is set when the KeepClient is initialized.
1163 return ''.join(self.get(x) for x in loc_s.split(','))
1165 self.get_counter.add(1)
1167 request_id = (request_id or
1168 (hasattr(self, 'api_client') and self.api_client.request_id) or
1169 arvados.util.new_request_id())
1172 headers['X-Request-Id'] = request_id
1177 locator = KeepLocator(loc_s)
1180 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1182 # Fresh and empty "first time it is used" slot
1185 # this is request for a prefetch to fill in
1186 # the cache, don't need to wait for the
1187 # result, so if it is already in flight return
1188 # immediately. Clear 'slot' to prevent
1189 # finally block from calling slot.set()
1194 if blob is not None:
1195 self.hits_counter.add(1)
1198 # If blob is None, this means either
1200 # (a) another thread was fetching this block and
1201 # failed with an error or
1203 # (b) cache thrashing caused the slot to be
1204 # evicted (content set to None) by another thread
1205 # between the call to reserve_cache() and get().
1207 # We'll handle these cases by reserving a new slot
1208 # and then doing a full GET request.
1211 self.misses_counter.add(1)
1213 # If the locator has hints specifying a prefix (indicating a
1214 # remote keepproxy) or the UUID of a local gateway service,
1215 # read data from the indicated service(s) instead of the usual
1216 # list of local disk services.
1217 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1218 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1219 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1220 for hint in locator.hints if (
1221 hint.startswith('K@') and
1223 self._gateway_services.get(hint[2:])
1225 # Map root URLs to their KeepService objects.
1227 root: self.KeepService(root, self._user_agent_pool,
1228 upload_counter=self.upload_counter,
1229 download_counter=self.download_counter,
1231 insecure=self.insecure)
1232 for root in hint_roots
1235 # See #3147 for a discussion of the loop implementation. Highlights:
1236 # * Refresh the list of Keep services after each failure, in case
1237 # it's being updated.
1238 # * Retry until we succeed, we're out of retries, or every available
1239 # service has returned permanent failure.
1242 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1244 for tries_left in loop:
1246 sorted_roots = self.map_new_services(
1248 force_rebuild=(tries_left < num_retries),
1249 need_writable=False,
1251 except Exception as error:
1252 loop.save_result(error)
1255 # Query KeepService objects that haven't returned
1256 # permanent failure, in our specified shuffle order.
1257 services_to_try = [roots_map[root]
1258 for root in sorted_roots
1259 if roots_map[root].usable()]
1260 for keep_service in services_to_try:
1261 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1262 if blob is not None:
1264 loop.save_result((blob, len(services_to_try)))
1266 # Always cache the result, then return it if we succeeded.
1270 if slot is not None:
1271 self.block_cache.set(slot, blob)
1273 # Q: Including 403 is necessary for the Keep tests to continue
1274 # passing, but maybe they should expect KeepReadError instead?
1275 not_founds = sum(1 for key in sorted_roots
1276 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1277 service_errors = ((key, roots_map[key].last_result()['error'])
1278 for key in sorted_roots)
1280 raise arvados.errors.KeepReadError(
1281 "[{}] failed to read {}: no Keep services available ({})".format(
1282 request_id, loc_s, loop.last_result()))
1283 elif not_founds == len(sorted_roots):
1284 raise arvados.errors.NotFoundError(
1285 "[{}] {} not found".format(request_id, loc_s), service_errors)
1287 raise arvados.errors.KeepReadError(
1288 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1291 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1292 """Save data in Keep.
1294 This method will get a list of Keep services from the API server, and
1295 send the data to each one simultaneously in a new thread. Once the
1296 uploads are finished, if enough copies are saved, this method returns
1297 the most recent HTTP response body. If requests fail to upload
1298 enough copies, this method raises KeepWriteError.
1301 * data: The string of data to upload.
1302 * copies: The number of copies that the user requires be saved.
1304 * num_retries: The number of times to retry PUT requests to
1305 *each* Keep server if it returns temporary failures, with
1306 exponential backoff. The default value is set when the
1307 KeepClient is initialized.
1308 * classes: An optional list of storage class names where copies should
1312 classes = classes or self._default_classes
1314 if not isinstance(data, bytes):
1315 data = data.encode()
1317 self.put_counter.add(1)
1319 data_hash = hashlib.md5(data).hexdigest()
1320 loc_s = data_hash + '+' + str(len(data))
1323 locator = KeepLocator(loc_s)
1325 request_id = (request_id or
1326 (hasattr(self, 'api_client') and self.api_client.request_id) or
1327 arvados.util.new_request_id())
1329 'X-Request-Id': request_id,
1330 'X-Keep-Desired-Replicas': str(copies),
1333 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1337 for tries_left in loop:
1339 sorted_roots = self.map_new_services(
1341 force_rebuild=(tries_left < num_retries),
1344 except Exception as error:
1345 loop.save_result(error)
1348 pending_classes = []
1349 if done_classes is not None:
1350 pending_classes = list(set(classes) - set(done_classes))
1351 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1352 data_hash=data_hash,
1353 copies=copies - done_copies,
1354 max_service_replicas=self.max_replicas_per_service,
1355 timeout=self.current_timeout(num_retries - tries_left),
1356 classes=pending_classes)
1357 for service_root, ks in [(root, roots_map[root])
1358 for root in sorted_roots]:
1361 writer_pool.add_task(ks, service_root)
1363 pool_copies, pool_classes = writer_pool.done()
1364 done_copies += pool_copies
1365 if (done_classes is not None) and (pool_classes is not None):
1366 done_classes += pool_classes
1368 (done_copies >= copies and set(done_classes) == set(classes),
1369 writer_pool.total_task_nr))
1371 # Old keepstore contacted without storage classes support:
1372 # success is determined only by successful copies.
1374 # Disable storage classes tracking from this point forward.
1375 if not self._storage_classes_unsupported_warning:
1376 self._storage_classes_unsupported_warning = True
1377 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1380 (done_copies >= copies, writer_pool.total_task_nr))
1383 return writer_pool.response()
1385 raise arvados.errors.KeepWriteError(
1386 "[{}] failed to write {}: no Keep services available ({})".format(
1387 request_id, data_hash, loop.last_result()))
1389 service_errors = ((key, roots_map[key].last_result()['error'])
1390 for key in sorted_roots
1391 if roots_map[key].last_result()['error'])
1392 raise arvados.errors.KeepWriteError(
1393 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1394 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1396 def _block_prefetch_worker(self):
1397 """The background downloader thread."""
1400 b = self._prefetch_queue.get()
1403 self.get(b, prefetch=True)
1405 _logger.exception("Exception doing block prefetch")
1407 def _start_prefetch_threads(self):
1408 if self._prefetch_threads is None:
1410 if self._prefetch_threads is not None:
1412 self._prefetch_queue = queue.Queue()
1413 self._prefetch_threads = []
1414 for i in range(0, self.num_prefetch_threads):
1415 thread = threading.Thread(target=self._block_prefetch_worker)
1416 self._prefetch_threads.append(thread)
1417 thread.daemon = True
1420 def block_prefetch(self, locator):
1422 This relies on the fact that KeepClient implements a block cache,
1423 so repeated requests for the same block will not result in repeated
1424 downloads (unless the block is evicted from the cache.) This method
1428 if self.block_cache.get(locator) is not None:
1431 self._start_prefetch_threads()
1432 self._prefetch_queue.put(locator)
1434 def stop_prefetch_threads(self):
1436 if self._prefetch_threads is not None:
1437 for t in self._prefetch_threads:
1438 self._prefetch_queue.put(None)
1439 for t in self._prefetch_threads:
1441 self._prefetch_threads = None
1442 self._prefetch_queue = None
1444 def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1445 """A stub for put().
1447 This method is used in place of the real put() method when
1448 using local storage (see constructor's local_store argument).
1450 copies and num_retries arguments are ignored: they are here
1451 only for the sake of offering the same call signature as
1454 Data stored this way can be retrieved via local_store_get().
1456 md5 = hashlib.md5(data).hexdigest()
1457 locator = '%s+%d' % (md5, len(data))
1458 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1460 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1461 os.path.join(self.local_store, md5))
1464 def local_store_get(self, loc_s, num_retries=None):
1465 """Companion to local_store_put()."""
1467 locator = KeepLocator(loc_s)
1469 raise arvados.errors.NotFoundError(
1470 "Invalid data locator: '%s'" % loc_s)
1471 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1473 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1476 def local_store_head(self, loc_s, num_retries=None):
1477 """Companion to local_store_put()."""
1479 locator = KeepLocator(loc_s)
1481 raise arvados.errors.NotFoundError(
1482 "Invalid data locator: '%s'" % loc_s)
1483 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1485 if os.path.exists(os.path.join(self.local_store, locator.md5sum)):