1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
26 from io import BytesIO
29 import arvados.config as config
31 import arvados.retry as retry
34 from ._internal import basedirs, diskcache, Timer, parse_seq
35 from ._internal.pycurl import PyCurlHelper
37 _logger = logging.getLogger('arvados.keep')
38 global_client_object = None
40 # Monkey patch TCP constants when not available (apple). Values sourced from:
41 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
42 if sys.platform == 'darwin':
43 if not hasattr(socket, 'TCP_KEEPALIVE'):
44 socket.TCP_KEEPALIVE = 0x010
45 if not hasattr(socket, 'TCP_KEEPINTVL'):
46 socket.TCP_KEEPINTVL = 0x101
47 if not hasattr(socket, 'TCP_KEEPCNT'):
48 socket.TCP_KEEPCNT = 0x102
50 class KeepLocator(object):
51 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
52 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
54 def __init__(self, locator_str):
57 self._perm_expiry = None
58 pieces = iter(locator_str.split('+'))
59 self.md5sum = next(pieces)
61 self.size = int(next(pieces))
65 if self.HINT_RE.match(hint) is None:
66 raise ValueError("invalid hint format: {}".format(hint))
67 elif hint.startswith('A'):
68 self.parse_permission_hint(hint)
70 self.hints.append(hint)
75 for s in [self.md5sum, self.size,
76 self.permission_hint()] + self.hints
80 if self.size is not None:
81 return "%s+%i" % (self.md5sum, self.size)
85 def _make_hex_prop(name, length):
86 # Build and return a new property with the given name that
87 # must be a hex string of the given length.
88 data_name = '_{}'.format(name)
90 return getattr(self, data_name)
91 def setter(self, hex_str):
92 if not arvados.util.is_hex(hex_str, length):
93 raise ValueError("{} is not a {}-digit hex string: {!r}".
94 format(name, length, hex_str))
95 setattr(self, data_name, hex_str)
96 return property(getter, setter)
98 md5sum = _make_hex_prop('md5sum', 32)
99 perm_sig = _make_hex_prop('perm_sig', 40)
102 def perm_expiry(self):
103 return self._perm_expiry
106 def perm_expiry(self, value):
107 if not arvados.util.is_hex(value, 1, 8):
109 "permission timestamp must be a hex Unix timestamp: {}".
111 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
113 def permission_hint(self):
114 data = [self.perm_sig, self.perm_expiry]
117 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
118 return "A{}@{:08x}".format(*data)
120 def parse_permission_hint(self, s):
122 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
124 raise ValueError("bad permission hint {}".format(s))
126 def permission_expired(self, as_of_dt=None):
127 if self.perm_expiry is None:
129 elif as_of_dt is None:
130 as_of_dt = datetime.datetime.now()
131 return self.perm_expiry <= as_of_dt
134 class KeepBlockCache(object):
135 def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
136 self.cache_max = cache_max
137 self._cache = collections.OrderedDict()
138 self._cache_lock = threading.Lock()
139 self._max_slots = max_slots
140 self._disk_cache = disk_cache
141 self._disk_cache_dir = disk_cache_dir
142 self._cache_updating = threading.Condition(self._cache_lock)
144 if self._disk_cache and self._disk_cache_dir is None:
145 self._disk_cache_dir = str(basedirs.BaseDirectories('CACHE').storage_path('keep'))
147 if self._max_slots == 0:
149 # Each block uses two file descriptors, one used to
150 # open it initially and hold the flock(), and a second
151 # hidden one used by mmap().
153 # Set max slots to 1/8 of maximum file handles. This
154 # means we'll use at most 1/4 of total file handles.
156 # NOFILE typically defaults to 1024 on Linux so this
157 # is 128 slots (256 file handles), which means we can
158 # cache up to 8 GiB of 64 MiB blocks. This leaves
159 # 768 file handles for sockets and other stuff.
161 # When we want the ability to have more cache (e.g. in
162 # arv-mount) we'll increase rlimit before calling
164 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
167 self._max_slots = 512
169 if self.cache_max == 0:
171 fs = os.statvfs(self._disk_cache_dir)
172 # Calculation of available space incorporates existing cache usage
173 existing_usage = diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
174 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
175 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
177 # 10% of total disk size
178 # 25% of available space
180 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
183 self.cache_max = (256 * 1024 * 1024)
185 self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
189 self._cache = diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
190 for slot in self._cache.values():
191 self.cache_total += slot.size()
195 __slots__ = ("locator", "ready", "content")
197 def __init__(self, locator):
198 self.locator = locator
199 self.ready = threading.Event()
206 def set(self, value):
207 if self.content is not None:
214 if self.content is None:
217 return len(self.content)
223 def _resize_cache(self, cache_max, max_slots):
224 # Try and make sure the contents of the cache do not exceed
225 # the supplied maximums.
227 if self.cache_total <= cache_max and len(self._cache) <= max_slots:
230 _evict_candidates = collections.deque(self._cache.values())
231 while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots):
232 slot = _evict_candidates.popleft()
233 if not slot.ready.is_set():
238 self.cache_total -= sz
239 del self._cache[slot.locator]
243 '''Cap the cache size to self.cache_max'''
244 with self._cache_updating:
245 self._resize_cache(self.cache_max, self._max_slots)
246 self._cache_updating.notify_all()
248 def _get(self, locator):
249 # Test if the locator is already in the cache
250 if locator in self._cache:
251 n = self._cache[locator]
252 if n.ready.is_set() and n.content is None:
253 del self._cache[n.locator]
255 self._cache.move_to_end(locator)
258 # see if it exists on disk
259 n = diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
261 self._cache[n.locator] = n
262 self.cache_total += n.size()
266 def get(self, locator):
267 with self._cache_lock:
268 return self._get(locator)
270 def reserve_cache(self, locator):
271 '''Reserve a cache slot for the specified locator,
272 or return the existing slot.'''
273 with self._cache_updating:
274 n = self._get(locator)
278 # Add a new cache slot for the locator
279 self._resize_cache(self.cache_max, self._max_slots-1)
280 while len(self._cache) >= self._max_slots:
281 # If there isn't a slot available, need to wait
282 # for something to happen that releases one of the
283 # cache slots. Idle for 200 ms or woken up by
285 self._cache_updating.wait(timeout=0.2)
286 self._resize_cache(self.cache_max, self._max_slots-1)
289 n = diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
291 n = KeepBlockCache._CacheSlot(locator)
292 self._cache[n.locator] = n
295 def set(self, slot, blob):
298 self.cache_total += slot.size()
301 if e.errno == errno.ENOMEM:
302 # Reduce max slots to current - 4, cap cache and retry
303 with self._cache_lock:
304 self._max_slots = max(4, len(self._cache) - 4)
305 elif e.errno == errno.ENOSPC:
306 # Reduce disk max space to current - 256 MiB, cap cache and retry
307 with self._cache_lock:
308 sm = sum(st.size() for st in self._cache.values())
309 self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
310 elif e.errno == errno.ENODEV:
311 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
312 except Exception as e:
315 # Check if we should evict things from the cache. Either
316 # because we added a new thing or there was an error and
317 # we possibly adjusted the limits down, so we might need
318 # to push something out.
322 # Only gets here if there was an error the first time. The
323 # exception handler adjusts limits downward in some cases
324 # to free up resources, which would make the operation
327 self.cache_total += slot.size()
328 except Exception as e:
329 # It failed again. Give up.
331 raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
336 with self._cache_lock:
341 def __init__(self, v=0):
342 self._lk = threading.Lock()
354 class KeepClient(object):
355 DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
356 DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
358 class _KeepService(PyCurlHelper):
359 """Make requests to a single Keep service, and track results.
361 A _KeepService is intended to last long enough to perform one
362 transaction (GET or PUT) against one Keep service. This can
363 involve calling either get() or put() multiple times in order
364 to retry after transient failures. However, calling both get()
365 and put() on a single instance -- or using the same instance
366 to access two different Keep services -- will not produce
373 arvados.errors.HttpError,
376 def __init__(self, root, user_agent_pool=queue.LifoQueue(),
378 download_counter=None,
383 self._user_agent_pool = user_agent_pool
384 self._result = {'error': None}
388 self.get_headers = {'Accept': 'application/octet-stream'}
389 self.get_headers.update(headers)
390 self.put_headers = headers
391 self.upload_counter = upload_counter
392 self.download_counter = download_counter
393 self.insecure = insecure
396 """Is it worth attempting a request?"""
400 """Did the request succeed or encounter permanent failure?"""
401 return self._result['error'] == False or not self._usable
403 def last_result(self):
406 def _get_user_agent(self):
408 return self._user_agent_pool.get(block=False)
412 def _put_user_agent(self, ua):
415 self._user_agent_pool.put(ua, block=False)
419 def get(self, locator, method="GET", timeout=None):
420 # locator is a KeepLocator object.
421 url = self.root + str(locator)
422 _logger.debug("Request: %s %s", method, url)
423 curl = self._get_user_agent()
428 response_body = BytesIO()
429 curl.setopt(pycurl.NOSIGNAL, 1)
430 curl.setopt(pycurl.OPENSOCKETFUNCTION,
431 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
432 curl.setopt(pycurl.URL, url.encode('utf-8'))
433 curl.setopt(pycurl.HTTPHEADER, [
434 '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
435 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
436 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
438 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
439 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
441 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
443 curl.setopt(pycurl.NOBODY, True)
445 curl.setopt(pycurl.HTTPGET, True)
446 self._setcurltimeouts(curl, timeout, method=="HEAD")
450 except Exception as e:
451 raise arvados.errors.HttpError(0, str(e))
457 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
458 'body': response_body.getvalue(),
459 'headers': self._headers,
463 ok = retry.check_http_response_success(self._result['status_code'])
465 self._result['error'] = arvados.errors.HttpError(
466 self._result['status_code'],
467 self._headers.get('x-status-line', 'Error'))
468 except self.HTTP_ERRORS as e:
472 self._usable = ok != False
473 if self._result.get('status_code', None):
474 # The client worked well enough to get an HTTP status
475 # code, so presumably any problems are just on the
476 # server side and it's OK to reuse the client.
477 self._put_user_agent(curl)
479 # Don't return this client to the pool, in case it's
483 _logger.debug("Request fail: GET %s => %s: %s",
484 url, type(self._result['error']), str(self._result['error']))
487 _logger.info("HEAD %s: %s bytes",
488 self._result['status_code'],
489 self._result.get('content-length'))
490 if self._result['headers'].get('x-keep-locator'):
491 # This is a response to a remote block copy request, return
492 # the local copy block locator.
493 return self._result['headers'].get('x-keep-locator')
496 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
497 self._result['status_code'],
498 len(self._result['body']),
500 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
502 if self.download_counter:
503 self.download_counter.add(len(self._result['body']))
504 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
505 if resp_md5 != locator.md5sum:
506 _logger.warning("Checksum fail: md5(%s) = %s",
508 self._result['error'] = arvados.errors.HttpError(
511 return self._result['body']
513 def put(self, hash_s, body, timeout=None, headers={}):
514 put_headers = copy.copy(self.put_headers)
515 put_headers.update(headers)
516 url = self.root + hash_s
517 _logger.debug("Request: PUT %s", url)
518 curl = self._get_user_agent()
523 body_reader = BytesIO(body)
524 response_body = BytesIO()
525 curl.setopt(pycurl.NOSIGNAL, 1)
526 curl.setopt(pycurl.OPENSOCKETFUNCTION,
527 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
528 curl.setopt(pycurl.URL, url.encode('utf-8'))
529 # Using UPLOAD tells cURL to wait for a "go ahead" from the
530 # Keep server (in the form of a HTTP/1.1 "100 Continue"
531 # response) instead of sending the request body immediately.
532 # This allows the server to reject the request if the request
533 # is invalid or the server is read-only, without waiting for
534 # the client to send the entire block.
535 curl.setopt(pycurl.UPLOAD, True)
536 curl.setopt(pycurl.INFILESIZE, len(body))
537 curl.setopt(pycurl.READFUNCTION, body_reader.read)
538 curl.setopt(pycurl.HTTPHEADER, [
539 '{}: {}'.format(k,v) for k,v in put_headers.items()])
540 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
541 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
543 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
544 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
546 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
547 self._setcurltimeouts(curl, timeout)
550 except Exception as e:
551 raise arvados.errors.HttpError(0, str(e))
557 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
558 'body': response_body.getvalue().decode('utf-8'),
559 'headers': self._headers,
562 ok = retry.check_http_response_success(self._result['status_code'])
564 self._result['error'] = arvados.errors.HttpError(
565 self._result['status_code'],
566 self._headers.get('x-status-line', 'Error'))
567 except self.HTTP_ERRORS as e:
571 self._usable = ok != False # still usable if ok is True or None
572 if self._result.get('status_code', None):
573 # Client is functional. See comment in get().
574 self._put_user_agent(curl)
578 _logger.debug("Request fail: PUT %s => %s: %s",
579 url, type(self._result['error']), str(self._result['error']))
581 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
582 self._result['status_code'],
585 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
586 if self.upload_counter:
587 self.upload_counter.add(len(body))
591 class _KeepWriterQueue(queue.Queue):
592 def __init__(self, copies, classes=[]):
593 queue.Queue.__init__(self) # Old-style superclass
594 self.wanted_copies = copies
595 self.wanted_storage_classes = classes
596 self.successful_copies = 0
597 self.confirmed_storage_classes = {}
599 self.storage_classes_tracking = True
600 self.queue_data_lock = threading.RLock()
601 self.pending_tries = max(copies, len(classes))
602 self.pending_tries_notification = threading.Condition()
604 def write_success(self, response, replicas_nr, classes_confirmed):
605 with self.queue_data_lock:
606 self.successful_copies += replicas_nr
607 if classes_confirmed is None:
608 self.storage_classes_tracking = False
609 elif self.storage_classes_tracking:
610 for st_class, st_copies in classes_confirmed.items():
612 self.confirmed_storage_classes[st_class] += st_copies
614 self.confirmed_storage_classes[st_class] = st_copies
615 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
616 self.response = response
617 with self.pending_tries_notification:
618 self.pending_tries_notification.notify_all()
620 def write_fail(self, ks):
621 with self.pending_tries_notification:
622 self.pending_tries += 1
623 self.pending_tries_notification.notify()
625 def pending_copies(self):
626 with self.queue_data_lock:
627 return self.wanted_copies - self.successful_copies
629 def satisfied_classes(self):
630 with self.queue_data_lock:
631 if not self.storage_classes_tracking:
632 # Notifies disabled storage classes expectation to
635 return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
637 def pending_classes(self):
638 with self.queue_data_lock:
639 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
641 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
642 for st_class, st_copies in self.confirmed_storage_classes.items():
643 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
644 unsatisfied_classes.remove(st_class)
645 return unsatisfied_classes
647 def get_next_task(self):
648 with self.pending_tries_notification:
650 if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
651 # This notify_all() is unnecessary --
652 # write_success() already called notify_all()
653 # when pending<1 became true, so it's not
654 # possible for any other thread to be in
655 # wait() now -- but it's cheap insurance
656 # against deadlock so we do it anyway:
657 self.pending_tries_notification.notify_all()
658 # Drain the queue and then raise Queue.Empty
662 elif self.pending_tries > 0:
663 service, service_root = self.get_nowait()
664 if service.finished():
667 self.pending_tries -= 1
668 return service, service_root
670 self.pending_tries_notification.notify_all()
673 self.pending_tries_notification.wait()
676 class _KeepWriterThreadPool:
677 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
678 self.total_task_nr = 0
679 if (not max_service_replicas) or (max_service_replicas >= copies):
682 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
683 _logger.debug("Pool max threads is %d", num_threads)
685 self.queue = KeepClient._KeepWriterQueue(copies, classes)
687 for _ in range(num_threads):
688 w = KeepClient._KeepWriterThread(self.queue, data, data_hash, timeout)
689 self.workers.append(w)
691 def add_task(self, ks, service_root):
692 self.queue.put((ks, service_root))
693 self.total_task_nr += 1
696 return self.queue.successful_copies, self.queue.satisfied_classes()
700 for worker in self.workers:
702 # Wait for finished work
706 return self.queue.response
709 class _KeepWriterThread(threading.Thread):
710 class TaskFailed(RuntimeError):
711 """Exception for failed Keep writes
713 TODO: Move this class to the module top level and document it
719 def __init__(self, queue, data, data_hash, timeout=None):
721 self.timeout = timeout
724 self.data_hash = data_hash
730 service, service_root = self.queue.get_next_task()
734 locator, copies, classes = self.do_task(service, service_root)
735 except Exception as e:
736 if not isinstance(e, self.TaskFailed):
737 _logger.exception("Exception in _KeepWriterThread")
738 self.queue.write_fail(service)
740 self.queue.write_success(locator, copies, classes)
742 self.queue.task_done()
744 def do_task(self, service, service_root):
745 classes = self.queue.pending_classes()
749 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
750 success = bool(service.put(self.data_hash,
752 timeout=self.timeout,
754 result = service.last_result()
757 if result.get('status_code'):
758 _logger.debug("Request fail: PUT %s => %s %s",
760 result.get('status_code'),
762 raise self.TaskFailed()
764 _logger.debug("_KeepWriterThread %s succeeded %s+%i %s",
765 str(threading.current_thread()),
770 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
771 except (KeyError, ValueError):
774 classes_confirmed = collections.defaultdict(int)
776 scch = result['headers']['x-keep-storage-classes-confirmed']
777 for confirmation in parse_seq(scch):
778 stored_class, _, stored_copies = confirmation.partition('=')
780 classes_confirmed[stored_class] += int(stored_copies)
781 except (KeyError, ValueError):
782 # Storage classes confirmed header missing or corrupt
783 classes_confirmed = None
785 return result['body'].strip(), replicas_stored, classes_confirmed
788 def __init__(self, api_client=None, proxy=None,
789 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
790 api_token=None, local_store=None, block_cache=None,
791 num_retries=10, session=None, num_prefetch_threads=None):
792 """Initialize a new KeepClient.
796 The API client to use to find Keep services. If not
797 provided, KeepClient will build one from available Arvados
801 If specified, this KeepClient will send requests to this Keep
802 proxy. Otherwise, KeepClient will fall back to the setting of the
803 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
804 If you want to KeepClient does not use a proxy, pass in an empty
808 The initial timeout (in seconds) for HTTP requests to Keep
809 non-proxy servers. A tuple of three floats is interpreted as
810 (connection_timeout, read_timeout, minimum_bandwidth). A connection
811 will be aborted if the average traffic rate falls below
812 minimum_bandwidth bytes per second over an interval of read_timeout
813 seconds. Because timeouts are often a result of transient server
814 load, the actual connection timeout will be increased by a factor
815 of two on each retry.
816 Default: (2, 256, 32768).
819 The initial timeout (in seconds) for HTTP requests to
820 Keep proxies. A tuple of three floats is interpreted as
821 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
822 described above for adjusting connection timeouts on retry also
824 Default: (20, 256, 32768).
827 If you're not using an API client, but only talking
828 directly to a Keep proxy, this parameter specifies an API token
829 to authenticate Keep requests. It is an error to specify both
830 api_client and api_token. If you specify neither, KeepClient
831 will use one available from the Arvados configuration.
834 If specified, this KeepClient will bypass Keep
835 services, and save data to the named directory. If unspecified,
836 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
837 environment variable. If you want to ensure KeepClient does not
838 use local storage, pass in an empty string. This is primarily
839 intended to mock a server for testing.
842 The default number of times to retry failed requests.
843 This will be used as the default num_retries value when get() and
844 put() are called. Default 10.
846 self.lock = threading.Lock()
848 if config.get('ARVADOS_KEEP_SERVICES'):
849 proxy = config.get('ARVADOS_KEEP_SERVICES')
851 proxy = config.get('ARVADOS_KEEP_PROXY')
852 if api_token is None:
853 if api_client is None:
854 api_token = config.get('ARVADOS_API_TOKEN')
856 api_token = api_client.api_token
857 elif api_client is not None:
859 "can't build KeepClient with both API client and token")
860 if local_store is None:
861 local_store = os.environ.get('KEEP_LOCAL_STORE')
863 if api_client is None:
864 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
866 self.insecure = api_client.insecure
868 self.block_cache = block_cache if block_cache else KeepBlockCache()
869 self.timeout = timeout
870 self.proxy_timeout = proxy_timeout
871 self._user_agent_pool = queue.LifoQueue()
872 self.upload_counter = _Counter()
873 self.download_counter = _Counter()
874 self.put_counter = _Counter()
875 self.get_counter = _Counter()
876 self.hits_counter = _Counter()
877 self.misses_counter = _Counter()
878 self._storage_classes_unsupported_warning = False
879 self._default_classes = []
880 if num_prefetch_threads is not None:
881 self.num_prefetch_threads = num_prefetch_threads
883 self.num_prefetch_threads = 2
884 self._prefetch_queue = None
885 self._prefetch_threads = None
888 self.local_store = local_store
889 self.head = self.local_store_head
890 self.get = self.local_store_get
891 self.put = self.local_store_put
893 self.num_retries = num_retries
894 self.max_replicas_per_service = None
896 proxy_uris = proxy.split()
897 for i in range(len(proxy_uris)):
898 if not proxy_uris[i].endswith('/'):
901 url = urllib.parse.urlparse(proxy_uris[i])
902 if not (url.scheme and url.netloc):
903 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
904 self.api_token = api_token
905 self._gateway_services = {}
906 self._keep_services = [{
907 'uuid': "00000-bi6l4-%015d" % idx,
908 'service_type': 'proxy',
909 '_service_root': uri,
910 } for idx, uri in enumerate(proxy_uris)]
911 self._writable_services = self._keep_services
912 self.using_proxy = True
913 self._static_services_list = True
915 # It's important to avoid instantiating an API client
916 # unless we actually need one, for testing's sake.
917 if api_client is None:
918 api_client = arvados.api('v1')
919 self.api_client = api_client
920 self.api_token = api_client.api_token
921 self._gateway_services = {}
922 self._keep_services = None
923 self._writable_services = None
924 self.using_proxy = None
925 self._static_services_list = False
927 self._default_classes = [
928 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
930 # We're talking to an old cluster
933 def current_timeout(self, attempt_number):
934 """Return the appropriate timeout to use for this client.
936 The proxy timeout setting if the backend service is currently a proxy,
937 the regular timeout setting otherwise. The `attempt_number` indicates
938 how many times the operation has been tried already (starting from 0
939 for the first try), and scales the connection timeout portion of the
940 return value accordingly.
943 # TODO(twp): the timeout should be a property of a
944 # _KeepService, not a KeepClient. See #4488.
945 t = self.proxy_timeout if self.using_proxy else self.timeout
947 return (t[0] * (1 << attempt_number), t[1])
949 return (t[0] * (1 << attempt_number), t[1], t[2])
950 def _any_nondisk_services(self, service_list):
951 return any(ks.get('service_type', 'disk') != 'disk'
952 for ks in service_list)
954 def build_services_list(self, force_rebuild=False):
955 if (self._static_services_list or
956 (self._keep_services and not force_rebuild)):
960 keep_services = self.api_client.keep_services().accessible()
961 except Exception: # API server predates Keep services.
962 keep_services = self.api_client.keep_disks().list()
964 # Gateway services are only used when specified by UUID,
965 # so there's nothing to gain by filtering them by
967 self._gateway_services = {ks['uuid']: ks for ks in
968 keep_services.execute()['items']}
969 if not self._gateway_services:
970 raise arvados.errors.NoKeepServersError()
972 # Precompute the base URI for each service.
973 for r in self._gateway_services.values():
974 host = r['service_host']
975 if not host.startswith('[') and host.find(':') >= 0:
976 # IPv6 URIs must be formatted like http://[::1]:80/...
977 host = '[' + host + ']'
978 r['_service_root'] = "{}://{}:{:d}/".format(
979 'https' if r['service_ssl_flag'] else 'http',
983 _logger.debug(str(self._gateway_services))
984 self._keep_services = [
985 ks for ks in self._gateway_services.values()
986 if not ks.get('service_type', '').startswith('gateway:')]
987 self._writable_services = [ks for ks in self._keep_services
988 if not ks.get('read_only')]
990 # For disk type services, max_replicas_per_service is 1
991 # It is unknown (unlimited) for other service types.
992 if self._any_nondisk_services(self._writable_services):
993 self.max_replicas_per_service = None
995 self.max_replicas_per_service = 1
997 def _service_weight(self, data_hash, service_uuid):
998 """Compute the weight of a Keep service endpoint for a data
999 block with a known hash.
1001 The weight is md5(h + u) where u is the last 15 characters of
1002 the service endpoint's UUID.
1004 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1006 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1007 """Return an array of Keep service endpoints, in the order in
1008 which they should be probed when reading or writing data with
1009 the given hash+hints.
1011 self.build_services_list(force_rebuild)
1014 # Use the services indicated by the given +K@... remote
1015 # service hints, if any are present and can be resolved to a
1017 for hint in locator.hints:
1018 if hint.startswith('K@'):
1020 sorted_roots.append(
1021 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1022 elif len(hint) == 29:
1023 svc = self._gateway_services.get(hint[2:])
1025 sorted_roots.append(svc['_service_root'])
1027 # Sort the available local services by weight (heaviest first)
1028 # for this locator, and return their service_roots (base URIs)
1030 use_services = self._keep_services
1032 use_services = self._writable_services
1033 self.using_proxy = self._any_nondisk_services(use_services)
1034 sorted_roots.extend([
1035 svc['_service_root'] for svc in sorted(
1038 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1039 _logger.debug("{}: {}".format(locator, sorted_roots))
1042 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1043 # roots_map is a dictionary, mapping Keep service root strings
1044 # to _KeepService objects. Poll for Keep services, and add any
1045 # new ones to roots_map. Return the current list of local
1047 headers.setdefault('Authorization', "Bearer %s" % (self.api_token,))
1048 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1049 for root in local_roots:
1050 if root not in roots_map:
1051 roots_map[root] = self._KeepService(
1052 root, self._user_agent_pool,
1053 upload_counter=self.upload_counter,
1054 download_counter=self.download_counter,
1056 insecure=self.insecure)
1060 def _check_loop_result(result):
1061 # KeepClient RetryLoops should save results as a 2-tuple: the
1062 # actual result of the request, and the number of servers available
1063 # to receive the request this round.
1064 # This method returns True if there's a real result, False if
1065 # there are no more servers available, otherwise None.
1066 if isinstance(result, Exception):
1068 result, tried_server_count = result
1069 if (result is not None) and (result is not False):
1071 elif tried_server_count < 1:
1072 _logger.info("No more Keep services to try; giving up")
1077 def get_from_cache(self, loc_s):
1078 """Fetch a block only if is in the cache, otherwise return None."""
1079 locator = KeepLocator(loc_s)
1080 slot = self.block_cache.get(locator.md5sum)
1081 if slot is not None and slot.ready.is_set():
1086 def refresh_signature(self, loc):
1087 """Ask Keep to get the remote block and return its local signature"""
1088 now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1089 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1092 def head(self, loc_s, **kwargs):
1093 return self._get_or_head(loc_s, method="HEAD", **kwargs)
1096 def get(self, loc_s, **kwargs):
1097 return self._get_or_head(loc_s, method="GET", **kwargs)
1099 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1100 """Get data from Keep.
1102 This method fetches one or more blocks of data from Keep. It
1103 sends a request each Keep service registered with the API
1104 server (or the proxy provided when this client was
1105 instantiated), then each service named in location hints, in
1106 sequence. As soon as one service provides the data, it's
1110 * loc_s: A string of one or more comma-separated locators to fetch.
1111 This method returns the concatenation of these blocks.
1112 * num_retries: The number of times to retry GET requests to
1113 *each* Keep server if it returns temporary failures, with
1114 exponential backoff. Note that, in each loop, the method may try
1115 to fetch data from every available Keep service, along with any
1116 that are named in location hints in the locator. The default value
1117 is set when the KeepClient is initialized.
1120 return ''.join(self.get(x) for x in loc_s.split(','))
1122 self.get_counter.add(1)
1124 request_id = (request_id or
1125 (hasattr(self, 'api_client') and self.api_client.request_id) or
1126 arvados.util.new_request_id())
1129 headers['X-Request-Id'] = request_id
1134 locator = KeepLocator(loc_s)
1137 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1139 # Fresh and empty "first time it is used" slot
1142 # this is request for a prefetch to fill in
1143 # the cache, don't need to wait for the
1144 # result, so if it is already in flight return
1145 # immediately. Clear 'slot' to prevent
1146 # finally block from calling slot.set()
1147 if slot.ready.is_set():
1153 if blob is not None:
1154 self.hits_counter.add(1)
1157 # If blob is None, this means either
1159 # (a) another thread was fetching this block and
1160 # failed with an error or
1162 # (b) cache thrashing caused the slot to be
1163 # evicted (content set to None) by another thread
1164 # between the call to reserve_cache() and get().
1166 # We'll handle these cases by reserving a new slot
1167 # and then doing a full GET request.
1170 self.misses_counter.add(1)
1172 # If the locator has hints specifying a prefix (indicating a
1173 # remote keepproxy) or the UUID of a local gateway service,
1174 # read data from the indicated service(s) instead of the usual
1175 # list of local disk services.
1176 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1177 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1178 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1179 for hint in locator.hints if (
1180 hint.startswith('K@') and
1182 self._gateway_services.get(hint[2:])
1184 # Map root URLs to their _KeepService objects.
1186 root: self._KeepService(root, self._user_agent_pool,
1187 upload_counter=self.upload_counter,
1188 download_counter=self.download_counter,
1190 insecure=self.insecure)
1191 for root in hint_roots
1194 # See #3147 for a discussion of the loop implementation. Highlights:
1195 # * Refresh the list of Keep services after each failure, in case
1196 # it's being updated.
1197 # * Retry until we succeed, we're out of retries, or every available
1198 # service has returned permanent failure.
1201 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1203 for tries_left in loop:
1205 sorted_roots = self.map_new_services(
1207 force_rebuild=(tries_left < num_retries),
1208 need_writable=False,
1210 except Exception as error:
1211 loop.save_result(error)
1214 # Query _KeepService objects that haven't returned
1215 # permanent failure, in our specified shuffle order.
1216 services_to_try = [roots_map[root]
1217 for root in sorted_roots
1218 if roots_map[root].usable()]
1219 for keep_service in services_to_try:
1220 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1221 if blob is not None:
1223 loop.save_result((blob, len(services_to_try)))
1225 # Always cache the result, then return it if we succeeded.
1229 if slot is not None:
1230 self.block_cache.set(slot, blob)
1232 # Q: Including 403 is necessary for the Keep tests to continue
1233 # passing, but maybe they should expect KeepReadError instead?
1234 not_founds = sum(1 for key in sorted_roots
1235 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1236 service_errors = ((key, roots_map[key].last_result()['error'])
1237 for key in sorted_roots)
1239 raise arvados.errors.KeepReadError(
1240 "[{}] failed to read {}: no Keep services available ({})".format(
1241 request_id, loc_s, loop.last_result()))
1242 elif not_founds == len(sorted_roots):
1243 raise arvados.errors.NotFoundError(
1244 "[{}] {} not found".format(request_id, loc_s), service_errors)
1246 raise arvados.errors.KeepReadError(
1247 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1250 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1251 """Save data in Keep.
1253 This method will get a list of Keep services from the API server, and
1254 send the data to each one simultaneously in a new thread. Once the
1255 uploads are finished, if enough copies are saved, this method returns
1256 the most recent HTTP response body. If requests fail to upload
1257 enough copies, this method raises KeepWriteError.
1260 * data: The string of data to upload.
1261 * copies: The number of copies that the user requires be saved.
1263 * num_retries: The number of times to retry PUT requests to
1264 *each* Keep server if it returns temporary failures, with
1265 exponential backoff. The default value is set when the
1266 KeepClient is initialized.
1267 * classes: An optional list of storage class names where copies should
1271 classes = classes or self._default_classes
1273 if not isinstance(data, bytes):
1274 data = data.encode()
1276 self.put_counter.add(1)
1278 data_hash = hashlib.md5(data).hexdigest()
1279 loc_s = data_hash + '+' + str(len(data))
1282 locator = KeepLocator(loc_s)
1284 request_id = (request_id or
1285 (hasattr(self, 'api_client') and self.api_client.request_id) or
1286 arvados.util.new_request_id())
1288 'X-Request-Id': request_id,
1289 'X-Keep-Desired-Replicas': str(copies),
1292 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1296 for tries_left in loop:
1298 sorted_roots = self.map_new_services(
1300 force_rebuild=(tries_left < num_retries),
1303 except Exception as error:
1304 loop.save_result(error)
1307 pending_classes = []
1308 if done_classes is not None:
1309 pending_classes = list(set(classes) - set(done_classes))
1310 writer_pool = KeepClient._KeepWriterThreadPool(
1312 data_hash=data_hash,
1313 copies=copies - done_copies,
1314 max_service_replicas=self.max_replicas_per_service,
1315 timeout=self.current_timeout(num_retries - tries_left),
1316 classes=pending_classes,
1318 for service_root, ks in [(root, roots_map[root])
1319 for root in sorted_roots]:
1322 writer_pool.add_task(ks, service_root)
1324 pool_copies, pool_classes = writer_pool.done()
1325 done_copies += pool_copies
1326 if (done_classes is not None) and (pool_classes is not None):
1327 done_classes += pool_classes
1329 (done_copies >= copies and set(done_classes) == set(classes),
1330 writer_pool.total_task_nr))
1332 # Old keepstore contacted without storage classes support:
1333 # success is determined only by successful copies.
1335 # Disable storage classes tracking from this point forward.
1336 if not self._storage_classes_unsupported_warning:
1337 self._storage_classes_unsupported_warning = True
1338 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1341 (done_copies >= copies, writer_pool.total_task_nr))
1344 return writer_pool.response()
1346 raise arvados.errors.KeepWriteError(
1347 "[{}] failed to write {}: no Keep services available ({})".format(
1348 request_id, data_hash, loop.last_result()))
1350 service_errors = ((key, roots_map[key].last_result()['error'])
1351 for key in sorted_roots
1352 if roots_map[key].last_result()['error'])
1353 raise arvados.errors.KeepWriteError(
1354 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1355 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1357 def _block_prefetch_worker(self):
1358 """The background downloader thread."""
1361 b = self._prefetch_queue.get()
1364 self.get(b, prefetch=True)
1366 _logger.exception("Exception doing block prefetch")
1368 def _start_prefetch_threads(self):
1369 if self._prefetch_threads is None:
1371 if self._prefetch_threads is not None:
1373 self._prefetch_queue = queue.Queue()
1374 self._prefetch_threads = []
1375 for i in range(0, self.num_prefetch_threads):
1376 thread = threading.Thread(target=self._block_prefetch_worker)
1377 self._prefetch_threads.append(thread)
1378 thread.daemon = True
1381 def block_prefetch(self, locator):
1383 This relies on the fact that KeepClient implements a block cache,
1384 so repeated requests for the same block will not result in repeated
1385 downloads (unless the block is evicted from the cache.) This method
1389 if self.block_cache.get(locator) is not None:
1392 self._start_prefetch_threads()
1393 self._prefetch_queue.put(locator)
1395 def stop_prefetch_threads(self):
1397 if self._prefetch_threads is not None:
1398 for t in self._prefetch_threads:
1399 self._prefetch_queue.put(None)
1400 for t in self._prefetch_threads:
1402 self._prefetch_threads = None
1403 self._prefetch_queue = None
1405 def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1406 """A stub for put().
1408 This method is used in place of the real put() method when
1409 using local storage (see constructor's local_store argument).
1411 copies and num_retries arguments are ignored: they are here
1412 only for the sake of offering the same call signature as
1415 Data stored this way can be retrieved via local_store_get().
1417 md5 = hashlib.md5(data).hexdigest()
1418 locator = '%s+%d' % (md5, len(data))
1419 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1421 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1422 os.path.join(self.local_store, md5))
1425 def local_store_get(self, loc_s, num_retries=None):
1426 """Companion to local_store_put()."""
1428 locator = KeepLocator(loc_s)
1430 raise arvados.errors.NotFoundError(
1431 "Invalid data locator: '%s'" % loc_s)
1432 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1434 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1437 def local_store_head(self, loc_s, num_retries=None):
1438 """Companion to local_store_put()."""
1440 locator = KeepLocator(loc_s)
1442 raise arvados.errors.NotFoundError(
1443 "Invalid data locator: '%s'" % loc_s)
1444 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1446 if os.path.exists(os.path.join(self.local_store, locator.md5sum)):