1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
26 from io import BytesIO
29 import arvados.config as config
31 import arvados.retry as retry
34 from ._internal import basedirs, diskcache, Timer
35 from ._internal.pycurl import PyCurlHelper
37 _logger = logging.getLogger('arvados.keep')
38 global_client_object = None
40 # Monkey patch TCP constants when not available (apple). Values sourced from:
41 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
42 if sys.platform == 'darwin':
43 if not hasattr(socket, 'TCP_KEEPALIVE'):
44 socket.TCP_KEEPALIVE = 0x010
45 if not hasattr(socket, 'TCP_KEEPINTVL'):
46 socket.TCP_KEEPINTVL = 0x101
47 if not hasattr(socket, 'TCP_KEEPCNT'):
48 socket.TCP_KEEPCNT = 0x102
50 class KeepLocator(object):
51 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
52 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
54 def __init__(self, locator_str):
57 self._perm_expiry = None
58 pieces = iter(locator_str.split('+'))
59 self.md5sum = next(pieces)
61 self.size = int(next(pieces))
65 if self.HINT_RE.match(hint) is None:
66 raise ValueError("invalid hint format: {}".format(hint))
67 elif hint.startswith('A'):
68 self.parse_permission_hint(hint)
70 self.hints.append(hint)
75 for s in [self.md5sum, self.size,
76 self.permission_hint()] + self.hints
80 if self.size is not None:
81 return "%s+%i" % (self.md5sum, self.size)
85 def _make_hex_prop(name, length):
86 # Build and return a new property with the given name that
87 # must be a hex string of the given length.
88 data_name = '_{}'.format(name)
90 return getattr(self, data_name)
91 def setter(self, hex_str):
92 if not arvados.util.is_hex(hex_str, length):
93 raise ValueError("{} is not a {}-digit hex string: {!r}".
94 format(name, length, hex_str))
95 setattr(self, data_name, hex_str)
96 return property(getter, setter)
98 md5sum = _make_hex_prop('md5sum', 32)
99 perm_sig = _make_hex_prop('perm_sig', 40)
102 def perm_expiry(self):
103 return self._perm_expiry
106 def perm_expiry(self, value):
107 if not arvados.util.is_hex(value, 1, 8):
109 "permission timestamp must be a hex Unix timestamp: {}".
111 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
113 def permission_hint(self):
114 data = [self.perm_sig, self.perm_expiry]
117 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
118 return "A{}@{:08x}".format(*data)
120 def parse_permission_hint(self, s):
122 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
124 raise ValueError("bad permission hint {}".format(s))
126 def permission_expired(self, as_of_dt=None):
127 if self.perm_expiry is None:
129 elif as_of_dt is None:
130 as_of_dt = datetime.datetime.now()
131 return self.perm_expiry <= as_of_dt
134 class KeepBlockCache(object):
135 def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
136 self.cache_max = cache_max
137 self._cache = collections.OrderedDict()
138 self._cache_lock = threading.Lock()
139 self._max_slots = max_slots
140 self._disk_cache = disk_cache
141 self._disk_cache_dir = disk_cache_dir
142 self._cache_updating = threading.Condition(self._cache_lock)
144 if self._disk_cache and self._disk_cache_dir is None:
145 self._disk_cache_dir = str(basedirs.BaseDirectories('CACHE').storage_path('keep'))
147 if self._max_slots == 0:
149 # Each block uses two file descriptors, one used to
150 # open it initially and hold the flock(), and a second
151 # hidden one used by mmap().
153 # Set max slots to 1/8 of maximum file handles. This
154 # means we'll use at most 1/4 of total file handles.
156 # NOFILE typically defaults to 1024 on Linux so this
157 # is 128 slots (256 file handles), which means we can
158 # cache up to 8 GiB of 64 MiB blocks. This leaves
159 # 768 file handles for sockets and other stuff.
161 # When we want the ability to have more cache (e.g. in
162 # arv-mount) we'll increase rlimit before calling
164 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
167 self._max_slots = 512
169 if self.cache_max == 0:
171 fs = os.statvfs(self._disk_cache_dir)
172 # Calculation of available space incorporates existing cache usage
173 existing_usage = diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
174 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
175 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
177 # 10% of total disk size
178 # 25% of available space
180 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
183 self.cache_max = (256 * 1024 * 1024)
185 self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
189 self._cache = diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
190 for slot in self._cache.values():
191 self.cache_total += slot.size()
195 __slots__ = ("locator", "ready", "content")
197 def __init__(self, locator):
198 self.locator = locator
199 self.ready = threading.Event()
206 def set(self, value):
207 if self.content is not None:
214 if self.content is None:
217 return len(self.content)
223 def _resize_cache(self, cache_max, max_slots):
224 # Try and make sure the contents of the cache do not exceed
225 # the supplied maximums.
227 if self.cache_total <= cache_max and len(self._cache) <= max_slots:
230 _evict_candidates = collections.deque(self._cache.values())
231 while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots):
232 slot = _evict_candidates.popleft()
233 if not slot.ready.is_set():
238 self.cache_total -= sz
239 del self._cache[slot.locator]
243 '''Cap the cache size to self.cache_max'''
244 with self._cache_updating:
245 self._resize_cache(self.cache_max, self._max_slots)
246 self._cache_updating.notify_all()
248 def _get(self, locator):
249 # Test if the locator is already in the cache
250 if locator in self._cache:
251 n = self._cache[locator]
252 if n.ready.is_set() and n.content is None:
253 del self._cache[n.locator]
255 self._cache.move_to_end(locator)
258 # see if it exists on disk
259 n = diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
261 self._cache[n.locator] = n
262 self.cache_total += n.size()
266 def get(self, locator):
267 with self._cache_lock:
268 return self._get(locator)
270 def reserve_cache(self, locator):
271 '''Reserve a cache slot for the specified locator,
272 or return the existing slot.'''
273 with self._cache_updating:
274 n = self._get(locator)
278 # Add a new cache slot for the locator
279 self._resize_cache(self.cache_max, self._max_slots-1)
280 while len(self._cache) >= self._max_slots:
281 # If there isn't a slot available, need to wait
282 # for something to happen that releases one of the
283 # cache slots. Idle for 200 ms or woken up by
285 self._cache_updating.wait(timeout=0.2)
286 self._resize_cache(self.cache_max, self._max_slots-1)
289 n = diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
291 n = KeepBlockCache._CacheSlot(locator)
292 self._cache[n.locator] = n
295 def set(self, slot, blob):
298 self.cache_total += slot.size()
301 if e.errno == errno.ENOMEM:
302 # Reduce max slots to current - 4, cap cache and retry
303 with self._cache_lock:
304 self._max_slots = max(4, len(self._cache) - 4)
305 elif e.errno == errno.ENOSPC:
306 # Reduce disk max space to current - 256 MiB, cap cache and retry
307 with self._cache_lock:
308 sm = sum(st.size() for st in self._cache.values())
309 self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
310 elif e.errno == errno.ENODEV:
311 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
312 except Exception as e:
315 # Check if we should evict things from the cache. Either
316 # because we added a new thing or there was an error and
317 # we possibly adjusted the limits down, so we might need
318 # to push something out.
322 # Only gets here if there was an error the first time. The
323 # exception handler adjusts limits downward in some cases
324 # to free up resources, which would make the operation
327 self.cache_total += slot.size()
328 except Exception as e:
329 # It failed again. Give up.
331 raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
337 def __init__(self, v=0):
338 self._lk = threading.Lock()
350 class KeepClient(object):
351 DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
352 DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
354 class _KeepService(PyCurlHelper):
355 """Make requests to a single Keep service, and track results.
357 A _KeepService is intended to last long enough to perform one
358 transaction (GET or PUT) against one Keep service. This can
359 involve calling either get() or put() multiple times in order
360 to retry after transient failures. However, calling both get()
361 and put() on a single instance -- or using the same instance
362 to access two different Keep services -- will not produce
369 arvados.errors.HttpError,
372 def __init__(self, root, user_agent_pool=queue.LifoQueue(),
374 download_counter=None,
379 self._user_agent_pool = user_agent_pool
380 self._result = {'error': None}
384 self.get_headers = {'Accept': 'application/octet-stream'}
385 self.get_headers.update(headers)
386 self.put_headers = headers
387 self.upload_counter = upload_counter
388 self.download_counter = download_counter
389 self.insecure = insecure
392 """Is it worth attempting a request?"""
396 """Did the request succeed or encounter permanent failure?"""
397 return self._result['error'] == False or not self._usable
399 def last_result(self):
402 def _get_user_agent(self):
404 return self._user_agent_pool.get(block=False)
408 def _put_user_agent(self, ua):
411 self._user_agent_pool.put(ua, block=False)
415 def get(self, locator, method="GET", timeout=None):
416 # locator is a KeepLocator object.
417 url = self.root + str(locator)
418 _logger.debug("Request: %s %s", method, url)
419 curl = self._get_user_agent()
424 response_body = BytesIO()
425 curl.setopt(pycurl.NOSIGNAL, 1)
426 curl.setopt(pycurl.OPENSOCKETFUNCTION,
427 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
428 curl.setopt(pycurl.URL, url.encode('utf-8'))
429 curl.setopt(pycurl.HTTPHEADER, [
430 '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
431 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
432 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
434 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
435 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
437 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
439 curl.setopt(pycurl.NOBODY, True)
441 curl.setopt(pycurl.HTTPGET, True)
442 self._setcurltimeouts(curl, timeout, method=="HEAD")
446 except Exception as e:
447 raise arvados.errors.HttpError(0, str(e))
453 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
454 'body': response_body.getvalue(),
455 'headers': self._headers,
459 ok = retry.check_http_response_success(self._result['status_code'])
461 self._result['error'] = arvados.errors.HttpError(
462 self._result['status_code'],
463 self._headers.get('x-status-line', 'Error'))
464 except self.HTTP_ERRORS as e:
468 self._usable = ok != False
469 if self._result.get('status_code', None):
470 # The client worked well enough to get an HTTP status
471 # code, so presumably any problems are just on the
472 # server side and it's OK to reuse the client.
473 self._put_user_agent(curl)
475 # Don't return this client to the pool, in case it's
479 _logger.debug("Request fail: GET %s => %s: %s",
480 url, type(self._result['error']), str(self._result['error']))
483 _logger.info("HEAD %s: %s bytes",
484 self._result['status_code'],
485 self._result.get('content-length'))
486 if self._result['headers'].get('x-keep-locator'):
487 # This is a response to a remote block copy request, return
488 # the local copy block locator.
489 return self._result['headers'].get('x-keep-locator')
492 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
493 self._result['status_code'],
494 len(self._result['body']),
496 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
498 if self.download_counter:
499 self.download_counter.add(len(self._result['body']))
500 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
501 if resp_md5 != locator.md5sum:
502 _logger.warning("Checksum fail: md5(%s) = %s",
504 self._result['error'] = arvados.errors.HttpError(
507 return self._result['body']
509 def put(self, hash_s, body, timeout=None, headers={}):
510 put_headers = copy.copy(self.put_headers)
511 put_headers.update(headers)
512 url = self.root + hash_s
513 _logger.debug("Request: PUT %s", url)
514 curl = self._get_user_agent()
519 body_reader = BytesIO(body)
520 response_body = BytesIO()
521 curl.setopt(pycurl.NOSIGNAL, 1)
522 curl.setopt(pycurl.OPENSOCKETFUNCTION,
523 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
524 curl.setopt(pycurl.URL, url.encode('utf-8'))
525 # Using UPLOAD tells cURL to wait for a "go ahead" from the
526 # Keep server (in the form of a HTTP/1.1 "100 Continue"
527 # response) instead of sending the request body immediately.
528 # This allows the server to reject the request if the request
529 # is invalid or the server is read-only, without waiting for
530 # the client to send the entire block.
531 curl.setopt(pycurl.UPLOAD, True)
532 curl.setopt(pycurl.INFILESIZE, len(body))
533 curl.setopt(pycurl.READFUNCTION, body_reader.read)
534 curl.setopt(pycurl.HTTPHEADER, [
535 '{}: {}'.format(k,v) for k,v in put_headers.items()])
536 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
537 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
539 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
540 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
542 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
543 self._setcurltimeouts(curl, timeout)
546 except Exception as e:
547 raise arvados.errors.HttpError(0, str(e))
553 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
554 'body': response_body.getvalue().decode('utf-8'),
555 'headers': self._headers,
558 ok = retry.check_http_response_success(self._result['status_code'])
560 self._result['error'] = arvados.errors.HttpError(
561 self._result['status_code'],
562 self._headers.get('x-status-line', 'Error'))
563 except self.HTTP_ERRORS as e:
567 self._usable = ok != False # still usable if ok is True or None
568 if self._result.get('status_code', None):
569 # Client is functional. See comment in get().
570 self._put_user_agent(curl)
574 _logger.debug("Request fail: PUT %s => %s: %s",
575 url, type(self._result['error']), str(self._result['error']))
577 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
578 self._result['status_code'],
581 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
582 if self.upload_counter:
583 self.upload_counter.add(len(body))
587 class _KeepWriterQueue(queue.Queue):
588 def __init__(self, copies, classes=[]):
589 queue.Queue.__init__(self) # Old-style superclass
590 self.wanted_copies = copies
591 self.wanted_storage_classes = classes
592 self.successful_copies = 0
593 self.confirmed_storage_classes = {}
595 self.storage_classes_tracking = True
596 self.queue_data_lock = threading.RLock()
597 self.pending_tries = max(copies, len(classes))
598 self.pending_tries_notification = threading.Condition()
600 def write_success(self, response, replicas_nr, classes_confirmed):
601 with self.queue_data_lock:
602 self.successful_copies += replicas_nr
603 if classes_confirmed is None:
604 self.storage_classes_tracking = False
605 elif self.storage_classes_tracking:
606 for st_class, st_copies in classes_confirmed.items():
608 self.confirmed_storage_classes[st_class] += st_copies
610 self.confirmed_storage_classes[st_class] = st_copies
611 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
612 self.response = response
613 with self.pending_tries_notification:
614 self.pending_tries_notification.notify_all()
616 def write_fail(self, ks):
617 with self.pending_tries_notification:
618 self.pending_tries += 1
619 self.pending_tries_notification.notify()
621 def pending_copies(self):
622 with self.queue_data_lock:
623 return self.wanted_copies - self.successful_copies
625 def satisfied_classes(self):
626 with self.queue_data_lock:
627 if not self.storage_classes_tracking:
628 # Notifies disabled storage classes expectation to
631 return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
633 def pending_classes(self):
634 with self.queue_data_lock:
635 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
637 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
638 for st_class, st_copies in self.confirmed_storage_classes.items():
639 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
640 unsatisfied_classes.remove(st_class)
641 return unsatisfied_classes
643 def get_next_task(self):
644 with self.pending_tries_notification:
646 if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
647 # This notify_all() is unnecessary --
648 # write_success() already called notify_all()
649 # when pending<1 became true, so it's not
650 # possible for any other thread to be in
651 # wait() now -- but it's cheap insurance
652 # against deadlock so we do it anyway:
653 self.pending_tries_notification.notify_all()
654 # Drain the queue and then raise Queue.Empty
658 elif self.pending_tries > 0:
659 service, service_root = self.get_nowait()
660 if service.finished():
663 self.pending_tries -= 1
664 return service, service_root
666 self.pending_tries_notification.notify_all()
669 self.pending_tries_notification.wait()
672 class _KeepWriterThreadPool:
673 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
674 self.total_task_nr = 0
675 if (not max_service_replicas) or (max_service_replicas >= copies):
678 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
679 _logger.debug("Pool max threads is %d", num_threads)
681 self.queue = KeepClient._KeepWriterQueue(copies, classes)
683 for _ in range(num_threads):
684 w = KeepClient._KeepWriterThread(self.queue, data, data_hash, timeout)
685 self.workers.append(w)
687 def add_task(self, ks, service_root):
688 self.queue.put((ks, service_root))
689 self.total_task_nr += 1
692 return self.queue.successful_copies, self.queue.satisfied_classes()
696 for worker in self.workers:
698 # Wait for finished work
702 return self.queue.response
705 class _KeepWriterThread(threading.Thread):
706 class TaskFailed(RuntimeError):
707 """Exception for failed Keep writes
709 TODO: Move this class to the module top level and document it
715 def __init__(self, queue, data, data_hash, timeout=None):
717 self.timeout = timeout
720 self.data_hash = data_hash
726 service, service_root = self.queue.get_next_task()
730 locator, copies, classes = self.do_task(service, service_root)
731 except Exception as e:
732 if not isinstance(e, self.TaskFailed):
733 _logger.exception("Exception in _KeepWriterThread")
734 self.queue.write_fail(service)
736 self.queue.write_success(locator, copies, classes)
738 self.queue.task_done()
740 def do_task(self, service, service_root):
741 classes = self.queue.pending_classes()
745 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
746 success = bool(service.put(self.data_hash,
748 timeout=self.timeout,
750 result = service.last_result()
753 if result.get('status_code'):
754 _logger.debug("Request fail: PUT %s => %s %s",
756 result.get('status_code'),
758 raise self.TaskFailed()
760 _logger.debug("_KeepWriterThread %s succeeded %s+%i %s",
761 str(threading.current_thread()),
766 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
767 except (KeyError, ValueError):
770 classes_confirmed = {}
772 scch = result['headers']['x-keep-storage-classes-confirmed']
773 for confirmation in scch.replace(' ', '').split(','):
774 if '=' in confirmation:
775 stored_class, stored_copies = confirmation.split('=')[:2]
776 classes_confirmed[stored_class] = int(stored_copies)
777 except (KeyError, ValueError):
778 # Storage classes confirmed header missing or corrupt
779 classes_confirmed = None
781 return result['body'].strip(), replicas_stored, classes_confirmed
784 def __init__(self, api_client=None, proxy=None,
785 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
786 api_token=None, local_store=None, block_cache=None,
787 num_retries=10, session=None, num_prefetch_threads=None):
788 """Initialize a new KeepClient.
792 The API client to use to find Keep services. If not
793 provided, KeepClient will build one from available Arvados
797 If specified, this KeepClient will send requests to this Keep
798 proxy. Otherwise, KeepClient will fall back to the setting of the
799 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
800 If you want to KeepClient does not use a proxy, pass in an empty
804 The initial timeout (in seconds) for HTTP requests to Keep
805 non-proxy servers. A tuple of three floats is interpreted as
806 (connection_timeout, read_timeout, minimum_bandwidth). A connection
807 will be aborted if the average traffic rate falls below
808 minimum_bandwidth bytes per second over an interval of read_timeout
809 seconds. Because timeouts are often a result of transient server
810 load, the actual connection timeout will be increased by a factor
811 of two on each retry.
812 Default: (2, 256, 32768).
815 The initial timeout (in seconds) for HTTP requests to
816 Keep proxies. A tuple of three floats is interpreted as
817 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
818 described above for adjusting connection timeouts on retry also
820 Default: (20, 256, 32768).
823 If you're not using an API client, but only talking
824 directly to a Keep proxy, this parameter specifies an API token
825 to authenticate Keep requests. It is an error to specify both
826 api_client and api_token. If you specify neither, KeepClient
827 will use one available from the Arvados configuration.
830 If specified, this KeepClient will bypass Keep
831 services, and save data to the named directory. If unspecified,
832 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
833 environment variable. If you want to ensure KeepClient does not
834 use local storage, pass in an empty string. This is primarily
835 intended to mock a server for testing.
838 The default number of times to retry failed requests.
839 This will be used as the default num_retries value when get() and
840 put() are called. Default 10.
842 self.lock = threading.Lock()
844 if config.get('ARVADOS_KEEP_SERVICES'):
845 proxy = config.get('ARVADOS_KEEP_SERVICES')
847 proxy = config.get('ARVADOS_KEEP_PROXY')
848 if api_token is None:
849 if api_client is None:
850 api_token = config.get('ARVADOS_API_TOKEN')
852 api_token = api_client.api_token
853 elif api_client is not None:
855 "can't build KeepClient with both API client and token")
856 if local_store is None:
857 local_store = os.environ.get('KEEP_LOCAL_STORE')
859 if api_client is None:
860 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
862 self.insecure = api_client.insecure
864 self.block_cache = block_cache if block_cache else KeepBlockCache()
865 self.timeout = timeout
866 self.proxy_timeout = proxy_timeout
867 self._user_agent_pool = queue.LifoQueue()
868 self.upload_counter = _Counter()
869 self.download_counter = _Counter()
870 self.put_counter = _Counter()
871 self.get_counter = _Counter()
872 self.hits_counter = _Counter()
873 self.misses_counter = _Counter()
874 self._storage_classes_unsupported_warning = False
875 self._default_classes = []
876 if num_prefetch_threads is not None:
877 self.num_prefetch_threads = num_prefetch_threads
879 self.num_prefetch_threads = 2
880 self._prefetch_queue = None
881 self._prefetch_threads = None
884 self.local_store = local_store
885 self.head = self.local_store_head
886 self.get = self.local_store_get
887 self.put = self.local_store_put
889 self.num_retries = num_retries
890 self.max_replicas_per_service = None
892 proxy_uris = proxy.split()
893 for i in range(len(proxy_uris)):
894 if not proxy_uris[i].endswith('/'):
897 url = urllib.parse.urlparse(proxy_uris[i])
898 if not (url.scheme and url.netloc):
899 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
900 self.api_token = api_token
901 self._gateway_services = {}
902 self._keep_services = [{
903 'uuid': "00000-bi6l4-%015d" % idx,
904 'service_type': 'proxy',
905 '_service_root': uri,
906 } for idx, uri in enumerate(proxy_uris)]
907 self._writable_services = self._keep_services
908 self.using_proxy = True
909 self._static_services_list = True
911 # It's important to avoid instantiating an API client
912 # unless we actually need one, for testing's sake.
913 if api_client is None:
914 api_client = arvados.api('v1')
915 self.api_client = api_client
916 self.api_token = api_client.api_token
917 self._gateway_services = {}
918 self._keep_services = None
919 self._writable_services = None
920 self.using_proxy = None
921 self._static_services_list = False
923 self._default_classes = [
924 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
926 # We're talking to an old cluster
929 def current_timeout(self, attempt_number):
930 """Return the appropriate timeout to use for this client.
932 The proxy timeout setting if the backend service is currently a proxy,
933 the regular timeout setting otherwise. The `attempt_number` indicates
934 how many times the operation has been tried already (starting from 0
935 for the first try), and scales the connection timeout portion of the
936 return value accordingly.
939 # TODO(twp): the timeout should be a property of a
940 # _KeepService, not a KeepClient. See #4488.
941 t = self.proxy_timeout if self.using_proxy else self.timeout
943 return (t[0] * (1 << attempt_number), t[1])
945 return (t[0] * (1 << attempt_number), t[1], t[2])
946 def _any_nondisk_services(self, service_list):
947 return any(ks.get('service_type', 'disk') != 'disk'
948 for ks in service_list)
950 def build_services_list(self, force_rebuild=False):
951 if (self._static_services_list or
952 (self._keep_services and not force_rebuild)):
956 keep_services = self.api_client.keep_services().accessible()
957 except Exception: # API server predates Keep services.
958 keep_services = self.api_client.keep_disks().list()
960 # Gateway services are only used when specified by UUID,
961 # so there's nothing to gain by filtering them by
963 self._gateway_services = {ks['uuid']: ks for ks in
964 keep_services.execute()['items']}
965 if not self._gateway_services:
966 raise arvados.errors.NoKeepServersError()
968 # Precompute the base URI for each service.
969 for r in self._gateway_services.values():
970 host = r['service_host']
971 if not host.startswith('[') and host.find(':') >= 0:
972 # IPv6 URIs must be formatted like http://[::1]:80/...
973 host = '[' + host + ']'
974 r['_service_root'] = "{}://{}:{:d}/".format(
975 'https' if r['service_ssl_flag'] else 'http',
979 _logger.debug(str(self._gateway_services))
980 self._keep_services = [
981 ks for ks in self._gateway_services.values()
982 if not ks.get('service_type', '').startswith('gateway:')]
983 self._writable_services = [ks for ks in self._keep_services
984 if not ks.get('read_only')]
986 # For disk type services, max_replicas_per_service is 1
987 # It is unknown (unlimited) for other service types.
988 if self._any_nondisk_services(self._writable_services):
989 self.max_replicas_per_service = None
991 self.max_replicas_per_service = 1
993 def _service_weight(self, data_hash, service_uuid):
994 """Compute the weight of a Keep service endpoint for a data
995 block with a known hash.
997 The weight is md5(h + u) where u is the last 15 characters of
998 the service endpoint's UUID.
1000 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1002 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1003 """Return an array of Keep service endpoints, in the order in
1004 which they should be probed when reading or writing data with
1005 the given hash+hints.
1007 self.build_services_list(force_rebuild)
1010 # Use the services indicated by the given +K@... remote
1011 # service hints, if any are present and can be resolved to a
1013 for hint in locator.hints:
1014 if hint.startswith('K@'):
1016 sorted_roots.append(
1017 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1018 elif len(hint) == 29:
1019 svc = self._gateway_services.get(hint[2:])
1021 sorted_roots.append(svc['_service_root'])
1023 # Sort the available local services by weight (heaviest first)
1024 # for this locator, and return their service_roots (base URIs)
1026 use_services = self._keep_services
1028 use_services = self._writable_services
1029 self.using_proxy = self._any_nondisk_services(use_services)
1030 sorted_roots.extend([
1031 svc['_service_root'] for svc in sorted(
1034 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1035 _logger.debug("{}: {}".format(locator, sorted_roots))
1038 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1039 # roots_map is a dictionary, mapping Keep service root strings
1040 # to _KeepService objects. Poll for Keep services, and add any
1041 # new ones to roots_map. Return the current list of local
1043 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1044 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1045 for root in local_roots:
1046 if root not in roots_map:
1047 roots_map[root] = self._KeepService(
1048 root, self._user_agent_pool,
1049 upload_counter=self.upload_counter,
1050 download_counter=self.download_counter,
1052 insecure=self.insecure)
1056 def _check_loop_result(result):
1057 # KeepClient RetryLoops should save results as a 2-tuple: the
1058 # actual result of the request, and the number of servers available
1059 # to receive the request this round.
1060 # This method returns True if there's a real result, False if
1061 # there are no more servers available, otherwise None.
1062 if isinstance(result, Exception):
1064 result, tried_server_count = result
1065 if (result is not None) and (result is not False):
1067 elif tried_server_count < 1:
1068 _logger.info("No more Keep services to try; giving up")
1073 def get_from_cache(self, loc_s):
1074 """Fetch a block only if is in the cache, otherwise return None."""
1075 locator = KeepLocator(loc_s)
1076 slot = self.block_cache.get(locator.md5sum)
1077 if slot is not None and slot.ready.is_set():
1082 def refresh_signature(self, loc):
1083 """Ask Keep to get the remote block and return its local signature"""
1084 now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1085 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1088 def head(self, loc_s, **kwargs):
1089 return self._get_or_head(loc_s, method="HEAD", **kwargs)
1092 def get(self, loc_s, **kwargs):
1093 return self._get_or_head(loc_s, method="GET", **kwargs)
1095 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1096 """Get data from Keep.
1098 This method fetches one or more blocks of data from Keep. It
1099 sends a request each Keep service registered with the API
1100 server (or the proxy provided when this client was
1101 instantiated), then each service named in location hints, in
1102 sequence. As soon as one service provides the data, it's
1106 * loc_s: A string of one or more comma-separated locators to fetch.
1107 This method returns the concatenation of these blocks.
1108 * num_retries: The number of times to retry GET requests to
1109 *each* Keep server if it returns temporary failures, with
1110 exponential backoff. Note that, in each loop, the method may try
1111 to fetch data from every available Keep service, along with any
1112 that are named in location hints in the locator. The default value
1113 is set when the KeepClient is initialized.
1116 return ''.join(self.get(x) for x in loc_s.split(','))
1118 self.get_counter.add(1)
1120 request_id = (request_id or
1121 (hasattr(self, 'api_client') and self.api_client.request_id) or
1122 arvados.util.new_request_id())
1125 headers['X-Request-Id'] = request_id
1130 locator = KeepLocator(loc_s)
1133 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1135 # Fresh and empty "first time it is used" slot
1138 # this is request for a prefetch to fill in
1139 # the cache, don't need to wait for the
1140 # result, so if it is already in flight return
1141 # immediately. Clear 'slot' to prevent
1142 # finally block from calling slot.set()
1143 if slot.ready.is_set():
1149 if blob is not None:
1150 self.hits_counter.add(1)
1153 # If blob is None, this means either
1155 # (a) another thread was fetching this block and
1156 # failed with an error or
1158 # (b) cache thrashing caused the slot to be
1159 # evicted (content set to None) by another thread
1160 # between the call to reserve_cache() and get().
1162 # We'll handle these cases by reserving a new slot
1163 # and then doing a full GET request.
1166 self.misses_counter.add(1)
1168 # If the locator has hints specifying a prefix (indicating a
1169 # remote keepproxy) or the UUID of a local gateway service,
1170 # read data from the indicated service(s) instead of the usual
1171 # list of local disk services.
1172 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1173 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1174 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1175 for hint in locator.hints if (
1176 hint.startswith('K@') and
1178 self._gateway_services.get(hint[2:])
1180 # Map root URLs to their _KeepService objects.
1182 root: self._KeepService(root, self._user_agent_pool,
1183 upload_counter=self.upload_counter,
1184 download_counter=self.download_counter,
1186 insecure=self.insecure)
1187 for root in hint_roots
1190 # See #3147 for a discussion of the loop implementation. Highlights:
1191 # * Refresh the list of Keep services after each failure, in case
1192 # it's being updated.
1193 # * Retry until we succeed, we're out of retries, or every available
1194 # service has returned permanent failure.
1197 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1199 for tries_left in loop:
1201 sorted_roots = self.map_new_services(
1203 force_rebuild=(tries_left < num_retries),
1204 need_writable=False,
1206 except Exception as error:
1207 loop.save_result(error)
1210 # Query _KeepService objects that haven't returned
1211 # permanent failure, in our specified shuffle order.
1212 services_to_try = [roots_map[root]
1213 for root in sorted_roots
1214 if roots_map[root].usable()]
1215 for keep_service in services_to_try:
1216 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1217 if blob is not None:
1219 loop.save_result((blob, len(services_to_try)))
1221 # Always cache the result, then return it if we succeeded.
1225 if slot is not None:
1226 self.block_cache.set(slot, blob)
1228 # Q: Including 403 is necessary for the Keep tests to continue
1229 # passing, but maybe they should expect KeepReadError instead?
1230 not_founds = sum(1 for key in sorted_roots
1231 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1232 service_errors = ((key, roots_map[key].last_result()['error'])
1233 for key in sorted_roots)
1235 raise arvados.errors.KeepReadError(
1236 "[{}] failed to read {}: no Keep services available ({})".format(
1237 request_id, loc_s, loop.last_result()))
1238 elif not_founds == len(sorted_roots):
1239 raise arvados.errors.NotFoundError(
1240 "[{}] {} not found".format(request_id, loc_s), service_errors)
1242 raise arvados.errors.KeepReadError(
1243 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1246 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1247 """Save data in Keep.
1249 This method will get a list of Keep services from the API server, and
1250 send the data to each one simultaneously in a new thread. Once the
1251 uploads are finished, if enough copies are saved, this method returns
1252 the most recent HTTP response body. If requests fail to upload
1253 enough copies, this method raises KeepWriteError.
1256 * data: The string of data to upload.
1257 * copies: The number of copies that the user requires be saved.
1259 * num_retries: The number of times to retry PUT requests to
1260 *each* Keep server if it returns temporary failures, with
1261 exponential backoff. The default value is set when the
1262 KeepClient is initialized.
1263 * classes: An optional list of storage class names where copies should
1267 classes = classes or self._default_classes
1269 if not isinstance(data, bytes):
1270 data = data.encode()
1272 self.put_counter.add(1)
1274 data_hash = hashlib.md5(data).hexdigest()
1275 loc_s = data_hash + '+' + str(len(data))
1278 locator = KeepLocator(loc_s)
1280 request_id = (request_id or
1281 (hasattr(self, 'api_client') and self.api_client.request_id) or
1282 arvados.util.new_request_id())
1284 'X-Request-Id': request_id,
1285 'X-Keep-Desired-Replicas': str(copies),
1288 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1292 for tries_left in loop:
1294 sorted_roots = self.map_new_services(
1296 force_rebuild=(tries_left < num_retries),
1299 except Exception as error:
1300 loop.save_result(error)
1303 pending_classes = []
1304 if done_classes is not None:
1305 pending_classes = list(set(classes) - set(done_classes))
1306 writer_pool = KeepClient._KeepWriterThreadPool(
1308 data_hash=data_hash,
1309 copies=copies - done_copies,
1310 max_service_replicas=self.max_replicas_per_service,
1311 timeout=self.current_timeout(num_retries - tries_left),
1312 classes=pending_classes,
1314 for service_root, ks in [(root, roots_map[root])
1315 for root in sorted_roots]:
1318 writer_pool.add_task(ks, service_root)
1320 pool_copies, pool_classes = writer_pool.done()
1321 done_copies += pool_copies
1322 if (done_classes is not None) and (pool_classes is not None):
1323 done_classes += pool_classes
1325 (done_copies >= copies and set(done_classes) == set(classes),
1326 writer_pool.total_task_nr))
1328 # Old keepstore contacted without storage classes support:
1329 # success is determined only by successful copies.
1331 # Disable storage classes tracking from this point forward.
1332 if not self._storage_classes_unsupported_warning:
1333 self._storage_classes_unsupported_warning = True
1334 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1337 (done_copies >= copies, writer_pool.total_task_nr))
1340 return writer_pool.response()
1342 raise arvados.errors.KeepWriteError(
1343 "[{}] failed to write {}: no Keep services available ({})".format(
1344 request_id, data_hash, loop.last_result()))
1346 service_errors = ((key, roots_map[key].last_result()['error'])
1347 for key in sorted_roots
1348 if roots_map[key].last_result()['error'])
1349 raise arvados.errors.KeepWriteError(
1350 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1351 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1353 def _block_prefetch_worker(self):
1354 """The background downloader thread."""
1357 b = self._prefetch_queue.get()
1360 self.get(b, prefetch=True)
1362 _logger.exception("Exception doing block prefetch")
1364 def _start_prefetch_threads(self):
1365 if self._prefetch_threads is None:
1367 if self._prefetch_threads is not None:
1369 self._prefetch_queue = queue.Queue()
1370 self._prefetch_threads = []
1371 for i in range(0, self.num_prefetch_threads):
1372 thread = threading.Thread(target=self._block_prefetch_worker)
1373 self._prefetch_threads.append(thread)
1374 thread.daemon = True
1377 def block_prefetch(self, locator):
1379 This relies on the fact that KeepClient implements a block cache,
1380 so repeated requests for the same block will not result in repeated
1381 downloads (unless the block is evicted from the cache.) This method
1385 if self.block_cache.get(locator) is not None:
1388 self._start_prefetch_threads()
1389 self._prefetch_queue.put(locator)
1391 def stop_prefetch_threads(self):
1393 if self._prefetch_threads is not None:
1394 for t in self._prefetch_threads:
1395 self._prefetch_queue.put(None)
1396 for t in self._prefetch_threads:
1398 self._prefetch_threads = None
1399 self._prefetch_queue = None
1401 def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1402 """A stub for put().
1404 This method is used in place of the real put() method when
1405 using local storage (see constructor's local_store argument).
1407 copies and num_retries arguments are ignored: they are here
1408 only for the sake of offering the same call signature as
1411 Data stored this way can be retrieved via local_store_get().
1413 md5 = hashlib.md5(data).hexdigest()
1414 locator = '%s+%d' % (md5, len(data))
1415 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1417 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1418 os.path.join(self.local_store, md5))
1421 def local_store_get(self, loc_s, num_retries=None):
1422 """Companion to local_store_put()."""
1424 locator = KeepLocator(loc_s)
1426 raise arvados.errors.NotFoundError(
1427 "Invalid data locator: '%s'" % loc_s)
1428 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1430 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1433 def local_store_head(self, loc_s, num_retries=None):
1434 """Companion to local_store_put()."""
1436 locator = KeepLocator(loc_s)
1438 raise arvados.errors.NotFoundError(
1439 "Invalid data locator: '%s'" % loc_s)
1440 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1442 if os.path.exists(os.path.join(self.local_store, locator.md5sum)):