1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
26 from io import BytesIO
29 import arvados.config as config
31 import arvados.retry as retry
33 import arvados.diskcache
34 from arvados._pycurlhelper import PyCurlHelper
37 _logger = logging.getLogger('arvados.keep')
38 global_client_object = None
40 # Monkey patch TCP constants when not available (apple). Values sourced from:
41 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
42 if sys.platform == 'darwin':
43 if not hasattr(socket, 'TCP_KEEPALIVE'):
44 socket.TCP_KEEPALIVE = 0x010
45 if not hasattr(socket, 'TCP_KEEPINTVL'):
46 socket.TCP_KEEPINTVL = 0x101
47 if not hasattr(socket, 'TCP_KEEPCNT'):
48 socket.TCP_KEEPCNT = 0x102
50 class KeepLocator(object):
51 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
52 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
54 def __init__(self, locator_str):
57 self._perm_expiry = None
58 pieces = iter(locator_str.split('+'))
59 self.md5sum = next(pieces)
61 self.size = int(next(pieces))
65 if self.HINT_RE.match(hint) is None:
66 raise ValueError("invalid hint format: {}".format(hint))
67 elif hint.startswith('A'):
68 self.parse_permission_hint(hint)
70 self.hints.append(hint)
75 for s in [self.md5sum, self.size,
76 self.permission_hint()] + self.hints
80 if self.size is not None:
81 return "%s+%i" % (self.md5sum, self.size)
85 def _make_hex_prop(name, length):
86 # Build and return a new property with the given name that
87 # must be a hex string of the given length.
88 data_name = '_{}'.format(name)
90 return getattr(self, data_name)
91 def setter(self, hex_str):
92 if not arvados.util.is_hex(hex_str, length):
93 raise ValueError("{} is not a {}-digit hex string: {!r}".
94 format(name, length, hex_str))
95 setattr(self, data_name, hex_str)
96 return property(getter, setter)
98 md5sum = _make_hex_prop('md5sum', 32)
99 perm_sig = _make_hex_prop('perm_sig', 40)
102 def perm_expiry(self):
103 return self._perm_expiry
106 def perm_expiry(self, value):
107 if not arvados.util.is_hex(value, 1, 8):
109 "permission timestamp must be a hex Unix timestamp: {}".
111 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
113 def permission_hint(self):
114 data = [self.perm_sig, self.perm_expiry]
117 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
118 return "A{}@{:08x}".format(*data)
120 def parse_permission_hint(self, s):
122 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
124 raise ValueError("bad permission hint {}".format(s))
126 def permission_expired(self, as_of_dt=None):
127 if self.perm_expiry is None:
129 elif as_of_dt is None:
130 as_of_dt = datetime.datetime.now()
131 return self.perm_expiry <= as_of_dt
135 """Simple interface to a global KeepClient object.
137 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
138 own API client. The global KeepClient will build an API client from the
139 current Arvados configuration, which may not match the one you built.
144 def global_client_object(cls):
145 global global_client_object
146 # Previously, KeepClient would change its behavior at runtime based
147 # on these configuration settings. We simulate that behavior here
148 # by checking the values and returning a new KeepClient if any of
150 key = (config.get('ARVADOS_API_HOST'),
151 config.get('ARVADOS_API_TOKEN'),
152 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
153 config.get('ARVADOS_KEEP_PROXY'),
154 os.environ.get('KEEP_LOCAL_STORE'))
155 if (global_client_object is None) or (cls._last_key != key):
156 global_client_object = KeepClient()
158 return global_client_object
161 def get(locator, **kwargs):
162 return Keep.global_client_object().get(locator, **kwargs)
165 def put(data, **kwargs):
166 return Keep.global_client_object().put(data, **kwargs)
168 class KeepBlockCache(object):
169 def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
170 self.cache_max = cache_max
171 self._cache = collections.OrderedDict()
172 self._cache_lock = threading.Lock()
173 self._max_slots = max_slots
174 self._disk_cache = disk_cache
175 self._disk_cache_dir = disk_cache_dir
176 self._cache_updating = threading.Condition(self._cache_lock)
178 if self._disk_cache and self._disk_cache_dir is None:
179 self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
180 os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
182 if self._max_slots == 0:
184 # Each block uses two file descriptors, one used to
185 # open it initially and hold the flock(), and a second
186 # hidden one used by mmap().
188 # Set max slots to 1/8 of maximum file handles. This
189 # means we'll use at most 1/4 of total file handles.
191 # NOFILE typically defaults to 1024 on Linux so this
192 # is 128 slots (256 file handles), which means we can
193 # cache up to 8 GiB of 64 MiB blocks. This leaves
194 # 768 file handles for sockets and other stuff.
196 # When we want the ability to have more cache (e.g. in
197 # arv-mount) we'll increase rlimit before calling
199 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
202 self._max_slots = 512
204 if self.cache_max == 0:
206 fs = os.statvfs(self._disk_cache_dir)
207 # Calculation of available space incorporates existing cache usage
208 existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
209 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
210 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
212 # 10% of total disk size
213 # 25% of available space
215 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
218 self.cache_max = (256 * 1024 * 1024)
220 self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
224 self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
225 for slot in self._cache.values():
226 self.cache_total += slot.size()
229 class CacheSlot(object):
230 __slots__ = ("locator", "ready", "content")
232 def __init__(self, locator):
233 self.locator = locator
234 self.ready = threading.Event()
241 def set(self, value):
242 if self.content is not None:
249 if self.content is None:
252 return len(self.content)
258 def _resize_cache(self, cache_max, max_slots):
259 # Try and make sure the contents of the cache do not exceed
260 # the supplied maximums.
262 if self.cache_total <= cache_max and len(self._cache) <= max_slots:
265 _evict_candidates = collections.deque(self._cache.values())
266 while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots):
267 slot = _evict_candidates.popleft()
268 if not slot.ready.is_set():
273 self.cache_total -= sz
274 del self._cache[slot.locator]
278 '''Cap the cache size to self.cache_max'''
279 with self._cache_updating:
280 self._resize_cache(self.cache_max, self._max_slots)
281 self._cache_updating.notify_all()
283 def _get(self, locator):
284 # Test if the locator is already in the cache
285 if locator in self._cache:
286 n = self._cache[locator]
287 if n.ready.is_set() and n.content is None:
288 del self._cache[n.locator]
290 self._cache.move_to_end(locator)
293 # see if it exists on disk
294 n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
296 self._cache[n.locator] = n
297 self.cache_total += n.size()
301 def get(self, locator):
302 with self._cache_lock:
303 return self._get(locator)
305 def reserve_cache(self, locator):
306 '''Reserve a cache slot for the specified locator,
307 or return the existing slot.'''
308 with self._cache_updating:
309 n = self._get(locator)
313 # Add a new cache slot for the locator
314 self._resize_cache(self.cache_max, self._max_slots-1)
315 while len(self._cache) >= self._max_slots:
316 # If there isn't a slot available, need to wait
317 # for something to happen that releases one of the
318 # cache slots. Idle for 200 ms or woken up by
320 self._cache_updating.wait(timeout=0.2)
321 self._resize_cache(self.cache_max, self._max_slots-1)
324 n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
326 n = KeepBlockCache.CacheSlot(locator)
327 self._cache[n.locator] = n
330 def set(self, slot, blob):
333 self.cache_total += slot.size()
336 if e.errno == errno.ENOMEM:
337 # Reduce max slots to current - 4, cap cache and retry
338 with self._cache_lock:
339 self._max_slots = max(4, len(self._cache) - 4)
340 elif e.errno == errno.ENOSPC:
341 # Reduce disk max space to current - 256 MiB, cap cache and retry
342 with self._cache_lock:
343 sm = sum(st.size() for st in self._cache.values())
344 self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
345 elif e.errno == errno.ENODEV:
346 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
347 except Exception as e:
350 # Check if we should evict things from the cache. Either
351 # because we added a new thing or there was an error and
352 # we possibly adjusted the limits down, so we might need
353 # to push something out.
357 # Only gets here if there was an error the first time. The
358 # exception handler adjusts limits downward in some cases
359 # to free up resources, which would make the operation
362 self.cache_total += slot.size()
363 except Exception as e:
364 # It failed again. Give up.
366 raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
370 class Counter(object):
371 def __init__(self, v=0):
372 self._lk = threading.Lock()
384 class KeepClient(object):
385 DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
386 DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
388 class KeepService(PyCurlHelper):
389 """Make requests to a single Keep service, and track results.
391 A KeepService is intended to last long enough to perform one
392 transaction (GET or PUT) against one Keep service. This can
393 involve calling either get() or put() multiple times in order
394 to retry after transient failures. However, calling both get()
395 and put() on a single instance -- or using the same instance
396 to access two different Keep services -- will not produce
403 arvados.errors.HttpError,
406 def __init__(self, root, user_agent_pool=queue.LifoQueue(),
408 download_counter=None,
411 super(KeepClient.KeepService, self).__init__()
413 self._user_agent_pool = user_agent_pool
414 self._result = {'error': None}
418 self.get_headers = {'Accept': 'application/octet-stream'}
419 self.get_headers.update(headers)
420 self.put_headers = headers
421 self.upload_counter = upload_counter
422 self.download_counter = download_counter
423 self.insecure = insecure
426 """Is it worth attempting a request?"""
430 """Did the request succeed or encounter permanent failure?"""
431 return self._result['error'] == False or not self._usable
433 def last_result(self):
436 def _get_user_agent(self):
438 return self._user_agent_pool.get(block=False)
442 def _put_user_agent(self, ua):
445 self._user_agent_pool.put(ua, block=False)
449 def get(self, locator, method="GET", timeout=None):
450 # locator is a KeepLocator object.
451 url = self.root + str(locator)
452 _logger.debug("Request: %s %s", method, url)
453 curl = self._get_user_agent()
456 with timer.Timer() as t:
458 response_body = BytesIO()
459 curl.setopt(pycurl.NOSIGNAL, 1)
460 curl.setopt(pycurl.OPENSOCKETFUNCTION,
461 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
462 curl.setopt(pycurl.URL, url.encode('utf-8'))
463 curl.setopt(pycurl.HTTPHEADER, [
464 '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
465 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
466 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
468 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
469 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
471 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
473 curl.setopt(pycurl.NOBODY, True)
475 curl.setopt(pycurl.HTTPGET, True)
476 self._setcurltimeouts(curl, timeout, method=="HEAD")
480 except Exception as e:
481 raise arvados.errors.HttpError(0, str(e))
487 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
488 'body': response_body.getvalue(),
489 'headers': self._headers,
493 ok = retry.check_http_response_success(self._result['status_code'])
495 self._result['error'] = arvados.errors.HttpError(
496 self._result['status_code'],
497 self._headers.get('x-status-line', 'Error'))
498 except self.HTTP_ERRORS as e:
502 self._usable = ok != False
503 if self._result.get('status_code', None):
504 # The client worked well enough to get an HTTP status
505 # code, so presumably any problems are just on the
506 # server side and it's OK to reuse the client.
507 self._put_user_agent(curl)
509 # Don't return this client to the pool, in case it's
513 _logger.debug("Request fail: GET %s => %s: %s",
514 url, type(self._result['error']), str(self._result['error']))
517 _logger.info("HEAD %s: %s bytes",
518 self._result['status_code'],
519 self._result.get('content-length'))
520 if self._result['headers'].get('x-keep-locator'):
521 # This is a response to a remote block copy request, return
522 # the local copy block locator.
523 return self._result['headers'].get('x-keep-locator')
526 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
527 self._result['status_code'],
528 len(self._result['body']),
530 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
532 if self.download_counter:
533 self.download_counter.add(len(self._result['body']))
534 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
535 if resp_md5 != locator.md5sum:
536 _logger.warning("Checksum fail: md5(%s) = %s",
538 self._result['error'] = arvados.errors.HttpError(
541 return self._result['body']
543 def put(self, hash_s, body, timeout=None, headers={}):
544 put_headers = copy.copy(self.put_headers)
545 put_headers.update(headers)
546 url = self.root + hash_s
547 _logger.debug("Request: PUT %s", url)
548 curl = self._get_user_agent()
551 with timer.Timer() as t:
553 body_reader = BytesIO(body)
554 response_body = BytesIO()
555 curl.setopt(pycurl.NOSIGNAL, 1)
556 curl.setopt(pycurl.OPENSOCKETFUNCTION,
557 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
558 curl.setopt(pycurl.URL, url.encode('utf-8'))
559 # Using UPLOAD tells cURL to wait for a "go ahead" from the
560 # Keep server (in the form of a HTTP/1.1 "100 Continue"
561 # response) instead of sending the request body immediately.
562 # This allows the server to reject the request if the request
563 # is invalid or the server is read-only, without waiting for
564 # the client to send the entire block.
565 curl.setopt(pycurl.UPLOAD, True)
566 curl.setopt(pycurl.INFILESIZE, len(body))
567 curl.setopt(pycurl.READFUNCTION, body_reader.read)
568 curl.setopt(pycurl.HTTPHEADER, [
569 '{}: {}'.format(k,v) for k,v in put_headers.items()])
570 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
571 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
573 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
574 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
576 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
577 self._setcurltimeouts(curl, timeout)
580 except Exception as e:
581 raise arvados.errors.HttpError(0, str(e))
587 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
588 'body': response_body.getvalue().decode('utf-8'),
589 'headers': self._headers,
592 ok = retry.check_http_response_success(self._result['status_code'])
594 self._result['error'] = arvados.errors.HttpError(
595 self._result['status_code'],
596 self._headers.get('x-status-line', 'Error'))
597 except self.HTTP_ERRORS as e:
601 self._usable = ok != False # still usable if ok is True or None
602 if self._result.get('status_code', None):
603 # Client is functional. See comment in get().
604 self._put_user_agent(curl)
608 _logger.debug("Request fail: PUT %s => %s: %s",
609 url, type(self._result['error']), str(self._result['error']))
611 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
612 self._result['status_code'],
615 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
616 if self.upload_counter:
617 self.upload_counter.add(len(body))
621 class KeepWriterQueue(queue.Queue):
622 def __init__(self, copies, classes=[]):
623 queue.Queue.__init__(self) # Old-style superclass
624 self.wanted_copies = copies
625 self.wanted_storage_classes = classes
626 self.successful_copies = 0
627 self.confirmed_storage_classes = {}
629 self.storage_classes_tracking = True
630 self.queue_data_lock = threading.RLock()
631 self.pending_tries = max(copies, len(classes))
632 self.pending_tries_notification = threading.Condition()
634 def write_success(self, response, replicas_nr, classes_confirmed):
635 with self.queue_data_lock:
636 self.successful_copies += replicas_nr
637 if classes_confirmed is None:
638 self.storage_classes_tracking = False
639 elif self.storage_classes_tracking:
640 for st_class, st_copies in classes_confirmed.items():
642 self.confirmed_storage_classes[st_class] += st_copies
644 self.confirmed_storage_classes[st_class] = st_copies
645 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
646 self.response = response
647 with self.pending_tries_notification:
648 self.pending_tries_notification.notify_all()
650 def write_fail(self, ks):
651 with self.pending_tries_notification:
652 self.pending_tries += 1
653 self.pending_tries_notification.notify()
655 def pending_copies(self):
656 with self.queue_data_lock:
657 return self.wanted_copies - self.successful_copies
659 def satisfied_classes(self):
660 with self.queue_data_lock:
661 if not self.storage_classes_tracking:
662 # Notifies disabled storage classes expectation to
665 return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
667 def pending_classes(self):
668 with self.queue_data_lock:
669 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
671 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
672 for st_class, st_copies in self.confirmed_storage_classes.items():
673 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
674 unsatisfied_classes.remove(st_class)
675 return unsatisfied_classes
677 def get_next_task(self):
678 with self.pending_tries_notification:
680 if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
681 # This notify_all() is unnecessary --
682 # write_success() already called notify_all()
683 # when pending<1 became true, so it's not
684 # possible for any other thread to be in
685 # wait() now -- but it's cheap insurance
686 # against deadlock so we do it anyway:
687 self.pending_tries_notification.notify_all()
688 # Drain the queue and then raise Queue.Empty
692 elif self.pending_tries > 0:
693 service, service_root = self.get_nowait()
694 if service.finished():
697 self.pending_tries -= 1
698 return service, service_root
700 self.pending_tries_notification.notify_all()
703 self.pending_tries_notification.wait()
706 class KeepWriterThreadPool(object):
707 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
708 self.total_task_nr = 0
709 if (not max_service_replicas) or (max_service_replicas >= copies):
712 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
713 _logger.debug("Pool max threads is %d", num_threads)
715 self.queue = KeepClient.KeepWriterQueue(copies, classes)
717 for _ in range(num_threads):
718 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
719 self.workers.append(w)
721 def add_task(self, ks, service_root):
722 self.queue.put((ks, service_root))
723 self.total_task_nr += 1
726 return self.queue.successful_copies, self.queue.satisfied_classes()
730 for worker in self.workers:
732 # Wait for finished work
736 return self.queue.response
739 class KeepWriterThread(threading.Thread):
740 class TaskFailed(RuntimeError): pass
742 def __init__(self, queue, data, data_hash, timeout=None):
743 super(KeepClient.KeepWriterThread, self).__init__()
744 self.timeout = timeout
747 self.data_hash = data_hash
753 service, service_root = self.queue.get_next_task()
757 locator, copies, classes = self.do_task(service, service_root)
758 except Exception as e:
759 if not isinstance(e, self.TaskFailed):
760 _logger.exception("Exception in KeepWriterThread")
761 self.queue.write_fail(service)
763 self.queue.write_success(locator, copies, classes)
765 self.queue.task_done()
767 def do_task(self, service, service_root):
768 classes = self.queue.pending_classes()
772 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
773 success = bool(service.put(self.data_hash,
775 timeout=self.timeout,
777 result = service.last_result()
780 if result.get('status_code'):
781 _logger.debug("Request fail: PUT %s => %s %s",
783 result.get('status_code'),
785 raise self.TaskFailed()
787 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
788 str(threading.current_thread()),
793 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
794 except (KeyError, ValueError):
797 classes_confirmed = {}
799 scch = result['headers']['x-keep-storage-classes-confirmed']
800 for confirmation in scch.replace(' ', '').split(','):
801 if '=' in confirmation:
802 stored_class, stored_copies = confirmation.split('=')[:2]
803 classes_confirmed[stored_class] = int(stored_copies)
804 except (KeyError, ValueError):
805 # Storage classes confirmed header missing or corrupt
806 classes_confirmed = None
808 return result['body'].strip(), replicas_stored, classes_confirmed
811 def __init__(self, api_client=None, proxy=None,
812 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
813 api_token=None, local_store=None, block_cache=None,
814 num_retries=10, session=None, num_prefetch_threads=None):
815 """Initialize a new KeepClient.
819 The API client to use to find Keep services. If not
820 provided, KeepClient will build one from available Arvados
824 If specified, this KeepClient will send requests to this Keep
825 proxy. Otherwise, KeepClient will fall back to the setting of the
826 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
827 If you want to KeepClient does not use a proxy, pass in an empty
831 The initial timeout (in seconds) for HTTP requests to Keep
832 non-proxy servers. A tuple of three floats is interpreted as
833 (connection_timeout, read_timeout, minimum_bandwidth). A connection
834 will be aborted if the average traffic rate falls below
835 minimum_bandwidth bytes per second over an interval of read_timeout
836 seconds. Because timeouts are often a result of transient server
837 load, the actual connection timeout will be increased by a factor
838 of two on each retry.
839 Default: (2, 256, 32768).
842 The initial timeout (in seconds) for HTTP requests to
843 Keep proxies. A tuple of three floats is interpreted as
844 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
845 described above for adjusting connection timeouts on retry also
847 Default: (20, 256, 32768).
850 If you're not using an API client, but only talking
851 directly to a Keep proxy, this parameter specifies an API token
852 to authenticate Keep requests. It is an error to specify both
853 api_client and api_token. If you specify neither, KeepClient
854 will use one available from the Arvados configuration.
857 If specified, this KeepClient will bypass Keep
858 services, and save data to the named directory. If unspecified,
859 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
860 environment variable. If you want to ensure KeepClient does not
861 use local storage, pass in an empty string. This is primarily
862 intended to mock a server for testing.
865 The default number of times to retry failed requests.
866 This will be used as the default num_retries value when get() and
867 put() are called. Default 10.
869 self.lock = threading.Lock()
871 if config.get('ARVADOS_KEEP_SERVICES'):
872 proxy = config.get('ARVADOS_KEEP_SERVICES')
874 proxy = config.get('ARVADOS_KEEP_PROXY')
875 if api_token is None:
876 if api_client is None:
877 api_token = config.get('ARVADOS_API_TOKEN')
879 api_token = api_client.api_token
880 elif api_client is not None:
882 "can't build KeepClient with both API client and token")
883 if local_store is None:
884 local_store = os.environ.get('KEEP_LOCAL_STORE')
886 if api_client is None:
887 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
889 self.insecure = api_client.insecure
891 self.block_cache = block_cache if block_cache else KeepBlockCache()
892 self.timeout = timeout
893 self.proxy_timeout = proxy_timeout
894 self._user_agent_pool = queue.LifoQueue()
895 self.upload_counter = Counter()
896 self.download_counter = Counter()
897 self.put_counter = Counter()
898 self.get_counter = Counter()
899 self.hits_counter = Counter()
900 self.misses_counter = Counter()
901 self._storage_classes_unsupported_warning = False
902 self._default_classes = []
903 if num_prefetch_threads is not None:
904 self.num_prefetch_threads = num_prefetch_threads
906 self.num_prefetch_threads = 2
907 self._prefetch_queue = None
908 self._prefetch_threads = None
911 self.local_store = local_store
912 self.head = self.local_store_head
913 self.get = self.local_store_get
914 self.put = self.local_store_put
916 self.num_retries = num_retries
917 self.max_replicas_per_service = None
919 proxy_uris = proxy.split()
920 for i in range(len(proxy_uris)):
921 if not proxy_uris[i].endswith('/'):
924 url = urllib.parse.urlparse(proxy_uris[i])
925 if not (url.scheme and url.netloc):
926 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
927 self.api_token = api_token
928 self._gateway_services = {}
929 self._keep_services = [{
930 'uuid': "00000-bi6l4-%015d" % idx,
931 'service_type': 'proxy',
932 '_service_root': uri,
933 } for idx, uri in enumerate(proxy_uris)]
934 self._writable_services = self._keep_services
935 self.using_proxy = True
936 self._static_services_list = True
938 # It's important to avoid instantiating an API client
939 # unless we actually need one, for testing's sake.
940 if api_client is None:
941 api_client = arvados.api('v1')
942 self.api_client = api_client
943 self.api_token = api_client.api_token
944 self._gateway_services = {}
945 self._keep_services = None
946 self._writable_services = None
947 self.using_proxy = None
948 self._static_services_list = False
950 self._default_classes = [
951 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
953 # We're talking to an old cluster
956 def current_timeout(self, attempt_number):
957 """Return the appropriate timeout to use for this client.
959 The proxy timeout setting if the backend service is currently a proxy,
960 the regular timeout setting otherwise. The `attempt_number` indicates
961 how many times the operation has been tried already (starting from 0
962 for the first try), and scales the connection timeout portion of the
963 return value accordingly.
966 # TODO(twp): the timeout should be a property of a
967 # KeepService, not a KeepClient. See #4488.
968 t = self.proxy_timeout if self.using_proxy else self.timeout
970 return (t[0] * (1 << attempt_number), t[1])
972 return (t[0] * (1 << attempt_number), t[1], t[2])
973 def _any_nondisk_services(self, service_list):
974 return any(ks.get('service_type', 'disk') != 'disk'
975 for ks in service_list)
977 def build_services_list(self, force_rebuild=False):
978 if (self._static_services_list or
979 (self._keep_services and not force_rebuild)):
983 keep_services = self.api_client.keep_services().accessible()
984 except Exception: # API server predates Keep services.
985 keep_services = self.api_client.keep_disks().list()
987 # Gateway services are only used when specified by UUID,
988 # so there's nothing to gain by filtering them by
990 self._gateway_services = {ks['uuid']: ks for ks in
991 keep_services.execute()['items']}
992 if not self._gateway_services:
993 raise arvados.errors.NoKeepServersError()
995 # Precompute the base URI for each service.
996 for r in self._gateway_services.values():
997 host = r['service_host']
998 if not host.startswith('[') and host.find(':') >= 0:
999 # IPv6 URIs must be formatted like http://[::1]:80/...
1000 host = '[' + host + ']'
1001 r['_service_root'] = "{}://{}:{:d}/".format(
1002 'https' if r['service_ssl_flag'] else 'http',
1006 _logger.debug(str(self._gateway_services))
1007 self._keep_services = [
1008 ks for ks in self._gateway_services.values()
1009 if not ks.get('service_type', '').startswith('gateway:')]
1010 self._writable_services = [ks for ks in self._keep_services
1011 if not ks.get('read_only')]
1013 # For disk type services, max_replicas_per_service is 1
1014 # It is unknown (unlimited) for other service types.
1015 if self._any_nondisk_services(self._writable_services):
1016 self.max_replicas_per_service = None
1018 self.max_replicas_per_service = 1
1020 def _service_weight(self, data_hash, service_uuid):
1021 """Compute the weight of a Keep service endpoint for a data
1022 block with a known hash.
1024 The weight is md5(h + u) where u is the last 15 characters of
1025 the service endpoint's UUID.
1027 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1029 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1030 """Return an array of Keep service endpoints, in the order in
1031 which they should be probed when reading or writing data with
1032 the given hash+hints.
1034 self.build_services_list(force_rebuild)
1037 # Use the services indicated by the given +K@... remote
1038 # service hints, if any are present and can be resolved to a
1040 for hint in locator.hints:
1041 if hint.startswith('K@'):
1043 sorted_roots.append(
1044 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1045 elif len(hint) == 29:
1046 svc = self._gateway_services.get(hint[2:])
1048 sorted_roots.append(svc['_service_root'])
1050 # Sort the available local services by weight (heaviest first)
1051 # for this locator, and return their service_roots (base URIs)
1053 use_services = self._keep_services
1055 use_services = self._writable_services
1056 self.using_proxy = self._any_nondisk_services(use_services)
1057 sorted_roots.extend([
1058 svc['_service_root'] for svc in sorted(
1061 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1062 _logger.debug("{}: {}".format(locator, sorted_roots))
1065 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1066 # roots_map is a dictionary, mapping Keep service root strings
1067 # to KeepService objects. Poll for Keep services, and add any
1068 # new ones to roots_map. Return the current list of local
1070 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1071 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1072 for root in local_roots:
1073 if root not in roots_map:
1074 roots_map[root] = self.KeepService(
1075 root, self._user_agent_pool,
1076 upload_counter=self.upload_counter,
1077 download_counter=self.download_counter,
1079 insecure=self.insecure)
1083 def _check_loop_result(result):
1084 # KeepClient RetryLoops should save results as a 2-tuple: the
1085 # actual result of the request, and the number of servers available
1086 # to receive the request this round.
1087 # This method returns True if there's a real result, False if
1088 # there are no more servers available, otherwise None.
1089 if isinstance(result, Exception):
1091 result, tried_server_count = result
1092 if (result is not None) and (result is not False):
1094 elif tried_server_count < 1:
1095 _logger.info("No more Keep services to try; giving up")
1100 def get_from_cache(self, loc_s):
1101 """Fetch a block only if is in the cache, otherwise return None."""
1102 locator = KeepLocator(loc_s)
1103 slot = self.block_cache.get(locator.md5sum)
1104 if slot is not None and slot.ready.is_set():
1109 def refresh_signature(self, loc):
1110 """Ask Keep to get the remote block and return its local signature"""
1111 now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1112 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1115 def head(self, loc_s, **kwargs):
1116 return self._get_or_head(loc_s, method="HEAD", **kwargs)
1119 def get(self, loc_s, **kwargs):
1120 return self._get_or_head(loc_s, method="GET", **kwargs)
1122 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1123 """Get data from Keep.
1125 This method fetches one or more blocks of data from Keep. It
1126 sends a request each Keep service registered with the API
1127 server (or the proxy provided when this client was
1128 instantiated), then each service named in location hints, in
1129 sequence. As soon as one service provides the data, it's
1133 * loc_s: A string of one or more comma-separated locators to fetch.
1134 This method returns the concatenation of these blocks.
1135 * num_retries: The number of times to retry GET requests to
1136 *each* Keep server if it returns temporary failures, with
1137 exponential backoff. Note that, in each loop, the method may try
1138 to fetch data from every available Keep service, along with any
1139 that are named in location hints in the locator. The default value
1140 is set when the KeepClient is initialized.
1143 return ''.join(self.get(x) for x in loc_s.split(','))
1145 self.get_counter.add(1)
1147 request_id = (request_id or
1148 (hasattr(self, 'api_client') and self.api_client.request_id) or
1149 arvados.util.new_request_id())
1152 headers['X-Request-Id'] = request_id
1157 locator = KeepLocator(loc_s)
1160 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1162 # Fresh and empty "first time it is used" slot
1165 # this is request for a prefetch to fill in
1166 # the cache, don't need to wait for the
1167 # result, so if it is already in flight return
1168 # immediately. Clear 'slot' to prevent
1169 # finally block from calling slot.set()
1170 if slot.ready.is_set():
1176 if blob is not None:
1177 self.hits_counter.add(1)
1180 # If blob is None, this means either
1182 # (a) another thread was fetching this block and
1183 # failed with an error or
1185 # (b) cache thrashing caused the slot to be
1186 # evicted (content set to None) by another thread
1187 # between the call to reserve_cache() and get().
1189 # We'll handle these cases by reserving a new slot
1190 # and then doing a full GET request.
1193 self.misses_counter.add(1)
1195 # If the locator has hints specifying a prefix (indicating a
1196 # remote keepproxy) or the UUID of a local gateway service,
1197 # read data from the indicated service(s) instead of the usual
1198 # list of local disk services.
1199 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1200 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1201 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1202 for hint in locator.hints if (
1203 hint.startswith('K@') and
1205 self._gateway_services.get(hint[2:])
1207 # Map root URLs to their KeepService objects.
1209 root: self.KeepService(root, self._user_agent_pool,
1210 upload_counter=self.upload_counter,
1211 download_counter=self.download_counter,
1213 insecure=self.insecure)
1214 for root in hint_roots
1217 # See #3147 for a discussion of the loop implementation. Highlights:
1218 # * Refresh the list of Keep services after each failure, in case
1219 # it's being updated.
1220 # * Retry until we succeed, we're out of retries, or every available
1221 # service has returned permanent failure.
1224 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1226 for tries_left in loop:
1228 sorted_roots = self.map_new_services(
1230 force_rebuild=(tries_left < num_retries),
1231 need_writable=False,
1233 except Exception as error:
1234 loop.save_result(error)
1237 # Query KeepService objects that haven't returned
1238 # permanent failure, in our specified shuffle order.
1239 services_to_try = [roots_map[root]
1240 for root in sorted_roots
1241 if roots_map[root].usable()]
1242 for keep_service in services_to_try:
1243 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1244 if blob is not None:
1246 loop.save_result((blob, len(services_to_try)))
1248 # Always cache the result, then return it if we succeeded.
1252 if slot is not None:
1253 self.block_cache.set(slot, blob)
1255 # Q: Including 403 is necessary for the Keep tests to continue
1256 # passing, but maybe they should expect KeepReadError instead?
1257 not_founds = sum(1 for key in sorted_roots
1258 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1259 service_errors = ((key, roots_map[key].last_result()['error'])
1260 for key in sorted_roots)
1262 raise arvados.errors.KeepReadError(
1263 "[{}] failed to read {}: no Keep services available ({})".format(
1264 request_id, loc_s, loop.last_result()))
1265 elif not_founds == len(sorted_roots):
1266 raise arvados.errors.NotFoundError(
1267 "[{}] {} not found".format(request_id, loc_s), service_errors)
1269 raise arvados.errors.KeepReadError(
1270 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1273 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1274 """Save data in Keep.
1276 This method will get a list of Keep services from the API server, and
1277 send the data to each one simultaneously in a new thread. Once the
1278 uploads are finished, if enough copies are saved, this method returns
1279 the most recent HTTP response body. If requests fail to upload
1280 enough copies, this method raises KeepWriteError.
1283 * data: The string of data to upload.
1284 * copies: The number of copies that the user requires be saved.
1286 * num_retries: The number of times to retry PUT requests to
1287 *each* Keep server if it returns temporary failures, with
1288 exponential backoff. The default value is set when the
1289 KeepClient is initialized.
1290 * classes: An optional list of storage class names where copies should
1294 classes = classes or self._default_classes
1296 if not isinstance(data, bytes):
1297 data = data.encode()
1299 self.put_counter.add(1)
1301 data_hash = hashlib.md5(data).hexdigest()
1302 loc_s = data_hash + '+' + str(len(data))
1305 locator = KeepLocator(loc_s)
1307 request_id = (request_id or
1308 (hasattr(self, 'api_client') and self.api_client.request_id) or
1309 arvados.util.new_request_id())
1311 'X-Request-Id': request_id,
1312 'X-Keep-Desired-Replicas': str(copies),
1315 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1319 for tries_left in loop:
1321 sorted_roots = self.map_new_services(
1323 force_rebuild=(tries_left < num_retries),
1326 except Exception as error:
1327 loop.save_result(error)
1330 pending_classes = []
1331 if done_classes is not None:
1332 pending_classes = list(set(classes) - set(done_classes))
1333 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1334 data_hash=data_hash,
1335 copies=copies - done_copies,
1336 max_service_replicas=self.max_replicas_per_service,
1337 timeout=self.current_timeout(num_retries - tries_left),
1338 classes=pending_classes)
1339 for service_root, ks in [(root, roots_map[root])
1340 for root in sorted_roots]:
1343 writer_pool.add_task(ks, service_root)
1345 pool_copies, pool_classes = writer_pool.done()
1346 done_copies += pool_copies
1347 if (done_classes is not None) and (pool_classes is not None):
1348 done_classes += pool_classes
1350 (done_copies >= copies and set(done_classes) == set(classes),
1351 writer_pool.total_task_nr))
1353 # Old keepstore contacted without storage classes support:
1354 # success is determined only by successful copies.
1356 # Disable storage classes tracking from this point forward.
1357 if not self._storage_classes_unsupported_warning:
1358 self._storage_classes_unsupported_warning = True
1359 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1362 (done_copies >= copies, writer_pool.total_task_nr))
1365 return writer_pool.response()
1367 raise arvados.errors.KeepWriteError(
1368 "[{}] failed to write {}: no Keep services available ({})".format(
1369 request_id, data_hash, loop.last_result()))
1371 service_errors = ((key, roots_map[key].last_result()['error'])
1372 for key in sorted_roots
1373 if roots_map[key].last_result()['error'])
1374 raise arvados.errors.KeepWriteError(
1375 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1376 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1378 def _block_prefetch_worker(self):
1379 """The background downloader thread."""
1382 b = self._prefetch_queue.get()
1385 self.get(b, prefetch=True)
1387 _logger.exception("Exception doing block prefetch")
1389 def _start_prefetch_threads(self):
1390 if self._prefetch_threads is None:
1392 if self._prefetch_threads is not None:
1394 self._prefetch_queue = queue.Queue()
1395 self._prefetch_threads = []
1396 for i in range(0, self.num_prefetch_threads):
1397 thread = threading.Thread(target=self._block_prefetch_worker)
1398 self._prefetch_threads.append(thread)
1399 thread.daemon = True
1402 def block_prefetch(self, locator):
1404 This relies on the fact that KeepClient implements a block cache,
1405 so repeated requests for the same block will not result in repeated
1406 downloads (unless the block is evicted from the cache.) This method
1410 if self.block_cache.get(locator) is not None:
1413 self._start_prefetch_threads()
1414 self._prefetch_queue.put(locator)
1416 def stop_prefetch_threads(self):
1418 if self._prefetch_threads is not None:
1419 for t in self._prefetch_threads:
1420 self._prefetch_queue.put(None)
1421 for t in self._prefetch_threads:
1423 self._prefetch_threads = None
1424 self._prefetch_queue = None
1426 def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1427 """A stub for put().
1429 This method is used in place of the real put() method when
1430 using local storage (see constructor's local_store argument).
1432 copies and num_retries arguments are ignored: they are here
1433 only for the sake of offering the same call signature as
1436 Data stored this way can be retrieved via local_store_get().
1438 md5 = hashlib.md5(data).hexdigest()
1439 locator = '%s+%d' % (md5, len(data))
1440 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1442 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1443 os.path.join(self.local_store, md5))
1446 def local_store_get(self, loc_s, num_retries=None):
1447 """Companion to local_store_put()."""
1449 locator = KeepLocator(loc_s)
1451 raise arvados.errors.NotFoundError(
1452 "Invalid data locator: '%s'" % loc_s)
1453 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1455 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1458 def local_store_head(self, loc_s, num_retries=None):
1459 """Companion to local_store_put()."""
1461 locator = KeepLocator(loc_s)
1463 raise arvados.errors.NotFoundError(
1464 "Invalid data locator: '%s'" % loc_s)
1465 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1467 if os.path.exists(os.path.join(self.local_store, locator.md5sum)):