1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from __future__ import division
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
36 if sys.version_info >= (3, 0):
37 from io import BytesIO
39 from cStringIO import StringIO as BytesIO
42 import arvados.config as config
44 import arvados.retry as retry
46 import arvados.diskcache
47 from arvados._pycurlhelper import PyCurlHelper
49 _logger = logging.getLogger('arvados.keep')
50 global_client_object = None
53 # Monkey patch TCP constants when not available (apple). Values sourced from:
54 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
55 if sys.platform == 'darwin':
56 if not hasattr(socket, 'TCP_KEEPALIVE'):
57 socket.TCP_KEEPALIVE = 0x010
58 if not hasattr(socket, 'TCP_KEEPINTVL'):
59 socket.TCP_KEEPINTVL = 0x101
60 if not hasattr(socket, 'TCP_KEEPCNT'):
61 socket.TCP_KEEPCNT = 0x102
64 class KeepLocator(object):
65 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
66 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
68 def __init__(self, locator_str):
71 self._perm_expiry = None
72 pieces = iter(locator_str.split('+'))
73 self.md5sum = next(pieces)
75 self.size = int(next(pieces))
79 if self.HINT_RE.match(hint) is None:
80 raise ValueError("invalid hint format: {}".format(hint))
81 elif hint.startswith('A'):
82 self.parse_permission_hint(hint)
84 self.hints.append(hint)
89 for s in [self.md5sum, self.size,
90 self.permission_hint()] + self.hints
94 if self.size is not None:
95 return "%s+%i" % (self.md5sum, self.size)
99 def _make_hex_prop(name, length):
100 # Build and return a new property with the given name that
101 # must be a hex string of the given length.
102 data_name = '_{}'.format(name)
104 return getattr(self, data_name)
105 def setter(self, hex_str):
106 if not arvados.util.is_hex(hex_str, length):
107 raise ValueError("{} is not a {}-digit hex string: {!r}".
108 format(name, length, hex_str))
109 setattr(self, data_name, hex_str)
110 return property(getter, setter)
112 md5sum = _make_hex_prop('md5sum', 32)
113 perm_sig = _make_hex_prop('perm_sig', 40)
116 def perm_expiry(self):
117 return self._perm_expiry
120 def perm_expiry(self, value):
121 if not arvados.util.is_hex(value, 1, 8):
123 "permission timestamp must be a hex Unix timestamp: {}".
125 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
127 def permission_hint(self):
128 data = [self.perm_sig, self.perm_expiry]
131 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
132 return "A{}@{:08x}".format(*data)
134 def parse_permission_hint(self, s):
136 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
138 raise ValueError("bad permission hint {}".format(s))
140 def permission_expired(self, as_of_dt=None):
141 if self.perm_expiry is None:
143 elif as_of_dt is None:
144 as_of_dt = datetime.datetime.now()
145 return self.perm_expiry <= as_of_dt
149 """Simple interface to a global KeepClient object.
151 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
152 own API client. The global KeepClient will build an API client from the
153 current Arvados configuration, which may not match the one you built.
158 def global_client_object(cls):
159 global global_client_object
160 # Previously, KeepClient would change its behavior at runtime based
161 # on these configuration settings. We simulate that behavior here
162 # by checking the values and returning a new KeepClient if any of
164 key = (config.get('ARVADOS_API_HOST'),
165 config.get('ARVADOS_API_TOKEN'),
166 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
167 config.get('ARVADOS_KEEP_PROXY'),
168 os.environ.get('KEEP_LOCAL_STORE'))
169 if (global_client_object is None) or (cls._last_key != key):
170 global_client_object = KeepClient()
172 return global_client_object
175 def get(locator, **kwargs):
176 return Keep.global_client_object().get(locator, **kwargs)
179 def put(data, **kwargs):
180 return Keep.global_client_object().put(data, **kwargs)
182 class KeepBlockCache(object):
183 def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
184 self.cache_max = cache_max
186 self._cache_lock = threading.Lock()
187 self._max_slots = max_slots
188 self._disk_cache = disk_cache
189 self._disk_cache_dir = disk_cache_dir
190 self._cache_updating = threading.Condition(self._cache_lock)
192 if self._disk_cache and self._disk_cache_dir is None:
193 self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
194 os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
196 if self._max_slots == 0:
198 # Each block uses two file descriptors, one used to
199 # open it initially and hold the flock(), and a second
200 # hidden one used by mmap().
202 # Set max slots to 1/8 of maximum file handles. This
203 # means we'll use at most 1/4 of total file handles.
205 # NOFILE typically defaults to 1024 on Linux so this
206 # is 128 slots (256 file handles), which means we can
207 # cache up to 8 GiB of 64 MiB blocks. This leaves
208 # 768 file handles for sockets and other stuff.
210 # When we want the ability to have more cache (e.g. in
211 # arv-mount) we'll increase rlimit before calling
213 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
216 self._max_slots = 512
218 if self.cache_max == 0:
220 fs = os.statvfs(self._disk_cache_dir)
221 # Calculation of available space incorporates existing cache usage
222 existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
223 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
224 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
226 # 10% of total disk size
227 # 25% of available space
229 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
232 self.cache_max = (256 * 1024 * 1024)
234 self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
237 self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
241 class CacheSlot(object):
242 __slots__ = ("locator", "ready", "content")
244 def __init__(self, locator):
245 self.locator = locator
246 self.ready = threading.Event()
253 def set(self, value):
258 if self.content is None:
261 return len(self.content)
268 return (self.content is None)
270 def _resize_cache(self, cache_max, max_slots):
271 # Try and make sure the contents of the cache do not exceed
272 # the supplied maximums.
274 # Select all slots except those where ready.is_set() and content is
275 # None (that means there was an error reading the block).
276 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
277 sm = sum([slot.size() for slot in self._cache])
278 while len(self._cache) > 0 and (sm > cache_max or len(self._cache) > max_slots):
279 for i in range(len(self._cache)-1, -1, -1):
280 # start from the back, find a slot that is a candidate to evict
281 if self._cache[i].ready.is_set():
282 sz = self._cache[i].size()
284 # If evict returns false it means the
285 # underlying disk cache couldn't lock the file
286 # for deletion because another process was using
287 # it. Don't count it as reducing the amount
288 # of data in the cache, find something else to
290 if self._cache[i].evict():
293 # check to make sure the underlying data is gone
294 if self._cache[i].gone():
295 # either way we forget about it. either the
296 # other process will delete it, or if we need
297 # it again and it is still there, we'll find
304 '''Cap the cache size to self.cache_max'''
305 with self._cache_updating:
306 self._resize_cache(self.cache_max, self._max_slots)
307 self._cache_updating.notify_all()
309 def _get(self, locator):
310 # Test if the locator is already in the cache
311 for i in range(0, len(self._cache)):
312 if self._cache[i].locator == locator:
315 # move it to the front
317 self._cache.insert(0, n)
320 # see if it exists on disk
321 n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
323 self._cache.insert(0, n)
327 def get(self, locator):
328 with self._cache_lock:
329 return self._get(locator)
331 def reserve_cache(self, locator):
332 '''Reserve a cache slot for the specified locator,
333 or return the existing slot.'''
334 with self._cache_updating:
335 n = self._get(locator)
339 # Add a new cache slot for the locator
340 self._resize_cache(self.cache_max, self._max_slots-1)
341 while len(self._cache) >= self._max_slots:
342 # If there isn't a slot available, need to wait
343 # for something to happen that releases one of the
344 # cache slots. Idle for 200 ms or woken up by
346 self._cache_updating.wait(timeout=0.2)
347 self._resize_cache(self.cache_max, self._max_slots-1)
350 n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
352 n = KeepBlockCache.CacheSlot(locator)
353 self._cache.insert(0, n)
356 def set(self, slot, blob):
361 if e.errno == errno.ENOMEM:
362 # Reduce max slots to current - 4, cap cache and retry
363 with self._cache_lock:
364 self._max_slots = max(4, len(self._cache) - 4)
365 elif e.errno == errno.ENOSPC:
366 # Reduce disk max space to current - 256 MiB, cap cache and retry
367 with self._cache_lock:
368 sm = sum([st.size() for st in self._cache])
369 self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
370 elif e.errno == errno.ENODEV:
371 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
372 except Exception as e:
375 # Check if we should evict things from the cache. Either
376 # because we added a new thing or there was an error and
377 # we possibly adjusted the limits down, so we might need
378 # to push something out.
382 # Only gets here if there was an error the first time. The
383 # exception handler adjusts limits downward in some cases
384 # to free up resources, which would make the operation
387 except Exception as e:
388 # It failed again. Give up.
390 raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
394 class Counter(object):
395 def __init__(self, v=0):
396 self._lk = threading.Lock()
408 class KeepClient(object):
409 DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
410 DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
412 class KeepService(PyCurlHelper):
413 """Make requests to a single Keep service, and track results.
415 A KeepService is intended to last long enough to perform one
416 transaction (GET or PUT) against one Keep service. This can
417 involve calling either get() or put() multiple times in order
418 to retry after transient failures. However, calling both get()
419 and put() on a single instance -- or using the same instance
420 to access two different Keep services -- will not produce
427 arvados.errors.HttpError,
430 def __init__(self, root, user_agent_pool=queue.LifoQueue(),
432 download_counter=None,
435 super(KeepClient.KeepService, self).__init__()
437 self._user_agent_pool = user_agent_pool
438 self._result = {'error': None}
442 self.get_headers = {'Accept': 'application/octet-stream'}
443 self.get_headers.update(headers)
444 self.put_headers = headers
445 self.upload_counter = upload_counter
446 self.download_counter = download_counter
447 self.insecure = insecure
450 """Is it worth attempting a request?"""
454 """Did the request succeed or encounter permanent failure?"""
455 return self._result['error'] == False or not self._usable
457 def last_result(self):
460 def _get_user_agent(self):
462 return self._user_agent_pool.get(block=False)
466 def _put_user_agent(self, ua):
469 self._user_agent_pool.put(ua, block=False)
473 def get(self, locator, method="GET", timeout=None):
474 # locator is a KeepLocator object.
475 url = self.root + str(locator)
476 _logger.debug("Request: %s %s", method, url)
477 curl = self._get_user_agent()
480 with timer.Timer() as t:
482 response_body = BytesIO()
483 curl.setopt(pycurl.NOSIGNAL, 1)
484 curl.setopt(pycurl.OPENSOCKETFUNCTION,
485 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
486 curl.setopt(pycurl.URL, url.encode('utf-8'))
487 curl.setopt(pycurl.HTTPHEADER, [
488 '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
489 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
490 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
492 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
493 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
495 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
497 curl.setopt(pycurl.NOBODY, True)
499 curl.setopt(pycurl.HTTPGET, True)
500 self._setcurltimeouts(curl, timeout, method=="HEAD")
504 except Exception as e:
505 raise arvados.errors.HttpError(0, str(e))
511 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
512 'body': response_body.getvalue(),
513 'headers': self._headers,
517 ok = retry.check_http_response_success(self._result['status_code'])
519 self._result['error'] = arvados.errors.HttpError(
520 self._result['status_code'],
521 self._headers.get('x-status-line', 'Error'))
522 except self.HTTP_ERRORS as e:
526 self._usable = ok != False
527 if self._result.get('status_code', None):
528 # The client worked well enough to get an HTTP status
529 # code, so presumably any problems are just on the
530 # server side and it's OK to reuse the client.
531 self._put_user_agent(curl)
533 # Don't return this client to the pool, in case it's
537 _logger.debug("Request fail: GET %s => %s: %s",
538 url, type(self._result['error']), str(self._result['error']))
541 _logger.info("HEAD %s: %s bytes",
542 self._result['status_code'],
543 self._result.get('content-length'))
544 if self._result['headers'].get('x-keep-locator'):
545 # This is a response to a remote block copy request, return
546 # the local copy block locator.
547 return self._result['headers'].get('x-keep-locator')
550 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
551 self._result['status_code'],
552 len(self._result['body']),
554 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
556 if self.download_counter:
557 self.download_counter.add(len(self._result['body']))
558 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
559 if resp_md5 != locator.md5sum:
560 _logger.warning("Checksum fail: md5(%s) = %s",
562 self._result['error'] = arvados.errors.HttpError(
565 return self._result['body']
567 def put(self, hash_s, body, timeout=None, headers={}):
568 put_headers = copy.copy(self.put_headers)
569 put_headers.update(headers)
570 url = self.root + hash_s
571 _logger.debug("Request: PUT %s", url)
572 curl = self._get_user_agent()
575 with timer.Timer() as t:
577 body_reader = BytesIO(body)
578 response_body = BytesIO()
579 curl.setopt(pycurl.NOSIGNAL, 1)
580 curl.setopt(pycurl.OPENSOCKETFUNCTION,
581 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
582 curl.setopt(pycurl.URL, url.encode('utf-8'))
583 # Using UPLOAD tells cURL to wait for a "go ahead" from the
584 # Keep server (in the form of a HTTP/1.1 "100 Continue"
585 # response) instead of sending the request body immediately.
586 # This allows the server to reject the request if the request
587 # is invalid or the server is read-only, without waiting for
588 # the client to send the entire block.
589 curl.setopt(pycurl.UPLOAD, True)
590 curl.setopt(pycurl.INFILESIZE, len(body))
591 curl.setopt(pycurl.READFUNCTION, body_reader.read)
592 curl.setopt(pycurl.HTTPHEADER, [
593 '{}: {}'.format(k,v) for k,v in put_headers.items()])
594 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
595 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
597 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
598 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
600 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
601 self._setcurltimeouts(curl, timeout)
604 except Exception as e:
605 raise arvados.errors.HttpError(0, str(e))
611 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
612 'body': response_body.getvalue().decode('utf-8'),
613 'headers': self._headers,
616 ok = retry.check_http_response_success(self._result['status_code'])
618 self._result['error'] = arvados.errors.HttpError(
619 self._result['status_code'],
620 self._headers.get('x-status-line', 'Error'))
621 except self.HTTP_ERRORS as e:
625 self._usable = ok != False # still usable if ok is True or None
626 if self._result.get('status_code', None):
627 # Client is functional. See comment in get().
628 self._put_user_agent(curl)
632 _logger.debug("Request fail: PUT %s => %s: %s",
633 url, type(self._result['error']), str(self._result['error']))
635 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
636 self._result['status_code'],
639 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
640 if self.upload_counter:
641 self.upload_counter.add(len(body))
645 class KeepWriterQueue(queue.Queue):
646 def __init__(self, copies, classes=[]):
647 queue.Queue.__init__(self) # Old-style superclass
648 self.wanted_copies = copies
649 self.wanted_storage_classes = classes
650 self.successful_copies = 0
651 self.confirmed_storage_classes = {}
653 self.storage_classes_tracking = True
654 self.queue_data_lock = threading.RLock()
655 self.pending_tries = max(copies, len(classes))
656 self.pending_tries_notification = threading.Condition()
658 def write_success(self, response, replicas_nr, classes_confirmed):
659 with self.queue_data_lock:
660 self.successful_copies += replicas_nr
661 if classes_confirmed is None:
662 self.storage_classes_tracking = False
663 elif self.storage_classes_tracking:
664 for st_class, st_copies in classes_confirmed.items():
666 self.confirmed_storage_classes[st_class] += st_copies
668 self.confirmed_storage_classes[st_class] = st_copies
669 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
670 self.response = response
671 with self.pending_tries_notification:
672 self.pending_tries_notification.notify_all()
674 def write_fail(self, ks):
675 with self.pending_tries_notification:
676 self.pending_tries += 1
677 self.pending_tries_notification.notify()
679 def pending_copies(self):
680 with self.queue_data_lock:
681 return self.wanted_copies - self.successful_copies
683 def satisfied_classes(self):
684 with self.queue_data_lock:
685 if not self.storage_classes_tracking:
686 # Notifies disabled storage classes expectation to
689 return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
691 def pending_classes(self):
692 with self.queue_data_lock:
693 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
695 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
696 for st_class, st_copies in self.confirmed_storage_classes.items():
697 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
698 unsatisfied_classes.remove(st_class)
699 return unsatisfied_classes
701 def get_next_task(self):
702 with self.pending_tries_notification:
704 if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
705 # This notify_all() is unnecessary --
706 # write_success() already called notify_all()
707 # when pending<1 became true, so it's not
708 # possible for any other thread to be in
709 # wait() now -- but it's cheap insurance
710 # against deadlock so we do it anyway:
711 self.pending_tries_notification.notify_all()
712 # Drain the queue and then raise Queue.Empty
716 elif self.pending_tries > 0:
717 service, service_root = self.get_nowait()
718 if service.finished():
721 self.pending_tries -= 1
722 return service, service_root
724 self.pending_tries_notification.notify_all()
727 self.pending_tries_notification.wait()
730 class KeepWriterThreadPool(object):
731 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
732 self.total_task_nr = 0
733 if (not max_service_replicas) or (max_service_replicas >= copies):
736 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
737 _logger.debug("Pool max threads is %d", num_threads)
739 self.queue = KeepClient.KeepWriterQueue(copies, classes)
741 for _ in range(num_threads):
742 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
743 self.workers.append(w)
745 def add_task(self, ks, service_root):
746 self.queue.put((ks, service_root))
747 self.total_task_nr += 1
750 return self.queue.successful_copies, self.queue.satisfied_classes()
754 for worker in self.workers:
756 # Wait for finished work
760 return self.queue.response
763 class KeepWriterThread(threading.Thread):
764 class TaskFailed(RuntimeError): pass
766 def __init__(self, queue, data, data_hash, timeout=None):
767 super(KeepClient.KeepWriterThread, self).__init__()
768 self.timeout = timeout
771 self.data_hash = data_hash
777 service, service_root = self.queue.get_next_task()
781 locator, copies, classes = self.do_task(service, service_root)
782 except Exception as e:
783 if not isinstance(e, self.TaskFailed):
784 _logger.exception("Exception in KeepWriterThread")
785 self.queue.write_fail(service)
787 self.queue.write_success(locator, copies, classes)
789 self.queue.task_done()
791 def do_task(self, service, service_root):
792 classes = self.queue.pending_classes()
796 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
797 success = bool(service.put(self.data_hash,
799 timeout=self.timeout,
801 result = service.last_result()
804 if result.get('status_code'):
805 _logger.debug("Request fail: PUT %s => %s %s",
807 result.get('status_code'),
809 raise self.TaskFailed()
811 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
812 str(threading.current_thread()),
817 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
818 except (KeyError, ValueError):
821 classes_confirmed = {}
823 scch = result['headers']['x-keep-storage-classes-confirmed']
824 for confirmation in scch.replace(' ', '').split(','):
825 if '=' in confirmation:
826 stored_class, stored_copies = confirmation.split('=')[:2]
827 classes_confirmed[stored_class] = int(stored_copies)
828 except (KeyError, ValueError):
829 # Storage classes confirmed header missing or corrupt
830 classes_confirmed = None
832 return result['body'].strip(), replicas_stored, classes_confirmed
835 def __init__(self, api_client=None, proxy=None,
836 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
837 api_token=None, local_store=None, block_cache=None,
838 num_retries=10, session=None, num_prefetch_threads=None):
839 """Initialize a new KeepClient.
843 The API client to use to find Keep services. If not
844 provided, KeepClient will build one from available Arvados
848 If specified, this KeepClient will send requests to this Keep
849 proxy. Otherwise, KeepClient will fall back to the setting of the
850 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
851 If you want to KeepClient does not use a proxy, pass in an empty
855 The initial timeout (in seconds) for HTTP requests to Keep
856 non-proxy servers. A tuple of three floats is interpreted as
857 (connection_timeout, read_timeout, minimum_bandwidth). A connection
858 will be aborted if the average traffic rate falls below
859 minimum_bandwidth bytes per second over an interval of read_timeout
860 seconds. Because timeouts are often a result of transient server
861 load, the actual connection timeout will be increased by a factor
862 of two on each retry.
863 Default: (2, 256, 32768).
866 The initial timeout (in seconds) for HTTP requests to
867 Keep proxies. A tuple of three floats is interpreted as
868 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
869 described above for adjusting connection timeouts on retry also
871 Default: (20, 256, 32768).
874 If you're not using an API client, but only talking
875 directly to a Keep proxy, this parameter specifies an API token
876 to authenticate Keep requests. It is an error to specify both
877 api_client and api_token. If you specify neither, KeepClient
878 will use one available from the Arvados configuration.
881 If specified, this KeepClient will bypass Keep
882 services, and save data to the named directory. If unspecified,
883 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
884 environment variable. If you want to ensure KeepClient does not
885 use local storage, pass in an empty string. This is primarily
886 intended to mock a server for testing.
889 The default number of times to retry failed requests.
890 This will be used as the default num_retries value when get() and
891 put() are called. Default 10.
893 self.lock = threading.Lock()
895 if config.get('ARVADOS_KEEP_SERVICES'):
896 proxy = config.get('ARVADOS_KEEP_SERVICES')
898 proxy = config.get('ARVADOS_KEEP_PROXY')
899 if api_token is None:
900 if api_client is None:
901 api_token = config.get('ARVADOS_API_TOKEN')
903 api_token = api_client.api_token
904 elif api_client is not None:
906 "can't build KeepClient with both API client and token")
907 if local_store is None:
908 local_store = os.environ.get('KEEP_LOCAL_STORE')
910 if api_client is None:
911 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
913 self.insecure = api_client.insecure
915 self.block_cache = block_cache if block_cache else KeepBlockCache()
916 self.timeout = timeout
917 self.proxy_timeout = proxy_timeout
918 self._user_agent_pool = queue.LifoQueue()
919 self.upload_counter = Counter()
920 self.download_counter = Counter()
921 self.put_counter = Counter()
922 self.get_counter = Counter()
923 self.hits_counter = Counter()
924 self.misses_counter = Counter()
925 self._storage_classes_unsupported_warning = False
926 self._default_classes = []
927 self.num_prefetch_threads = num_prefetch_threads or 2
928 self._prefetch_queue = None
929 self._prefetch_threads = None
932 self.local_store = local_store
933 self.head = self.local_store_head
934 self.get = self.local_store_get
935 self.put = self.local_store_put
937 self.num_retries = num_retries
938 self.max_replicas_per_service = None
940 proxy_uris = proxy.split()
941 for i in range(len(proxy_uris)):
942 if not proxy_uris[i].endswith('/'):
945 url = urllib.parse.urlparse(proxy_uris[i])
946 if not (url.scheme and url.netloc):
947 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
948 self.api_token = api_token
949 self._gateway_services = {}
950 self._keep_services = [{
951 'uuid': "00000-bi6l4-%015d" % idx,
952 'service_type': 'proxy',
953 '_service_root': uri,
954 } for idx, uri in enumerate(proxy_uris)]
955 self._writable_services = self._keep_services
956 self.using_proxy = True
957 self._static_services_list = True
959 # It's important to avoid instantiating an API client
960 # unless we actually need one, for testing's sake.
961 if api_client is None:
962 api_client = arvados.api('v1')
963 self.api_client = api_client
964 self.api_token = api_client.api_token
965 self._gateway_services = {}
966 self._keep_services = None
967 self._writable_services = None
968 self.using_proxy = None
969 self._static_services_list = False
971 self._default_classes = [
972 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
974 # We're talking to an old cluster
977 def current_timeout(self, attempt_number):
978 """Return the appropriate timeout to use for this client.
980 The proxy timeout setting if the backend service is currently a proxy,
981 the regular timeout setting otherwise. The `attempt_number` indicates
982 how many times the operation has been tried already (starting from 0
983 for the first try), and scales the connection timeout portion of the
984 return value accordingly.
987 # TODO(twp): the timeout should be a property of a
988 # KeepService, not a KeepClient. See #4488.
989 t = self.proxy_timeout if self.using_proxy else self.timeout
991 return (t[0] * (1 << attempt_number), t[1])
993 return (t[0] * (1 << attempt_number), t[1], t[2])
994 def _any_nondisk_services(self, service_list):
995 return any(ks.get('service_type', 'disk') != 'disk'
996 for ks in service_list)
998 def build_services_list(self, force_rebuild=False):
999 if (self._static_services_list or
1000 (self._keep_services and not force_rebuild)):
1004 keep_services = self.api_client.keep_services().accessible()
1005 except Exception: # API server predates Keep services.
1006 keep_services = self.api_client.keep_disks().list()
1008 # Gateway services are only used when specified by UUID,
1009 # so there's nothing to gain by filtering them by
1011 self._gateway_services = {ks['uuid']: ks for ks in
1012 keep_services.execute()['items']}
1013 if not self._gateway_services:
1014 raise arvados.errors.NoKeepServersError()
1016 # Precompute the base URI for each service.
1017 for r in self._gateway_services.values():
1018 host = r['service_host']
1019 if not host.startswith('[') and host.find(':') >= 0:
1020 # IPv6 URIs must be formatted like http://[::1]:80/...
1021 host = '[' + host + ']'
1022 r['_service_root'] = "{}://{}:{:d}/".format(
1023 'https' if r['service_ssl_flag'] else 'http',
1027 _logger.debug(str(self._gateway_services))
1028 self._keep_services = [
1029 ks for ks in self._gateway_services.values()
1030 if not ks.get('service_type', '').startswith('gateway:')]
1031 self._writable_services = [ks for ks in self._keep_services
1032 if not ks.get('read_only')]
1034 # For disk type services, max_replicas_per_service is 1
1035 # It is unknown (unlimited) for other service types.
1036 if self._any_nondisk_services(self._writable_services):
1037 self.max_replicas_per_service = None
1039 self.max_replicas_per_service = 1
1041 def _service_weight(self, data_hash, service_uuid):
1042 """Compute the weight of a Keep service endpoint for a data
1043 block with a known hash.
1045 The weight is md5(h + u) where u is the last 15 characters of
1046 the service endpoint's UUID.
1048 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1050 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1051 """Return an array of Keep service endpoints, in the order in
1052 which they should be probed when reading or writing data with
1053 the given hash+hints.
1055 self.build_services_list(force_rebuild)
1058 # Use the services indicated by the given +K@... remote
1059 # service hints, if any are present and can be resolved to a
1061 for hint in locator.hints:
1062 if hint.startswith('K@'):
1064 sorted_roots.append(
1065 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1066 elif len(hint) == 29:
1067 svc = self._gateway_services.get(hint[2:])
1069 sorted_roots.append(svc['_service_root'])
1071 # Sort the available local services by weight (heaviest first)
1072 # for this locator, and return their service_roots (base URIs)
1074 use_services = self._keep_services
1076 use_services = self._writable_services
1077 self.using_proxy = self._any_nondisk_services(use_services)
1078 sorted_roots.extend([
1079 svc['_service_root'] for svc in sorted(
1082 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1083 _logger.debug("{}: {}".format(locator, sorted_roots))
1086 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1087 # roots_map is a dictionary, mapping Keep service root strings
1088 # to KeepService objects. Poll for Keep services, and add any
1089 # new ones to roots_map. Return the current list of local
1091 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1092 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1093 for root in local_roots:
1094 if root not in roots_map:
1095 roots_map[root] = self.KeepService(
1096 root, self._user_agent_pool,
1097 upload_counter=self.upload_counter,
1098 download_counter=self.download_counter,
1100 insecure=self.insecure)
1104 def _check_loop_result(result):
1105 # KeepClient RetryLoops should save results as a 2-tuple: the
1106 # actual result of the request, and the number of servers available
1107 # to receive the request this round.
1108 # This method returns True if there's a real result, False if
1109 # there are no more servers available, otherwise None.
1110 if isinstance(result, Exception):
1112 result, tried_server_count = result
1113 if (result is not None) and (result is not False):
1115 elif tried_server_count < 1:
1116 _logger.info("No more Keep services to try; giving up")
1121 def get_from_cache(self, loc_s):
1122 """Fetch a block only if is in the cache, otherwise return None."""
1123 locator = KeepLocator(loc_s)
1124 slot = self.block_cache.get(locator.md5sum)
1125 if slot is not None and slot.ready.is_set():
1130 def refresh_signature(self, loc):
1131 """Ask Keep to get the remote block and return its local signature"""
1132 now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1133 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1136 def head(self, loc_s, **kwargs):
1137 return self._get_or_head(loc_s, method="HEAD", **kwargs)
1140 def get(self, loc_s, **kwargs):
1141 return self._get_or_head(loc_s, method="GET", **kwargs)
1143 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1144 """Get data from Keep.
1146 This method fetches one or more blocks of data from Keep. It
1147 sends a request each Keep service registered with the API
1148 server (or the proxy provided when this client was
1149 instantiated), then each service named in location hints, in
1150 sequence. As soon as one service provides the data, it's
1154 * loc_s: A string of one or more comma-separated locators to fetch.
1155 This method returns the concatenation of these blocks.
1156 * num_retries: The number of times to retry GET requests to
1157 *each* Keep server if it returns temporary failures, with
1158 exponential backoff. Note that, in each loop, the method may try
1159 to fetch data from every available Keep service, along with any
1160 that are named in location hints in the locator. The default value
1161 is set when the KeepClient is initialized.
1164 return ''.join(self.get(x) for x in loc_s.split(','))
1166 self.get_counter.add(1)
1168 request_id = (request_id or
1169 (hasattr(self, 'api_client') and self.api_client.request_id) or
1170 arvados.util.new_request_id())
1173 headers['X-Request-Id'] = request_id
1178 locator = KeepLocator(loc_s)
1181 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1183 # Fresh and empty "first time it is used" slot
1186 # this is request for a prefetch to fill in
1187 # the cache, don't need to wait for the
1188 # result, so if it is already in flight return
1189 # immediately. Clear 'slot' to prevent
1190 # finally block from calling slot.set()
1195 if blob is not None:
1196 self.hits_counter.add(1)
1199 # If blob is None, this means either
1201 # (a) another thread was fetching this block and
1202 # failed with an error or
1204 # (b) cache thrashing caused the slot to be
1205 # evicted (content set to None) by another thread
1206 # between the call to reserve_cache() and get().
1208 # We'll handle these cases by reserving a new slot
1209 # and then doing a full GET request.
1212 self.misses_counter.add(1)
1214 # If the locator has hints specifying a prefix (indicating a
1215 # remote keepproxy) or the UUID of a local gateway service,
1216 # read data from the indicated service(s) instead of the usual
1217 # list of local disk services.
1218 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1219 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1220 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1221 for hint in locator.hints if (
1222 hint.startswith('K@') and
1224 self._gateway_services.get(hint[2:])
1226 # Map root URLs to their KeepService objects.
1228 root: self.KeepService(root, self._user_agent_pool,
1229 upload_counter=self.upload_counter,
1230 download_counter=self.download_counter,
1232 insecure=self.insecure)
1233 for root in hint_roots
1236 # See #3147 for a discussion of the loop implementation. Highlights:
1237 # * Refresh the list of Keep services after each failure, in case
1238 # it's being updated.
1239 # * Retry until we succeed, we're out of retries, or every available
1240 # service has returned permanent failure.
1243 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1245 for tries_left in loop:
1247 sorted_roots = self.map_new_services(
1249 force_rebuild=(tries_left < num_retries),
1250 need_writable=False,
1252 except Exception as error:
1253 loop.save_result(error)
1256 # Query KeepService objects that haven't returned
1257 # permanent failure, in our specified shuffle order.
1258 services_to_try = [roots_map[root]
1259 for root in sorted_roots
1260 if roots_map[root].usable()]
1261 for keep_service in services_to_try:
1262 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1263 if blob is not None:
1265 loop.save_result((blob, len(services_to_try)))
1267 # Always cache the result, then return it if we succeeded.
1271 if slot is not None:
1272 self.block_cache.set(slot, blob)
1274 # Q: Including 403 is necessary for the Keep tests to continue
1275 # passing, but maybe they should expect KeepReadError instead?
1276 not_founds = sum(1 for key in sorted_roots
1277 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1278 service_errors = ((key, roots_map[key].last_result()['error'])
1279 for key in sorted_roots)
1281 raise arvados.errors.KeepReadError(
1282 "[{}] failed to read {}: no Keep services available ({})".format(
1283 request_id, loc_s, loop.last_result()))
1284 elif not_founds == len(sorted_roots):
1285 raise arvados.errors.NotFoundError(
1286 "[{}] {} not found".format(request_id, loc_s), service_errors)
1288 raise arvados.errors.KeepReadError(
1289 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1292 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1293 """Save data in Keep.
1295 This method will get a list of Keep services from the API server, and
1296 send the data to each one simultaneously in a new thread. Once the
1297 uploads are finished, if enough copies are saved, this method returns
1298 the most recent HTTP response body. If requests fail to upload
1299 enough copies, this method raises KeepWriteError.
1302 * data: The string of data to upload.
1303 * copies: The number of copies that the user requires be saved.
1305 * num_retries: The number of times to retry PUT requests to
1306 *each* Keep server if it returns temporary failures, with
1307 exponential backoff. The default value is set when the
1308 KeepClient is initialized.
1309 * classes: An optional list of storage class names where copies should
1313 classes = classes or self._default_classes
1315 if not isinstance(data, bytes):
1316 data = data.encode()
1318 self.put_counter.add(1)
1320 data_hash = hashlib.md5(data).hexdigest()
1321 loc_s = data_hash + '+' + str(len(data))
1324 locator = KeepLocator(loc_s)
1326 request_id = (request_id or
1327 (hasattr(self, 'api_client') and self.api_client.request_id) or
1328 arvados.util.new_request_id())
1330 'X-Request-Id': request_id,
1331 'X-Keep-Desired-Replicas': str(copies),
1334 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1338 for tries_left in loop:
1340 sorted_roots = self.map_new_services(
1342 force_rebuild=(tries_left < num_retries),
1345 except Exception as error:
1346 loop.save_result(error)
1349 pending_classes = []
1350 if done_classes is not None:
1351 pending_classes = list(set(classes) - set(done_classes))
1352 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1353 data_hash=data_hash,
1354 copies=copies - done_copies,
1355 max_service_replicas=self.max_replicas_per_service,
1356 timeout=self.current_timeout(num_retries - tries_left),
1357 classes=pending_classes)
1358 for service_root, ks in [(root, roots_map[root])
1359 for root in sorted_roots]:
1362 writer_pool.add_task(ks, service_root)
1364 pool_copies, pool_classes = writer_pool.done()
1365 done_copies += pool_copies
1366 if (done_classes is not None) and (pool_classes is not None):
1367 done_classes += pool_classes
1369 (done_copies >= copies and set(done_classes) == set(classes),
1370 writer_pool.total_task_nr))
1372 # Old keepstore contacted without storage classes support:
1373 # success is determined only by successful copies.
1375 # Disable storage classes tracking from this point forward.
1376 if not self._storage_classes_unsupported_warning:
1377 self._storage_classes_unsupported_warning = True
1378 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1381 (done_copies >= copies, writer_pool.total_task_nr))
1384 return writer_pool.response()
1386 raise arvados.errors.KeepWriteError(
1387 "[{}] failed to write {}: no Keep services available ({})".format(
1388 request_id, data_hash, loop.last_result()))
1390 service_errors = ((key, roots_map[key].last_result()['error'])
1391 for key in sorted_roots
1392 if roots_map[key].last_result()['error'])
1393 raise arvados.errors.KeepWriteError(
1394 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1395 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1397 def _block_prefetch_worker(self):
1398 """The background downloader thread."""
1401 b = self._prefetch_queue.get()
1404 self.get(b, prefetch=True)
1406 _logger.exception("Exception doing block prefetch")
1408 def _start_prefetch_threads(self):
1409 if self._prefetch_threads is None:
1411 if self._prefetch_threads is not None:
1413 self._prefetch_queue = queue.Queue()
1414 self._prefetch_threads = []
1415 for i in range(0, self.num_prefetch_threads):
1416 thread = threading.Thread(target=self._block_prefetch_worker)
1417 self._prefetch_threads.append(thread)
1418 thread.daemon = True
1421 def block_prefetch(self, locator):
1423 This relies on the fact that KeepClient implements a block cache,
1424 so repeated requests for the same block will not result in repeated
1425 downloads (unless the block is evicted from the cache.) This method
1429 self._start_prefetch_threads()
1430 self._prefetch_queue.put(locator)
1432 def stop_prefetch_threads(self):
1434 if self._prefetch_threads is not None:
1435 for t in self._prefetch_threads:
1436 self._prefetch_queue.put(None)
1437 for t in self._prefetch_threads:
1439 self._prefetch_threads = None
1440 self._prefetch_queue = None
1442 def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1443 """A stub for put().
1445 This method is used in place of the real put() method when
1446 using local storage (see constructor's local_store argument).
1448 copies and num_retries arguments are ignored: they are here
1449 only for the sake of offering the same call signature as
1452 Data stored this way can be retrieved via local_store_get().
1454 md5 = hashlib.md5(data).hexdigest()
1455 locator = '%s+%d' % (md5, len(data))
1456 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1458 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1459 os.path.join(self.local_store, md5))
1462 def local_store_get(self, loc_s, num_retries=None):
1463 """Companion to local_store_put()."""
1465 locator = KeepLocator(loc_s)
1467 raise arvados.errors.NotFoundError(
1468 "Invalid data locator: '%s'" % loc_s)
1469 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1471 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1474 def local_store_head(self, loc_s, num_retries=None):
1475 """Companion to local_store_put()."""
1477 locator = KeepLocator(loc_s)
1479 raise arvados.errors.NotFoundError(
1480 "Invalid data locator: '%s'" % loc_s)
1481 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1483 if os.path.exists(os.path.join(self.local_store, locator.md5sum)):