1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from __future__ import division
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
36 if sys.version_info >= (3, 0):
37 from io import BytesIO
39 from cStringIO import StringIO as BytesIO
42 import arvados.config as config
44 import arvados.retry as retry
46 import arvados.diskcache
47 from arvados._pycurlhelper import PyCurlHelper
49 _logger = logging.getLogger('arvados.keep')
50 global_client_object = None
53 # Monkey patch TCP constants when not available (apple). Values sourced from:
54 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
55 if sys.platform == 'darwin':
56 if not hasattr(socket, 'TCP_KEEPALIVE'):
57 socket.TCP_KEEPALIVE = 0x010
58 if not hasattr(socket, 'TCP_KEEPINTVL'):
59 socket.TCP_KEEPINTVL = 0x101
60 if not hasattr(socket, 'TCP_KEEPCNT'):
61 socket.TCP_KEEPCNT = 0x102
64 class KeepLocator(object):
65 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
66 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
68 def __init__(self, locator_str):
71 self._perm_expiry = None
72 pieces = iter(locator_str.split('+'))
73 self.md5sum = next(pieces)
75 self.size = int(next(pieces))
79 if self.HINT_RE.match(hint) is None:
80 raise ValueError("invalid hint format: {}".format(hint))
81 elif hint.startswith('A'):
82 self.parse_permission_hint(hint)
84 self.hints.append(hint)
89 for s in [self.md5sum, self.size,
90 self.permission_hint()] + self.hints
94 if self.size is not None:
95 return "%s+%i" % (self.md5sum, self.size)
99 def _make_hex_prop(name, length):
100 # Build and return a new property with the given name that
101 # must be a hex string of the given length.
102 data_name = '_{}'.format(name)
104 return getattr(self, data_name)
105 def setter(self, hex_str):
106 if not arvados.util.is_hex(hex_str, length):
107 raise ValueError("{} is not a {}-digit hex string: {!r}".
108 format(name, length, hex_str))
109 setattr(self, data_name, hex_str)
110 return property(getter, setter)
112 md5sum = _make_hex_prop('md5sum', 32)
113 perm_sig = _make_hex_prop('perm_sig', 40)
116 def perm_expiry(self):
117 return self._perm_expiry
120 def perm_expiry(self, value):
121 if not arvados.util.is_hex(value, 1, 8):
123 "permission timestamp must be a hex Unix timestamp: {}".
125 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
127 def permission_hint(self):
128 data = [self.perm_sig, self.perm_expiry]
131 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
132 return "A{}@{:08x}".format(*data)
134 def parse_permission_hint(self, s):
136 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
138 raise ValueError("bad permission hint {}".format(s))
140 def permission_expired(self, as_of_dt=None):
141 if self.perm_expiry is None:
143 elif as_of_dt is None:
144 as_of_dt = datetime.datetime.now()
145 return self.perm_expiry <= as_of_dt
149 """Simple interface to a global KeepClient object.
151 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
152 own API client. The global KeepClient will build an API client from the
153 current Arvados configuration, which may not match the one you built.
158 def global_client_object(cls):
159 global global_client_object
160 # Previously, KeepClient would change its behavior at runtime based
161 # on these configuration settings. We simulate that behavior here
162 # by checking the values and returning a new KeepClient if any of
164 key = (config.get('ARVADOS_API_HOST'),
165 config.get('ARVADOS_API_TOKEN'),
166 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
167 config.get('ARVADOS_KEEP_PROXY'),
168 os.environ.get('KEEP_LOCAL_STORE'))
169 if (global_client_object is None) or (cls._last_key != key):
170 global_client_object = KeepClient()
172 return global_client_object
175 def get(locator, **kwargs):
176 return Keep.global_client_object().get(locator, **kwargs)
179 def put(data, **kwargs):
180 return Keep.global_client_object().put(data, **kwargs)
182 class KeepBlockCache(object):
183 def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
184 self.cache_max = cache_max
185 self._cache = collections.OrderedDict()
186 self._cache_lock = threading.Lock()
187 self._max_slots = max_slots
188 self._disk_cache = disk_cache
189 self._disk_cache_dir = disk_cache_dir
190 self._cache_updating = threading.Condition(self._cache_lock)
192 if self._disk_cache and self._disk_cache_dir is None:
193 self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
194 os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
196 if self._max_slots == 0:
198 # Each block uses two file descriptors, one used to
199 # open it initially and hold the flock(), and a second
200 # hidden one used by mmap().
202 # Set max slots to 1/8 of maximum file handles. This
203 # means we'll use at most 1/4 of total file handles.
205 # NOFILE typically defaults to 1024 on Linux so this
206 # is 128 slots (256 file handles), which means we can
207 # cache up to 8 GiB of 64 MiB blocks. This leaves
208 # 768 file handles for sockets and other stuff.
210 # When we want the ability to have more cache (e.g. in
211 # arv-mount) we'll increase rlimit before calling
213 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
216 self._max_slots = 512
218 if self.cache_max == 0:
220 fs = os.statvfs(self._disk_cache_dir)
221 # Calculation of available space incorporates existing cache usage
222 existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
223 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
224 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
226 # 10% of total disk size
227 # 25% of available space
229 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
232 self.cache_max = (256 * 1024 * 1024)
234 self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
238 self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
239 for slot in self._cache.values():
240 self.cache_total += slot.size()
243 class CacheSlot(object):
244 __slots__ = ("locator", "ready", "content")
246 def __init__(self, locator):
247 self.locator = locator
248 self.ready = threading.Event()
255 def set(self, value):
256 if self.content is not None:
263 if self.content is None:
266 return len(self.content)
272 def _resize_cache(self, cache_max, max_slots):
273 # Try and make sure the contents of the cache do not exceed
274 # the supplied maximums.
276 if self.cache_total <= cache_max and len(self._cache) <= max_slots:
279 _evict_candidates = collections.deque(self._cache.values())
280 while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots):
281 slot = _evict_candidates.popleft()
282 if not slot.ready.is_set():
287 self.cache_total -= sz
288 del self._cache[slot.locator]
292 '''Cap the cache size to self.cache_max'''
293 with self._cache_updating:
294 self._resize_cache(self.cache_max, self._max_slots)
295 self._cache_updating.notify_all()
297 def _get(self, locator):
298 # Test if the locator is already in the cache
299 if locator in self._cache:
300 n = self._cache[locator]
301 if n.ready.is_set() and n.content is None:
302 del self._cache[n.locator]
304 self._cache.move_to_end(locator)
307 # see if it exists on disk
308 n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
310 self._cache[n.locator] = n
311 self.cache_total += n.size()
315 def get(self, locator):
316 with self._cache_lock:
317 return self._get(locator)
319 def reserve_cache(self, locator):
320 '''Reserve a cache slot for the specified locator,
321 or return the existing slot.'''
322 with self._cache_updating:
323 n = self._get(locator)
327 # Add a new cache slot for the locator
328 self._resize_cache(self.cache_max, self._max_slots-1)
329 while len(self._cache) >= self._max_slots:
330 # If there isn't a slot available, need to wait
331 # for something to happen that releases one of the
332 # cache slots. Idle for 200 ms or woken up by
334 self._cache_updating.wait(timeout=0.2)
335 self._resize_cache(self.cache_max, self._max_slots-1)
338 n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
340 n = KeepBlockCache.CacheSlot(locator)
341 self._cache[n.locator] = n
344 def set(self, slot, blob):
347 self.cache_total += slot.size()
350 if e.errno == errno.ENOMEM:
351 # Reduce max slots to current - 4, cap cache and retry
352 with self._cache_lock:
353 self._max_slots = max(4, len(self._cache) - 4)
354 elif e.errno == errno.ENOSPC:
355 # Reduce disk max space to current - 256 MiB, cap cache and retry
356 with self._cache_lock:
357 sm = sum(st.size() for st in self._cache.values())
358 self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
359 elif e.errno == errno.ENODEV:
360 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
361 except Exception as e:
364 # Check if we should evict things from the cache. Either
365 # because we added a new thing or there was an error and
366 # we possibly adjusted the limits down, so we might need
367 # to push something out.
371 # Only gets here if there was an error the first time. The
372 # exception handler adjusts limits downward in some cases
373 # to free up resources, which would make the operation
376 self.cache_total += slot.size()
377 except Exception as e:
378 # It failed again. Give up.
380 raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
384 class Counter(object):
385 def __init__(self, v=0):
386 self._lk = threading.Lock()
398 class KeepClient(object):
399 DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
400 DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
402 class KeepService(PyCurlHelper):
403 """Make requests to a single Keep service, and track results.
405 A KeepService is intended to last long enough to perform one
406 transaction (GET or PUT) against one Keep service. This can
407 involve calling either get() or put() multiple times in order
408 to retry after transient failures. However, calling both get()
409 and put() on a single instance -- or using the same instance
410 to access two different Keep services -- will not produce
417 arvados.errors.HttpError,
420 def __init__(self, root, user_agent_pool=queue.LifoQueue(),
422 download_counter=None,
425 super(KeepClient.KeepService, self).__init__()
427 self._user_agent_pool = user_agent_pool
428 self._result = {'error': None}
432 self.get_headers = {'Accept': 'application/octet-stream'}
433 self.get_headers.update(headers)
434 self.put_headers = headers
435 self.upload_counter = upload_counter
436 self.download_counter = download_counter
437 self.insecure = insecure
440 """Is it worth attempting a request?"""
444 """Did the request succeed or encounter permanent failure?"""
445 return self._result['error'] == False or not self._usable
447 def last_result(self):
450 def _get_user_agent(self):
452 return self._user_agent_pool.get(block=False)
456 def _put_user_agent(self, ua):
459 self._user_agent_pool.put(ua, block=False)
463 def get(self, locator, method="GET", timeout=None):
464 # locator is a KeepLocator object.
465 url = self.root + str(locator)
466 _logger.debug("Request: %s %s", method, url)
467 curl = self._get_user_agent()
470 with timer.Timer() as t:
472 response_body = BytesIO()
473 curl.setopt(pycurl.NOSIGNAL, 1)
474 curl.setopt(pycurl.OPENSOCKETFUNCTION,
475 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
476 curl.setopt(pycurl.URL, url.encode('utf-8'))
477 curl.setopt(pycurl.HTTPHEADER, [
478 '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
479 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
480 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
482 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
483 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
485 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
487 curl.setopt(pycurl.NOBODY, True)
489 curl.setopt(pycurl.HTTPGET, True)
490 self._setcurltimeouts(curl, timeout, method=="HEAD")
494 except Exception as e:
495 raise arvados.errors.HttpError(0, str(e))
501 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
502 'body': response_body.getvalue(),
503 'headers': self._headers,
507 ok = retry.check_http_response_success(self._result['status_code'])
509 self._result['error'] = arvados.errors.HttpError(
510 self._result['status_code'],
511 self._headers.get('x-status-line', 'Error'))
512 except self.HTTP_ERRORS as e:
516 self._usable = ok != False
517 if self._result.get('status_code', None):
518 # The client worked well enough to get an HTTP status
519 # code, so presumably any problems are just on the
520 # server side and it's OK to reuse the client.
521 self._put_user_agent(curl)
523 # Don't return this client to the pool, in case it's
527 _logger.debug("Request fail: GET %s => %s: %s",
528 url, type(self._result['error']), str(self._result['error']))
531 _logger.info("HEAD %s: %s bytes",
532 self._result['status_code'],
533 self._result.get('content-length'))
534 if self._result['headers'].get('x-keep-locator'):
535 # This is a response to a remote block copy request, return
536 # the local copy block locator.
537 return self._result['headers'].get('x-keep-locator')
540 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
541 self._result['status_code'],
542 len(self._result['body']),
544 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
546 if self.download_counter:
547 self.download_counter.add(len(self._result['body']))
548 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
549 if resp_md5 != locator.md5sum:
550 _logger.warning("Checksum fail: md5(%s) = %s",
552 self._result['error'] = arvados.errors.HttpError(
555 return self._result['body']
557 def put(self, hash_s, body, timeout=None, headers={}):
558 put_headers = copy.copy(self.put_headers)
559 put_headers.update(headers)
560 url = self.root + hash_s
561 _logger.debug("Request: PUT %s", url)
562 curl = self._get_user_agent()
565 with timer.Timer() as t:
567 body_reader = BytesIO(body)
568 response_body = BytesIO()
569 curl.setopt(pycurl.NOSIGNAL, 1)
570 curl.setopt(pycurl.OPENSOCKETFUNCTION,
571 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
572 curl.setopt(pycurl.URL, url.encode('utf-8'))
573 # Using UPLOAD tells cURL to wait for a "go ahead" from the
574 # Keep server (in the form of a HTTP/1.1 "100 Continue"
575 # response) instead of sending the request body immediately.
576 # This allows the server to reject the request if the request
577 # is invalid or the server is read-only, without waiting for
578 # the client to send the entire block.
579 curl.setopt(pycurl.UPLOAD, True)
580 curl.setopt(pycurl.INFILESIZE, len(body))
581 curl.setopt(pycurl.READFUNCTION, body_reader.read)
582 curl.setopt(pycurl.HTTPHEADER, [
583 '{}: {}'.format(k,v) for k,v in put_headers.items()])
584 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
585 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
587 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
588 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
590 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
591 self._setcurltimeouts(curl, timeout)
594 except Exception as e:
595 raise arvados.errors.HttpError(0, str(e))
601 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
602 'body': response_body.getvalue().decode('utf-8'),
603 'headers': self._headers,
606 ok = retry.check_http_response_success(self._result['status_code'])
608 self._result['error'] = arvados.errors.HttpError(
609 self._result['status_code'],
610 self._headers.get('x-status-line', 'Error'))
611 except self.HTTP_ERRORS as e:
615 self._usable = ok != False # still usable if ok is True or None
616 if self._result.get('status_code', None):
617 # Client is functional. See comment in get().
618 self._put_user_agent(curl)
622 _logger.debug("Request fail: PUT %s => %s: %s",
623 url, type(self._result['error']), str(self._result['error']))
625 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
626 self._result['status_code'],
629 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
630 if self.upload_counter:
631 self.upload_counter.add(len(body))
635 class KeepWriterQueue(queue.Queue):
636 def __init__(self, copies, classes=[]):
637 queue.Queue.__init__(self) # Old-style superclass
638 self.wanted_copies = copies
639 self.wanted_storage_classes = classes
640 self.successful_copies = 0
641 self.confirmed_storage_classes = {}
643 self.storage_classes_tracking = True
644 self.queue_data_lock = threading.RLock()
645 self.pending_tries = max(copies, len(classes))
646 self.pending_tries_notification = threading.Condition()
648 def write_success(self, response, replicas_nr, classes_confirmed):
649 with self.queue_data_lock:
650 self.successful_copies += replicas_nr
651 if classes_confirmed is None:
652 self.storage_classes_tracking = False
653 elif self.storage_classes_tracking:
654 for st_class, st_copies in classes_confirmed.items():
656 self.confirmed_storage_classes[st_class] += st_copies
658 self.confirmed_storage_classes[st_class] = st_copies
659 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
660 self.response = response
661 with self.pending_tries_notification:
662 self.pending_tries_notification.notify_all()
664 def write_fail(self, ks):
665 with self.pending_tries_notification:
666 self.pending_tries += 1
667 self.pending_tries_notification.notify()
669 def pending_copies(self):
670 with self.queue_data_lock:
671 return self.wanted_copies - self.successful_copies
673 def satisfied_classes(self):
674 with self.queue_data_lock:
675 if not self.storage_classes_tracking:
676 # Notifies disabled storage classes expectation to
679 return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
681 def pending_classes(self):
682 with self.queue_data_lock:
683 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
685 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
686 for st_class, st_copies in self.confirmed_storage_classes.items():
687 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
688 unsatisfied_classes.remove(st_class)
689 return unsatisfied_classes
691 def get_next_task(self):
692 with self.pending_tries_notification:
694 if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
695 # This notify_all() is unnecessary --
696 # write_success() already called notify_all()
697 # when pending<1 became true, so it's not
698 # possible for any other thread to be in
699 # wait() now -- but it's cheap insurance
700 # against deadlock so we do it anyway:
701 self.pending_tries_notification.notify_all()
702 # Drain the queue and then raise Queue.Empty
706 elif self.pending_tries > 0:
707 service, service_root = self.get_nowait()
708 if service.finished():
711 self.pending_tries -= 1
712 return service, service_root
714 self.pending_tries_notification.notify_all()
717 self.pending_tries_notification.wait()
720 class KeepWriterThreadPool(object):
721 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
722 self.total_task_nr = 0
723 if (not max_service_replicas) or (max_service_replicas >= copies):
726 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
727 _logger.debug("Pool max threads is %d", num_threads)
729 self.queue = KeepClient.KeepWriterQueue(copies, classes)
731 for _ in range(num_threads):
732 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
733 self.workers.append(w)
735 def add_task(self, ks, service_root):
736 self.queue.put((ks, service_root))
737 self.total_task_nr += 1
740 return self.queue.successful_copies, self.queue.satisfied_classes()
744 for worker in self.workers:
746 # Wait for finished work
750 return self.queue.response
753 class KeepWriterThread(threading.Thread):
754 class TaskFailed(RuntimeError): pass
756 def __init__(self, queue, data, data_hash, timeout=None):
757 super(KeepClient.KeepWriterThread, self).__init__()
758 self.timeout = timeout
761 self.data_hash = data_hash
767 service, service_root = self.queue.get_next_task()
771 locator, copies, classes = self.do_task(service, service_root)
772 except Exception as e:
773 if not isinstance(e, self.TaskFailed):
774 _logger.exception("Exception in KeepWriterThread")
775 self.queue.write_fail(service)
777 self.queue.write_success(locator, copies, classes)
779 self.queue.task_done()
781 def do_task(self, service, service_root):
782 classes = self.queue.pending_classes()
786 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
787 success = bool(service.put(self.data_hash,
789 timeout=self.timeout,
791 result = service.last_result()
794 if result.get('status_code'):
795 _logger.debug("Request fail: PUT %s => %s %s",
797 result.get('status_code'),
799 raise self.TaskFailed()
801 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
802 str(threading.current_thread()),
807 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
808 except (KeyError, ValueError):
811 classes_confirmed = {}
813 scch = result['headers']['x-keep-storage-classes-confirmed']
814 for confirmation in scch.replace(' ', '').split(','):
815 if '=' in confirmation:
816 stored_class, stored_copies = confirmation.split('=')[:2]
817 classes_confirmed[stored_class] = int(stored_copies)
818 except (KeyError, ValueError):
819 # Storage classes confirmed header missing or corrupt
820 classes_confirmed = None
822 return result['body'].strip(), replicas_stored, classes_confirmed
825 def __init__(self, api_client=None, proxy=None,
826 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
827 api_token=None, local_store=None, block_cache=None,
828 num_retries=10, session=None, num_prefetch_threads=None):
829 """Initialize a new KeepClient.
833 The API client to use to find Keep services. If not
834 provided, KeepClient will build one from available Arvados
838 If specified, this KeepClient will send requests to this Keep
839 proxy. Otherwise, KeepClient will fall back to the setting of the
840 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
841 If you want to KeepClient does not use a proxy, pass in an empty
845 The initial timeout (in seconds) for HTTP requests to Keep
846 non-proxy servers. A tuple of three floats is interpreted as
847 (connection_timeout, read_timeout, minimum_bandwidth). A connection
848 will be aborted if the average traffic rate falls below
849 minimum_bandwidth bytes per second over an interval of read_timeout
850 seconds. Because timeouts are often a result of transient server
851 load, the actual connection timeout will be increased by a factor
852 of two on each retry.
853 Default: (2, 256, 32768).
856 The initial timeout (in seconds) for HTTP requests to
857 Keep proxies. A tuple of three floats is interpreted as
858 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
859 described above for adjusting connection timeouts on retry also
861 Default: (20, 256, 32768).
864 If you're not using an API client, but only talking
865 directly to a Keep proxy, this parameter specifies an API token
866 to authenticate Keep requests. It is an error to specify both
867 api_client and api_token. If you specify neither, KeepClient
868 will use one available from the Arvados configuration.
871 If specified, this KeepClient will bypass Keep
872 services, and save data to the named directory. If unspecified,
873 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
874 environment variable. If you want to ensure KeepClient does not
875 use local storage, pass in an empty string. This is primarily
876 intended to mock a server for testing.
879 The default number of times to retry failed requests.
880 This will be used as the default num_retries value when get() and
881 put() are called. Default 10.
883 self.lock = threading.Lock()
885 if config.get('ARVADOS_KEEP_SERVICES'):
886 proxy = config.get('ARVADOS_KEEP_SERVICES')
888 proxy = config.get('ARVADOS_KEEP_PROXY')
889 if api_token is None:
890 if api_client is None:
891 api_token = config.get('ARVADOS_API_TOKEN')
893 api_token = api_client.api_token
894 elif api_client is not None:
896 "can't build KeepClient with both API client and token")
897 if local_store is None:
898 local_store = os.environ.get('KEEP_LOCAL_STORE')
900 if api_client is None:
901 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
903 self.insecure = api_client.insecure
905 self.block_cache = block_cache if block_cache else KeepBlockCache()
906 self.timeout = timeout
907 self.proxy_timeout = proxy_timeout
908 self._user_agent_pool = queue.LifoQueue()
909 self.upload_counter = Counter()
910 self.download_counter = Counter()
911 self.put_counter = Counter()
912 self.get_counter = Counter()
913 self.hits_counter = Counter()
914 self.misses_counter = Counter()
915 self._storage_classes_unsupported_warning = False
916 self._default_classes = []
917 if num_prefetch_threads is not None:
918 self.num_prefetch_threads = num_prefetch_threads
920 self.num_prefetch_threads = 2
921 self._prefetch_queue = None
922 self._prefetch_threads = None
925 self.local_store = local_store
926 self.head = self.local_store_head
927 self.get = self.local_store_get
928 self.put = self.local_store_put
930 self.num_retries = num_retries
931 self.max_replicas_per_service = None
933 proxy_uris = proxy.split()
934 for i in range(len(proxy_uris)):
935 if not proxy_uris[i].endswith('/'):
938 url = urllib.parse.urlparse(proxy_uris[i])
939 if not (url.scheme and url.netloc):
940 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
941 self.api_token = api_token
942 self._gateway_services = {}
943 self._keep_services = [{
944 'uuid': "00000-bi6l4-%015d" % idx,
945 'service_type': 'proxy',
946 '_service_root': uri,
947 } for idx, uri in enumerate(proxy_uris)]
948 self._writable_services = self._keep_services
949 self.using_proxy = True
950 self._static_services_list = True
952 # It's important to avoid instantiating an API client
953 # unless we actually need one, for testing's sake.
954 if api_client is None:
955 api_client = arvados.api('v1')
956 self.api_client = api_client
957 self.api_token = api_client.api_token
958 self._gateway_services = {}
959 self._keep_services = None
960 self._writable_services = None
961 self.using_proxy = None
962 self._static_services_list = False
964 self._default_classes = [
965 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
967 # We're talking to an old cluster
970 def current_timeout(self, attempt_number):
971 """Return the appropriate timeout to use for this client.
973 The proxy timeout setting if the backend service is currently a proxy,
974 the regular timeout setting otherwise. The `attempt_number` indicates
975 how many times the operation has been tried already (starting from 0
976 for the first try), and scales the connection timeout portion of the
977 return value accordingly.
980 # TODO(twp): the timeout should be a property of a
981 # KeepService, not a KeepClient. See #4488.
982 t = self.proxy_timeout if self.using_proxy else self.timeout
984 return (t[0] * (1 << attempt_number), t[1])
986 return (t[0] * (1 << attempt_number), t[1], t[2])
987 def _any_nondisk_services(self, service_list):
988 return any(ks.get('service_type', 'disk') != 'disk'
989 for ks in service_list)
991 def build_services_list(self, force_rebuild=False):
992 if (self._static_services_list or
993 (self._keep_services and not force_rebuild)):
997 keep_services = self.api_client.keep_services().accessible()
998 except Exception: # API server predates Keep services.
999 keep_services = self.api_client.keep_disks().list()
1001 # Gateway services are only used when specified by UUID,
1002 # so there's nothing to gain by filtering them by
1004 self._gateway_services = {ks['uuid']: ks for ks in
1005 keep_services.execute()['items']}
1006 if not self._gateway_services:
1007 raise arvados.errors.NoKeepServersError()
1009 # Precompute the base URI for each service.
1010 for r in self._gateway_services.values():
1011 host = r['service_host']
1012 if not host.startswith('[') and host.find(':') >= 0:
1013 # IPv6 URIs must be formatted like http://[::1]:80/...
1014 host = '[' + host + ']'
1015 r['_service_root'] = "{}://{}:{:d}/".format(
1016 'https' if r['service_ssl_flag'] else 'http',
1020 _logger.debug(str(self._gateway_services))
1021 self._keep_services = [
1022 ks for ks in self._gateway_services.values()
1023 if not ks.get('service_type', '').startswith('gateway:')]
1024 self._writable_services = [ks for ks in self._keep_services
1025 if not ks.get('read_only')]
1027 # For disk type services, max_replicas_per_service is 1
1028 # It is unknown (unlimited) for other service types.
1029 if self._any_nondisk_services(self._writable_services):
1030 self.max_replicas_per_service = None
1032 self.max_replicas_per_service = 1
1034 def _service_weight(self, data_hash, service_uuid):
1035 """Compute the weight of a Keep service endpoint for a data
1036 block with a known hash.
1038 The weight is md5(h + u) where u is the last 15 characters of
1039 the service endpoint's UUID.
1041 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1043 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1044 """Return an array of Keep service endpoints, in the order in
1045 which they should be probed when reading or writing data with
1046 the given hash+hints.
1048 self.build_services_list(force_rebuild)
1051 # Use the services indicated by the given +K@... remote
1052 # service hints, if any are present and can be resolved to a
1054 for hint in locator.hints:
1055 if hint.startswith('K@'):
1057 sorted_roots.append(
1058 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1059 elif len(hint) == 29:
1060 svc = self._gateway_services.get(hint[2:])
1062 sorted_roots.append(svc['_service_root'])
1064 # Sort the available local services by weight (heaviest first)
1065 # for this locator, and return their service_roots (base URIs)
1067 use_services = self._keep_services
1069 use_services = self._writable_services
1070 self.using_proxy = self._any_nondisk_services(use_services)
1071 sorted_roots.extend([
1072 svc['_service_root'] for svc in sorted(
1075 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1076 _logger.debug("{}: {}".format(locator, sorted_roots))
1079 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1080 # roots_map is a dictionary, mapping Keep service root strings
1081 # to KeepService objects. Poll for Keep services, and add any
1082 # new ones to roots_map. Return the current list of local
1084 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1085 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1086 for root in local_roots:
1087 if root not in roots_map:
1088 roots_map[root] = self.KeepService(
1089 root, self._user_agent_pool,
1090 upload_counter=self.upload_counter,
1091 download_counter=self.download_counter,
1093 insecure=self.insecure)
1097 def _check_loop_result(result):
1098 # KeepClient RetryLoops should save results as a 2-tuple: the
1099 # actual result of the request, and the number of servers available
1100 # to receive the request this round.
1101 # This method returns True if there's a real result, False if
1102 # there are no more servers available, otherwise None.
1103 if isinstance(result, Exception):
1105 result, tried_server_count = result
1106 if (result is not None) and (result is not False):
1108 elif tried_server_count < 1:
1109 _logger.info("No more Keep services to try; giving up")
1114 def get_from_cache(self, loc_s):
1115 """Fetch a block only if is in the cache, otherwise return None."""
1116 locator = KeepLocator(loc_s)
1117 slot = self.block_cache.get(locator.md5sum)
1118 if slot is not None and slot.ready.is_set():
1123 def refresh_signature(self, loc):
1124 """Ask Keep to get the remote block and return its local signature"""
1125 now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1126 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1129 def head(self, loc_s, **kwargs):
1130 return self._get_or_head(loc_s, method="HEAD", **kwargs)
1133 def get(self, loc_s, **kwargs):
1134 return self._get_or_head(loc_s, method="GET", **kwargs)
1136 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1137 """Get data from Keep.
1139 This method fetches one or more blocks of data from Keep. It
1140 sends a request each Keep service registered with the API
1141 server (or the proxy provided when this client was
1142 instantiated), then each service named in location hints, in
1143 sequence. As soon as one service provides the data, it's
1147 * loc_s: A string of one or more comma-separated locators to fetch.
1148 This method returns the concatenation of these blocks.
1149 * num_retries: The number of times to retry GET requests to
1150 *each* Keep server if it returns temporary failures, with
1151 exponential backoff. Note that, in each loop, the method may try
1152 to fetch data from every available Keep service, along with any
1153 that are named in location hints in the locator. The default value
1154 is set when the KeepClient is initialized.
1157 return ''.join(self.get(x) for x in loc_s.split(','))
1159 self.get_counter.add(1)
1161 request_id = (request_id or
1162 (hasattr(self, 'api_client') and self.api_client.request_id) or
1163 arvados.util.new_request_id())
1166 headers['X-Request-Id'] = request_id
1171 locator = KeepLocator(loc_s)
1174 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1176 # Fresh and empty "first time it is used" slot
1179 # this is request for a prefetch to fill in
1180 # the cache, don't need to wait for the
1181 # result, so if it is already in flight return
1182 # immediately. Clear 'slot' to prevent
1183 # finally block from calling slot.set()
1184 if slot.ready.is_set():
1190 if blob is not None:
1191 self.hits_counter.add(1)
1194 # If blob is None, this means either
1196 # (a) another thread was fetching this block and
1197 # failed with an error or
1199 # (b) cache thrashing caused the slot to be
1200 # evicted (content set to None) by another thread
1201 # between the call to reserve_cache() and get().
1203 # We'll handle these cases by reserving a new slot
1204 # and then doing a full GET request.
1207 self.misses_counter.add(1)
1209 # If the locator has hints specifying a prefix (indicating a
1210 # remote keepproxy) or the UUID of a local gateway service,
1211 # read data from the indicated service(s) instead of the usual
1212 # list of local disk services.
1213 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1214 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1215 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1216 for hint in locator.hints if (
1217 hint.startswith('K@') and
1219 self._gateway_services.get(hint[2:])
1221 # Map root URLs to their KeepService objects.
1223 root: self.KeepService(root, self._user_agent_pool,
1224 upload_counter=self.upload_counter,
1225 download_counter=self.download_counter,
1227 insecure=self.insecure)
1228 for root in hint_roots
1231 # See #3147 for a discussion of the loop implementation. Highlights:
1232 # * Refresh the list of Keep services after each failure, in case
1233 # it's being updated.
1234 # * Retry until we succeed, we're out of retries, or every available
1235 # service has returned permanent failure.
1238 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1240 for tries_left in loop:
1242 sorted_roots = self.map_new_services(
1244 force_rebuild=(tries_left < num_retries),
1245 need_writable=False,
1247 except Exception as error:
1248 loop.save_result(error)
1251 # Query KeepService objects that haven't returned
1252 # permanent failure, in our specified shuffle order.
1253 services_to_try = [roots_map[root]
1254 for root in sorted_roots
1255 if roots_map[root].usable()]
1256 for keep_service in services_to_try:
1257 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1258 if blob is not None:
1260 loop.save_result((blob, len(services_to_try)))
1262 # Always cache the result, then return it if we succeeded.
1266 if slot is not None:
1267 self.block_cache.set(slot, blob)
1269 # Q: Including 403 is necessary for the Keep tests to continue
1270 # passing, but maybe they should expect KeepReadError instead?
1271 not_founds = sum(1 for key in sorted_roots
1272 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1273 service_errors = ((key, roots_map[key].last_result()['error'])
1274 for key in sorted_roots)
1276 raise arvados.errors.KeepReadError(
1277 "[{}] failed to read {}: no Keep services available ({})".format(
1278 request_id, loc_s, loop.last_result()))
1279 elif not_founds == len(sorted_roots):
1280 raise arvados.errors.NotFoundError(
1281 "[{}] {} not found".format(request_id, loc_s), service_errors)
1283 raise arvados.errors.KeepReadError(
1284 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1287 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1288 """Save data in Keep.
1290 This method will get a list of Keep services from the API server, and
1291 send the data to each one simultaneously in a new thread. Once the
1292 uploads are finished, if enough copies are saved, this method returns
1293 the most recent HTTP response body. If requests fail to upload
1294 enough copies, this method raises KeepWriteError.
1297 * data: The string of data to upload.
1298 * copies: The number of copies that the user requires be saved.
1300 * num_retries: The number of times to retry PUT requests to
1301 *each* Keep server if it returns temporary failures, with
1302 exponential backoff. The default value is set when the
1303 KeepClient is initialized.
1304 * classes: An optional list of storage class names where copies should
1308 classes = classes or self._default_classes
1310 if not isinstance(data, bytes):
1311 data = data.encode()
1313 self.put_counter.add(1)
1315 data_hash = hashlib.md5(data).hexdigest()
1316 loc_s = data_hash + '+' + str(len(data))
1319 locator = KeepLocator(loc_s)
1321 request_id = (request_id or
1322 (hasattr(self, 'api_client') and self.api_client.request_id) or
1323 arvados.util.new_request_id())
1325 'X-Request-Id': request_id,
1326 'X-Keep-Desired-Replicas': str(copies),
1329 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1333 for tries_left in loop:
1335 sorted_roots = self.map_new_services(
1337 force_rebuild=(tries_left < num_retries),
1340 except Exception as error:
1341 loop.save_result(error)
1344 pending_classes = []
1345 if done_classes is not None:
1346 pending_classes = list(set(classes) - set(done_classes))
1347 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1348 data_hash=data_hash,
1349 copies=copies - done_copies,
1350 max_service_replicas=self.max_replicas_per_service,
1351 timeout=self.current_timeout(num_retries - tries_left),
1352 classes=pending_classes)
1353 for service_root, ks in [(root, roots_map[root])
1354 for root in sorted_roots]:
1357 writer_pool.add_task(ks, service_root)
1359 pool_copies, pool_classes = writer_pool.done()
1360 done_copies += pool_copies
1361 if (done_classes is not None) and (pool_classes is not None):
1362 done_classes += pool_classes
1364 (done_copies >= copies and set(done_classes) == set(classes),
1365 writer_pool.total_task_nr))
1367 # Old keepstore contacted without storage classes support:
1368 # success is determined only by successful copies.
1370 # Disable storage classes tracking from this point forward.
1371 if not self._storage_classes_unsupported_warning:
1372 self._storage_classes_unsupported_warning = True
1373 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1376 (done_copies >= copies, writer_pool.total_task_nr))
1379 return writer_pool.response()
1381 raise arvados.errors.KeepWriteError(
1382 "[{}] failed to write {}: no Keep services available ({})".format(
1383 request_id, data_hash, loop.last_result()))
1385 service_errors = ((key, roots_map[key].last_result()['error'])
1386 for key in sorted_roots
1387 if roots_map[key].last_result()['error'])
1388 raise arvados.errors.KeepWriteError(
1389 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1390 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1392 def _block_prefetch_worker(self):
1393 """The background downloader thread."""
1396 b = self._prefetch_queue.get()
1399 self.get(b, prefetch=True)
1401 _logger.exception("Exception doing block prefetch")
1403 def _start_prefetch_threads(self):
1404 if self._prefetch_threads is None:
1406 if self._prefetch_threads is not None:
1408 self._prefetch_queue = queue.Queue()
1409 self._prefetch_threads = []
1410 for i in range(0, self.num_prefetch_threads):
1411 thread = threading.Thread(target=self._block_prefetch_worker)
1412 self._prefetch_threads.append(thread)
1413 thread.daemon = True
1416 def block_prefetch(self, locator):
1418 This relies on the fact that KeepClient implements a block cache,
1419 so repeated requests for the same block will not result in repeated
1420 downloads (unless the block is evicted from the cache.) This method
1424 if self.block_cache.get(locator) is not None:
1427 self._start_prefetch_threads()
1428 self._prefetch_queue.put(locator)
1430 def stop_prefetch_threads(self):
1432 if self._prefetch_threads is not None:
1433 for t in self._prefetch_threads:
1434 self._prefetch_queue.put(None)
1435 for t in self._prefetch_threads:
1437 self._prefetch_threads = None
1438 self._prefetch_queue = None
1440 def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1441 """A stub for put().
1443 This method is used in place of the real put() method when
1444 using local storage (see constructor's local_store argument).
1446 copies and num_retries arguments are ignored: they are here
1447 only for the sake of offering the same call signature as
1450 Data stored this way can be retrieved via local_store_get().
1452 md5 = hashlib.md5(data).hexdigest()
1453 locator = '%s+%d' % (md5, len(data))
1454 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1456 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1457 os.path.join(self.local_store, md5))
1460 def local_store_get(self, loc_s, num_retries=None):
1461 """Companion to local_store_put()."""
1463 locator = KeepLocator(loc_s)
1465 raise arvados.errors.NotFoundError(
1466 "Invalid data locator: '%s'" % loc_s)
1467 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1469 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1472 def local_store_head(self, loc_s, num_retries=None):
1473 """Companion to local_store_put()."""
1475 locator = KeepLocator(loc_s)
1477 raise arvados.errors.NotFoundError(
1478 "Invalid data locator: '%s'" % loc_s)
1479 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1481 if os.path.exists(os.path.join(self.local_store, locator.md5sum)):