1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
26 from io import BytesIO
29 import arvados.config as config
31 import arvados.retry as retry
33 import arvados.diskcache
34 from arvados._pycurlhelper import PyCurlHelper
37 _logger = logging.getLogger('arvados.keep')
38 global_client_object = None
40 # Monkey patch TCP constants when not available (apple). Values sourced from:
41 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
42 if sys.platform == 'darwin':
43 if not hasattr(socket, 'TCP_KEEPALIVE'):
44 socket.TCP_KEEPALIVE = 0x010
45 if not hasattr(socket, 'TCP_KEEPINTVL'):
46 socket.TCP_KEEPINTVL = 0x101
47 if not hasattr(socket, 'TCP_KEEPCNT'):
48 socket.TCP_KEEPCNT = 0x102
50 class KeepLocator(object):
51 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
52 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
54 def __init__(self, locator_str):
57 self._perm_expiry = None
58 pieces = iter(locator_str.split('+'))
59 self.md5sum = next(pieces)
61 self.size = int(next(pieces))
65 if self.HINT_RE.match(hint) is None:
66 raise ValueError("invalid hint format: {}".format(hint))
67 elif hint.startswith('A'):
68 self.parse_permission_hint(hint)
70 self.hints.append(hint)
75 for s in [self.md5sum, self.size,
76 self.permission_hint()] + self.hints
80 if self.size is not None:
81 return "%s+%i" % (self.md5sum, self.size)
85 def _make_hex_prop(name, length):
86 # Build and return a new property with the given name that
87 # must be a hex string of the given length.
88 data_name = '_{}'.format(name)
90 return getattr(self, data_name)
91 def setter(self, hex_str):
92 if not arvados.util.is_hex(hex_str, length):
93 raise ValueError("{} is not a {}-digit hex string: {!r}".
94 format(name, length, hex_str))
95 setattr(self, data_name, hex_str)
96 return property(getter, setter)
98 md5sum = _make_hex_prop('md5sum', 32)
99 perm_sig = _make_hex_prop('perm_sig', 40)
102 def perm_expiry(self):
103 return self._perm_expiry
106 def perm_expiry(self, value):
107 if not arvados.util.is_hex(value, 1, 8):
109 "permission timestamp must be a hex Unix timestamp: {}".
111 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
113 def permission_hint(self):
114 data = [self.perm_sig, self.perm_expiry]
117 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
118 return "A{}@{:08x}".format(*data)
120 def parse_permission_hint(self, s):
122 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
124 raise ValueError("bad permission hint {}".format(s))
126 def permission_expired(self, as_of_dt=None):
127 if self.perm_expiry is None:
129 elif as_of_dt is None:
130 as_of_dt = datetime.datetime.now()
131 return self.perm_expiry <= as_of_dt
135 """Simple interface to a global KeepClient object.
137 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
138 own API client. The global KeepClient will build an API client from the
139 current Arvados configuration, which may not match the one you built.
144 def global_client_object(cls):
145 global global_client_object
146 # Previously, KeepClient would change its behavior at runtime based
147 # on these configuration settings. We simulate that behavior here
148 # by checking the values and returning a new KeepClient if any of
150 key = (config.get('ARVADOS_API_HOST'),
151 config.get('ARVADOS_API_TOKEN'),
152 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
153 config.get('ARVADOS_KEEP_PROXY'),
154 os.environ.get('KEEP_LOCAL_STORE'))
155 if (global_client_object is None) or (cls._last_key != key):
156 global_client_object = KeepClient()
158 return global_client_object
161 def get(locator, **kwargs):
162 return Keep.global_client_object().get(locator, **kwargs)
165 def put(data, **kwargs):
166 return Keep.global_client_object().put(data, **kwargs)
168 class KeepBlockCache(object):
169 def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
170 self.cache_max = cache_max
171 self._cache = collections.OrderedDict()
172 self._cache_lock = threading.Lock()
173 self._max_slots = max_slots
174 self._disk_cache = disk_cache
175 self._disk_cache_dir = disk_cache_dir
176 self._cache_updating = threading.Condition(self._cache_lock)
178 if self._disk_cache and self._disk_cache_dir is None:
179 cache_path = arvados.util._BaseDirectories('CACHE').storage_path() / 'keep'
180 cache_path.mkdir(mode=0o700, exist_ok=True)
181 self._disk_cache_dir = str(cache_path)
183 if self._max_slots == 0:
185 # Each block uses two file descriptors, one used to
186 # open it initially and hold the flock(), and a second
187 # hidden one used by mmap().
189 # Set max slots to 1/8 of maximum file handles. This
190 # means we'll use at most 1/4 of total file handles.
192 # NOFILE typically defaults to 1024 on Linux so this
193 # is 128 slots (256 file handles), which means we can
194 # cache up to 8 GiB of 64 MiB blocks. This leaves
195 # 768 file handles for sockets and other stuff.
197 # When we want the ability to have more cache (e.g. in
198 # arv-mount) we'll increase rlimit before calling
200 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
203 self._max_slots = 512
205 if self.cache_max == 0:
207 fs = os.statvfs(self._disk_cache_dir)
208 # Calculation of available space incorporates existing cache usage
209 existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
210 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
211 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
213 # 10% of total disk size
214 # 25% of available space
216 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
219 self.cache_max = (256 * 1024 * 1024)
221 self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
225 self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
226 for slot in self._cache.values():
227 self.cache_total += slot.size()
230 class CacheSlot(object):
231 __slots__ = ("locator", "ready", "content")
233 def __init__(self, locator):
234 self.locator = locator
235 self.ready = threading.Event()
242 def set(self, value):
243 if self.content is not None:
250 if self.content is None:
253 return len(self.content)
259 def _resize_cache(self, cache_max, max_slots):
260 # Try and make sure the contents of the cache do not exceed
261 # the supplied maximums.
263 if self.cache_total <= cache_max and len(self._cache) <= max_slots:
266 _evict_candidates = collections.deque(self._cache.values())
267 while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots):
268 slot = _evict_candidates.popleft()
269 if not slot.ready.is_set():
274 self.cache_total -= sz
275 del self._cache[slot.locator]
279 '''Cap the cache size to self.cache_max'''
280 with self._cache_updating:
281 self._resize_cache(self.cache_max, self._max_slots)
282 self._cache_updating.notify_all()
284 def _get(self, locator):
285 # Test if the locator is already in the cache
286 if locator in self._cache:
287 n = self._cache[locator]
288 if n.ready.is_set() and n.content is None:
289 del self._cache[n.locator]
291 self._cache.move_to_end(locator)
294 # see if it exists on disk
295 n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
297 self._cache[n.locator] = n
298 self.cache_total += n.size()
302 def get(self, locator):
303 with self._cache_lock:
304 return self._get(locator)
306 def reserve_cache(self, locator):
307 '''Reserve a cache slot for the specified locator,
308 or return the existing slot.'''
309 with self._cache_updating:
310 n = self._get(locator)
314 # Add a new cache slot for the locator
315 self._resize_cache(self.cache_max, self._max_slots-1)
316 while len(self._cache) >= self._max_slots:
317 # If there isn't a slot available, need to wait
318 # for something to happen that releases one of the
319 # cache slots. Idle for 200 ms or woken up by
321 self._cache_updating.wait(timeout=0.2)
322 self._resize_cache(self.cache_max, self._max_slots-1)
325 n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
327 n = KeepBlockCache.CacheSlot(locator)
328 self._cache[n.locator] = n
331 def set(self, slot, blob):
334 self.cache_total += slot.size()
337 if e.errno == errno.ENOMEM:
338 # Reduce max slots to current - 4, cap cache and retry
339 with self._cache_lock:
340 self._max_slots = max(4, len(self._cache) - 4)
341 elif e.errno == errno.ENOSPC:
342 # Reduce disk max space to current - 256 MiB, cap cache and retry
343 with self._cache_lock:
344 sm = sum(st.size() for st in self._cache.values())
345 self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
346 elif e.errno == errno.ENODEV:
347 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
348 except Exception as e:
351 # Check if we should evict things from the cache. Either
352 # because we added a new thing or there was an error and
353 # we possibly adjusted the limits down, so we might need
354 # to push something out.
358 # Only gets here if there was an error the first time. The
359 # exception handler adjusts limits downward in some cases
360 # to free up resources, which would make the operation
363 self.cache_total += slot.size()
364 except Exception as e:
365 # It failed again. Give up.
367 raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
371 class Counter(object):
372 def __init__(self, v=0):
373 self._lk = threading.Lock()
385 class KeepClient(object):
386 DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
387 DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
389 class KeepService(PyCurlHelper):
390 """Make requests to a single Keep service, and track results.
392 A KeepService is intended to last long enough to perform one
393 transaction (GET or PUT) against one Keep service. This can
394 involve calling either get() or put() multiple times in order
395 to retry after transient failures. However, calling both get()
396 and put() on a single instance -- or using the same instance
397 to access two different Keep services -- will not produce
404 arvados.errors.HttpError,
407 def __init__(self, root, user_agent_pool=queue.LifoQueue(),
409 download_counter=None,
412 super(KeepClient.KeepService, self).__init__()
414 self._user_agent_pool = user_agent_pool
415 self._result = {'error': None}
419 self.get_headers = {'Accept': 'application/octet-stream'}
420 self.get_headers.update(headers)
421 self.put_headers = headers
422 self.upload_counter = upload_counter
423 self.download_counter = download_counter
424 self.insecure = insecure
427 """Is it worth attempting a request?"""
431 """Did the request succeed or encounter permanent failure?"""
432 return self._result['error'] == False or not self._usable
434 def last_result(self):
437 def _get_user_agent(self):
439 return self._user_agent_pool.get(block=False)
443 def _put_user_agent(self, ua):
446 self._user_agent_pool.put(ua, block=False)
450 def get(self, locator, method="GET", timeout=None):
451 # locator is a KeepLocator object.
452 url = self.root + str(locator)
453 _logger.debug("Request: %s %s", method, url)
454 curl = self._get_user_agent()
457 with timer.Timer() as t:
459 response_body = BytesIO()
460 curl.setopt(pycurl.NOSIGNAL, 1)
461 curl.setopt(pycurl.OPENSOCKETFUNCTION,
462 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
463 curl.setopt(pycurl.URL, url.encode('utf-8'))
464 curl.setopt(pycurl.HTTPHEADER, [
465 '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
466 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
467 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
469 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
470 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
472 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
474 curl.setopt(pycurl.NOBODY, True)
476 curl.setopt(pycurl.HTTPGET, True)
477 self._setcurltimeouts(curl, timeout, method=="HEAD")
481 except Exception as e:
482 raise arvados.errors.HttpError(0, str(e))
488 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
489 'body': response_body.getvalue(),
490 'headers': self._headers,
494 ok = retry.check_http_response_success(self._result['status_code'])
496 self._result['error'] = arvados.errors.HttpError(
497 self._result['status_code'],
498 self._headers.get('x-status-line', 'Error'))
499 except self.HTTP_ERRORS as e:
503 self._usable = ok != False
504 if self._result.get('status_code', None):
505 # The client worked well enough to get an HTTP status
506 # code, so presumably any problems are just on the
507 # server side and it's OK to reuse the client.
508 self._put_user_agent(curl)
510 # Don't return this client to the pool, in case it's
514 _logger.debug("Request fail: GET %s => %s: %s",
515 url, type(self._result['error']), str(self._result['error']))
518 _logger.info("HEAD %s: %s bytes",
519 self._result['status_code'],
520 self._result.get('content-length'))
521 if self._result['headers'].get('x-keep-locator'):
522 # This is a response to a remote block copy request, return
523 # the local copy block locator.
524 return self._result['headers'].get('x-keep-locator')
527 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
528 self._result['status_code'],
529 len(self._result['body']),
531 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
533 if self.download_counter:
534 self.download_counter.add(len(self._result['body']))
535 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
536 if resp_md5 != locator.md5sum:
537 _logger.warning("Checksum fail: md5(%s) = %s",
539 self._result['error'] = arvados.errors.HttpError(
542 return self._result['body']
544 def put(self, hash_s, body, timeout=None, headers={}):
545 put_headers = copy.copy(self.put_headers)
546 put_headers.update(headers)
547 url = self.root + hash_s
548 _logger.debug("Request: PUT %s", url)
549 curl = self._get_user_agent()
552 with timer.Timer() as t:
554 body_reader = BytesIO(body)
555 response_body = BytesIO()
556 curl.setopt(pycurl.NOSIGNAL, 1)
557 curl.setopt(pycurl.OPENSOCKETFUNCTION,
558 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
559 curl.setopt(pycurl.URL, url.encode('utf-8'))
560 # Using UPLOAD tells cURL to wait for a "go ahead" from the
561 # Keep server (in the form of a HTTP/1.1 "100 Continue"
562 # response) instead of sending the request body immediately.
563 # This allows the server to reject the request if the request
564 # is invalid or the server is read-only, without waiting for
565 # the client to send the entire block.
566 curl.setopt(pycurl.UPLOAD, True)
567 curl.setopt(pycurl.INFILESIZE, len(body))
568 curl.setopt(pycurl.READFUNCTION, body_reader.read)
569 curl.setopt(pycurl.HTTPHEADER, [
570 '{}: {}'.format(k,v) for k,v in put_headers.items()])
571 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
572 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
574 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
575 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
577 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
578 self._setcurltimeouts(curl, timeout)
581 except Exception as e:
582 raise arvados.errors.HttpError(0, str(e))
588 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
589 'body': response_body.getvalue().decode('utf-8'),
590 'headers': self._headers,
593 ok = retry.check_http_response_success(self._result['status_code'])
595 self._result['error'] = arvados.errors.HttpError(
596 self._result['status_code'],
597 self._headers.get('x-status-line', 'Error'))
598 except self.HTTP_ERRORS as e:
602 self._usable = ok != False # still usable if ok is True or None
603 if self._result.get('status_code', None):
604 # Client is functional. See comment in get().
605 self._put_user_agent(curl)
609 _logger.debug("Request fail: PUT %s => %s: %s",
610 url, type(self._result['error']), str(self._result['error']))
612 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
613 self._result['status_code'],
616 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
617 if self.upload_counter:
618 self.upload_counter.add(len(body))
622 class KeepWriterQueue(queue.Queue):
623 def __init__(self, copies, classes=[]):
624 queue.Queue.__init__(self) # Old-style superclass
625 self.wanted_copies = copies
626 self.wanted_storage_classes = classes
627 self.successful_copies = 0
628 self.confirmed_storage_classes = {}
630 self.storage_classes_tracking = True
631 self.queue_data_lock = threading.RLock()
632 self.pending_tries = max(copies, len(classes))
633 self.pending_tries_notification = threading.Condition()
635 def write_success(self, response, replicas_nr, classes_confirmed):
636 with self.queue_data_lock:
637 self.successful_copies += replicas_nr
638 if classes_confirmed is None:
639 self.storage_classes_tracking = False
640 elif self.storage_classes_tracking:
641 for st_class, st_copies in classes_confirmed.items():
643 self.confirmed_storage_classes[st_class] += st_copies
645 self.confirmed_storage_classes[st_class] = st_copies
646 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
647 self.response = response
648 with self.pending_tries_notification:
649 self.pending_tries_notification.notify_all()
651 def write_fail(self, ks):
652 with self.pending_tries_notification:
653 self.pending_tries += 1
654 self.pending_tries_notification.notify()
656 def pending_copies(self):
657 with self.queue_data_lock:
658 return self.wanted_copies - self.successful_copies
660 def satisfied_classes(self):
661 with self.queue_data_lock:
662 if not self.storage_classes_tracking:
663 # Notifies disabled storage classes expectation to
666 return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
668 def pending_classes(self):
669 with self.queue_data_lock:
670 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
672 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
673 for st_class, st_copies in self.confirmed_storage_classes.items():
674 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
675 unsatisfied_classes.remove(st_class)
676 return unsatisfied_classes
678 def get_next_task(self):
679 with self.pending_tries_notification:
681 if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
682 # This notify_all() is unnecessary --
683 # write_success() already called notify_all()
684 # when pending<1 became true, so it's not
685 # possible for any other thread to be in
686 # wait() now -- but it's cheap insurance
687 # against deadlock so we do it anyway:
688 self.pending_tries_notification.notify_all()
689 # Drain the queue and then raise Queue.Empty
693 elif self.pending_tries > 0:
694 service, service_root = self.get_nowait()
695 if service.finished():
698 self.pending_tries -= 1
699 return service, service_root
701 self.pending_tries_notification.notify_all()
704 self.pending_tries_notification.wait()
707 class KeepWriterThreadPool(object):
708 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
709 self.total_task_nr = 0
710 if (not max_service_replicas) or (max_service_replicas >= copies):
713 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
714 _logger.debug("Pool max threads is %d", num_threads)
716 self.queue = KeepClient.KeepWriterQueue(copies, classes)
718 for _ in range(num_threads):
719 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
720 self.workers.append(w)
722 def add_task(self, ks, service_root):
723 self.queue.put((ks, service_root))
724 self.total_task_nr += 1
727 return self.queue.successful_copies, self.queue.satisfied_classes()
731 for worker in self.workers:
733 # Wait for finished work
737 return self.queue.response
740 class KeepWriterThread(threading.Thread):
741 class TaskFailed(RuntimeError): pass
743 def __init__(self, queue, data, data_hash, timeout=None):
744 super(KeepClient.KeepWriterThread, self).__init__()
745 self.timeout = timeout
748 self.data_hash = data_hash
754 service, service_root = self.queue.get_next_task()
758 locator, copies, classes = self.do_task(service, service_root)
759 except Exception as e:
760 if not isinstance(e, self.TaskFailed):
761 _logger.exception("Exception in KeepWriterThread")
762 self.queue.write_fail(service)
764 self.queue.write_success(locator, copies, classes)
766 self.queue.task_done()
768 def do_task(self, service, service_root):
769 classes = self.queue.pending_classes()
773 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
774 success = bool(service.put(self.data_hash,
776 timeout=self.timeout,
778 result = service.last_result()
781 if result.get('status_code'):
782 _logger.debug("Request fail: PUT %s => %s %s",
784 result.get('status_code'),
786 raise self.TaskFailed()
788 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
789 str(threading.current_thread()),
794 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
795 except (KeyError, ValueError):
798 classes_confirmed = {}
800 scch = result['headers']['x-keep-storage-classes-confirmed']
801 for confirmation in scch.replace(' ', '').split(','):
802 if '=' in confirmation:
803 stored_class, stored_copies = confirmation.split('=')[:2]
804 classes_confirmed[stored_class] = int(stored_copies)
805 except (KeyError, ValueError):
806 # Storage classes confirmed header missing or corrupt
807 classes_confirmed = None
809 return result['body'].strip(), replicas_stored, classes_confirmed
812 def __init__(self, api_client=None, proxy=None,
813 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
814 api_token=None, local_store=None, block_cache=None,
815 num_retries=10, session=None, num_prefetch_threads=None):
816 """Initialize a new KeepClient.
820 The API client to use to find Keep services. If not
821 provided, KeepClient will build one from available Arvados
825 If specified, this KeepClient will send requests to this Keep
826 proxy. Otherwise, KeepClient will fall back to the setting of the
827 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
828 If you want to KeepClient does not use a proxy, pass in an empty
832 The initial timeout (in seconds) for HTTP requests to Keep
833 non-proxy servers. A tuple of three floats is interpreted as
834 (connection_timeout, read_timeout, minimum_bandwidth). A connection
835 will be aborted if the average traffic rate falls below
836 minimum_bandwidth bytes per second over an interval of read_timeout
837 seconds. Because timeouts are often a result of transient server
838 load, the actual connection timeout will be increased by a factor
839 of two on each retry.
840 Default: (2, 256, 32768).
843 The initial timeout (in seconds) for HTTP requests to
844 Keep proxies. A tuple of three floats is interpreted as
845 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
846 described above for adjusting connection timeouts on retry also
848 Default: (20, 256, 32768).
851 If you're not using an API client, but only talking
852 directly to a Keep proxy, this parameter specifies an API token
853 to authenticate Keep requests. It is an error to specify both
854 api_client and api_token. If you specify neither, KeepClient
855 will use one available from the Arvados configuration.
858 If specified, this KeepClient will bypass Keep
859 services, and save data to the named directory. If unspecified,
860 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
861 environment variable. If you want to ensure KeepClient does not
862 use local storage, pass in an empty string. This is primarily
863 intended to mock a server for testing.
866 The default number of times to retry failed requests.
867 This will be used as the default num_retries value when get() and
868 put() are called. Default 10.
870 self.lock = threading.Lock()
872 if config.get('ARVADOS_KEEP_SERVICES'):
873 proxy = config.get('ARVADOS_KEEP_SERVICES')
875 proxy = config.get('ARVADOS_KEEP_PROXY')
876 if api_token is None:
877 if api_client is None:
878 api_token = config.get('ARVADOS_API_TOKEN')
880 api_token = api_client.api_token
881 elif api_client is not None:
883 "can't build KeepClient with both API client and token")
884 if local_store is None:
885 local_store = os.environ.get('KEEP_LOCAL_STORE')
887 if api_client is None:
888 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
890 self.insecure = api_client.insecure
892 self.block_cache = block_cache if block_cache else KeepBlockCache()
893 self.timeout = timeout
894 self.proxy_timeout = proxy_timeout
895 self._user_agent_pool = queue.LifoQueue()
896 self.upload_counter = Counter()
897 self.download_counter = Counter()
898 self.put_counter = Counter()
899 self.get_counter = Counter()
900 self.hits_counter = Counter()
901 self.misses_counter = Counter()
902 self._storage_classes_unsupported_warning = False
903 self._default_classes = []
904 if num_prefetch_threads is not None:
905 self.num_prefetch_threads = num_prefetch_threads
907 self.num_prefetch_threads = 2
908 self._prefetch_queue = None
909 self._prefetch_threads = None
912 self.local_store = local_store
913 self.head = self.local_store_head
914 self.get = self.local_store_get
915 self.put = self.local_store_put
917 self.num_retries = num_retries
918 self.max_replicas_per_service = None
920 proxy_uris = proxy.split()
921 for i in range(len(proxy_uris)):
922 if not proxy_uris[i].endswith('/'):
925 url = urllib.parse.urlparse(proxy_uris[i])
926 if not (url.scheme and url.netloc):
927 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
928 self.api_token = api_token
929 self._gateway_services = {}
930 self._keep_services = [{
931 'uuid': "00000-bi6l4-%015d" % idx,
932 'service_type': 'proxy',
933 '_service_root': uri,
934 } for idx, uri in enumerate(proxy_uris)]
935 self._writable_services = self._keep_services
936 self.using_proxy = True
937 self._static_services_list = True
939 # It's important to avoid instantiating an API client
940 # unless we actually need one, for testing's sake.
941 if api_client is None:
942 api_client = arvados.api('v1')
943 self.api_client = api_client
944 self.api_token = api_client.api_token
945 self._gateway_services = {}
946 self._keep_services = None
947 self._writable_services = None
948 self.using_proxy = None
949 self._static_services_list = False
951 self._default_classes = [
952 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
954 # We're talking to an old cluster
957 def current_timeout(self, attempt_number):
958 """Return the appropriate timeout to use for this client.
960 The proxy timeout setting if the backend service is currently a proxy,
961 the regular timeout setting otherwise. The `attempt_number` indicates
962 how many times the operation has been tried already (starting from 0
963 for the first try), and scales the connection timeout portion of the
964 return value accordingly.
967 # TODO(twp): the timeout should be a property of a
968 # KeepService, not a KeepClient. See #4488.
969 t = self.proxy_timeout if self.using_proxy else self.timeout
971 return (t[0] * (1 << attempt_number), t[1])
973 return (t[0] * (1 << attempt_number), t[1], t[2])
974 def _any_nondisk_services(self, service_list):
975 return any(ks.get('service_type', 'disk') != 'disk'
976 for ks in service_list)
978 def build_services_list(self, force_rebuild=False):
979 if (self._static_services_list or
980 (self._keep_services and not force_rebuild)):
984 keep_services = self.api_client.keep_services().accessible()
985 except Exception: # API server predates Keep services.
986 keep_services = self.api_client.keep_disks().list()
988 # Gateway services are only used when specified by UUID,
989 # so there's nothing to gain by filtering them by
991 self._gateway_services = {ks['uuid']: ks for ks in
992 keep_services.execute()['items']}
993 if not self._gateway_services:
994 raise arvados.errors.NoKeepServersError()
996 # Precompute the base URI for each service.
997 for r in self._gateway_services.values():
998 host = r['service_host']
999 if not host.startswith('[') and host.find(':') >= 0:
1000 # IPv6 URIs must be formatted like http://[::1]:80/...
1001 host = '[' + host + ']'
1002 r['_service_root'] = "{}://{}:{:d}/".format(
1003 'https' if r['service_ssl_flag'] else 'http',
1007 _logger.debug(str(self._gateway_services))
1008 self._keep_services = [
1009 ks for ks in self._gateway_services.values()
1010 if not ks.get('service_type', '').startswith('gateway:')]
1011 self._writable_services = [ks for ks in self._keep_services
1012 if not ks.get('read_only')]
1014 # For disk type services, max_replicas_per_service is 1
1015 # It is unknown (unlimited) for other service types.
1016 if self._any_nondisk_services(self._writable_services):
1017 self.max_replicas_per_service = None
1019 self.max_replicas_per_service = 1
1021 def _service_weight(self, data_hash, service_uuid):
1022 """Compute the weight of a Keep service endpoint for a data
1023 block with a known hash.
1025 The weight is md5(h + u) where u is the last 15 characters of
1026 the service endpoint's UUID.
1028 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1030 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1031 """Return an array of Keep service endpoints, in the order in
1032 which they should be probed when reading or writing data with
1033 the given hash+hints.
1035 self.build_services_list(force_rebuild)
1038 # Use the services indicated by the given +K@... remote
1039 # service hints, if any are present and can be resolved to a
1041 for hint in locator.hints:
1042 if hint.startswith('K@'):
1044 sorted_roots.append(
1045 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1046 elif len(hint) == 29:
1047 svc = self._gateway_services.get(hint[2:])
1049 sorted_roots.append(svc['_service_root'])
1051 # Sort the available local services by weight (heaviest first)
1052 # for this locator, and return their service_roots (base URIs)
1054 use_services = self._keep_services
1056 use_services = self._writable_services
1057 self.using_proxy = self._any_nondisk_services(use_services)
1058 sorted_roots.extend([
1059 svc['_service_root'] for svc in sorted(
1062 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1063 _logger.debug("{}: {}".format(locator, sorted_roots))
1066 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1067 # roots_map is a dictionary, mapping Keep service root strings
1068 # to KeepService objects. Poll for Keep services, and add any
1069 # new ones to roots_map. Return the current list of local
1071 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1072 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1073 for root in local_roots:
1074 if root not in roots_map:
1075 roots_map[root] = self.KeepService(
1076 root, self._user_agent_pool,
1077 upload_counter=self.upload_counter,
1078 download_counter=self.download_counter,
1080 insecure=self.insecure)
1084 def _check_loop_result(result):
1085 # KeepClient RetryLoops should save results as a 2-tuple: the
1086 # actual result of the request, and the number of servers available
1087 # to receive the request this round.
1088 # This method returns True if there's a real result, False if
1089 # there are no more servers available, otherwise None.
1090 if isinstance(result, Exception):
1092 result, tried_server_count = result
1093 if (result is not None) and (result is not False):
1095 elif tried_server_count < 1:
1096 _logger.info("No more Keep services to try; giving up")
1101 def get_from_cache(self, loc_s):
1102 """Fetch a block only if is in the cache, otherwise return None."""
1103 locator = KeepLocator(loc_s)
1104 slot = self.block_cache.get(locator.md5sum)
1105 if slot is not None and slot.ready.is_set():
1110 def refresh_signature(self, loc):
1111 """Ask Keep to get the remote block and return its local signature"""
1112 now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1113 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1116 def head(self, loc_s, **kwargs):
1117 return self._get_or_head(loc_s, method="HEAD", **kwargs)
1120 def get(self, loc_s, **kwargs):
1121 return self._get_or_head(loc_s, method="GET", **kwargs)
1123 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1124 """Get data from Keep.
1126 This method fetches one or more blocks of data from Keep. It
1127 sends a request each Keep service registered with the API
1128 server (or the proxy provided when this client was
1129 instantiated), then each service named in location hints, in
1130 sequence. As soon as one service provides the data, it's
1134 * loc_s: A string of one or more comma-separated locators to fetch.
1135 This method returns the concatenation of these blocks.
1136 * num_retries: The number of times to retry GET requests to
1137 *each* Keep server if it returns temporary failures, with
1138 exponential backoff. Note that, in each loop, the method may try
1139 to fetch data from every available Keep service, along with any
1140 that are named in location hints in the locator. The default value
1141 is set when the KeepClient is initialized.
1144 return ''.join(self.get(x) for x in loc_s.split(','))
1146 self.get_counter.add(1)
1148 request_id = (request_id or
1149 (hasattr(self, 'api_client') and self.api_client.request_id) or
1150 arvados.util.new_request_id())
1153 headers['X-Request-Id'] = request_id
1158 locator = KeepLocator(loc_s)
1161 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1163 # Fresh and empty "first time it is used" slot
1166 # this is request for a prefetch to fill in
1167 # the cache, don't need to wait for the
1168 # result, so if it is already in flight return
1169 # immediately. Clear 'slot' to prevent
1170 # finally block from calling slot.set()
1171 if slot.ready.is_set():
1177 if blob is not None:
1178 self.hits_counter.add(1)
1181 # If blob is None, this means either
1183 # (a) another thread was fetching this block and
1184 # failed with an error or
1186 # (b) cache thrashing caused the slot to be
1187 # evicted (content set to None) by another thread
1188 # between the call to reserve_cache() and get().
1190 # We'll handle these cases by reserving a new slot
1191 # and then doing a full GET request.
1194 self.misses_counter.add(1)
1196 # If the locator has hints specifying a prefix (indicating a
1197 # remote keepproxy) or the UUID of a local gateway service,
1198 # read data from the indicated service(s) instead of the usual
1199 # list of local disk services.
1200 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1201 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1202 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1203 for hint in locator.hints if (
1204 hint.startswith('K@') and
1206 self._gateway_services.get(hint[2:])
1208 # Map root URLs to their KeepService objects.
1210 root: self.KeepService(root, self._user_agent_pool,
1211 upload_counter=self.upload_counter,
1212 download_counter=self.download_counter,
1214 insecure=self.insecure)
1215 for root in hint_roots
1218 # See #3147 for a discussion of the loop implementation. Highlights:
1219 # * Refresh the list of Keep services after each failure, in case
1220 # it's being updated.
1221 # * Retry until we succeed, we're out of retries, or every available
1222 # service has returned permanent failure.
1225 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1227 for tries_left in loop:
1229 sorted_roots = self.map_new_services(
1231 force_rebuild=(tries_left < num_retries),
1232 need_writable=False,
1234 except Exception as error:
1235 loop.save_result(error)
1238 # Query KeepService objects that haven't returned
1239 # permanent failure, in our specified shuffle order.
1240 services_to_try = [roots_map[root]
1241 for root in sorted_roots
1242 if roots_map[root].usable()]
1243 for keep_service in services_to_try:
1244 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1245 if blob is not None:
1247 loop.save_result((blob, len(services_to_try)))
1249 # Always cache the result, then return it if we succeeded.
1253 if slot is not None:
1254 self.block_cache.set(slot, blob)
1256 # Q: Including 403 is necessary for the Keep tests to continue
1257 # passing, but maybe they should expect KeepReadError instead?
1258 not_founds = sum(1 for key in sorted_roots
1259 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1260 service_errors = ((key, roots_map[key].last_result()['error'])
1261 for key in sorted_roots)
1263 raise arvados.errors.KeepReadError(
1264 "[{}] failed to read {}: no Keep services available ({})".format(
1265 request_id, loc_s, loop.last_result()))
1266 elif not_founds == len(sorted_roots):
1267 raise arvados.errors.NotFoundError(
1268 "[{}] {} not found".format(request_id, loc_s), service_errors)
1270 raise arvados.errors.KeepReadError(
1271 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1274 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1275 """Save data in Keep.
1277 This method will get a list of Keep services from the API server, and
1278 send the data to each one simultaneously in a new thread. Once the
1279 uploads are finished, if enough copies are saved, this method returns
1280 the most recent HTTP response body. If requests fail to upload
1281 enough copies, this method raises KeepWriteError.
1284 * data: The string of data to upload.
1285 * copies: The number of copies that the user requires be saved.
1287 * num_retries: The number of times to retry PUT requests to
1288 *each* Keep server if it returns temporary failures, with
1289 exponential backoff. The default value is set when the
1290 KeepClient is initialized.
1291 * classes: An optional list of storage class names where copies should
1295 classes = classes or self._default_classes
1297 if not isinstance(data, bytes):
1298 data = data.encode()
1300 self.put_counter.add(1)
1302 data_hash = hashlib.md5(data).hexdigest()
1303 loc_s = data_hash + '+' + str(len(data))
1306 locator = KeepLocator(loc_s)
1308 request_id = (request_id or
1309 (hasattr(self, 'api_client') and self.api_client.request_id) or
1310 arvados.util.new_request_id())
1312 'X-Request-Id': request_id,
1313 'X-Keep-Desired-Replicas': str(copies),
1316 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1320 for tries_left in loop:
1322 sorted_roots = self.map_new_services(
1324 force_rebuild=(tries_left < num_retries),
1327 except Exception as error:
1328 loop.save_result(error)
1331 pending_classes = []
1332 if done_classes is not None:
1333 pending_classes = list(set(classes) - set(done_classes))
1334 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1335 data_hash=data_hash,
1336 copies=copies - done_copies,
1337 max_service_replicas=self.max_replicas_per_service,
1338 timeout=self.current_timeout(num_retries - tries_left),
1339 classes=pending_classes)
1340 for service_root, ks in [(root, roots_map[root])
1341 for root in sorted_roots]:
1344 writer_pool.add_task(ks, service_root)
1346 pool_copies, pool_classes = writer_pool.done()
1347 done_copies += pool_copies
1348 if (done_classes is not None) and (pool_classes is not None):
1349 done_classes += pool_classes
1351 (done_copies >= copies and set(done_classes) == set(classes),
1352 writer_pool.total_task_nr))
1354 # Old keepstore contacted without storage classes support:
1355 # success is determined only by successful copies.
1357 # Disable storage classes tracking from this point forward.
1358 if not self._storage_classes_unsupported_warning:
1359 self._storage_classes_unsupported_warning = True
1360 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1363 (done_copies >= copies, writer_pool.total_task_nr))
1366 return writer_pool.response()
1368 raise arvados.errors.KeepWriteError(
1369 "[{}] failed to write {}: no Keep services available ({})".format(
1370 request_id, data_hash, loop.last_result()))
1372 service_errors = ((key, roots_map[key].last_result()['error'])
1373 for key in sorted_roots
1374 if roots_map[key].last_result()['error'])
1375 raise arvados.errors.KeepWriteError(
1376 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1377 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1379 def _block_prefetch_worker(self):
1380 """The background downloader thread."""
1383 b = self._prefetch_queue.get()
1386 self.get(b, prefetch=True)
1388 _logger.exception("Exception doing block prefetch")
1390 def _start_prefetch_threads(self):
1391 if self._prefetch_threads is None:
1393 if self._prefetch_threads is not None:
1395 self._prefetch_queue = queue.Queue()
1396 self._prefetch_threads = []
1397 for i in range(0, self.num_prefetch_threads):
1398 thread = threading.Thread(target=self._block_prefetch_worker)
1399 self._prefetch_threads.append(thread)
1400 thread.daemon = True
1403 def block_prefetch(self, locator):
1405 This relies on the fact that KeepClient implements a block cache,
1406 so repeated requests for the same block will not result in repeated
1407 downloads (unless the block is evicted from the cache.) This method
1411 if self.block_cache.get(locator) is not None:
1414 self._start_prefetch_threads()
1415 self._prefetch_queue.put(locator)
1417 def stop_prefetch_threads(self):
1419 if self._prefetch_threads is not None:
1420 for t in self._prefetch_threads:
1421 self._prefetch_queue.put(None)
1422 for t in self._prefetch_threads:
1424 self._prefetch_threads = None
1425 self._prefetch_queue = None
1427 def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1428 """A stub for put().
1430 This method is used in place of the real put() method when
1431 using local storage (see constructor's local_store argument).
1433 copies and num_retries arguments are ignored: they are here
1434 only for the sake of offering the same call signature as
1437 Data stored this way can be retrieved via local_store_get().
1439 md5 = hashlib.md5(data).hexdigest()
1440 locator = '%s+%d' % (md5, len(data))
1441 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1443 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1444 os.path.join(self.local_store, md5))
1447 def local_store_get(self, loc_s, num_retries=None):
1448 """Companion to local_store_put()."""
1450 locator = KeepLocator(loc_s)
1452 raise arvados.errors.NotFoundError(
1453 "Invalid data locator: '%s'" % loc_s)
1454 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1456 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1459 def local_store_head(self, loc_s, num_retries=None):
1460 """Companion to local_store_put()."""
1462 locator = KeepLocator(loc_s)
1464 raise arvados.errors.NotFoundError(
1465 "Invalid data locator: '%s'" % loc_s)
1466 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1468 if os.path.exists(os.path.join(self.local_store, locator.md5sum)):