1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
26 from io import BytesIO
29 import arvados.config as config
31 import arvados.retry as retry
33 import arvados.diskcache
34 from arvados._pycurlhelper import PyCurlHelper
37 _logger = logging.getLogger('arvados.keep')
38 global_client_object = None
40 # Monkey patch TCP constants when not available (apple). Values sourced from:
41 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
42 if sys.platform == 'darwin':
43 if not hasattr(socket, 'TCP_KEEPALIVE'):
44 socket.TCP_KEEPALIVE = 0x010
45 if not hasattr(socket, 'TCP_KEEPINTVL'):
46 socket.TCP_KEEPINTVL = 0x101
47 if not hasattr(socket, 'TCP_KEEPCNT'):
48 socket.TCP_KEEPCNT = 0x102
50 class KeepLocator(object):
51 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
52 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
54 def __init__(self, locator_str):
57 self._perm_expiry = None
58 pieces = iter(locator_str.split('+'))
59 self.md5sum = next(pieces)
61 self.size = int(next(pieces))
65 if self.HINT_RE.match(hint) is None:
66 raise ValueError("invalid hint format: {}".format(hint))
67 elif hint.startswith('A'):
68 self.parse_permission_hint(hint)
70 self.hints.append(hint)
75 for s in [self.md5sum, self.size,
76 self.permission_hint()] + self.hints
80 if self.size is not None:
81 return "%s+%i" % (self.md5sum, self.size)
85 def _make_hex_prop(name, length):
86 # Build and return a new property with the given name that
87 # must be a hex string of the given length.
88 data_name = '_{}'.format(name)
90 return getattr(self, data_name)
91 def setter(self, hex_str):
92 if not arvados.util.is_hex(hex_str, length):
93 raise ValueError("{} is not a {}-digit hex string: {!r}".
94 format(name, length, hex_str))
95 setattr(self, data_name, hex_str)
96 return property(getter, setter)
98 md5sum = _make_hex_prop('md5sum', 32)
99 perm_sig = _make_hex_prop('perm_sig', 40)
102 def perm_expiry(self):
103 return self._perm_expiry
106 def perm_expiry(self, value):
107 if not arvados.util.is_hex(value, 1, 8):
109 "permission timestamp must be a hex Unix timestamp: {}".
111 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
113 def permission_hint(self):
114 data = [self.perm_sig, self.perm_expiry]
117 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
118 return "A{}@{:08x}".format(*data)
120 def parse_permission_hint(self, s):
122 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
124 raise ValueError("bad permission hint {}".format(s))
126 def permission_expired(self, as_of_dt=None):
127 if self.perm_expiry is None:
129 elif as_of_dt is None:
130 as_of_dt = datetime.datetime.now()
131 return self.perm_expiry <= as_of_dt
134 class KeepBlockCache(object):
135 def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
136 self.cache_max = cache_max
137 self._cache = collections.OrderedDict()
138 self._cache_lock = threading.Lock()
139 self._max_slots = max_slots
140 self._disk_cache = disk_cache
141 self._disk_cache_dir = disk_cache_dir
142 self._cache_updating = threading.Condition(self._cache_lock)
144 if self._disk_cache and self._disk_cache_dir is None:
145 self._disk_cache_dir = str(arvados.util._BaseDirectories('CACHE').storage_path('keep'))
147 if self._max_slots == 0:
149 # Each block uses two file descriptors, one used to
150 # open it initially and hold the flock(), and a second
151 # hidden one used by mmap().
153 # Set max slots to 1/8 of maximum file handles. This
154 # means we'll use at most 1/4 of total file handles.
156 # NOFILE typically defaults to 1024 on Linux so this
157 # is 128 slots (256 file handles), which means we can
158 # cache up to 8 GiB of 64 MiB blocks. This leaves
159 # 768 file handles for sockets and other stuff.
161 # When we want the ability to have more cache (e.g. in
162 # arv-mount) we'll increase rlimit before calling
164 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
167 self._max_slots = 512
169 if self.cache_max == 0:
171 fs = os.statvfs(self._disk_cache_dir)
172 # Calculation of available space incorporates existing cache usage
173 existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
174 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
175 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
177 # 10% of total disk size
178 # 25% of available space
180 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
183 self.cache_max = (256 * 1024 * 1024)
185 self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
189 self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
190 for slot in self._cache.values():
191 self.cache_total += slot.size()
194 class CacheSlot(object):
195 __slots__ = ("locator", "ready", "content")
197 def __init__(self, locator):
198 self.locator = locator
199 self.ready = threading.Event()
206 def set(self, value):
207 if self.content is not None:
214 if self.content is None:
217 return len(self.content)
223 def _resize_cache(self, cache_max, max_slots):
224 # Try and make sure the contents of the cache do not exceed
225 # the supplied maximums.
227 if self.cache_total <= cache_max and len(self._cache) <= max_slots:
230 _evict_candidates = collections.deque(self._cache.values())
231 while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots):
232 slot = _evict_candidates.popleft()
233 if not slot.ready.is_set():
238 self.cache_total -= sz
239 del self._cache[slot.locator]
243 '''Cap the cache size to self.cache_max'''
244 with self._cache_updating:
245 self._resize_cache(self.cache_max, self._max_slots)
246 self._cache_updating.notify_all()
248 def _get(self, locator):
249 # Test if the locator is already in the cache
250 if locator in self._cache:
251 n = self._cache[locator]
252 if n.ready.is_set() and n.content is None:
253 del self._cache[n.locator]
255 self._cache.move_to_end(locator)
258 # see if it exists on disk
259 n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
261 self._cache[n.locator] = n
262 self.cache_total += n.size()
266 def get(self, locator):
267 with self._cache_lock:
268 return self._get(locator)
270 def reserve_cache(self, locator):
271 '''Reserve a cache slot for the specified locator,
272 or return the existing slot.'''
273 with self._cache_updating:
274 n = self._get(locator)
278 # Add a new cache slot for the locator
279 self._resize_cache(self.cache_max, self._max_slots-1)
280 while len(self._cache) >= self._max_slots:
281 # If there isn't a slot available, need to wait
282 # for something to happen that releases one of the
283 # cache slots. Idle for 200 ms or woken up by
285 self._cache_updating.wait(timeout=0.2)
286 self._resize_cache(self.cache_max, self._max_slots-1)
289 n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
291 n = KeepBlockCache.CacheSlot(locator)
292 self._cache[n.locator] = n
295 def set(self, slot, blob):
298 self.cache_total += slot.size()
301 if e.errno == errno.ENOMEM:
302 # Reduce max slots to current - 4, cap cache and retry
303 with self._cache_lock:
304 self._max_slots = max(4, len(self._cache) - 4)
305 elif e.errno == errno.ENOSPC:
306 # Reduce disk max space to current - 256 MiB, cap cache and retry
307 with self._cache_lock:
308 sm = sum(st.size() for st in self._cache.values())
309 self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
310 elif e.errno == errno.ENODEV:
311 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
312 except Exception as e:
315 # Check if we should evict things from the cache. Either
316 # because we added a new thing or there was an error and
317 # we possibly adjusted the limits down, so we might need
318 # to push something out.
322 # Only gets here if there was an error the first time. The
323 # exception handler adjusts limits downward in some cases
324 # to free up resources, which would make the operation
327 self.cache_total += slot.size()
328 except Exception as e:
329 # It failed again. Give up.
331 raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
335 class Counter(object):
336 def __init__(self, v=0):
337 self._lk = threading.Lock()
349 class KeepClient(object):
350 DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
351 DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
353 class KeepService(PyCurlHelper):
354 """Make requests to a single Keep service, and track results.
356 A KeepService is intended to last long enough to perform one
357 transaction (GET or PUT) against one Keep service. This can
358 involve calling either get() or put() multiple times in order
359 to retry after transient failures. However, calling both get()
360 and put() on a single instance -- or using the same instance
361 to access two different Keep services -- will not produce
368 arvados.errors.HttpError,
371 def __init__(self, root, user_agent_pool=queue.LifoQueue(),
373 download_counter=None,
376 super(KeepClient.KeepService, self).__init__()
378 self._user_agent_pool = user_agent_pool
379 self._result = {'error': None}
383 self.get_headers = {'Accept': 'application/octet-stream'}
384 self.get_headers.update(headers)
385 self.put_headers = headers
386 self.upload_counter = upload_counter
387 self.download_counter = download_counter
388 self.insecure = insecure
391 """Is it worth attempting a request?"""
395 """Did the request succeed or encounter permanent failure?"""
396 return self._result['error'] == False or not self._usable
398 def last_result(self):
401 def _get_user_agent(self):
403 return self._user_agent_pool.get(block=False)
407 def _put_user_agent(self, ua):
410 self._user_agent_pool.put(ua, block=False)
414 def get(self, locator, method="GET", timeout=None):
415 # locator is a KeepLocator object.
416 url = self.root + str(locator)
417 _logger.debug("Request: %s %s", method, url)
418 curl = self._get_user_agent()
421 with timer.Timer() as t:
423 response_body = BytesIO()
424 curl.setopt(pycurl.NOSIGNAL, 1)
425 curl.setopt(pycurl.OPENSOCKETFUNCTION,
426 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
427 curl.setopt(pycurl.URL, url.encode('utf-8'))
428 curl.setopt(pycurl.HTTPHEADER, [
429 '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
430 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
431 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
433 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
434 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
436 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
438 curl.setopt(pycurl.NOBODY, True)
440 curl.setopt(pycurl.HTTPGET, True)
441 self._setcurltimeouts(curl, timeout, method=="HEAD")
445 except Exception as e:
446 raise arvados.errors.HttpError(0, str(e))
452 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
453 'body': response_body.getvalue(),
454 'headers': self._headers,
458 ok = retry.check_http_response_success(self._result['status_code'])
460 self._result['error'] = arvados.errors.HttpError(
461 self._result['status_code'],
462 self._headers.get('x-status-line', 'Error'))
463 except self.HTTP_ERRORS as e:
467 self._usable = ok != False
468 if self._result.get('status_code', None):
469 # The client worked well enough to get an HTTP status
470 # code, so presumably any problems are just on the
471 # server side and it's OK to reuse the client.
472 self._put_user_agent(curl)
474 # Don't return this client to the pool, in case it's
478 _logger.debug("Request fail: GET %s => %s: %s",
479 url, type(self._result['error']), str(self._result['error']))
482 _logger.info("HEAD %s: %s bytes",
483 self._result['status_code'],
484 self._result.get('content-length'))
485 if self._result['headers'].get('x-keep-locator'):
486 # This is a response to a remote block copy request, return
487 # the local copy block locator.
488 return self._result['headers'].get('x-keep-locator')
491 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
492 self._result['status_code'],
493 len(self._result['body']),
495 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
497 if self.download_counter:
498 self.download_counter.add(len(self._result['body']))
499 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
500 if resp_md5 != locator.md5sum:
501 _logger.warning("Checksum fail: md5(%s) = %s",
503 self._result['error'] = arvados.errors.HttpError(
506 return self._result['body']
508 def put(self, hash_s, body, timeout=None, headers={}):
509 put_headers = copy.copy(self.put_headers)
510 put_headers.update(headers)
511 url = self.root + hash_s
512 _logger.debug("Request: PUT %s", url)
513 curl = self._get_user_agent()
516 with timer.Timer() as t:
518 body_reader = BytesIO(body)
519 response_body = BytesIO()
520 curl.setopt(pycurl.NOSIGNAL, 1)
521 curl.setopt(pycurl.OPENSOCKETFUNCTION,
522 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
523 curl.setopt(pycurl.URL, url.encode('utf-8'))
524 # Using UPLOAD tells cURL to wait for a "go ahead" from the
525 # Keep server (in the form of a HTTP/1.1 "100 Continue"
526 # response) instead of sending the request body immediately.
527 # This allows the server to reject the request if the request
528 # is invalid or the server is read-only, without waiting for
529 # the client to send the entire block.
530 curl.setopt(pycurl.UPLOAD, True)
531 curl.setopt(pycurl.INFILESIZE, len(body))
532 curl.setopt(pycurl.READFUNCTION, body_reader.read)
533 curl.setopt(pycurl.HTTPHEADER, [
534 '{}: {}'.format(k,v) for k,v in put_headers.items()])
535 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
536 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
538 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
539 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
541 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
542 self._setcurltimeouts(curl, timeout)
545 except Exception as e:
546 raise arvados.errors.HttpError(0, str(e))
552 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
553 'body': response_body.getvalue().decode('utf-8'),
554 'headers': self._headers,
557 ok = retry.check_http_response_success(self._result['status_code'])
559 self._result['error'] = arvados.errors.HttpError(
560 self._result['status_code'],
561 self._headers.get('x-status-line', 'Error'))
562 except self.HTTP_ERRORS as e:
566 self._usable = ok != False # still usable if ok is True or None
567 if self._result.get('status_code', None):
568 # Client is functional. See comment in get().
569 self._put_user_agent(curl)
573 _logger.debug("Request fail: PUT %s => %s: %s",
574 url, type(self._result['error']), str(self._result['error']))
576 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
577 self._result['status_code'],
580 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
581 if self.upload_counter:
582 self.upload_counter.add(len(body))
586 class KeepWriterQueue(queue.Queue):
587 def __init__(self, copies, classes=[]):
588 queue.Queue.__init__(self) # Old-style superclass
589 self.wanted_copies = copies
590 self.wanted_storage_classes = classes
591 self.successful_copies = 0
592 self.confirmed_storage_classes = {}
594 self.storage_classes_tracking = True
595 self.queue_data_lock = threading.RLock()
596 self.pending_tries = max(copies, len(classes))
597 self.pending_tries_notification = threading.Condition()
599 def write_success(self, response, replicas_nr, classes_confirmed):
600 with self.queue_data_lock:
601 self.successful_copies += replicas_nr
602 if classes_confirmed is None:
603 self.storage_classes_tracking = False
604 elif self.storage_classes_tracking:
605 for st_class, st_copies in classes_confirmed.items():
607 self.confirmed_storage_classes[st_class] += st_copies
609 self.confirmed_storage_classes[st_class] = st_copies
610 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
611 self.response = response
612 with self.pending_tries_notification:
613 self.pending_tries_notification.notify_all()
615 def write_fail(self, ks):
616 with self.pending_tries_notification:
617 self.pending_tries += 1
618 self.pending_tries_notification.notify()
620 def pending_copies(self):
621 with self.queue_data_lock:
622 return self.wanted_copies - self.successful_copies
624 def satisfied_classes(self):
625 with self.queue_data_lock:
626 if not self.storage_classes_tracking:
627 # Notifies disabled storage classes expectation to
630 return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
632 def pending_classes(self):
633 with self.queue_data_lock:
634 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
636 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
637 for st_class, st_copies in self.confirmed_storage_classes.items():
638 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
639 unsatisfied_classes.remove(st_class)
640 return unsatisfied_classes
642 def get_next_task(self):
643 with self.pending_tries_notification:
645 if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
646 # This notify_all() is unnecessary --
647 # write_success() already called notify_all()
648 # when pending<1 became true, so it's not
649 # possible for any other thread to be in
650 # wait() now -- but it's cheap insurance
651 # against deadlock so we do it anyway:
652 self.pending_tries_notification.notify_all()
653 # Drain the queue and then raise Queue.Empty
657 elif self.pending_tries > 0:
658 service, service_root = self.get_nowait()
659 if service.finished():
662 self.pending_tries -= 1
663 return service, service_root
665 self.pending_tries_notification.notify_all()
668 self.pending_tries_notification.wait()
671 class KeepWriterThreadPool(object):
672 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
673 self.total_task_nr = 0
674 if (not max_service_replicas) or (max_service_replicas >= copies):
677 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
678 _logger.debug("Pool max threads is %d", num_threads)
680 self.queue = KeepClient.KeepWriterQueue(copies, classes)
682 for _ in range(num_threads):
683 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
684 self.workers.append(w)
686 def add_task(self, ks, service_root):
687 self.queue.put((ks, service_root))
688 self.total_task_nr += 1
691 return self.queue.successful_copies, self.queue.satisfied_classes()
695 for worker in self.workers:
697 # Wait for finished work
701 return self.queue.response
704 class KeepWriterThread(threading.Thread):
705 class TaskFailed(RuntimeError): pass
707 def __init__(self, queue, data, data_hash, timeout=None):
708 super(KeepClient.KeepWriterThread, self).__init__()
709 self.timeout = timeout
712 self.data_hash = data_hash
718 service, service_root = self.queue.get_next_task()
722 locator, copies, classes = self.do_task(service, service_root)
723 except Exception as e:
724 if not isinstance(e, self.TaskFailed):
725 _logger.exception("Exception in KeepWriterThread")
726 self.queue.write_fail(service)
728 self.queue.write_success(locator, copies, classes)
730 self.queue.task_done()
732 def do_task(self, service, service_root):
733 classes = self.queue.pending_classes()
737 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
738 success = bool(service.put(self.data_hash,
740 timeout=self.timeout,
742 result = service.last_result()
745 if result.get('status_code'):
746 _logger.debug("Request fail: PUT %s => %s %s",
748 result.get('status_code'),
750 raise self.TaskFailed()
752 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
753 str(threading.current_thread()),
758 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
759 except (KeyError, ValueError):
762 classes_confirmed = {}
764 scch = result['headers']['x-keep-storage-classes-confirmed']
765 for confirmation in scch.replace(' ', '').split(','):
766 if '=' in confirmation:
767 stored_class, stored_copies = confirmation.split('=')[:2]
768 classes_confirmed[stored_class] = int(stored_copies)
769 except (KeyError, ValueError):
770 # Storage classes confirmed header missing or corrupt
771 classes_confirmed = None
773 return result['body'].strip(), replicas_stored, classes_confirmed
776 def __init__(self, api_client=None, proxy=None,
777 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
778 api_token=None, local_store=None, block_cache=None,
779 num_retries=10, session=None, num_prefetch_threads=None):
780 """Initialize a new KeepClient.
784 The API client to use to find Keep services. If not
785 provided, KeepClient will build one from available Arvados
789 If specified, this KeepClient will send requests to this Keep
790 proxy. Otherwise, KeepClient will fall back to the setting of the
791 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
792 If you want to KeepClient does not use a proxy, pass in an empty
796 The initial timeout (in seconds) for HTTP requests to Keep
797 non-proxy servers. A tuple of three floats is interpreted as
798 (connection_timeout, read_timeout, minimum_bandwidth). A connection
799 will be aborted if the average traffic rate falls below
800 minimum_bandwidth bytes per second over an interval of read_timeout
801 seconds. Because timeouts are often a result of transient server
802 load, the actual connection timeout will be increased by a factor
803 of two on each retry.
804 Default: (2, 256, 32768).
807 The initial timeout (in seconds) for HTTP requests to
808 Keep proxies. A tuple of three floats is interpreted as
809 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
810 described above for adjusting connection timeouts on retry also
812 Default: (20, 256, 32768).
815 If you're not using an API client, but only talking
816 directly to a Keep proxy, this parameter specifies an API token
817 to authenticate Keep requests. It is an error to specify both
818 api_client and api_token. If you specify neither, KeepClient
819 will use one available from the Arvados configuration.
822 If specified, this KeepClient will bypass Keep
823 services, and save data to the named directory. If unspecified,
824 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
825 environment variable. If you want to ensure KeepClient does not
826 use local storage, pass in an empty string. This is primarily
827 intended to mock a server for testing.
830 The default number of times to retry failed requests.
831 This will be used as the default num_retries value when get() and
832 put() are called. Default 10.
834 self.lock = threading.Lock()
836 if config.get('ARVADOS_KEEP_SERVICES'):
837 proxy = config.get('ARVADOS_KEEP_SERVICES')
839 proxy = config.get('ARVADOS_KEEP_PROXY')
840 if api_token is None:
841 if api_client is None:
842 api_token = config.get('ARVADOS_API_TOKEN')
844 api_token = api_client.api_token
845 elif api_client is not None:
847 "can't build KeepClient with both API client and token")
848 if local_store is None:
849 local_store = os.environ.get('KEEP_LOCAL_STORE')
851 if api_client is None:
852 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
854 self.insecure = api_client.insecure
856 self.block_cache = block_cache if block_cache else KeepBlockCache()
857 self.timeout = timeout
858 self.proxy_timeout = proxy_timeout
859 self._user_agent_pool = queue.LifoQueue()
860 self.upload_counter = Counter()
861 self.download_counter = Counter()
862 self.put_counter = Counter()
863 self.get_counter = Counter()
864 self.hits_counter = Counter()
865 self.misses_counter = Counter()
866 self._storage_classes_unsupported_warning = False
867 self._default_classes = []
868 if num_prefetch_threads is not None:
869 self.num_prefetch_threads = num_prefetch_threads
871 self.num_prefetch_threads = 2
872 self._prefetch_queue = None
873 self._prefetch_threads = None
876 self.local_store = local_store
877 self.head = self.local_store_head
878 self.get = self.local_store_get
879 self.put = self.local_store_put
881 self.num_retries = num_retries
882 self.max_replicas_per_service = None
884 proxy_uris = proxy.split()
885 for i in range(len(proxy_uris)):
886 if not proxy_uris[i].endswith('/'):
889 url = urllib.parse.urlparse(proxy_uris[i])
890 if not (url.scheme and url.netloc):
891 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
892 self.api_token = api_token
893 self._gateway_services = {}
894 self._keep_services = [{
895 'uuid': "00000-bi6l4-%015d" % idx,
896 'service_type': 'proxy',
897 '_service_root': uri,
898 } for idx, uri in enumerate(proxy_uris)]
899 self._writable_services = self._keep_services
900 self.using_proxy = True
901 self._static_services_list = True
903 # It's important to avoid instantiating an API client
904 # unless we actually need one, for testing's sake.
905 if api_client is None:
906 api_client = arvados.api('v1')
907 self.api_client = api_client
908 self.api_token = api_client.api_token
909 self._gateway_services = {}
910 self._keep_services = None
911 self._writable_services = None
912 self.using_proxy = None
913 self._static_services_list = False
915 self._default_classes = [
916 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
918 # We're talking to an old cluster
921 def current_timeout(self, attempt_number):
922 """Return the appropriate timeout to use for this client.
924 The proxy timeout setting if the backend service is currently a proxy,
925 the regular timeout setting otherwise. The `attempt_number` indicates
926 how many times the operation has been tried already (starting from 0
927 for the first try), and scales the connection timeout portion of the
928 return value accordingly.
931 # TODO(twp): the timeout should be a property of a
932 # KeepService, not a KeepClient. See #4488.
933 t = self.proxy_timeout if self.using_proxy else self.timeout
935 return (t[0] * (1 << attempt_number), t[1])
937 return (t[0] * (1 << attempt_number), t[1], t[2])
938 def _any_nondisk_services(self, service_list):
939 return any(ks.get('service_type', 'disk') != 'disk'
940 for ks in service_list)
942 def build_services_list(self, force_rebuild=False):
943 if (self._static_services_list or
944 (self._keep_services and not force_rebuild)):
948 keep_services = self.api_client.keep_services().accessible()
949 except Exception: # API server predates Keep services.
950 keep_services = self.api_client.keep_disks().list()
952 # Gateway services are only used when specified by UUID,
953 # so there's nothing to gain by filtering them by
955 self._gateway_services = {ks['uuid']: ks for ks in
956 keep_services.execute()['items']}
957 if not self._gateway_services:
958 raise arvados.errors.NoKeepServersError()
960 # Precompute the base URI for each service.
961 for r in self._gateway_services.values():
962 host = r['service_host']
963 if not host.startswith('[') and host.find(':') >= 0:
964 # IPv6 URIs must be formatted like http://[::1]:80/...
965 host = '[' + host + ']'
966 r['_service_root'] = "{}://{}:{:d}/".format(
967 'https' if r['service_ssl_flag'] else 'http',
971 _logger.debug(str(self._gateway_services))
972 self._keep_services = [
973 ks for ks in self._gateway_services.values()
974 if not ks.get('service_type', '').startswith('gateway:')]
975 self._writable_services = [ks for ks in self._keep_services
976 if not ks.get('read_only')]
978 # For disk type services, max_replicas_per_service is 1
979 # It is unknown (unlimited) for other service types.
980 if self._any_nondisk_services(self._writable_services):
981 self.max_replicas_per_service = None
983 self.max_replicas_per_service = 1
985 def _service_weight(self, data_hash, service_uuid):
986 """Compute the weight of a Keep service endpoint for a data
987 block with a known hash.
989 The weight is md5(h + u) where u is the last 15 characters of
990 the service endpoint's UUID.
992 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
994 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
995 """Return an array of Keep service endpoints, in the order in
996 which they should be probed when reading or writing data with
997 the given hash+hints.
999 self.build_services_list(force_rebuild)
1002 # Use the services indicated by the given +K@... remote
1003 # service hints, if any are present and can be resolved to a
1005 for hint in locator.hints:
1006 if hint.startswith('K@'):
1008 sorted_roots.append(
1009 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1010 elif len(hint) == 29:
1011 svc = self._gateway_services.get(hint[2:])
1013 sorted_roots.append(svc['_service_root'])
1015 # Sort the available local services by weight (heaviest first)
1016 # for this locator, and return their service_roots (base URIs)
1018 use_services = self._keep_services
1020 use_services = self._writable_services
1021 self.using_proxy = self._any_nondisk_services(use_services)
1022 sorted_roots.extend([
1023 svc['_service_root'] for svc in sorted(
1026 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1027 _logger.debug("{}: {}".format(locator, sorted_roots))
1030 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1031 # roots_map is a dictionary, mapping Keep service root strings
1032 # to KeepService objects. Poll for Keep services, and add any
1033 # new ones to roots_map. Return the current list of local
1035 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1036 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1037 for root in local_roots:
1038 if root not in roots_map:
1039 roots_map[root] = self.KeepService(
1040 root, self._user_agent_pool,
1041 upload_counter=self.upload_counter,
1042 download_counter=self.download_counter,
1044 insecure=self.insecure)
1048 def _check_loop_result(result):
1049 # KeepClient RetryLoops should save results as a 2-tuple: the
1050 # actual result of the request, and the number of servers available
1051 # to receive the request this round.
1052 # This method returns True if there's a real result, False if
1053 # there are no more servers available, otherwise None.
1054 if isinstance(result, Exception):
1056 result, tried_server_count = result
1057 if (result is not None) and (result is not False):
1059 elif tried_server_count < 1:
1060 _logger.info("No more Keep services to try; giving up")
1065 def get_from_cache(self, loc_s):
1066 """Fetch a block only if is in the cache, otherwise return None."""
1067 locator = KeepLocator(loc_s)
1068 slot = self.block_cache.get(locator.md5sum)
1069 if slot is not None and slot.ready.is_set():
1074 def refresh_signature(self, loc):
1075 """Ask Keep to get the remote block and return its local signature"""
1076 now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1077 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1080 def head(self, loc_s, **kwargs):
1081 return self._get_or_head(loc_s, method="HEAD", **kwargs)
1084 def get(self, loc_s, **kwargs):
1085 return self._get_or_head(loc_s, method="GET", **kwargs)
1087 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1088 """Get data from Keep.
1090 This method fetches one or more blocks of data from Keep. It
1091 sends a request each Keep service registered with the API
1092 server (or the proxy provided when this client was
1093 instantiated), then each service named in location hints, in
1094 sequence. As soon as one service provides the data, it's
1098 * loc_s: A string of one or more comma-separated locators to fetch.
1099 This method returns the concatenation of these blocks.
1100 * num_retries: The number of times to retry GET requests to
1101 *each* Keep server if it returns temporary failures, with
1102 exponential backoff. Note that, in each loop, the method may try
1103 to fetch data from every available Keep service, along with any
1104 that are named in location hints in the locator. The default value
1105 is set when the KeepClient is initialized.
1108 return ''.join(self.get(x) for x in loc_s.split(','))
1110 self.get_counter.add(1)
1112 request_id = (request_id or
1113 (hasattr(self, 'api_client') and self.api_client.request_id) or
1114 arvados.util.new_request_id())
1117 headers['X-Request-Id'] = request_id
1122 locator = KeepLocator(loc_s)
1125 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1127 # Fresh and empty "first time it is used" slot
1130 # this is request for a prefetch to fill in
1131 # the cache, don't need to wait for the
1132 # result, so if it is already in flight return
1133 # immediately. Clear 'slot' to prevent
1134 # finally block from calling slot.set()
1135 if slot.ready.is_set():
1141 if blob is not None:
1142 self.hits_counter.add(1)
1145 # If blob is None, this means either
1147 # (a) another thread was fetching this block and
1148 # failed with an error or
1150 # (b) cache thrashing caused the slot to be
1151 # evicted (content set to None) by another thread
1152 # between the call to reserve_cache() and get().
1154 # We'll handle these cases by reserving a new slot
1155 # and then doing a full GET request.
1158 self.misses_counter.add(1)
1160 # If the locator has hints specifying a prefix (indicating a
1161 # remote keepproxy) or the UUID of a local gateway service,
1162 # read data from the indicated service(s) instead of the usual
1163 # list of local disk services.
1164 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1165 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1166 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1167 for hint in locator.hints if (
1168 hint.startswith('K@') and
1170 self._gateway_services.get(hint[2:])
1172 # Map root URLs to their KeepService objects.
1174 root: self.KeepService(root, self._user_agent_pool,
1175 upload_counter=self.upload_counter,
1176 download_counter=self.download_counter,
1178 insecure=self.insecure)
1179 for root in hint_roots
1182 # See #3147 for a discussion of the loop implementation. Highlights:
1183 # * Refresh the list of Keep services after each failure, in case
1184 # it's being updated.
1185 # * Retry until we succeed, we're out of retries, or every available
1186 # service has returned permanent failure.
1189 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1191 for tries_left in loop:
1193 sorted_roots = self.map_new_services(
1195 force_rebuild=(tries_left < num_retries),
1196 need_writable=False,
1198 except Exception as error:
1199 loop.save_result(error)
1202 # Query KeepService objects that haven't returned
1203 # permanent failure, in our specified shuffle order.
1204 services_to_try = [roots_map[root]
1205 for root in sorted_roots
1206 if roots_map[root].usable()]
1207 for keep_service in services_to_try:
1208 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1209 if blob is not None:
1211 loop.save_result((blob, len(services_to_try)))
1213 # Always cache the result, then return it if we succeeded.
1217 if slot is not None:
1218 self.block_cache.set(slot, blob)
1220 # Q: Including 403 is necessary for the Keep tests to continue
1221 # passing, but maybe they should expect KeepReadError instead?
1222 not_founds = sum(1 for key in sorted_roots
1223 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1224 service_errors = ((key, roots_map[key].last_result()['error'])
1225 for key in sorted_roots)
1227 raise arvados.errors.KeepReadError(
1228 "[{}] failed to read {}: no Keep services available ({})".format(
1229 request_id, loc_s, loop.last_result()))
1230 elif not_founds == len(sorted_roots):
1231 raise arvados.errors.NotFoundError(
1232 "[{}] {} not found".format(request_id, loc_s), service_errors)
1234 raise arvados.errors.KeepReadError(
1235 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1238 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1239 """Save data in Keep.
1241 This method will get a list of Keep services from the API server, and
1242 send the data to each one simultaneously in a new thread. Once the
1243 uploads are finished, if enough copies are saved, this method returns
1244 the most recent HTTP response body. If requests fail to upload
1245 enough copies, this method raises KeepWriteError.
1248 * data: The string of data to upload.
1249 * copies: The number of copies that the user requires be saved.
1251 * num_retries: The number of times to retry PUT requests to
1252 *each* Keep server if it returns temporary failures, with
1253 exponential backoff. The default value is set when the
1254 KeepClient is initialized.
1255 * classes: An optional list of storage class names where copies should
1259 classes = classes or self._default_classes
1261 if not isinstance(data, bytes):
1262 data = data.encode()
1264 self.put_counter.add(1)
1266 data_hash = hashlib.md5(data).hexdigest()
1267 loc_s = data_hash + '+' + str(len(data))
1270 locator = KeepLocator(loc_s)
1272 request_id = (request_id or
1273 (hasattr(self, 'api_client') and self.api_client.request_id) or
1274 arvados.util.new_request_id())
1276 'X-Request-Id': request_id,
1277 'X-Keep-Desired-Replicas': str(copies),
1280 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1284 for tries_left in loop:
1286 sorted_roots = self.map_new_services(
1288 force_rebuild=(tries_left < num_retries),
1291 except Exception as error:
1292 loop.save_result(error)
1295 pending_classes = []
1296 if done_classes is not None:
1297 pending_classes = list(set(classes) - set(done_classes))
1298 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1299 data_hash=data_hash,
1300 copies=copies - done_copies,
1301 max_service_replicas=self.max_replicas_per_service,
1302 timeout=self.current_timeout(num_retries - tries_left),
1303 classes=pending_classes)
1304 for service_root, ks in [(root, roots_map[root])
1305 for root in sorted_roots]:
1308 writer_pool.add_task(ks, service_root)
1310 pool_copies, pool_classes = writer_pool.done()
1311 done_copies += pool_copies
1312 if (done_classes is not None) and (pool_classes is not None):
1313 done_classes += pool_classes
1315 (done_copies >= copies and set(done_classes) == set(classes),
1316 writer_pool.total_task_nr))
1318 # Old keepstore contacted without storage classes support:
1319 # success is determined only by successful copies.
1321 # Disable storage classes tracking from this point forward.
1322 if not self._storage_classes_unsupported_warning:
1323 self._storage_classes_unsupported_warning = True
1324 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1327 (done_copies >= copies, writer_pool.total_task_nr))
1330 return writer_pool.response()
1332 raise arvados.errors.KeepWriteError(
1333 "[{}] failed to write {}: no Keep services available ({})".format(
1334 request_id, data_hash, loop.last_result()))
1336 service_errors = ((key, roots_map[key].last_result()['error'])
1337 for key in sorted_roots
1338 if roots_map[key].last_result()['error'])
1339 raise arvados.errors.KeepWriteError(
1340 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1341 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1343 def _block_prefetch_worker(self):
1344 """The background downloader thread."""
1347 b = self._prefetch_queue.get()
1350 self.get(b, prefetch=True)
1352 _logger.exception("Exception doing block prefetch")
1354 def _start_prefetch_threads(self):
1355 if self._prefetch_threads is None:
1357 if self._prefetch_threads is not None:
1359 self._prefetch_queue = queue.Queue()
1360 self._prefetch_threads = []
1361 for i in range(0, self.num_prefetch_threads):
1362 thread = threading.Thread(target=self._block_prefetch_worker)
1363 self._prefetch_threads.append(thread)
1364 thread.daemon = True
1367 def block_prefetch(self, locator):
1369 This relies on the fact that KeepClient implements a block cache,
1370 so repeated requests for the same block will not result in repeated
1371 downloads (unless the block is evicted from the cache.) This method
1375 if self.block_cache.get(locator) is not None:
1378 self._start_prefetch_threads()
1379 self._prefetch_queue.put(locator)
1381 def stop_prefetch_threads(self):
1383 if self._prefetch_threads is not None:
1384 for t in self._prefetch_threads:
1385 self._prefetch_queue.put(None)
1386 for t in self._prefetch_threads:
1388 self._prefetch_threads = None
1389 self._prefetch_queue = None
1391 def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1392 """A stub for put().
1394 This method is used in place of the real put() method when
1395 using local storage (see constructor's local_store argument).
1397 copies and num_retries arguments are ignored: they are here
1398 only for the sake of offering the same call signature as
1401 Data stored this way can be retrieved via local_store_get().
1403 md5 = hashlib.md5(data).hexdigest()
1404 locator = '%s+%d' % (md5, len(data))
1405 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1407 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1408 os.path.join(self.local_store, md5))
1411 def local_store_get(self, loc_s, num_retries=None):
1412 """Companion to local_store_put()."""
1414 locator = KeepLocator(loc_s)
1416 raise arvados.errors.NotFoundError(
1417 "Invalid data locator: '%s'" % loc_s)
1418 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1420 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1423 def local_store_head(self, loc_s, num_retries=None):
1424 """Companion to local_store_put()."""
1426 locator = KeepLocator(loc_s)
1428 raise arvados.errors.NotFoundError(
1429 "Invalid data locator: '%s'" % loc_s)
1430 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1432 if os.path.exists(os.path.join(self.local_store, locator.md5sum)):