1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from __future__ import division
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
36 if sys.version_info >= (3, 0):
37 from io import BytesIO
39 from cStringIO import StringIO as BytesIO
42 import arvados.config as config
44 import arvados.retry as retry
46 import arvados.diskcache
47 from arvados._pycurlhelper import PyCurlHelper
49 _logger = logging.getLogger('arvados.keep')
50 global_client_object = None
53 # Monkey patch TCP constants when not available (apple). Values sourced from:
54 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
55 if sys.platform == 'darwin':
56 if not hasattr(socket, 'TCP_KEEPALIVE'):
57 socket.TCP_KEEPALIVE = 0x010
58 if not hasattr(socket, 'TCP_KEEPINTVL'):
59 socket.TCP_KEEPINTVL = 0x101
60 if not hasattr(socket, 'TCP_KEEPCNT'):
61 socket.TCP_KEEPCNT = 0x102
64 class KeepLocator(object):
65 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
66 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
68 def __init__(self, locator_str):
71 self._perm_expiry = None
72 pieces = iter(locator_str.split('+'))
73 self.md5sum = next(pieces)
75 self.size = int(next(pieces))
79 if self.HINT_RE.match(hint) is None:
80 raise ValueError("invalid hint format: {}".format(hint))
81 elif hint.startswith('A'):
82 self.parse_permission_hint(hint)
84 self.hints.append(hint)
89 for s in [self.md5sum, self.size,
90 self.permission_hint()] + self.hints
94 if self.size is not None:
95 return "%s+%i" % (self.md5sum, self.size)
99 def _make_hex_prop(name, length):
100 # Build and return a new property with the given name that
101 # must be a hex string of the given length.
102 data_name = '_{}'.format(name)
104 return getattr(self, data_name)
105 def setter(self, hex_str):
106 if not arvados.util.is_hex(hex_str, length):
107 raise ValueError("{} is not a {}-digit hex string: {!r}".
108 format(name, length, hex_str))
109 setattr(self, data_name, hex_str)
110 return property(getter, setter)
112 md5sum = _make_hex_prop('md5sum', 32)
113 perm_sig = _make_hex_prop('perm_sig', 40)
116 def perm_expiry(self):
117 return self._perm_expiry
120 def perm_expiry(self, value):
121 if not arvados.util.is_hex(value, 1, 8):
123 "permission timestamp must be a hex Unix timestamp: {}".
125 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
127 def permission_hint(self):
128 data = [self.perm_sig, self.perm_expiry]
131 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
132 return "A{}@{:08x}".format(*data)
134 def parse_permission_hint(self, s):
136 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
138 raise ValueError("bad permission hint {}".format(s))
140 def permission_expired(self, as_of_dt=None):
141 if self.perm_expiry is None:
143 elif as_of_dt is None:
144 as_of_dt = datetime.datetime.now()
145 return self.perm_expiry <= as_of_dt
149 """Simple interface to a global KeepClient object.
151 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
152 own API client. The global KeepClient will build an API client from the
153 current Arvados configuration, which may not match the one you built.
158 def global_client_object(cls):
159 global global_client_object
160 # Previously, KeepClient would change its behavior at runtime based
161 # on these configuration settings. We simulate that behavior here
162 # by checking the values and returning a new KeepClient if any of
164 key = (config.get('ARVADOS_API_HOST'),
165 config.get('ARVADOS_API_TOKEN'),
166 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
167 config.get('ARVADOS_KEEP_PROXY'),
168 os.environ.get('KEEP_LOCAL_STORE'))
169 if (global_client_object is None) or (cls._last_key != key):
170 global_client_object = KeepClient()
172 return global_client_object
175 def get(locator, **kwargs):
176 return Keep.global_client_object().get(locator, **kwargs)
179 def put(data, **kwargs):
180 return Keep.global_client_object().put(data, **kwargs)
182 class KeepBlockCache(object):
183 def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
184 self.cache_max = cache_max
185 self._cache = collections.OrderedDict()
186 self._cache_lock = threading.Lock()
187 self._max_slots = max_slots
188 self._disk_cache = disk_cache
189 self._disk_cache_dir = disk_cache_dir
190 self._cache_updating = threading.Condition(self._cache_lock)
192 if self._disk_cache and self._disk_cache_dir is None:
193 self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
194 os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
196 if self._max_slots == 0:
198 # Each block uses two file descriptors, one used to
199 # open it initially and hold the flock(), and a second
200 # hidden one used by mmap().
202 # Set max slots to 1/8 of maximum file handles. This
203 # means we'll use at most 1/4 of total file handles.
205 # NOFILE typically defaults to 1024 on Linux so this
206 # is 128 slots (256 file handles), which means we can
207 # cache up to 8 GiB of 64 MiB blocks. This leaves
208 # 768 file handles for sockets and other stuff.
210 # When we want the ability to have more cache (e.g. in
211 # arv-mount) we'll increase rlimit before calling
213 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
216 self._max_slots = 512
218 if self.cache_max == 0:
220 fs = os.statvfs(self._disk_cache_dir)
221 # Calculation of available space incorporates existing cache usage
222 existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
223 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
224 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
226 # 10% of total disk size
227 # 25% of available space
229 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
232 self.cache_max = (256 * 1024 * 1024)
234 self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
237 self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
241 class CacheSlot(object):
242 __slots__ = ("locator", "ready", "content")
244 def __init__(self, locator):
245 self.locator = locator
246 self.ready = threading.Event()
253 def set(self, value):
258 if self.content is None:
261 return len(self.content)
268 return (self.content is None)
270 def _resize_cache(self, cache_max, max_slots):
271 # Try and make sure the contents of the cache do not exceed
272 # the supplied maximums.
274 _evict_candidates = collections.deque(self._cache.values())
275 sm = sum([slot.size() for slot in _evict_candidates])
276 while len(_evict_candidates) > 0 and (sm > cache_max or len(self._cache) > max_slots):
277 slot = _evict_candidates.popleft()
278 if not slot.ready.is_set():
281 if slot.content is None:
283 del self._cache[slot.locator]
288 # If evict returns false it means the
289 # underlying disk cache couldn't lock the file
290 # for deletion because another process was using
291 # it. Don't count it as reducing the amount
292 # of data in the cache, find something else to
297 # check to make sure the underlying data is gone
299 # either way we forget about it. either the
300 # other process will delete it, or if we need
301 # it again and it is still there, we'll find
303 del self._cache[slot.locator]
307 '''Cap the cache size to self.cache_max'''
308 with self._cache_updating:
309 self._resize_cache(self.cache_max, self._max_slots)
310 self._cache_updating.notify_all()
312 def _get(self, locator):
313 # Test if the locator is already in the cache
314 if locator in self._cache:
315 n = self._cache[locator]
316 if n.ready.is_set() and n.content is None:
317 del self._cache[n.locator]
319 self._cache.move_to_end(locator)
322 # see if it exists on disk
323 n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
325 self._cache[n.locator] = n
329 def get(self, locator):
330 with self._cache_lock:
331 return self._get(locator)
333 def reserve_cache(self, locator):
334 '''Reserve a cache slot for the specified locator,
335 or return the existing slot.'''
336 with self._cache_updating:
337 n = self._get(locator)
341 # Add a new cache slot for the locator
342 self._resize_cache(self.cache_max, self._max_slots-1)
343 while len(self._cache) >= self._max_slots:
344 # If there isn't a slot available, need to wait
345 # for something to happen that releases one of the
346 # cache slots. Idle for 200 ms or woken up by
348 self._cache_updating.wait(timeout=0.2)
349 self._resize_cache(self.cache_max, self._max_slots-1)
352 n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
354 n = KeepBlockCache.CacheSlot(locator)
355 self._cache[n.locator] = n
358 def set(self, slot, blob):
363 if e.errno == errno.ENOMEM:
364 # Reduce max slots to current - 4, cap cache and retry
365 with self._cache_lock:
366 self._max_slots = max(4, len(self._cache) - 4)
367 elif e.errno == errno.ENOSPC:
368 # Reduce disk max space to current - 256 MiB, cap cache and retry
369 with self._cache_lock:
370 sm = sum([st.size() for st in self._cache.values()])
371 self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
372 elif e.errno == errno.ENODEV:
373 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
374 except Exception as e:
377 # Check if we should evict things from the cache. Either
378 # because we added a new thing or there was an error and
379 # we possibly adjusted the limits down, so we might need
380 # to push something out.
384 # Only gets here if there was an error the first time. The
385 # exception handler adjusts limits downward in some cases
386 # to free up resources, which would make the operation
389 except Exception as e:
390 # It failed again. Give up.
392 raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
396 class Counter(object):
397 def __init__(self, v=0):
398 self._lk = threading.Lock()
410 class KeepClient(object):
411 DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
412 DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
414 class KeepService(PyCurlHelper):
415 """Make requests to a single Keep service, and track results.
417 A KeepService is intended to last long enough to perform one
418 transaction (GET or PUT) against one Keep service. This can
419 involve calling either get() or put() multiple times in order
420 to retry after transient failures. However, calling both get()
421 and put() on a single instance -- or using the same instance
422 to access two different Keep services -- will not produce
429 arvados.errors.HttpError,
432 def __init__(self, root, user_agent_pool=queue.LifoQueue(),
434 download_counter=None,
437 super(KeepClient.KeepService, self).__init__()
439 self._user_agent_pool = user_agent_pool
440 self._result = {'error': None}
444 self.get_headers = {'Accept': 'application/octet-stream'}
445 self.get_headers.update(headers)
446 self.put_headers = headers
447 self.upload_counter = upload_counter
448 self.download_counter = download_counter
449 self.insecure = insecure
452 """Is it worth attempting a request?"""
456 """Did the request succeed or encounter permanent failure?"""
457 return self._result['error'] == False or not self._usable
459 def last_result(self):
462 def _get_user_agent(self):
464 return self._user_agent_pool.get(block=False)
468 def _put_user_agent(self, ua):
471 self._user_agent_pool.put(ua, block=False)
475 def get(self, locator, method="GET", timeout=None):
476 # locator is a KeepLocator object.
477 url = self.root + str(locator)
478 _logger.debug("Request: %s %s", method, url)
479 curl = self._get_user_agent()
482 with timer.Timer() as t:
484 response_body = BytesIO()
485 curl.setopt(pycurl.NOSIGNAL, 1)
486 curl.setopt(pycurl.OPENSOCKETFUNCTION,
487 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
488 curl.setopt(pycurl.URL, url.encode('utf-8'))
489 curl.setopt(pycurl.HTTPHEADER, [
490 '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
491 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
492 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
494 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
495 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
497 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
499 curl.setopt(pycurl.NOBODY, True)
501 curl.setopt(pycurl.HTTPGET, True)
502 self._setcurltimeouts(curl, timeout, method=="HEAD")
506 except Exception as e:
507 raise arvados.errors.HttpError(0, str(e))
513 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
514 'body': response_body.getvalue(),
515 'headers': self._headers,
519 ok = retry.check_http_response_success(self._result['status_code'])
521 self._result['error'] = arvados.errors.HttpError(
522 self._result['status_code'],
523 self._headers.get('x-status-line', 'Error'))
524 except self.HTTP_ERRORS as e:
528 self._usable = ok != False
529 if self._result.get('status_code', None):
530 # The client worked well enough to get an HTTP status
531 # code, so presumably any problems are just on the
532 # server side and it's OK to reuse the client.
533 self._put_user_agent(curl)
535 # Don't return this client to the pool, in case it's
539 _logger.debug("Request fail: GET %s => %s: %s",
540 url, type(self._result['error']), str(self._result['error']))
543 _logger.info("HEAD %s: %s bytes",
544 self._result['status_code'],
545 self._result.get('content-length'))
546 if self._result['headers'].get('x-keep-locator'):
547 # This is a response to a remote block copy request, return
548 # the local copy block locator.
549 return self._result['headers'].get('x-keep-locator')
552 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
553 self._result['status_code'],
554 len(self._result['body']),
556 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
558 if self.download_counter:
559 self.download_counter.add(len(self._result['body']))
560 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
561 if resp_md5 != locator.md5sum:
562 _logger.warning("Checksum fail: md5(%s) = %s",
564 self._result['error'] = arvados.errors.HttpError(
567 return self._result['body']
569 def put(self, hash_s, body, timeout=None, headers={}):
570 put_headers = copy.copy(self.put_headers)
571 put_headers.update(headers)
572 url = self.root + hash_s
573 _logger.debug("Request: PUT %s", url)
574 curl = self._get_user_agent()
577 with timer.Timer() as t:
579 body_reader = BytesIO(body)
580 response_body = BytesIO()
581 curl.setopt(pycurl.NOSIGNAL, 1)
582 curl.setopt(pycurl.OPENSOCKETFUNCTION,
583 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
584 curl.setopt(pycurl.URL, url.encode('utf-8'))
585 # Using UPLOAD tells cURL to wait for a "go ahead" from the
586 # Keep server (in the form of a HTTP/1.1 "100 Continue"
587 # response) instead of sending the request body immediately.
588 # This allows the server to reject the request if the request
589 # is invalid or the server is read-only, without waiting for
590 # the client to send the entire block.
591 curl.setopt(pycurl.UPLOAD, True)
592 curl.setopt(pycurl.INFILESIZE, len(body))
593 curl.setopt(pycurl.READFUNCTION, body_reader.read)
594 curl.setopt(pycurl.HTTPHEADER, [
595 '{}: {}'.format(k,v) for k,v in put_headers.items()])
596 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
597 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
599 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
600 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
602 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
603 self._setcurltimeouts(curl, timeout)
606 except Exception as e:
607 raise arvados.errors.HttpError(0, str(e))
613 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
614 'body': response_body.getvalue().decode('utf-8'),
615 'headers': self._headers,
618 ok = retry.check_http_response_success(self._result['status_code'])
620 self._result['error'] = arvados.errors.HttpError(
621 self._result['status_code'],
622 self._headers.get('x-status-line', 'Error'))
623 except self.HTTP_ERRORS as e:
627 self._usable = ok != False # still usable if ok is True or None
628 if self._result.get('status_code', None):
629 # Client is functional. See comment in get().
630 self._put_user_agent(curl)
634 _logger.debug("Request fail: PUT %s => %s: %s",
635 url, type(self._result['error']), str(self._result['error']))
637 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
638 self._result['status_code'],
641 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
642 if self.upload_counter:
643 self.upload_counter.add(len(body))
647 class KeepWriterQueue(queue.Queue):
648 def __init__(self, copies, classes=[]):
649 queue.Queue.__init__(self) # Old-style superclass
650 self.wanted_copies = copies
651 self.wanted_storage_classes = classes
652 self.successful_copies = 0
653 self.confirmed_storage_classes = {}
655 self.storage_classes_tracking = True
656 self.queue_data_lock = threading.RLock()
657 self.pending_tries = max(copies, len(classes))
658 self.pending_tries_notification = threading.Condition()
660 def write_success(self, response, replicas_nr, classes_confirmed):
661 with self.queue_data_lock:
662 self.successful_copies += replicas_nr
663 if classes_confirmed is None:
664 self.storage_classes_tracking = False
665 elif self.storage_classes_tracking:
666 for st_class, st_copies in classes_confirmed.items():
668 self.confirmed_storage_classes[st_class] += st_copies
670 self.confirmed_storage_classes[st_class] = st_copies
671 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
672 self.response = response
673 with self.pending_tries_notification:
674 self.pending_tries_notification.notify_all()
676 def write_fail(self, ks):
677 with self.pending_tries_notification:
678 self.pending_tries += 1
679 self.pending_tries_notification.notify()
681 def pending_copies(self):
682 with self.queue_data_lock:
683 return self.wanted_copies - self.successful_copies
685 def satisfied_classes(self):
686 with self.queue_data_lock:
687 if not self.storage_classes_tracking:
688 # Notifies disabled storage classes expectation to
691 return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
693 def pending_classes(self):
694 with self.queue_data_lock:
695 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
697 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
698 for st_class, st_copies in self.confirmed_storage_classes.items():
699 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
700 unsatisfied_classes.remove(st_class)
701 return unsatisfied_classes
703 def get_next_task(self):
704 with self.pending_tries_notification:
706 if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
707 # This notify_all() is unnecessary --
708 # write_success() already called notify_all()
709 # when pending<1 became true, so it's not
710 # possible for any other thread to be in
711 # wait() now -- but it's cheap insurance
712 # against deadlock so we do it anyway:
713 self.pending_tries_notification.notify_all()
714 # Drain the queue and then raise Queue.Empty
718 elif self.pending_tries > 0:
719 service, service_root = self.get_nowait()
720 if service.finished():
723 self.pending_tries -= 1
724 return service, service_root
726 self.pending_tries_notification.notify_all()
729 self.pending_tries_notification.wait()
732 class KeepWriterThreadPool(object):
733 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
734 self.total_task_nr = 0
735 if (not max_service_replicas) or (max_service_replicas >= copies):
738 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
739 _logger.debug("Pool max threads is %d", num_threads)
741 self.queue = KeepClient.KeepWriterQueue(copies, classes)
743 for _ in range(num_threads):
744 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
745 self.workers.append(w)
747 def add_task(self, ks, service_root):
748 self.queue.put((ks, service_root))
749 self.total_task_nr += 1
752 return self.queue.successful_copies, self.queue.satisfied_classes()
756 for worker in self.workers:
758 # Wait for finished work
762 return self.queue.response
765 class KeepWriterThread(threading.Thread):
766 class TaskFailed(RuntimeError): pass
768 def __init__(self, queue, data, data_hash, timeout=None):
769 super(KeepClient.KeepWriterThread, self).__init__()
770 self.timeout = timeout
773 self.data_hash = data_hash
779 service, service_root = self.queue.get_next_task()
783 locator, copies, classes = self.do_task(service, service_root)
784 except Exception as e:
785 if not isinstance(e, self.TaskFailed):
786 _logger.exception("Exception in KeepWriterThread")
787 self.queue.write_fail(service)
789 self.queue.write_success(locator, copies, classes)
791 self.queue.task_done()
793 def do_task(self, service, service_root):
794 classes = self.queue.pending_classes()
798 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
799 success = bool(service.put(self.data_hash,
801 timeout=self.timeout,
803 result = service.last_result()
806 if result.get('status_code'):
807 _logger.debug("Request fail: PUT %s => %s %s",
809 result.get('status_code'),
811 raise self.TaskFailed()
813 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
814 str(threading.current_thread()),
819 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
820 except (KeyError, ValueError):
823 classes_confirmed = {}
825 scch = result['headers']['x-keep-storage-classes-confirmed']
826 for confirmation in scch.replace(' ', '').split(','):
827 if '=' in confirmation:
828 stored_class, stored_copies = confirmation.split('=')[:2]
829 classes_confirmed[stored_class] = int(stored_copies)
830 except (KeyError, ValueError):
831 # Storage classes confirmed header missing or corrupt
832 classes_confirmed = None
834 return result['body'].strip(), replicas_stored, classes_confirmed
837 def __init__(self, api_client=None, proxy=None,
838 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
839 api_token=None, local_store=None, block_cache=None,
840 num_retries=10, session=None, num_prefetch_threads=None):
841 """Initialize a new KeepClient.
845 The API client to use to find Keep services. If not
846 provided, KeepClient will build one from available Arvados
850 If specified, this KeepClient will send requests to this Keep
851 proxy. Otherwise, KeepClient will fall back to the setting of the
852 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
853 If you want to KeepClient does not use a proxy, pass in an empty
857 The initial timeout (in seconds) for HTTP requests to Keep
858 non-proxy servers. A tuple of three floats is interpreted as
859 (connection_timeout, read_timeout, minimum_bandwidth). A connection
860 will be aborted if the average traffic rate falls below
861 minimum_bandwidth bytes per second over an interval of read_timeout
862 seconds. Because timeouts are often a result of transient server
863 load, the actual connection timeout will be increased by a factor
864 of two on each retry.
865 Default: (2, 256, 32768).
868 The initial timeout (in seconds) for HTTP requests to
869 Keep proxies. A tuple of three floats is interpreted as
870 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
871 described above for adjusting connection timeouts on retry also
873 Default: (20, 256, 32768).
876 If you're not using an API client, but only talking
877 directly to a Keep proxy, this parameter specifies an API token
878 to authenticate Keep requests. It is an error to specify both
879 api_client and api_token. If you specify neither, KeepClient
880 will use one available from the Arvados configuration.
883 If specified, this KeepClient will bypass Keep
884 services, and save data to the named directory. If unspecified,
885 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
886 environment variable. If you want to ensure KeepClient does not
887 use local storage, pass in an empty string. This is primarily
888 intended to mock a server for testing.
891 The default number of times to retry failed requests.
892 This will be used as the default num_retries value when get() and
893 put() are called. Default 10.
895 self.lock = threading.Lock()
897 if config.get('ARVADOS_KEEP_SERVICES'):
898 proxy = config.get('ARVADOS_KEEP_SERVICES')
900 proxy = config.get('ARVADOS_KEEP_PROXY')
901 if api_token is None:
902 if api_client is None:
903 api_token = config.get('ARVADOS_API_TOKEN')
905 api_token = api_client.api_token
906 elif api_client is not None:
908 "can't build KeepClient with both API client and token")
909 if local_store is None:
910 local_store = os.environ.get('KEEP_LOCAL_STORE')
912 if api_client is None:
913 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
915 self.insecure = api_client.insecure
917 self.block_cache = block_cache if block_cache else KeepBlockCache()
918 self.timeout = timeout
919 self.proxy_timeout = proxy_timeout
920 self._user_agent_pool = queue.LifoQueue()
921 self.upload_counter = Counter()
922 self.download_counter = Counter()
923 self.put_counter = Counter()
924 self.get_counter = Counter()
925 self.hits_counter = Counter()
926 self.misses_counter = Counter()
927 self._storage_classes_unsupported_warning = False
928 self._default_classes = []
929 self.num_prefetch_threads = num_prefetch_threads or 2
930 self._prefetch_queue = None
931 self._prefetch_threads = None
934 self.local_store = local_store
935 self.head = self.local_store_head
936 self.get = self.local_store_get
937 self.put = self.local_store_put
939 self.num_retries = num_retries
940 self.max_replicas_per_service = None
942 proxy_uris = proxy.split()
943 for i in range(len(proxy_uris)):
944 if not proxy_uris[i].endswith('/'):
947 url = urllib.parse.urlparse(proxy_uris[i])
948 if not (url.scheme and url.netloc):
949 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
950 self.api_token = api_token
951 self._gateway_services = {}
952 self._keep_services = [{
953 'uuid': "00000-bi6l4-%015d" % idx,
954 'service_type': 'proxy',
955 '_service_root': uri,
956 } for idx, uri in enumerate(proxy_uris)]
957 self._writable_services = self._keep_services
958 self.using_proxy = True
959 self._static_services_list = True
961 # It's important to avoid instantiating an API client
962 # unless we actually need one, for testing's sake.
963 if api_client is None:
964 api_client = arvados.api('v1')
965 self.api_client = api_client
966 self.api_token = api_client.api_token
967 self._gateway_services = {}
968 self._keep_services = None
969 self._writable_services = None
970 self.using_proxy = None
971 self._static_services_list = False
973 self._default_classes = [
974 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
976 # We're talking to an old cluster
979 def current_timeout(self, attempt_number):
980 """Return the appropriate timeout to use for this client.
982 The proxy timeout setting if the backend service is currently a proxy,
983 the regular timeout setting otherwise. The `attempt_number` indicates
984 how many times the operation has been tried already (starting from 0
985 for the first try), and scales the connection timeout portion of the
986 return value accordingly.
989 # TODO(twp): the timeout should be a property of a
990 # KeepService, not a KeepClient. See #4488.
991 t = self.proxy_timeout if self.using_proxy else self.timeout
993 return (t[0] * (1 << attempt_number), t[1])
995 return (t[0] * (1 << attempt_number), t[1], t[2])
996 def _any_nondisk_services(self, service_list):
997 return any(ks.get('service_type', 'disk') != 'disk'
998 for ks in service_list)
1000 def build_services_list(self, force_rebuild=False):
1001 if (self._static_services_list or
1002 (self._keep_services and not force_rebuild)):
1006 keep_services = self.api_client.keep_services().accessible()
1007 except Exception: # API server predates Keep services.
1008 keep_services = self.api_client.keep_disks().list()
1010 # Gateway services are only used when specified by UUID,
1011 # so there's nothing to gain by filtering them by
1013 self._gateway_services = {ks['uuid']: ks for ks in
1014 keep_services.execute()['items']}
1015 if not self._gateway_services:
1016 raise arvados.errors.NoKeepServersError()
1018 # Precompute the base URI for each service.
1019 for r in self._gateway_services.values():
1020 host = r['service_host']
1021 if not host.startswith('[') and host.find(':') >= 0:
1022 # IPv6 URIs must be formatted like http://[::1]:80/...
1023 host = '[' + host + ']'
1024 r['_service_root'] = "{}://{}:{:d}/".format(
1025 'https' if r['service_ssl_flag'] else 'http',
1029 _logger.debug(str(self._gateway_services))
1030 self._keep_services = [
1031 ks for ks in self._gateway_services.values()
1032 if not ks.get('service_type', '').startswith('gateway:')]
1033 self._writable_services = [ks for ks in self._keep_services
1034 if not ks.get('read_only')]
1036 # For disk type services, max_replicas_per_service is 1
1037 # It is unknown (unlimited) for other service types.
1038 if self._any_nondisk_services(self._writable_services):
1039 self.max_replicas_per_service = None
1041 self.max_replicas_per_service = 1
1043 def _service_weight(self, data_hash, service_uuid):
1044 """Compute the weight of a Keep service endpoint for a data
1045 block with a known hash.
1047 The weight is md5(h + u) where u is the last 15 characters of
1048 the service endpoint's UUID.
1050 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1052 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1053 """Return an array of Keep service endpoints, in the order in
1054 which they should be probed when reading or writing data with
1055 the given hash+hints.
1057 self.build_services_list(force_rebuild)
1060 # Use the services indicated by the given +K@... remote
1061 # service hints, if any are present and can be resolved to a
1063 for hint in locator.hints:
1064 if hint.startswith('K@'):
1066 sorted_roots.append(
1067 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1068 elif len(hint) == 29:
1069 svc = self._gateway_services.get(hint[2:])
1071 sorted_roots.append(svc['_service_root'])
1073 # Sort the available local services by weight (heaviest first)
1074 # for this locator, and return their service_roots (base URIs)
1076 use_services = self._keep_services
1078 use_services = self._writable_services
1079 self.using_proxy = self._any_nondisk_services(use_services)
1080 sorted_roots.extend([
1081 svc['_service_root'] for svc in sorted(
1084 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1085 _logger.debug("{}: {}".format(locator, sorted_roots))
1088 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1089 # roots_map is a dictionary, mapping Keep service root strings
1090 # to KeepService objects. Poll for Keep services, and add any
1091 # new ones to roots_map. Return the current list of local
1093 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1094 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1095 for root in local_roots:
1096 if root not in roots_map:
1097 roots_map[root] = self.KeepService(
1098 root, self._user_agent_pool,
1099 upload_counter=self.upload_counter,
1100 download_counter=self.download_counter,
1102 insecure=self.insecure)
1106 def _check_loop_result(result):
1107 # KeepClient RetryLoops should save results as a 2-tuple: the
1108 # actual result of the request, and the number of servers available
1109 # to receive the request this round.
1110 # This method returns True if there's a real result, False if
1111 # there are no more servers available, otherwise None.
1112 if isinstance(result, Exception):
1114 result, tried_server_count = result
1115 if (result is not None) and (result is not False):
1117 elif tried_server_count < 1:
1118 _logger.info("No more Keep services to try; giving up")
1123 def get_from_cache(self, loc_s):
1124 """Fetch a block only if is in the cache, otherwise return None."""
1125 locator = KeepLocator(loc_s)
1126 slot = self.block_cache.get(locator.md5sum)
1127 if slot is not None and slot.ready.is_set():
1132 def refresh_signature(self, loc):
1133 """Ask Keep to get the remote block and return its local signature"""
1134 now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1135 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1138 def head(self, loc_s, **kwargs):
1139 return self._get_or_head(loc_s, method="HEAD", **kwargs)
1142 def get(self, loc_s, **kwargs):
1143 return self._get_or_head(loc_s, method="GET", **kwargs)
1145 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1146 """Get data from Keep.
1148 This method fetches one or more blocks of data from Keep. It
1149 sends a request each Keep service registered with the API
1150 server (or the proxy provided when this client was
1151 instantiated), then each service named in location hints, in
1152 sequence. As soon as one service provides the data, it's
1156 * loc_s: A string of one or more comma-separated locators to fetch.
1157 This method returns the concatenation of these blocks.
1158 * num_retries: The number of times to retry GET requests to
1159 *each* Keep server if it returns temporary failures, with
1160 exponential backoff. Note that, in each loop, the method may try
1161 to fetch data from every available Keep service, along with any
1162 that are named in location hints in the locator. The default value
1163 is set when the KeepClient is initialized.
1166 return ''.join(self.get(x) for x in loc_s.split(','))
1168 self.get_counter.add(1)
1170 request_id = (request_id or
1171 (hasattr(self, 'api_client') and self.api_client.request_id) or
1172 arvados.util.new_request_id())
1175 headers['X-Request-Id'] = request_id
1180 locator = KeepLocator(loc_s)
1183 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1185 # Fresh and empty "first time it is used" slot
1188 # this is request for a prefetch to fill in
1189 # the cache, don't need to wait for the
1190 # result, so if it is already in flight return
1191 # immediately. Clear 'slot' to prevent
1192 # finally block from calling slot.set()
1197 if blob is not None:
1198 self.hits_counter.add(1)
1201 # If blob is None, this means either
1203 # (a) another thread was fetching this block and
1204 # failed with an error or
1206 # (b) cache thrashing caused the slot to be
1207 # evicted (content set to None) by another thread
1208 # between the call to reserve_cache() and get().
1210 # We'll handle these cases by reserving a new slot
1211 # and then doing a full GET request.
1214 self.misses_counter.add(1)
1216 # If the locator has hints specifying a prefix (indicating a
1217 # remote keepproxy) or the UUID of a local gateway service,
1218 # read data from the indicated service(s) instead of the usual
1219 # list of local disk services.
1220 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1221 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1222 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1223 for hint in locator.hints if (
1224 hint.startswith('K@') and
1226 self._gateway_services.get(hint[2:])
1228 # Map root URLs to their KeepService objects.
1230 root: self.KeepService(root, self._user_agent_pool,
1231 upload_counter=self.upload_counter,
1232 download_counter=self.download_counter,
1234 insecure=self.insecure)
1235 for root in hint_roots
1238 # See #3147 for a discussion of the loop implementation. Highlights:
1239 # * Refresh the list of Keep services after each failure, in case
1240 # it's being updated.
1241 # * Retry until we succeed, we're out of retries, or every available
1242 # service has returned permanent failure.
1245 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1247 for tries_left in loop:
1249 sorted_roots = self.map_new_services(
1251 force_rebuild=(tries_left < num_retries),
1252 need_writable=False,
1254 except Exception as error:
1255 loop.save_result(error)
1258 # Query KeepService objects that haven't returned
1259 # permanent failure, in our specified shuffle order.
1260 services_to_try = [roots_map[root]
1261 for root in sorted_roots
1262 if roots_map[root].usable()]
1263 for keep_service in services_to_try:
1264 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1265 if blob is not None:
1267 loop.save_result((blob, len(services_to_try)))
1269 # Always cache the result, then return it if we succeeded.
1273 if slot is not None:
1274 self.block_cache.set(slot, blob)
1276 # Q: Including 403 is necessary for the Keep tests to continue
1277 # passing, but maybe they should expect KeepReadError instead?
1278 not_founds = sum(1 for key in sorted_roots
1279 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1280 service_errors = ((key, roots_map[key].last_result()['error'])
1281 for key in sorted_roots)
1283 raise arvados.errors.KeepReadError(
1284 "[{}] failed to read {}: no Keep services available ({})".format(
1285 request_id, loc_s, loop.last_result()))
1286 elif not_founds == len(sorted_roots):
1287 raise arvados.errors.NotFoundError(
1288 "[{}] {} not found".format(request_id, loc_s), service_errors)
1290 raise arvados.errors.KeepReadError(
1291 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1294 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1295 """Save data in Keep.
1297 This method will get a list of Keep services from the API server, and
1298 send the data to each one simultaneously in a new thread. Once the
1299 uploads are finished, if enough copies are saved, this method returns
1300 the most recent HTTP response body. If requests fail to upload
1301 enough copies, this method raises KeepWriteError.
1304 * data: The string of data to upload.
1305 * copies: The number of copies that the user requires be saved.
1307 * num_retries: The number of times to retry PUT requests to
1308 *each* Keep server if it returns temporary failures, with
1309 exponential backoff. The default value is set when the
1310 KeepClient is initialized.
1311 * classes: An optional list of storage class names where copies should
1315 classes = classes or self._default_classes
1317 if not isinstance(data, bytes):
1318 data = data.encode()
1320 self.put_counter.add(1)
1322 data_hash = hashlib.md5(data).hexdigest()
1323 loc_s = data_hash + '+' + str(len(data))
1326 locator = KeepLocator(loc_s)
1328 request_id = (request_id or
1329 (hasattr(self, 'api_client') and self.api_client.request_id) or
1330 arvados.util.new_request_id())
1332 'X-Request-Id': request_id,
1333 'X-Keep-Desired-Replicas': str(copies),
1336 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1340 for tries_left in loop:
1342 sorted_roots = self.map_new_services(
1344 force_rebuild=(tries_left < num_retries),
1347 except Exception as error:
1348 loop.save_result(error)
1351 pending_classes = []
1352 if done_classes is not None:
1353 pending_classes = list(set(classes) - set(done_classes))
1354 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1355 data_hash=data_hash,
1356 copies=copies - done_copies,
1357 max_service_replicas=self.max_replicas_per_service,
1358 timeout=self.current_timeout(num_retries - tries_left),
1359 classes=pending_classes)
1360 for service_root, ks in [(root, roots_map[root])
1361 for root in sorted_roots]:
1364 writer_pool.add_task(ks, service_root)
1366 pool_copies, pool_classes = writer_pool.done()
1367 done_copies += pool_copies
1368 if (done_classes is not None) and (pool_classes is not None):
1369 done_classes += pool_classes
1371 (done_copies >= copies and set(done_classes) == set(classes),
1372 writer_pool.total_task_nr))
1374 # Old keepstore contacted without storage classes support:
1375 # success is determined only by successful copies.
1377 # Disable storage classes tracking from this point forward.
1378 if not self._storage_classes_unsupported_warning:
1379 self._storage_classes_unsupported_warning = True
1380 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1383 (done_copies >= copies, writer_pool.total_task_nr))
1386 return writer_pool.response()
1388 raise arvados.errors.KeepWriteError(
1389 "[{}] failed to write {}: no Keep services available ({})".format(
1390 request_id, data_hash, loop.last_result()))
1392 service_errors = ((key, roots_map[key].last_result()['error'])
1393 for key in sorted_roots
1394 if roots_map[key].last_result()['error'])
1395 raise arvados.errors.KeepWriteError(
1396 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1397 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1399 def _block_prefetch_worker(self):
1400 """The background downloader thread."""
1403 b = self._prefetch_queue.get()
1406 self.get(b, prefetch=True)
1408 _logger.exception("Exception doing block prefetch")
1410 def _start_prefetch_threads(self):
1411 if self._prefetch_threads is None:
1413 if self._prefetch_threads is not None:
1415 self._prefetch_queue = queue.Queue()
1416 self._prefetch_threads = []
1417 for i in range(0, self.num_prefetch_threads):
1418 thread = threading.Thread(target=self._block_prefetch_worker)
1419 self._prefetch_threads.append(thread)
1420 thread.daemon = True
1423 def block_prefetch(self, locator):
1425 This relies on the fact that KeepClient implements a block cache,
1426 so repeated requests for the same block will not result in repeated
1427 downloads (unless the block is evicted from the cache.) This method
1431 if self.block_cache.get(locator) is not None:
1434 self._start_prefetch_threads()
1435 self._prefetch_queue.put(locator)
1437 def stop_prefetch_threads(self):
1439 if self._prefetch_threads is not None:
1440 for t in self._prefetch_threads:
1441 self._prefetch_queue.put(None)
1442 for t in self._prefetch_threads:
1444 self._prefetch_threads = None
1445 self._prefetch_queue = None
1447 def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1448 """A stub for put().
1450 This method is used in place of the real put() method when
1451 using local storage (see constructor's local_store argument).
1453 copies and num_retries arguments are ignored: they are here
1454 only for the sake of offering the same call signature as
1457 Data stored this way can be retrieved via local_store_get().
1459 md5 = hashlib.md5(data).hexdigest()
1460 locator = '%s+%d' % (md5, len(data))
1461 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1463 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1464 os.path.join(self.local_store, md5))
1467 def local_store_get(self, loc_s, num_retries=None):
1468 """Companion to local_store_put()."""
1470 locator = KeepLocator(loc_s)
1472 raise arvados.errors.NotFoundError(
1473 "Invalid data locator: '%s'" % loc_s)
1474 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1476 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1479 def local_store_head(self, loc_s, num_retries=None):
1480 """Companion to local_store_put()."""
1482 locator = KeepLocator(loc_s)
1484 raise arvados.errors.NotFoundError(
1485 "Invalid data locator: '%s'" % loc_s)
1486 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1488 if os.path.exists(os.path.join(self.local_store, locator.md5sum)):