1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
26 from io import BytesIO
29 import arvados.config as config
31 import arvados.retry as retry
33 import arvados.diskcache
34 from arvados._pycurlhelper import PyCurlHelper
37 _logger = logging.getLogger('arvados.keep')
38 global_client_object = None
40 # Monkey patch TCP constants when not available (apple). Values sourced from:
41 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
42 if sys.platform == 'darwin':
43 if not hasattr(socket, 'TCP_KEEPALIVE'):
44 socket.TCP_KEEPALIVE = 0x010
45 if not hasattr(socket, 'TCP_KEEPINTVL'):
46 socket.TCP_KEEPINTVL = 0x101
47 if not hasattr(socket, 'TCP_KEEPCNT'):
48 socket.TCP_KEEPCNT = 0x102
50 class KeepLocator(object):
51 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
52 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
54 def __init__(self, locator_str):
57 self._perm_expiry = None
58 pieces = iter(locator_str.split('+'))
59 self.md5sum = next(pieces)
61 self.size = int(next(pieces))
65 if self.HINT_RE.match(hint) is None:
66 raise ValueError("invalid hint format: {}".format(hint))
67 elif hint.startswith('A'):
68 self.parse_permission_hint(hint)
70 self.hints.append(hint)
75 for s in [self.md5sum, self.size,
76 self.permission_hint()] + self.hints
80 if self.size is not None:
81 return "%s+%i" % (self.md5sum, self.size)
85 def _make_hex_prop(name, length):
86 # Build and return a new property with the given name that
87 # must be a hex string of the given length.
88 data_name = '_{}'.format(name)
90 return getattr(self, data_name)
91 def setter(self, hex_str):
92 if not arvados.util.is_hex(hex_str, length):
93 raise ValueError("{} is not a {}-digit hex string: {!r}".
94 format(name, length, hex_str))
95 setattr(self, data_name, hex_str)
96 return property(getter, setter)
98 md5sum = _make_hex_prop('md5sum', 32)
99 perm_sig = _make_hex_prop('perm_sig', 40)
102 def perm_expiry(self):
103 return self._perm_expiry
106 def perm_expiry(self, value):
107 if not arvados.util.is_hex(value, 1, 8):
109 "permission timestamp must be a hex Unix timestamp: {}".
111 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
113 def permission_hint(self):
114 data = [self.perm_sig, self.perm_expiry]
117 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
118 return "A{}@{:08x}".format(*data)
120 def parse_permission_hint(self, s):
122 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
124 raise ValueError("bad permission hint {}".format(s))
126 def permission_expired(self, as_of_dt=None):
127 if self.perm_expiry is None:
129 elif as_of_dt is None:
130 as_of_dt = datetime.datetime.now()
131 return self.perm_expiry <= as_of_dt
135 """Simple interface to a global KeepClient object.
137 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
138 own API client. The global KeepClient will build an API client from the
139 current Arvados configuration, which may not match the one you built.
144 def global_client_object(cls):
145 global global_client_object
146 # Previously, KeepClient would change its behavior at runtime based
147 # on these configuration settings. We simulate that behavior here
148 # by checking the values and returning a new KeepClient if any of
150 key = (config.get('ARVADOS_API_HOST'),
151 config.get('ARVADOS_API_TOKEN'),
152 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
153 config.get('ARVADOS_KEEP_PROXY'),
154 os.environ.get('KEEP_LOCAL_STORE'))
155 if (global_client_object is None) or (cls._last_key != key):
156 global_client_object = KeepClient()
158 return global_client_object
161 def get(locator, **kwargs):
162 return Keep.global_client_object().get(locator, **kwargs)
165 def put(data, **kwargs):
166 return Keep.global_client_object().put(data, **kwargs)
168 class KeepBlockCache(object):
169 def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
170 self.cache_max = cache_max
171 self._cache = collections.OrderedDict()
172 self._cache_lock = threading.Lock()
173 self._max_slots = max_slots
174 self._disk_cache = disk_cache
175 self._disk_cache_dir = disk_cache_dir
176 self._cache_updating = threading.Condition(self._cache_lock)
178 if self._disk_cache and self._disk_cache_dir is None:
179 self._disk_cache_dir = str(arvados.util._BaseDirectories('CACHE').storage_path('keep'))
181 if self._max_slots == 0:
183 # Each block uses two file descriptors, one used to
184 # open it initially and hold the flock(), and a second
185 # hidden one used by mmap().
187 # Set max slots to 1/8 of maximum file handles. This
188 # means we'll use at most 1/4 of total file handles.
190 # NOFILE typically defaults to 1024 on Linux so this
191 # is 128 slots (256 file handles), which means we can
192 # cache up to 8 GiB of 64 MiB blocks. This leaves
193 # 768 file handles for sockets and other stuff.
195 # When we want the ability to have more cache (e.g. in
196 # arv-mount) we'll increase rlimit before calling
198 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
201 self._max_slots = 512
203 if self.cache_max == 0:
205 fs = os.statvfs(self._disk_cache_dir)
206 # Calculation of available space incorporates existing cache usage
207 existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
208 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
209 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
211 # 10% of total disk size
212 # 25% of available space
214 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
217 self.cache_max = (256 * 1024 * 1024)
219 self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
223 self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
224 for slot in self._cache.values():
225 self.cache_total += slot.size()
228 class CacheSlot(object):
229 __slots__ = ("locator", "ready", "content")
231 def __init__(self, locator):
232 self.locator = locator
233 self.ready = threading.Event()
240 def set(self, value):
241 if self.content is not None:
248 if self.content is None:
251 return len(self.content)
257 def _resize_cache(self, cache_max, max_slots):
258 # Try and make sure the contents of the cache do not exceed
259 # the supplied maximums.
261 if self.cache_total <= cache_max and len(self._cache) <= max_slots:
264 _evict_candidates = collections.deque(self._cache.values())
265 while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots):
266 slot = _evict_candidates.popleft()
267 if not slot.ready.is_set():
272 self.cache_total -= sz
273 del self._cache[slot.locator]
277 '''Cap the cache size to self.cache_max'''
278 with self._cache_updating:
279 self._resize_cache(self.cache_max, self._max_slots)
280 self._cache_updating.notify_all()
282 def _get(self, locator):
283 # Test if the locator is already in the cache
284 if locator in self._cache:
285 n = self._cache[locator]
286 if n.ready.is_set() and n.content is None:
287 del self._cache[n.locator]
289 self._cache.move_to_end(locator)
292 # see if it exists on disk
293 n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
295 self._cache[n.locator] = n
296 self.cache_total += n.size()
300 def get(self, locator):
301 with self._cache_lock:
302 return self._get(locator)
304 def reserve_cache(self, locator):
305 '''Reserve a cache slot for the specified locator,
306 or return the existing slot.'''
307 with self._cache_updating:
308 n = self._get(locator)
312 # Add a new cache slot for the locator
313 self._resize_cache(self.cache_max, self._max_slots-1)
314 while len(self._cache) >= self._max_slots:
315 # If there isn't a slot available, need to wait
316 # for something to happen that releases one of the
317 # cache slots. Idle for 200 ms or woken up by
319 self._cache_updating.wait(timeout=0.2)
320 self._resize_cache(self.cache_max, self._max_slots-1)
323 n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
325 n = KeepBlockCache.CacheSlot(locator)
326 self._cache[n.locator] = n
329 def set(self, slot, blob):
332 self.cache_total += slot.size()
335 if e.errno == errno.ENOMEM:
336 # Reduce max slots to current - 4, cap cache and retry
337 with self._cache_lock:
338 self._max_slots = max(4, len(self._cache) - 4)
339 elif e.errno == errno.ENOSPC:
340 # Reduce disk max space to current - 256 MiB, cap cache and retry
341 with self._cache_lock:
342 sm = sum(st.size() for st in self._cache.values())
343 self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
344 elif e.errno == errno.ENODEV:
345 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
346 except Exception as e:
349 # Check if we should evict things from the cache. Either
350 # because we added a new thing or there was an error and
351 # we possibly adjusted the limits down, so we might need
352 # to push something out.
356 # Only gets here if there was an error the first time. The
357 # exception handler adjusts limits downward in some cases
358 # to free up resources, which would make the operation
361 self.cache_total += slot.size()
362 except Exception as e:
363 # It failed again. Give up.
365 raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
369 class Counter(object):
370 def __init__(self, v=0):
371 self._lk = threading.Lock()
383 class KeepClient(object):
384 DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
385 DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
387 class KeepService(PyCurlHelper):
388 """Make requests to a single Keep service, and track results.
390 A KeepService is intended to last long enough to perform one
391 transaction (GET or PUT) against one Keep service. This can
392 involve calling either get() or put() multiple times in order
393 to retry after transient failures. However, calling both get()
394 and put() on a single instance -- or using the same instance
395 to access two different Keep services -- will not produce
402 arvados.errors.HttpError,
405 def __init__(self, root, user_agent_pool=queue.LifoQueue(),
407 download_counter=None,
410 super(KeepClient.KeepService, self).__init__()
412 self._user_agent_pool = user_agent_pool
413 self._result = {'error': None}
417 self.get_headers = {'Accept': 'application/octet-stream'}
418 self.get_headers.update(headers)
419 self.put_headers = headers
420 self.upload_counter = upload_counter
421 self.download_counter = download_counter
422 self.insecure = insecure
425 """Is it worth attempting a request?"""
429 """Did the request succeed or encounter permanent failure?"""
430 return self._result['error'] == False or not self._usable
432 def last_result(self):
435 def _get_user_agent(self):
437 return self._user_agent_pool.get(block=False)
441 def _put_user_agent(self, ua):
444 self._user_agent_pool.put(ua, block=False)
448 def get(self, locator, method="GET", timeout=None):
449 # locator is a KeepLocator object.
450 url = self.root + str(locator)
451 _logger.debug("Request: %s %s", method, url)
452 curl = self._get_user_agent()
455 with timer.Timer() as t:
457 response_body = BytesIO()
458 curl.setopt(pycurl.NOSIGNAL, 1)
459 curl.setopt(pycurl.OPENSOCKETFUNCTION,
460 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
461 curl.setopt(pycurl.URL, url.encode('utf-8'))
462 curl.setopt(pycurl.HTTPHEADER, [
463 '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
464 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
465 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
467 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
468 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
470 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
472 curl.setopt(pycurl.NOBODY, True)
474 curl.setopt(pycurl.HTTPGET, True)
475 self._setcurltimeouts(curl, timeout, method=="HEAD")
479 except Exception as e:
480 raise arvados.errors.HttpError(0, str(e))
486 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
487 'body': response_body.getvalue(),
488 'headers': self._headers,
492 ok = retry.check_http_response_success(self._result['status_code'])
494 self._result['error'] = arvados.errors.HttpError(
495 self._result['status_code'],
496 self._headers.get('x-status-line', 'Error'))
497 except self.HTTP_ERRORS as e:
501 self._usable = ok != False
502 if self._result.get('status_code', None):
503 # The client worked well enough to get an HTTP status
504 # code, so presumably any problems are just on the
505 # server side and it's OK to reuse the client.
506 self._put_user_agent(curl)
508 # Don't return this client to the pool, in case it's
512 _logger.debug("Request fail: GET %s => %s: %s",
513 url, type(self._result['error']), str(self._result['error']))
516 _logger.info("HEAD %s: %s bytes",
517 self._result['status_code'],
518 self._result.get('content-length'))
519 if self._result['headers'].get('x-keep-locator'):
520 # This is a response to a remote block copy request, return
521 # the local copy block locator.
522 return self._result['headers'].get('x-keep-locator')
525 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
526 self._result['status_code'],
527 len(self._result['body']),
529 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
531 if self.download_counter:
532 self.download_counter.add(len(self._result['body']))
533 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
534 if resp_md5 != locator.md5sum:
535 _logger.warning("Checksum fail: md5(%s) = %s",
537 self._result['error'] = arvados.errors.HttpError(
540 return self._result['body']
542 def put(self, hash_s, body, timeout=None, headers={}):
543 put_headers = copy.copy(self.put_headers)
544 put_headers.update(headers)
545 url = self.root + hash_s
546 _logger.debug("Request: PUT %s", url)
547 curl = self._get_user_agent()
550 with timer.Timer() as t:
552 body_reader = BytesIO(body)
553 response_body = BytesIO()
554 curl.setopt(pycurl.NOSIGNAL, 1)
555 curl.setopt(pycurl.OPENSOCKETFUNCTION,
556 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
557 curl.setopt(pycurl.URL, url.encode('utf-8'))
558 # Using UPLOAD tells cURL to wait for a "go ahead" from the
559 # Keep server (in the form of a HTTP/1.1 "100 Continue"
560 # response) instead of sending the request body immediately.
561 # This allows the server to reject the request if the request
562 # is invalid or the server is read-only, without waiting for
563 # the client to send the entire block.
564 curl.setopt(pycurl.UPLOAD, True)
565 curl.setopt(pycurl.INFILESIZE, len(body))
566 curl.setopt(pycurl.READFUNCTION, body_reader.read)
567 curl.setopt(pycurl.HTTPHEADER, [
568 '{}: {}'.format(k,v) for k,v in put_headers.items()])
569 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
570 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
572 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
573 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
575 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
576 self._setcurltimeouts(curl, timeout)
579 except Exception as e:
580 raise arvados.errors.HttpError(0, str(e))
586 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
587 'body': response_body.getvalue().decode('utf-8'),
588 'headers': self._headers,
591 ok = retry.check_http_response_success(self._result['status_code'])
593 self._result['error'] = arvados.errors.HttpError(
594 self._result['status_code'],
595 self._headers.get('x-status-line', 'Error'))
596 except self.HTTP_ERRORS as e:
600 self._usable = ok != False # still usable if ok is True or None
601 if self._result.get('status_code', None):
602 # Client is functional. See comment in get().
603 self._put_user_agent(curl)
607 _logger.debug("Request fail: PUT %s => %s: %s",
608 url, type(self._result['error']), str(self._result['error']))
610 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
611 self._result['status_code'],
614 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
615 if self.upload_counter:
616 self.upload_counter.add(len(body))
620 class KeepWriterQueue(queue.Queue):
621 def __init__(self, copies, classes=[]):
622 queue.Queue.__init__(self) # Old-style superclass
623 self.wanted_copies = copies
624 self.wanted_storage_classes = classes
625 self.successful_copies = 0
626 self.confirmed_storage_classes = {}
628 self.storage_classes_tracking = True
629 self.queue_data_lock = threading.RLock()
630 self.pending_tries = max(copies, len(classes))
631 self.pending_tries_notification = threading.Condition()
633 def write_success(self, response, replicas_nr, classes_confirmed):
634 with self.queue_data_lock:
635 self.successful_copies += replicas_nr
636 if classes_confirmed is None:
637 self.storage_classes_tracking = False
638 elif self.storage_classes_tracking:
639 for st_class, st_copies in classes_confirmed.items():
641 self.confirmed_storage_classes[st_class] += st_copies
643 self.confirmed_storage_classes[st_class] = st_copies
644 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
645 self.response = response
646 with self.pending_tries_notification:
647 self.pending_tries_notification.notify_all()
649 def write_fail(self, ks):
650 with self.pending_tries_notification:
651 self.pending_tries += 1
652 self.pending_tries_notification.notify()
654 def pending_copies(self):
655 with self.queue_data_lock:
656 return self.wanted_copies - self.successful_copies
658 def satisfied_classes(self):
659 with self.queue_data_lock:
660 if not self.storage_classes_tracking:
661 # Notifies disabled storage classes expectation to
664 return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
666 def pending_classes(self):
667 with self.queue_data_lock:
668 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
670 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
671 for st_class, st_copies in self.confirmed_storage_classes.items():
672 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
673 unsatisfied_classes.remove(st_class)
674 return unsatisfied_classes
676 def get_next_task(self):
677 with self.pending_tries_notification:
679 if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
680 # This notify_all() is unnecessary --
681 # write_success() already called notify_all()
682 # when pending<1 became true, so it's not
683 # possible for any other thread to be in
684 # wait() now -- but it's cheap insurance
685 # against deadlock so we do it anyway:
686 self.pending_tries_notification.notify_all()
687 # Drain the queue and then raise Queue.Empty
691 elif self.pending_tries > 0:
692 service, service_root = self.get_nowait()
693 if service.finished():
696 self.pending_tries -= 1
697 return service, service_root
699 self.pending_tries_notification.notify_all()
702 self.pending_tries_notification.wait()
705 class KeepWriterThreadPool(object):
706 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
707 self.total_task_nr = 0
708 if (not max_service_replicas) or (max_service_replicas >= copies):
711 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
712 _logger.debug("Pool max threads is %d", num_threads)
714 self.queue = KeepClient.KeepWriterQueue(copies, classes)
716 for _ in range(num_threads):
717 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
718 self.workers.append(w)
720 def add_task(self, ks, service_root):
721 self.queue.put((ks, service_root))
722 self.total_task_nr += 1
725 return self.queue.successful_copies, self.queue.satisfied_classes()
729 for worker in self.workers:
731 # Wait for finished work
735 return self.queue.response
738 class KeepWriterThread(threading.Thread):
739 class TaskFailed(RuntimeError): pass
741 def __init__(self, queue, data, data_hash, timeout=None):
742 super(KeepClient.KeepWriterThread, self).__init__()
743 self.timeout = timeout
746 self.data_hash = data_hash
752 service, service_root = self.queue.get_next_task()
756 locator, copies, classes = self.do_task(service, service_root)
757 except Exception as e:
758 if not isinstance(e, self.TaskFailed):
759 _logger.exception("Exception in KeepWriterThread")
760 self.queue.write_fail(service)
762 self.queue.write_success(locator, copies, classes)
764 self.queue.task_done()
766 def do_task(self, service, service_root):
767 classes = self.queue.pending_classes()
771 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
772 success = bool(service.put(self.data_hash,
774 timeout=self.timeout,
776 result = service.last_result()
779 if result.get('status_code'):
780 _logger.debug("Request fail: PUT %s => %s %s",
782 result.get('status_code'),
784 raise self.TaskFailed()
786 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
787 str(threading.current_thread()),
792 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
793 except (KeyError, ValueError):
796 classes_confirmed = {}
798 scch = result['headers']['x-keep-storage-classes-confirmed']
799 for confirmation in scch.replace(' ', '').split(','):
800 if '=' in confirmation:
801 stored_class, stored_copies = confirmation.split('=')[:2]
802 classes_confirmed[stored_class] = int(stored_copies)
803 except (KeyError, ValueError):
804 # Storage classes confirmed header missing or corrupt
805 classes_confirmed = None
807 return result['body'].strip(), replicas_stored, classes_confirmed
810 def __init__(self, api_client=None, proxy=None,
811 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
812 api_token=None, local_store=None, block_cache=None,
813 num_retries=10, session=None, num_prefetch_threads=None):
814 """Initialize a new KeepClient.
818 The API client to use to find Keep services. If not
819 provided, KeepClient will build one from available Arvados
823 If specified, this KeepClient will send requests to this Keep
824 proxy. Otherwise, KeepClient will fall back to the setting of the
825 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
826 If you want to KeepClient does not use a proxy, pass in an empty
830 The initial timeout (in seconds) for HTTP requests to Keep
831 non-proxy servers. A tuple of three floats is interpreted as
832 (connection_timeout, read_timeout, minimum_bandwidth). A connection
833 will be aborted if the average traffic rate falls below
834 minimum_bandwidth bytes per second over an interval of read_timeout
835 seconds. Because timeouts are often a result of transient server
836 load, the actual connection timeout will be increased by a factor
837 of two on each retry.
838 Default: (2, 256, 32768).
841 The initial timeout (in seconds) for HTTP requests to
842 Keep proxies. A tuple of three floats is interpreted as
843 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
844 described above for adjusting connection timeouts on retry also
846 Default: (20, 256, 32768).
849 If you're not using an API client, but only talking
850 directly to a Keep proxy, this parameter specifies an API token
851 to authenticate Keep requests. It is an error to specify both
852 api_client and api_token. If you specify neither, KeepClient
853 will use one available from the Arvados configuration.
856 If specified, this KeepClient will bypass Keep
857 services, and save data to the named directory. If unspecified,
858 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
859 environment variable. If you want to ensure KeepClient does not
860 use local storage, pass in an empty string. This is primarily
861 intended to mock a server for testing.
864 The default number of times to retry failed requests.
865 This will be used as the default num_retries value when get() and
866 put() are called. Default 10.
868 self.lock = threading.Lock()
870 if config.get('ARVADOS_KEEP_SERVICES'):
871 proxy = config.get('ARVADOS_KEEP_SERVICES')
873 proxy = config.get('ARVADOS_KEEP_PROXY')
874 if api_token is None:
875 if api_client is None:
876 api_token = config.get('ARVADOS_API_TOKEN')
878 api_token = api_client.api_token
879 elif api_client is not None:
881 "can't build KeepClient with both API client and token")
882 if local_store is None:
883 local_store = os.environ.get('KEEP_LOCAL_STORE')
885 if api_client is None:
886 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
888 self.insecure = api_client.insecure
890 self.block_cache = block_cache if block_cache else KeepBlockCache()
891 self.timeout = timeout
892 self.proxy_timeout = proxy_timeout
893 self._user_agent_pool = queue.LifoQueue()
894 self.upload_counter = Counter()
895 self.download_counter = Counter()
896 self.put_counter = Counter()
897 self.get_counter = Counter()
898 self.hits_counter = Counter()
899 self.misses_counter = Counter()
900 self._storage_classes_unsupported_warning = False
901 self._default_classes = []
902 if num_prefetch_threads is not None:
903 self.num_prefetch_threads = num_prefetch_threads
905 self.num_prefetch_threads = 2
906 self._prefetch_queue = None
907 self._prefetch_threads = None
910 self.local_store = local_store
911 self.head = self.local_store_head
912 self.get = self.local_store_get
913 self.put = self.local_store_put
915 self.num_retries = num_retries
916 self.max_replicas_per_service = None
918 proxy_uris = proxy.split()
919 for i in range(len(proxy_uris)):
920 if not proxy_uris[i].endswith('/'):
923 url = urllib.parse.urlparse(proxy_uris[i])
924 if not (url.scheme and url.netloc):
925 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
926 self.api_token = api_token
927 self._gateway_services = {}
928 self._keep_services = [{
929 'uuid': "00000-bi6l4-%015d" % idx,
930 'service_type': 'proxy',
931 '_service_root': uri,
932 } for idx, uri in enumerate(proxy_uris)]
933 self._writable_services = self._keep_services
934 self.using_proxy = True
935 self._static_services_list = True
937 # It's important to avoid instantiating an API client
938 # unless we actually need one, for testing's sake.
939 if api_client is None:
940 api_client = arvados.api('v1')
941 self.api_client = api_client
942 self.api_token = api_client.api_token
943 self._gateway_services = {}
944 self._keep_services = None
945 self._writable_services = None
946 self.using_proxy = None
947 self._static_services_list = False
949 self._default_classes = [
950 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
952 # We're talking to an old cluster
955 def current_timeout(self, attempt_number):
956 """Return the appropriate timeout to use for this client.
958 The proxy timeout setting if the backend service is currently a proxy,
959 the regular timeout setting otherwise. The `attempt_number` indicates
960 how many times the operation has been tried already (starting from 0
961 for the first try), and scales the connection timeout portion of the
962 return value accordingly.
965 # TODO(twp): the timeout should be a property of a
966 # KeepService, not a KeepClient. See #4488.
967 t = self.proxy_timeout if self.using_proxy else self.timeout
969 return (t[0] * (1 << attempt_number), t[1])
971 return (t[0] * (1 << attempt_number), t[1], t[2])
972 def _any_nondisk_services(self, service_list):
973 return any(ks.get('service_type', 'disk') != 'disk'
974 for ks in service_list)
976 def build_services_list(self, force_rebuild=False):
977 if (self._static_services_list or
978 (self._keep_services and not force_rebuild)):
982 keep_services = self.api_client.keep_services().accessible()
983 except Exception: # API server predates Keep services.
984 keep_services = self.api_client.keep_disks().list()
986 # Gateway services are only used when specified by UUID,
987 # so there's nothing to gain by filtering them by
989 self._gateway_services = {ks['uuid']: ks for ks in
990 keep_services.execute()['items']}
991 if not self._gateway_services:
992 raise arvados.errors.NoKeepServersError()
994 # Precompute the base URI for each service.
995 for r in self._gateway_services.values():
996 host = r['service_host']
997 if not host.startswith('[') and host.find(':') >= 0:
998 # IPv6 URIs must be formatted like http://[::1]:80/...
999 host = '[' + host + ']'
1000 r['_service_root'] = "{}://{}:{:d}/".format(
1001 'https' if r['service_ssl_flag'] else 'http',
1005 _logger.debug(str(self._gateway_services))
1006 self._keep_services = [
1007 ks for ks in self._gateway_services.values()
1008 if not ks.get('service_type', '').startswith('gateway:')]
1009 self._writable_services = [ks for ks in self._keep_services
1010 if not ks.get('read_only')]
1012 # For disk type services, max_replicas_per_service is 1
1013 # It is unknown (unlimited) for other service types.
1014 if self._any_nondisk_services(self._writable_services):
1015 self.max_replicas_per_service = None
1017 self.max_replicas_per_service = 1
1019 def _service_weight(self, data_hash, service_uuid):
1020 """Compute the weight of a Keep service endpoint for a data
1021 block with a known hash.
1023 The weight is md5(h + u) where u is the last 15 characters of
1024 the service endpoint's UUID.
1026 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1028 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1029 """Return an array of Keep service endpoints, in the order in
1030 which they should be probed when reading or writing data with
1031 the given hash+hints.
1033 self.build_services_list(force_rebuild)
1036 # Use the services indicated by the given +K@... remote
1037 # service hints, if any are present and can be resolved to a
1039 for hint in locator.hints:
1040 if hint.startswith('K@'):
1042 sorted_roots.append(
1043 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1044 elif len(hint) == 29:
1045 svc = self._gateway_services.get(hint[2:])
1047 sorted_roots.append(svc['_service_root'])
1049 # Sort the available local services by weight (heaviest first)
1050 # for this locator, and return their service_roots (base URIs)
1052 use_services = self._keep_services
1054 use_services = self._writable_services
1055 self.using_proxy = self._any_nondisk_services(use_services)
1056 sorted_roots.extend([
1057 svc['_service_root'] for svc in sorted(
1060 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1061 _logger.debug("{}: {}".format(locator, sorted_roots))
1064 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1065 # roots_map is a dictionary, mapping Keep service root strings
1066 # to KeepService objects. Poll for Keep services, and add any
1067 # new ones to roots_map. Return the current list of local
1069 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1070 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1071 for root in local_roots:
1072 if root not in roots_map:
1073 roots_map[root] = self.KeepService(
1074 root, self._user_agent_pool,
1075 upload_counter=self.upload_counter,
1076 download_counter=self.download_counter,
1078 insecure=self.insecure)
1082 def _check_loop_result(result):
1083 # KeepClient RetryLoops should save results as a 2-tuple: the
1084 # actual result of the request, and the number of servers available
1085 # to receive the request this round.
1086 # This method returns True if there's a real result, False if
1087 # there are no more servers available, otherwise None.
1088 if isinstance(result, Exception):
1090 result, tried_server_count = result
1091 if (result is not None) and (result is not False):
1093 elif tried_server_count < 1:
1094 _logger.info("No more Keep services to try; giving up")
1099 def get_from_cache(self, loc_s):
1100 """Fetch a block only if is in the cache, otherwise return None."""
1101 locator = KeepLocator(loc_s)
1102 slot = self.block_cache.get(locator.md5sum)
1103 if slot is not None and slot.ready.is_set():
1108 def refresh_signature(self, loc):
1109 """Ask Keep to get the remote block and return its local signature"""
1110 now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1111 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1114 def head(self, loc_s, **kwargs):
1115 return self._get_or_head(loc_s, method="HEAD", **kwargs)
1118 def get(self, loc_s, **kwargs):
1119 return self._get_or_head(loc_s, method="GET", **kwargs)
1121 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1122 """Get data from Keep.
1124 This method fetches one or more blocks of data from Keep. It
1125 sends a request each Keep service registered with the API
1126 server (or the proxy provided when this client was
1127 instantiated), then each service named in location hints, in
1128 sequence. As soon as one service provides the data, it's
1132 * loc_s: A string of one or more comma-separated locators to fetch.
1133 This method returns the concatenation of these blocks.
1134 * num_retries: The number of times to retry GET requests to
1135 *each* Keep server if it returns temporary failures, with
1136 exponential backoff. Note that, in each loop, the method may try
1137 to fetch data from every available Keep service, along with any
1138 that are named in location hints in the locator. The default value
1139 is set when the KeepClient is initialized.
1142 return ''.join(self.get(x) for x in loc_s.split(','))
1144 self.get_counter.add(1)
1146 request_id = (request_id or
1147 (hasattr(self, 'api_client') and self.api_client.request_id) or
1148 arvados.util.new_request_id())
1151 headers['X-Request-Id'] = request_id
1156 locator = KeepLocator(loc_s)
1159 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1161 # Fresh and empty "first time it is used" slot
1164 # this is request for a prefetch to fill in
1165 # the cache, don't need to wait for the
1166 # result, so if it is already in flight return
1167 # immediately. Clear 'slot' to prevent
1168 # finally block from calling slot.set()
1169 if slot.ready.is_set():
1175 if blob is not None:
1176 self.hits_counter.add(1)
1179 # If blob is None, this means either
1181 # (a) another thread was fetching this block and
1182 # failed with an error or
1184 # (b) cache thrashing caused the slot to be
1185 # evicted (content set to None) by another thread
1186 # between the call to reserve_cache() and get().
1188 # We'll handle these cases by reserving a new slot
1189 # and then doing a full GET request.
1192 self.misses_counter.add(1)
1194 # If the locator has hints specifying a prefix (indicating a
1195 # remote keepproxy) or the UUID of a local gateway service,
1196 # read data from the indicated service(s) instead of the usual
1197 # list of local disk services.
1198 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1199 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1200 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1201 for hint in locator.hints if (
1202 hint.startswith('K@') and
1204 self._gateway_services.get(hint[2:])
1206 # Map root URLs to their KeepService objects.
1208 root: self.KeepService(root, self._user_agent_pool,
1209 upload_counter=self.upload_counter,
1210 download_counter=self.download_counter,
1212 insecure=self.insecure)
1213 for root in hint_roots
1216 # See #3147 for a discussion of the loop implementation. Highlights:
1217 # * Refresh the list of Keep services after each failure, in case
1218 # it's being updated.
1219 # * Retry until we succeed, we're out of retries, or every available
1220 # service has returned permanent failure.
1223 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1225 for tries_left in loop:
1227 sorted_roots = self.map_new_services(
1229 force_rebuild=(tries_left < num_retries),
1230 need_writable=False,
1232 except Exception as error:
1233 loop.save_result(error)
1236 # Query KeepService objects that haven't returned
1237 # permanent failure, in our specified shuffle order.
1238 services_to_try = [roots_map[root]
1239 for root in sorted_roots
1240 if roots_map[root].usable()]
1241 for keep_service in services_to_try:
1242 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1243 if blob is not None:
1245 loop.save_result((blob, len(services_to_try)))
1247 # Always cache the result, then return it if we succeeded.
1251 if slot is not None:
1252 self.block_cache.set(slot, blob)
1254 # Q: Including 403 is necessary for the Keep tests to continue
1255 # passing, but maybe they should expect KeepReadError instead?
1256 not_founds = sum(1 for key in sorted_roots
1257 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1258 service_errors = ((key, roots_map[key].last_result()['error'])
1259 for key in sorted_roots)
1261 raise arvados.errors.KeepReadError(
1262 "[{}] failed to read {}: no Keep services available ({})".format(
1263 request_id, loc_s, loop.last_result()))
1264 elif not_founds == len(sorted_roots):
1265 raise arvados.errors.NotFoundError(
1266 "[{}] {} not found".format(request_id, loc_s), service_errors)
1268 raise arvados.errors.KeepReadError(
1269 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1272 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1273 """Save data in Keep.
1275 This method will get a list of Keep services from the API server, and
1276 send the data to each one simultaneously in a new thread. Once the
1277 uploads are finished, if enough copies are saved, this method returns
1278 the most recent HTTP response body. If requests fail to upload
1279 enough copies, this method raises KeepWriteError.
1282 * data: The string of data to upload.
1283 * copies: The number of copies that the user requires be saved.
1285 * num_retries: The number of times to retry PUT requests to
1286 *each* Keep server if it returns temporary failures, with
1287 exponential backoff. The default value is set when the
1288 KeepClient is initialized.
1289 * classes: An optional list of storage class names where copies should
1293 classes = classes or self._default_classes
1295 if not isinstance(data, bytes):
1296 data = data.encode()
1298 self.put_counter.add(1)
1300 data_hash = hashlib.md5(data).hexdigest()
1301 loc_s = data_hash + '+' + str(len(data))
1304 locator = KeepLocator(loc_s)
1306 request_id = (request_id or
1307 (hasattr(self, 'api_client') and self.api_client.request_id) or
1308 arvados.util.new_request_id())
1310 'X-Request-Id': request_id,
1311 'X-Keep-Desired-Replicas': str(copies),
1314 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1318 for tries_left in loop:
1320 sorted_roots = self.map_new_services(
1322 force_rebuild=(tries_left < num_retries),
1325 except Exception as error:
1326 loop.save_result(error)
1329 pending_classes = []
1330 if done_classes is not None:
1331 pending_classes = list(set(classes) - set(done_classes))
1332 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1333 data_hash=data_hash,
1334 copies=copies - done_copies,
1335 max_service_replicas=self.max_replicas_per_service,
1336 timeout=self.current_timeout(num_retries - tries_left),
1337 classes=pending_classes)
1338 for service_root, ks in [(root, roots_map[root])
1339 for root in sorted_roots]:
1342 writer_pool.add_task(ks, service_root)
1344 pool_copies, pool_classes = writer_pool.done()
1345 done_copies += pool_copies
1346 if (done_classes is not None) and (pool_classes is not None):
1347 done_classes += pool_classes
1349 (done_copies >= copies and set(done_classes) == set(classes),
1350 writer_pool.total_task_nr))
1352 # Old keepstore contacted without storage classes support:
1353 # success is determined only by successful copies.
1355 # Disable storage classes tracking from this point forward.
1356 if not self._storage_classes_unsupported_warning:
1357 self._storage_classes_unsupported_warning = True
1358 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1361 (done_copies >= copies, writer_pool.total_task_nr))
1364 return writer_pool.response()
1366 raise arvados.errors.KeepWriteError(
1367 "[{}] failed to write {}: no Keep services available ({})".format(
1368 request_id, data_hash, loop.last_result()))
1370 service_errors = ((key, roots_map[key].last_result()['error'])
1371 for key in sorted_roots
1372 if roots_map[key].last_result()['error'])
1373 raise arvados.errors.KeepWriteError(
1374 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1375 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1377 def _block_prefetch_worker(self):
1378 """The background downloader thread."""
1381 b = self._prefetch_queue.get()
1384 self.get(b, prefetch=True)
1386 _logger.exception("Exception doing block prefetch")
1388 def _start_prefetch_threads(self):
1389 if self._prefetch_threads is None:
1391 if self._prefetch_threads is not None:
1393 self._prefetch_queue = queue.Queue()
1394 self._prefetch_threads = []
1395 for i in range(0, self.num_prefetch_threads):
1396 thread = threading.Thread(target=self._block_prefetch_worker)
1397 self._prefetch_threads.append(thread)
1398 thread.daemon = True
1401 def block_prefetch(self, locator):
1403 This relies on the fact that KeepClient implements a block cache,
1404 so repeated requests for the same block will not result in repeated
1405 downloads (unless the block is evicted from the cache.) This method
1409 if self.block_cache.get(locator) is not None:
1412 self._start_prefetch_threads()
1413 self._prefetch_queue.put(locator)
1415 def stop_prefetch_threads(self):
1417 if self._prefetch_threads is not None:
1418 for t in self._prefetch_threads:
1419 self._prefetch_queue.put(None)
1420 for t in self._prefetch_threads:
1422 self._prefetch_threads = None
1423 self._prefetch_queue = None
1425 def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1426 """A stub for put().
1428 This method is used in place of the real put() method when
1429 using local storage (see constructor's local_store argument).
1431 copies and num_retries arguments are ignored: they are here
1432 only for the sake of offering the same call signature as
1435 Data stored this way can be retrieved via local_store_get().
1437 md5 = hashlib.md5(data).hexdigest()
1438 locator = '%s+%d' % (md5, len(data))
1439 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1441 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1442 os.path.join(self.local_store, md5))
1445 def local_store_get(self, loc_s, num_retries=None):
1446 """Companion to local_store_put()."""
1448 locator = KeepLocator(loc_s)
1450 raise arvados.errors.NotFoundError(
1451 "Invalid data locator: '%s'" % loc_s)
1452 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1454 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1457 def local_store_head(self, loc_s, num_retries=None):
1458 """Companion to local_store_put()."""
1460 locator = KeepLocator(loc_s)
1462 raise arvados.errors.NotFoundError(
1463 "Invalid data locator: '%s'" % loc_s)
1464 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1466 if os.path.exists(os.path.join(self.local_store, locator.md5sum)):