1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from __future__ import division
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
32 if sys.version_info >= (3, 0):
33 from io import BytesIO
35 from cStringIO import StringIO as BytesIO
38 import arvados.config as config
40 import arvados.retry as retry
43 _logger = logging.getLogger('arvados.keep')
44 global_client_object = None
47 # Monkey patch TCP constants when not available (apple). Values sourced from:
48 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
49 if sys.platform == 'darwin':
50 if not hasattr(socket, 'TCP_KEEPALIVE'):
51 socket.TCP_KEEPALIVE = 0x010
52 if not hasattr(socket, 'TCP_KEEPINTVL'):
53 socket.TCP_KEEPINTVL = 0x101
54 if not hasattr(socket, 'TCP_KEEPCNT'):
55 socket.TCP_KEEPCNT = 0x102
58 class KeepLocator(object):
59 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
60 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
62 def __init__(self, locator_str):
65 self._perm_expiry = None
66 pieces = iter(locator_str.split('+'))
67 self.md5sum = next(pieces)
69 self.size = int(next(pieces))
73 if self.HINT_RE.match(hint) is None:
74 raise ValueError("invalid hint format: {}".format(hint))
75 elif hint.startswith('A'):
76 self.parse_permission_hint(hint)
78 self.hints.append(hint)
83 for s in [self.md5sum, self.size,
84 self.permission_hint()] + self.hints
88 if self.size is not None:
89 return "%s+%i" % (self.md5sum, self.size)
93 def _make_hex_prop(name, length):
94 # Build and return a new property with the given name that
95 # must be a hex string of the given length.
96 data_name = '_{}'.format(name)
98 return getattr(self, data_name)
99 def setter(self, hex_str):
100 if not arvados.util.is_hex(hex_str, length):
101 raise ValueError("{} is not a {}-digit hex string: {!r}".
102 format(name, length, hex_str))
103 setattr(self, data_name, hex_str)
104 return property(getter, setter)
106 md5sum = _make_hex_prop('md5sum', 32)
107 perm_sig = _make_hex_prop('perm_sig', 40)
110 def perm_expiry(self):
111 return self._perm_expiry
114 def perm_expiry(self, value):
115 if not arvados.util.is_hex(value, 1, 8):
117 "permission timestamp must be a hex Unix timestamp: {}".
119 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
121 def permission_hint(self):
122 data = [self.perm_sig, self.perm_expiry]
125 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
126 return "A{}@{:08x}".format(*data)
128 def parse_permission_hint(self, s):
130 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
132 raise ValueError("bad permission hint {}".format(s))
134 def permission_expired(self, as_of_dt=None):
135 if self.perm_expiry is None:
137 elif as_of_dt is None:
138 as_of_dt = datetime.datetime.now()
139 return self.perm_expiry <= as_of_dt
143 """Simple interface to a global KeepClient object.
145 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
146 own API client. The global KeepClient will build an API client from the
147 current Arvados configuration, which may not match the one you built.
152 def global_client_object(cls):
153 global global_client_object
154 # Previously, KeepClient would change its behavior at runtime based
155 # on these configuration settings. We simulate that behavior here
156 # by checking the values and returning a new KeepClient if any of
158 key = (config.get('ARVADOS_API_HOST'),
159 config.get('ARVADOS_API_TOKEN'),
160 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
161 config.get('ARVADOS_KEEP_PROXY'),
162 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
163 os.environ.get('KEEP_LOCAL_STORE'))
164 if (global_client_object is None) or (cls._last_key != key):
165 global_client_object = KeepClient()
167 return global_client_object
170 def get(locator, **kwargs):
171 return Keep.global_client_object().get(locator, **kwargs)
174 def put(data, **kwargs):
175 return Keep.global_client_object().put(data, **kwargs)
177 class KeepBlockCache(object):
178 # Default RAM cache is 256MiB
179 def __init__(self, cache_max=(256 * 1024 * 1024)):
180 self.cache_max = cache_max
182 self._cache_lock = threading.Lock()
184 class CacheSlot(object):
185 __slots__ = ("locator", "ready", "content")
187 def __init__(self, locator):
188 self.locator = locator
189 self.ready = threading.Event()
196 def set(self, value):
201 if self.content is None:
204 return len(self.content)
207 '''Cap the cache size to self.cache_max'''
208 with self._cache_lock:
209 # Select all slots except those where ready.is_set() and content is
210 # None (that means there was an error reading the block).
211 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
212 sm = sum([slot.size() for slot in self._cache])
213 while len(self._cache) > 0 and sm > self.cache_max:
214 for i in range(len(self._cache)-1, -1, -1):
215 if self._cache[i].ready.is_set():
218 sm = sum([slot.size() for slot in self._cache])
220 def _get(self, locator):
221 # Test if the locator is already in the cache
222 for i in range(0, len(self._cache)):
223 if self._cache[i].locator == locator:
226 # move it to the front
228 self._cache.insert(0, n)
232 def get(self, locator):
233 with self._cache_lock:
234 return self._get(locator)
236 def reserve_cache(self, locator):
237 '''Reserve a cache slot for the specified locator,
238 or return the existing slot.'''
239 with self._cache_lock:
240 n = self._get(locator)
244 # Add a new cache slot for the locator
245 n = KeepBlockCache.CacheSlot(locator)
246 self._cache.insert(0, n)
249 class Counter(object):
250 def __init__(self, v=0):
251 self._lk = threading.Lock()
263 class KeepClient(object):
265 # Default Keep server connection timeout: 2 seconds
266 # Default Keep server read timeout: 256 seconds
267 # Default Keep server bandwidth minimum: 32768 bytes per second
268 # Default Keep proxy connection timeout: 20 seconds
269 # Default Keep proxy read timeout: 256 seconds
270 # Default Keep proxy bandwidth minimum: 32768 bytes per second
271 DEFAULT_TIMEOUT = (2, 256, 32768)
272 DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
275 class KeepService(object):
276 """Make requests to a single Keep service, and track results.
278 A KeepService is intended to last long enough to perform one
279 transaction (GET or PUT) against one Keep service. This can
280 involve calling either get() or put() multiple times in order
281 to retry after transient failures. However, calling both get()
282 and put() on a single instance -- or using the same instance
283 to access two different Keep services -- will not produce
290 arvados.errors.HttpError,
293 def __init__(self, root, user_agent_pool=queue.LifoQueue(),
295 download_counter=None,
299 self._user_agent_pool = user_agent_pool
300 self._result = {'error': None}
304 self.get_headers = {'Accept': 'application/octet-stream'}
305 self.get_headers.update(headers)
306 self.put_headers = headers
307 self.upload_counter = upload_counter
308 self.download_counter = download_counter
309 self.insecure = insecure
312 """Is it worth attempting a request?"""
316 """Did the request succeed or encounter permanent failure?"""
317 return self._result['error'] == False or not self._usable
319 def last_result(self):
322 def _get_user_agent(self):
324 return self._user_agent_pool.get(block=False)
328 def _put_user_agent(self, ua):
331 self._user_agent_pool.put(ua, block=False)
335 def _socket_open(self, *args, **kwargs):
336 if len(args) + len(kwargs) == 2:
337 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
339 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
341 def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
342 return self._socket_open_pycurl_7_21_5(
344 address=collections.namedtuple(
345 'Address', ['family', 'socktype', 'protocol', 'addr'],
346 )(family, socktype, protocol, address))
348 def _socket_open_pycurl_7_21_5(self, purpose, address):
349 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
350 s = socket.socket(address.family, address.socktype, address.protocol)
351 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
352 # Will throw invalid protocol error on mac. This test prevents that.
353 if hasattr(socket, 'TCP_KEEPIDLE'):
354 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
355 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
359 def get(self, locator, method="GET", timeout=None):
360 # locator is a KeepLocator object.
361 url = self.root + str(locator)
362 _logger.debug("Request: %s %s", method, url)
363 curl = self._get_user_agent()
366 with timer.Timer() as t:
368 response_body = BytesIO()
369 curl.setopt(pycurl.NOSIGNAL, 1)
370 curl.setopt(pycurl.OPENSOCKETFUNCTION,
371 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
372 curl.setopt(pycurl.URL, url.encode('utf-8'))
373 curl.setopt(pycurl.HTTPHEADER, [
374 '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
375 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
376 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
378 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
380 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
382 curl.setopt(pycurl.NOBODY, True)
383 self._setcurltimeouts(curl, timeout, method=="HEAD")
387 except Exception as e:
388 raise arvados.errors.HttpError(0, str(e))
394 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
395 'body': response_body.getvalue(),
396 'headers': self._headers,
400 ok = retry.check_http_response_success(self._result['status_code'])
402 self._result['error'] = arvados.errors.HttpError(
403 self._result['status_code'],
404 self._headers.get('x-status-line', 'Error'))
405 except self.HTTP_ERRORS as e:
409 self._usable = ok != False
410 if self._result.get('status_code', None):
411 # The client worked well enough to get an HTTP status
412 # code, so presumably any problems are just on the
413 # server side and it's OK to reuse the client.
414 self._put_user_agent(curl)
416 # Don't return this client to the pool, in case it's
420 _logger.debug("Request fail: GET %s => %s: %s",
421 url, type(self._result['error']), str(self._result['error']))
424 _logger.info("HEAD %s: %s bytes",
425 self._result['status_code'],
426 self._result.get('content-length'))
427 if self._result['headers'].get('x-keep-locator'):
428 # This is a response to a remote block copy request, return
429 # the local copy block locator.
430 return self._result['headers'].get('x-keep-locator')
433 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
434 self._result['status_code'],
435 len(self._result['body']),
437 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
439 if self.download_counter:
440 self.download_counter.add(len(self._result['body']))
441 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
442 if resp_md5 != locator.md5sum:
443 _logger.warning("Checksum fail: md5(%s) = %s",
445 self._result['error'] = arvados.errors.HttpError(
448 return self._result['body']
450 def put(self, hash_s, body, timeout=None, headers={}):
451 put_headers = copy.copy(self.put_headers)
452 put_headers.update(headers)
453 url = self.root + hash_s
454 _logger.debug("Request: PUT %s", url)
455 curl = self._get_user_agent()
458 with timer.Timer() as t:
460 body_reader = BytesIO(body)
461 response_body = BytesIO()
462 curl.setopt(pycurl.NOSIGNAL, 1)
463 curl.setopt(pycurl.OPENSOCKETFUNCTION,
464 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
465 curl.setopt(pycurl.URL, url.encode('utf-8'))
466 # Using UPLOAD tells cURL to wait for a "go ahead" from the
467 # Keep server (in the form of a HTTP/1.1 "100 Continue"
468 # response) instead of sending the request body immediately.
469 # This allows the server to reject the request if the request
470 # is invalid or the server is read-only, without waiting for
471 # the client to send the entire block.
472 curl.setopt(pycurl.UPLOAD, True)
473 curl.setopt(pycurl.INFILESIZE, len(body))
474 curl.setopt(pycurl.READFUNCTION, body_reader.read)
475 curl.setopt(pycurl.HTTPHEADER, [
476 '{}: {}'.format(k,v) for k,v in put_headers.items()])
477 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
478 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
480 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
482 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
483 self._setcurltimeouts(curl, timeout)
486 except Exception as e:
487 raise arvados.errors.HttpError(0, str(e))
493 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
494 'body': response_body.getvalue().decode('utf-8'),
495 'headers': self._headers,
498 ok = retry.check_http_response_success(self._result['status_code'])
500 self._result['error'] = arvados.errors.HttpError(
501 self._result['status_code'],
502 self._headers.get('x-status-line', 'Error'))
503 except self.HTTP_ERRORS as e:
507 self._usable = ok != False # still usable if ok is True or None
508 if self._result.get('status_code', None):
509 # Client is functional. See comment in get().
510 self._put_user_agent(curl)
514 _logger.debug("Request fail: PUT %s => %s: %s",
515 url, type(self._result['error']), str(self._result['error']))
517 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
518 self._result['status_code'],
521 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
522 if self.upload_counter:
523 self.upload_counter.add(len(body))
526 def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
529 elif isinstance(timeouts, tuple):
530 if len(timeouts) == 2:
531 conn_t, xfer_t = timeouts
532 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
534 conn_t, xfer_t, bandwidth_bps = timeouts
536 conn_t, xfer_t = (timeouts, timeouts)
537 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
538 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
539 if not ignore_bandwidth:
540 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
541 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
543 def _headerfunction(self, header_line):
544 if isinstance(header_line, bytes):
545 header_line = header_line.decode('iso-8859-1')
546 if ':' in header_line:
547 name, value = header_line.split(':', 1)
548 name = name.strip().lower()
549 value = value.strip()
551 name = self._lastheadername
552 value = self._headers[name] + ' ' + header_line.strip()
553 elif header_line.startswith('HTTP/'):
554 name = 'x-status-line'
557 _logger.error("Unexpected header line: %s", header_line)
559 self._lastheadername = name
560 self._headers[name] = value
561 # Returning None implies all bytes were written
564 class KeepWriterQueue(queue.Queue):
565 def __init__(self, copies, classes=[]):
566 queue.Queue.__init__(self) # Old-style superclass
567 self.wanted_copies = copies
568 self.wanted_storage_classes = classes
569 self.successful_copies = 0
570 self.confirmed_storage_classes = {}
572 self.storage_classes_tracking = True
573 self.queue_data_lock = threading.RLock()
574 self.pending_tries = max(copies, len(classes))
575 self.pending_tries_notification = threading.Condition()
577 def write_success(self, response, replicas_nr, classes_confirmed):
578 with self.queue_data_lock:
579 self.successful_copies += replicas_nr
580 if classes_confirmed is None:
581 self.storage_classes_tracking = False
582 elif self.storage_classes_tracking:
583 for st_class, st_copies in classes_confirmed.items():
585 self.confirmed_storage_classes[st_class] += st_copies
587 self.confirmed_storage_classes[st_class] = st_copies
588 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
589 self.response = response
590 with self.pending_tries_notification:
591 self.pending_tries_notification.notify_all()
593 def write_fail(self, ks):
594 with self.pending_tries_notification:
595 self.pending_tries += 1
596 self.pending_tries_notification.notify()
598 def pending_copies(self):
599 with self.queue_data_lock:
600 return self.wanted_copies - self.successful_copies
602 def satisfied_classes(self):
603 with self.queue_data_lock:
604 if not self.storage_classes_tracking:
605 # Notifies disabled storage classes expectation to
608 return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
610 def pending_classes(self):
611 with self.queue_data_lock:
612 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
614 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
615 for st_class, st_copies in self.confirmed_storage_classes.items():
616 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
617 unsatisfied_classes.remove(st_class)
618 return unsatisfied_classes
620 def get_next_task(self):
621 with self.pending_tries_notification:
623 if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
624 # This notify_all() is unnecessary --
625 # write_success() already called notify_all()
626 # when pending<1 became true, so it's not
627 # possible for any other thread to be in
628 # wait() now -- but it's cheap insurance
629 # against deadlock so we do it anyway:
630 self.pending_tries_notification.notify_all()
631 # Drain the queue and then raise Queue.Empty
635 elif self.pending_tries > 0:
636 service, service_root = self.get_nowait()
637 if service.finished():
640 self.pending_tries -= 1
641 return service, service_root
643 self.pending_tries_notification.notify_all()
646 self.pending_tries_notification.wait()
649 class KeepWriterThreadPool(object):
650 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
651 self.total_task_nr = 0
652 if (not max_service_replicas) or (max_service_replicas >= copies):
655 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
656 _logger.debug("Pool max threads is %d", num_threads)
658 self.queue = KeepClient.KeepWriterQueue(copies, classes)
660 for _ in range(num_threads):
661 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
662 self.workers.append(w)
664 def add_task(self, ks, service_root):
665 self.queue.put((ks, service_root))
666 self.total_task_nr += 1
669 return self.queue.successful_copies, self.queue.satisfied_classes()
673 for worker in self.workers:
675 # Wait for finished work
679 return self.queue.response
682 class KeepWriterThread(threading.Thread):
683 class TaskFailed(RuntimeError): pass
685 def __init__(self, queue, data, data_hash, timeout=None):
686 super(KeepClient.KeepWriterThread, self).__init__()
687 self.timeout = timeout
690 self.data_hash = data_hash
696 service, service_root = self.queue.get_next_task()
700 locator, copies, classes = self.do_task(service, service_root)
701 except Exception as e:
702 if not isinstance(e, self.TaskFailed):
703 _logger.exception("Exception in KeepWriterThread")
704 self.queue.write_fail(service)
706 self.queue.write_success(locator, copies, classes)
708 self.queue.task_done()
710 def do_task(self, service, service_root):
711 classes = self.queue.pending_classes()
715 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
716 success = bool(service.put(self.data_hash,
718 timeout=self.timeout,
720 result = service.last_result()
723 if result.get('status_code'):
724 _logger.debug("Request fail: PUT %s => %s %s",
726 result.get('status_code'),
728 raise self.TaskFailed()
730 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
731 str(threading.current_thread()),
736 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
737 except (KeyError, ValueError):
740 classes_confirmed = {}
742 scch = result['headers']['x-keep-storage-classes-confirmed']
743 for confirmation in scch.replace(' ', '').split(','):
744 if '=' in confirmation:
745 stored_class, stored_copies = confirmation.split('=')[:2]
746 classes_confirmed[stored_class] = int(stored_copies)
747 except (KeyError, ValueError):
748 # Storage classes confirmed header missing or corrupt
749 classes_confirmed = None
751 return result['body'].strip(), replicas_stored, classes_confirmed
754 def __init__(self, api_client=None, proxy=None,
755 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
756 api_token=None, local_store=None, block_cache=None,
757 num_retries=0, session=None):
758 """Initialize a new KeepClient.
762 The API client to use to find Keep services. If not
763 provided, KeepClient will build one from available Arvados
767 If specified, this KeepClient will send requests to this Keep
768 proxy. Otherwise, KeepClient will fall back to the setting of the
769 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
770 If you want to KeepClient does not use a proxy, pass in an empty
774 The initial timeout (in seconds) for HTTP requests to Keep
775 non-proxy servers. A tuple of three floats is interpreted as
776 (connection_timeout, read_timeout, minimum_bandwidth). A connection
777 will be aborted if the average traffic rate falls below
778 minimum_bandwidth bytes per second over an interval of read_timeout
779 seconds. Because timeouts are often a result of transient server
780 load, the actual connection timeout will be increased by a factor
781 of two on each retry.
782 Default: (2, 256, 32768).
785 The initial timeout (in seconds) for HTTP requests to
786 Keep proxies. A tuple of three floats is interpreted as
787 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
788 described above for adjusting connection timeouts on retry also
790 Default: (20, 256, 32768).
793 If you're not using an API client, but only talking
794 directly to a Keep proxy, this parameter specifies an API token
795 to authenticate Keep requests. It is an error to specify both
796 api_client and api_token. If you specify neither, KeepClient
797 will use one available from the Arvados configuration.
800 If specified, this KeepClient will bypass Keep
801 services, and save data to the named directory. If unspecified,
802 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
803 environment variable. If you want to ensure KeepClient does not
804 use local storage, pass in an empty string. This is primarily
805 intended to mock a server for testing.
808 The default number of times to retry failed requests.
809 This will be used as the default num_retries value when get() and
810 put() are called. Default 0.
812 self.lock = threading.Lock()
814 if config.get('ARVADOS_KEEP_SERVICES'):
815 proxy = config.get('ARVADOS_KEEP_SERVICES')
817 proxy = config.get('ARVADOS_KEEP_PROXY')
818 if api_token is None:
819 if api_client is None:
820 api_token = config.get('ARVADOS_API_TOKEN')
822 api_token = api_client.api_token
823 elif api_client is not None:
825 "can't build KeepClient with both API client and token")
826 if local_store is None:
827 local_store = os.environ.get('KEEP_LOCAL_STORE')
829 if api_client is None:
830 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
832 self.insecure = api_client.insecure
834 self.block_cache = block_cache if block_cache else KeepBlockCache()
835 self.timeout = timeout
836 self.proxy_timeout = proxy_timeout
837 self._user_agent_pool = queue.LifoQueue()
838 self.upload_counter = Counter()
839 self.download_counter = Counter()
840 self.put_counter = Counter()
841 self.get_counter = Counter()
842 self.hits_counter = Counter()
843 self.misses_counter = Counter()
844 self._storage_classes_unsupported_warning = False
845 self._default_classes = []
848 self.local_store = local_store
849 self.head = self.local_store_head
850 self.get = self.local_store_get
851 self.put = self.local_store_put
853 self.num_retries = num_retries
854 self.max_replicas_per_service = None
856 proxy_uris = proxy.split()
857 for i in range(len(proxy_uris)):
858 if not proxy_uris[i].endswith('/'):
861 url = urllib.parse.urlparse(proxy_uris[i])
862 if not (url.scheme and url.netloc):
863 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
864 self.api_token = api_token
865 self._gateway_services = {}
866 self._keep_services = [{
867 'uuid': "00000-bi6l4-%015d" % idx,
868 'service_type': 'proxy',
869 '_service_root': uri,
870 } for idx, uri in enumerate(proxy_uris)]
871 self._writable_services = self._keep_services
872 self.using_proxy = True
873 self._static_services_list = True
875 # It's important to avoid instantiating an API client
876 # unless we actually need one, for testing's sake.
877 if api_client is None:
878 api_client = arvados.api('v1')
879 self.api_client = api_client
880 self.api_token = api_client.api_token
881 self._gateway_services = {}
882 self._keep_services = None
883 self._writable_services = None
884 self.using_proxy = None
885 self._static_services_list = False
887 self._default_classes = [
888 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
890 # We're talking to an old cluster
893 def current_timeout(self, attempt_number):
894 """Return the appropriate timeout to use for this client.
896 The proxy timeout setting if the backend service is currently a proxy,
897 the regular timeout setting otherwise. The `attempt_number` indicates
898 how many times the operation has been tried already (starting from 0
899 for the first try), and scales the connection timeout portion of the
900 return value accordingly.
903 # TODO(twp): the timeout should be a property of a
904 # KeepService, not a KeepClient. See #4488.
905 t = self.proxy_timeout if self.using_proxy else self.timeout
907 return (t[0] * (1 << attempt_number), t[1])
909 return (t[0] * (1 << attempt_number), t[1], t[2])
910 def _any_nondisk_services(self, service_list):
911 return any(ks.get('service_type', 'disk') != 'disk'
912 for ks in service_list)
914 def build_services_list(self, force_rebuild=False):
915 if (self._static_services_list or
916 (self._keep_services and not force_rebuild)):
920 keep_services = self.api_client.keep_services().accessible()
921 except Exception: # API server predates Keep services.
922 keep_services = self.api_client.keep_disks().list()
924 # Gateway services are only used when specified by UUID,
925 # so there's nothing to gain by filtering them by
927 self._gateway_services = {ks['uuid']: ks for ks in
928 keep_services.execute()['items']}
929 if not self._gateway_services:
930 raise arvados.errors.NoKeepServersError()
932 # Precompute the base URI for each service.
933 for r in self._gateway_services.values():
934 host = r['service_host']
935 if not host.startswith('[') and host.find(':') >= 0:
936 # IPv6 URIs must be formatted like http://[::1]:80/...
937 host = '[' + host + ']'
938 r['_service_root'] = "{}://{}:{:d}/".format(
939 'https' if r['service_ssl_flag'] else 'http',
943 _logger.debug(str(self._gateway_services))
944 self._keep_services = [
945 ks for ks in self._gateway_services.values()
946 if not ks.get('service_type', '').startswith('gateway:')]
947 self._writable_services = [ks for ks in self._keep_services
948 if not ks.get('read_only')]
950 # For disk type services, max_replicas_per_service is 1
951 # It is unknown (unlimited) for other service types.
952 if self._any_nondisk_services(self._writable_services):
953 self.max_replicas_per_service = None
955 self.max_replicas_per_service = 1
957 def _service_weight(self, data_hash, service_uuid):
958 """Compute the weight of a Keep service endpoint for a data
959 block with a known hash.
961 The weight is md5(h + u) where u is the last 15 characters of
962 the service endpoint's UUID.
964 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
966 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
967 """Return an array of Keep service endpoints, in the order in
968 which they should be probed when reading or writing data with
969 the given hash+hints.
971 self.build_services_list(force_rebuild)
974 # Use the services indicated by the given +K@... remote
975 # service hints, if any are present and can be resolved to a
977 for hint in locator.hints:
978 if hint.startswith('K@'):
981 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
982 elif len(hint) == 29:
983 svc = self._gateway_services.get(hint[2:])
985 sorted_roots.append(svc['_service_root'])
987 # Sort the available local services by weight (heaviest first)
988 # for this locator, and return their service_roots (base URIs)
990 use_services = self._keep_services
992 use_services = self._writable_services
993 self.using_proxy = self._any_nondisk_services(use_services)
994 sorted_roots.extend([
995 svc['_service_root'] for svc in sorted(
998 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
999 _logger.debug("{}: {}".format(locator, sorted_roots))
1002 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1003 # roots_map is a dictionary, mapping Keep service root strings
1004 # to KeepService objects. Poll for Keep services, and add any
1005 # new ones to roots_map. Return the current list of local
1007 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1008 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1009 for root in local_roots:
1010 if root not in roots_map:
1011 roots_map[root] = self.KeepService(
1012 root, self._user_agent_pool,
1013 upload_counter=self.upload_counter,
1014 download_counter=self.download_counter,
1016 insecure=self.insecure)
1020 def _check_loop_result(result):
1021 # KeepClient RetryLoops should save results as a 2-tuple: the
1022 # actual result of the request, and the number of servers available
1023 # to receive the request this round.
1024 # This method returns True if there's a real result, False if
1025 # there are no more servers available, otherwise None.
1026 if isinstance(result, Exception):
1028 result, tried_server_count = result
1029 if (result is not None) and (result is not False):
1031 elif tried_server_count < 1:
1032 _logger.info("No more Keep services to try; giving up")
1037 def get_from_cache(self, loc):
1038 """Fetch a block only if is in the cache, otherwise return None."""
1039 slot = self.block_cache.get(loc)
1040 if slot is not None and slot.ready.is_set():
1045 def refresh_signature(self, loc):
1046 """Ask Keep to get the remote block and return its local signature"""
1047 now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1048 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1051 def head(self, loc_s, **kwargs):
1052 return self._get_or_head(loc_s, method="HEAD", **kwargs)
1055 def get(self, loc_s, **kwargs):
1056 return self._get_or_head(loc_s, method="GET", **kwargs)
1058 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None):
1059 """Get data from Keep.
1061 This method fetches one or more blocks of data from Keep. It
1062 sends a request each Keep service registered with the API
1063 server (or the proxy provided when this client was
1064 instantiated), then each service named in location hints, in
1065 sequence. As soon as one service provides the data, it's
1069 * loc_s: A string of one or more comma-separated locators to fetch.
1070 This method returns the concatenation of these blocks.
1071 * num_retries: The number of times to retry GET requests to
1072 *each* Keep server if it returns temporary failures, with
1073 exponential backoff. Note that, in each loop, the method may try
1074 to fetch data from every available Keep service, along with any
1075 that are named in location hints in the locator. The default value
1076 is set when the KeepClient is initialized.
1079 return ''.join(self.get(x) for x in loc_s.split(','))
1081 self.get_counter.add(1)
1083 request_id = (request_id or
1084 (hasattr(self, 'api_client') and self.api_client.request_id) or
1085 arvados.util.new_request_id())
1088 headers['X-Request-Id'] = request_id
1093 locator = KeepLocator(loc_s)
1095 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1097 self.hits_counter.add(1)
1100 raise arvados.errors.KeepReadError(
1101 "failed to read {}".format(loc_s))
1104 self.misses_counter.add(1)
1106 # If the locator has hints specifying a prefix (indicating a
1107 # remote keepproxy) or the UUID of a local gateway service,
1108 # read data from the indicated service(s) instead of the usual
1109 # list of local disk services.
1110 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1111 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1112 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1113 for hint in locator.hints if (
1114 hint.startswith('K@') and
1116 self._gateway_services.get(hint[2:])
1118 # Map root URLs to their KeepService objects.
1120 root: self.KeepService(root, self._user_agent_pool,
1121 upload_counter=self.upload_counter,
1122 download_counter=self.download_counter,
1124 insecure=self.insecure)
1125 for root in hint_roots
1128 # See #3147 for a discussion of the loop implementation. Highlights:
1129 # * Refresh the list of Keep services after each failure, in case
1130 # it's being updated.
1131 # * Retry until we succeed, we're out of retries, or every available
1132 # service has returned permanent failure.
1135 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1137 for tries_left in loop:
1139 sorted_roots = self.map_new_services(
1141 force_rebuild=(tries_left < num_retries),
1142 need_writable=False,
1144 except Exception as error:
1145 loop.save_result(error)
1148 # Query KeepService objects that haven't returned
1149 # permanent failure, in our specified shuffle order.
1150 services_to_try = [roots_map[root]
1151 for root in sorted_roots
1152 if roots_map[root].usable()]
1153 for keep_service in services_to_try:
1154 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1155 if blob is not None:
1157 loop.save_result((blob, len(services_to_try)))
1159 # Always cache the result, then return it if we succeeded.
1163 if slot is not None:
1165 self.block_cache.cap_cache()
1167 # Q: Including 403 is necessary for the Keep tests to continue
1168 # passing, but maybe they should expect KeepReadError instead?
1169 not_founds = sum(1 for key in sorted_roots
1170 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1171 service_errors = ((key, roots_map[key].last_result()['error'])
1172 for key in sorted_roots)
1174 raise arvados.errors.KeepReadError(
1175 "[{}] failed to read {}: no Keep services available ({})".format(
1176 request_id, loc_s, loop.last_result()))
1177 elif not_founds == len(sorted_roots):
1178 raise arvados.errors.NotFoundError(
1179 "[{}] {} not found".format(request_id, loc_s), service_errors)
1181 raise arvados.errors.KeepReadError(
1182 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1185 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1186 """Save data in Keep.
1188 This method will get a list of Keep services from the API server, and
1189 send the data to each one simultaneously in a new thread. Once the
1190 uploads are finished, if enough copies are saved, this method returns
1191 the most recent HTTP response body. If requests fail to upload
1192 enough copies, this method raises KeepWriteError.
1195 * data: The string of data to upload.
1196 * copies: The number of copies that the user requires be saved.
1198 * num_retries: The number of times to retry PUT requests to
1199 *each* Keep server if it returns temporary failures, with
1200 exponential backoff. The default value is set when the
1201 KeepClient is initialized.
1202 * classes: An optional list of storage class names where copies should
1206 classes = classes or self._default_classes
1208 if not isinstance(data, bytes):
1209 data = data.encode()
1211 self.put_counter.add(1)
1213 data_hash = hashlib.md5(data).hexdigest()
1214 loc_s = data_hash + '+' + str(len(data))
1217 locator = KeepLocator(loc_s)
1219 request_id = (request_id or
1220 (hasattr(self, 'api_client') and self.api_client.request_id) or
1221 arvados.util.new_request_id())
1223 'X-Request-Id': request_id,
1224 'X-Keep-Desired-Replicas': str(copies),
1227 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1231 for tries_left in loop:
1233 sorted_roots = self.map_new_services(
1235 force_rebuild=(tries_left < num_retries),
1238 except Exception as error:
1239 loop.save_result(error)
1242 pending_classes = []
1243 if done_classes is not None:
1244 pending_classes = list(set(classes) - set(done_classes))
1245 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1246 data_hash=data_hash,
1247 copies=copies - done_copies,
1248 max_service_replicas=self.max_replicas_per_service,
1249 timeout=self.current_timeout(num_retries - tries_left),
1250 classes=pending_classes)
1251 for service_root, ks in [(root, roots_map[root])
1252 for root in sorted_roots]:
1255 writer_pool.add_task(ks, service_root)
1257 pool_copies, pool_classes = writer_pool.done()
1258 done_copies += pool_copies
1259 if (done_classes is not None) and (pool_classes is not None):
1260 done_classes += pool_classes
1262 (done_copies >= copies and set(done_classes) == set(classes),
1263 writer_pool.total_task_nr))
1265 # Old keepstore contacted without storage classes support:
1266 # success is determined only by successful copies.
1268 # Disable storage classes tracking from this point forward.
1269 if not self._storage_classes_unsupported_warning:
1270 self._storage_classes_unsupported_warning = True
1271 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1274 (done_copies >= copies, writer_pool.total_task_nr))
1277 return writer_pool.response()
1279 raise arvados.errors.KeepWriteError(
1280 "[{}] failed to write {}: no Keep services available ({})".format(
1281 request_id, data_hash, loop.last_result()))
1283 service_errors = ((key, roots_map[key].last_result()['error'])
1284 for key in sorted_roots
1285 if roots_map[key].last_result()['error'])
1286 raise arvados.errors.KeepWriteError(
1287 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1288 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1290 def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1291 """A stub for put().
1293 This method is used in place of the real put() method when
1294 using local storage (see constructor's local_store argument).
1296 copies and num_retries arguments are ignored: they are here
1297 only for the sake of offering the same call signature as
1300 Data stored this way can be retrieved via local_store_get().
1302 md5 = hashlib.md5(data).hexdigest()
1303 locator = '%s+%d' % (md5, len(data))
1304 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1306 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1307 os.path.join(self.local_store, md5))
1310 def local_store_get(self, loc_s, num_retries=None):
1311 """Companion to local_store_put()."""
1313 locator = KeepLocator(loc_s)
1315 raise arvados.errors.NotFoundError(
1316 "Invalid data locator: '%s'" % loc_s)
1317 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1319 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1322 def local_store_head(self, loc_s, num_retries=None):
1323 """Companion to local_store_put()."""
1325 locator = KeepLocator(loc_s)
1327 raise arvados.errors.NotFoundError(
1328 "Invalid data locator: '%s'" % loc_s)
1329 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1331 if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1334 def is_cached(self, locator):
1335 return self.block_cache.reserve_cache(expect_hash)