1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from __future__ import division
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
32 if sys.version_info >= (3, 0):
33 from io import BytesIO
35 from cStringIO import StringIO as BytesIO
38 import arvados.config as config
40 import arvados.retry as retry
43 _logger = logging.getLogger('arvados.keep')
44 global_client_object = None
47 # Monkey patch TCP constants when not available (apple). Values sourced from:
48 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
49 if sys.platform == 'darwin':
50 if not hasattr(socket, 'TCP_KEEPALIVE'):
51 socket.TCP_KEEPALIVE = 0x010
52 if not hasattr(socket, 'TCP_KEEPINTVL'):
53 socket.TCP_KEEPINTVL = 0x101
54 if not hasattr(socket, 'TCP_KEEPCNT'):
55 socket.TCP_KEEPCNT = 0x102
58 class KeepLocator(object):
59 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
60 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
62 def __init__(self, locator_str):
65 self._perm_expiry = None
66 pieces = iter(locator_str.split('+'))
67 self.md5sum = next(pieces)
69 self.size = int(next(pieces))
73 if self.HINT_RE.match(hint) is None:
74 raise ValueError("invalid hint format: {}".format(hint))
75 elif hint.startswith('A'):
76 self.parse_permission_hint(hint)
78 self.hints.append(hint)
83 for s in [self.md5sum, self.size,
84 self.permission_hint()] + self.hints
88 if self.size is not None:
89 return "%s+%i" % (self.md5sum, self.size)
93 def _make_hex_prop(name, length):
94 # Build and return a new property with the given name that
95 # must be a hex string of the given length.
96 data_name = '_{}'.format(name)
98 return getattr(self, data_name)
99 def setter(self, hex_str):
100 if not arvados.util.is_hex(hex_str, length):
101 raise ValueError("{} is not a {}-digit hex string: {!r}".
102 format(name, length, hex_str))
103 setattr(self, data_name, hex_str)
104 return property(getter, setter)
106 md5sum = _make_hex_prop('md5sum', 32)
107 perm_sig = _make_hex_prop('perm_sig', 40)
110 def perm_expiry(self):
111 return self._perm_expiry
114 def perm_expiry(self, value):
115 if not arvados.util.is_hex(value, 1, 8):
117 "permission timestamp must be a hex Unix timestamp: {}".
119 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
121 def permission_hint(self):
122 data = [self.perm_sig, self.perm_expiry]
125 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
126 return "A{}@{:08x}".format(*data)
128 def parse_permission_hint(self, s):
130 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
132 raise ValueError("bad permission hint {}".format(s))
134 def permission_expired(self, as_of_dt=None):
135 if self.perm_expiry is None:
137 elif as_of_dt is None:
138 as_of_dt = datetime.datetime.now()
139 return self.perm_expiry <= as_of_dt
143 """Simple interface to a global KeepClient object.
145 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
146 own API client. The global KeepClient will build an API client from the
147 current Arvados configuration, which may not match the one you built.
152 def global_client_object(cls):
153 global global_client_object
154 # Previously, KeepClient would change its behavior at runtime based
155 # on these configuration settings. We simulate that behavior here
156 # by checking the values and returning a new KeepClient if any of
158 key = (config.get('ARVADOS_API_HOST'),
159 config.get('ARVADOS_API_TOKEN'),
160 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
161 config.get('ARVADOS_KEEP_PROXY'),
162 os.environ.get('KEEP_LOCAL_STORE'))
163 if (global_client_object is None) or (cls._last_key != key):
164 global_client_object = KeepClient()
166 return global_client_object
169 def get(locator, **kwargs):
170 return Keep.global_client_object().get(locator, **kwargs)
173 def put(data, **kwargs):
174 return Keep.global_client_object().put(data, **kwargs)
176 class KeepBlockCache(object):
177 # Default RAM cache is 256MiB
178 def __init__(self, cache_max=(256 * 1024 * 1024)):
179 self.cache_max = cache_max
181 self._cache_lock = threading.Lock()
183 class CacheSlot(object):
184 __slots__ = ("locator", "ready", "content")
186 def __init__(self, locator):
187 self.locator = locator
188 self.ready = threading.Event()
195 def set(self, value):
200 if self.content is None:
203 return len(self.content)
206 '''Cap the cache size to self.cache_max'''
207 with self._cache_lock:
208 # Select all slots except those where ready.is_set() and content is
209 # None (that means there was an error reading the block).
210 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
211 sm = sum([slot.size() for slot in self._cache])
212 while len(self._cache) > 0 and sm > self.cache_max:
213 for i in range(len(self._cache)-1, -1, -1):
214 if self._cache[i].ready.is_set():
217 sm = sum([slot.size() for slot in self._cache])
219 def _get(self, locator):
220 # Test if the locator is already in the cache
221 for i in range(0, len(self._cache)):
222 if self._cache[i].locator == locator:
225 # move it to the front
227 self._cache.insert(0, n)
231 def get(self, locator):
232 with self._cache_lock:
233 return self._get(locator)
235 def reserve_cache(self, locator):
236 '''Reserve a cache slot for the specified locator,
237 or return the existing slot.'''
238 with self._cache_lock:
239 n = self._get(locator)
243 # Add a new cache slot for the locator
244 n = KeepBlockCache.CacheSlot(locator)
245 self._cache.insert(0, n)
248 class Counter(object):
249 def __init__(self, v=0):
250 self._lk = threading.Lock()
262 class KeepClient(object):
264 # Default Keep server connection timeout: 2 seconds
265 # Default Keep server read timeout: 256 seconds
266 # Default Keep server bandwidth minimum: 32768 bytes per second
267 # Default Keep proxy connection timeout: 20 seconds
268 # Default Keep proxy read timeout: 256 seconds
269 # Default Keep proxy bandwidth minimum: 32768 bytes per second
270 DEFAULT_TIMEOUT = (2, 256, 32768)
271 DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
274 class KeepService(object):
275 """Make requests to a single Keep service, and track results.
277 A KeepService is intended to last long enough to perform one
278 transaction (GET or PUT) against one Keep service. This can
279 involve calling either get() or put() multiple times in order
280 to retry after transient failures. However, calling both get()
281 and put() on a single instance -- or using the same instance
282 to access two different Keep services -- will not produce
289 arvados.errors.HttpError,
292 def __init__(self, root, user_agent_pool=queue.LifoQueue(),
294 download_counter=None,
298 self._user_agent_pool = user_agent_pool
299 self._result = {'error': None}
303 self.get_headers = {'Accept': 'application/octet-stream'}
304 self.get_headers.update(headers)
305 self.put_headers = headers
306 self.upload_counter = upload_counter
307 self.download_counter = download_counter
308 self.insecure = insecure
311 """Is it worth attempting a request?"""
315 """Did the request succeed or encounter permanent failure?"""
316 return self._result['error'] == False or not self._usable
318 def last_result(self):
321 def _get_user_agent(self):
323 return self._user_agent_pool.get(block=False)
327 def _put_user_agent(self, ua):
330 self._user_agent_pool.put(ua, block=False)
334 def _socket_open(self, *args, **kwargs):
335 if len(args) + len(kwargs) == 2:
336 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
338 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
340 def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
341 return self._socket_open_pycurl_7_21_5(
343 address=collections.namedtuple(
344 'Address', ['family', 'socktype', 'protocol', 'addr'],
345 )(family, socktype, protocol, address))
347 def _socket_open_pycurl_7_21_5(self, purpose, address):
348 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
349 s = socket.socket(address.family, address.socktype, address.protocol)
350 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
351 # Will throw invalid protocol error on mac. This test prevents that.
352 if hasattr(socket, 'TCP_KEEPIDLE'):
353 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
354 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
358 def get(self, locator, method="GET", timeout=None):
359 # locator is a KeepLocator object.
360 url = self.root + str(locator)
361 _logger.debug("Request: %s %s", method, url)
362 curl = self._get_user_agent()
365 with timer.Timer() as t:
367 response_body = BytesIO()
368 curl.setopt(pycurl.NOSIGNAL, 1)
369 curl.setopt(pycurl.OPENSOCKETFUNCTION,
370 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
371 curl.setopt(pycurl.URL, url.encode('utf-8'))
372 curl.setopt(pycurl.HTTPHEADER, [
373 '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
374 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
375 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
377 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
378 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
380 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
382 curl.setopt(pycurl.NOBODY, True)
383 self._setcurltimeouts(curl, timeout, method=="HEAD")
387 except Exception as e:
388 raise arvados.errors.HttpError(0, str(e))
394 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
395 'body': response_body.getvalue(),
396 'headers': self._headers,
400 ok = retry.check_http_response_success(self._result['status_code'])
402 self._result['error'] = arvados.errors.HttpError(
403 self._result['status_code'],
404 self._headers.get('x-status-line', 'Error'))
405 except self.HTTP_ERRORS as e:
409 self._usable = ok != False
410 if self._result.get('status_code', None):
411 # The client worked well enough to get an HTTP status
412 # code, so presumably any problems are just on the
413 # server side and it's OK to reuse the client.
414 self._put_user_agent(curl)
416 # Don't return this client to the pool, in case it's
420 _logger.debug("Request fail: GET %s => %s: %s",
421 url, type(self._result['error']), str(self._result['error']))
424 _logger.info("HEAD %s: %s bytes",
425 self._result['status_code'],
426 self._result.get('content-length'))
427 if self._result['headers'].get('x-keep-locator'):
428 # This is a response to a remote block copy request, return
429 # the local copy block locator.
430 return self._result['headers'].get('x-keep-locator')
433 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
434 self._result['status_code'],
435 len(self._result['body']),
437 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
439 if self.download_counter:
440 self.download_counter.add(len(self._result['body']))
441 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
442 if resp_md5 != locator.md5sum:
443 _logger.warning("Checksum fail: md5(%s) = %s",
445 self._result['error'] = arvados.errors.HttpError(
448 return self._result['body']
450 def put(self, hash_s, body, timeout=None, headers={}):
451 put_headers = copy.copy(self.put_headers)
452 put_headers.update(headers)
453 url = self.root + hash_s
454 _logger.debug("Request: PUT %s", url)
455 curl = self._get_user_agent()
458 with timer.Timer() as t:
460 body_reader = BytesIO(body)
461 response_body = BytesIO()
462 curl.setopt(pycurl.NOSIGNAL, 1)
463 curl.setopt(pycurl.OPENSOCKETFUNCTION,
464 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
465 curl.setopt(pycurl.URL, url.encode('utf-8'))
466 # Using UPLOAD tells cURL to wait for a "go ahead" from the
467 # Keep server (in the form of a HTTP/1.1 "100 Continue"
468 # response) instead of sending the request body immediately.
469 # This allows the server to reject the request if the request
470 # is invalid or the server is read-only, without waiting for
471 # the client to send the entire block.
472 curl.setopt(pycurl.UPLOAD, True)
473 curl.setopt(pycurl.INFILESIZE, len(body))
474 curl.setopt(pycurl.READFUNCTION, body_reader.read)
475 curl.setopt(pycurl.HTTPHEADER, [
476 '{}: {}'.format(k,v) for k,v in put_headers.items()])
477 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
478 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
480 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
481 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
483 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
484 self._setcurltimeouts(curl, timeout)
487 except Exception as e:
488 raise arvados.errors.HttpError(0, str(e))
494 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
495 'body': response_body.getvalue().decode('utf-8'),
496 'headers': self._headers,
499 ok = retry.check_http_response_success(self._result['status_code'])
501 self._result['error'] = arvados.errors.HttpError(
502 self._result['status_code'],
503 self._headers.get('x-status-line', 'Error'))
504 except self.HTTP_ERRORS as e:
508 self._usable = ok != False # still usable if ok is True or None
509 if self._result.get('status_code', None):
510 # Client is functional. See comment in get().
511 self._put_user_agent(curl)
515 _logger.debug("Request fail: PUT %s => %s: %s",
516 url, type(self._result['error']), str(self._result['error']))
518 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
519 self._result['status_code'],
522 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
523 if self.upload_counter:
524 self.upload_counter.add(len(body))
527 def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
530 elif isinstance(timeouts, tuple):
531 if len(timeouts) == 2:
532 conn_t, xfer_t = timeouts
533 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
535 conn_t, xfer_t, bandwidth_bps = timeouts
537 conn_t, xfer_t = (timeouts, timeouts)
538 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
539 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
540 if not ignore_bandwidth:
541 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
542 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
544 def _headerfunction(self, header_line):
545 if isinstance(header_line, bytes):
546 header_line = header_line.decode('iso-8859-1')
547 if ':' in header_line:
548 name, value = header_line.split(':', 1)
549 name = name.strip().lower()
550 value = value.strip()
552 name = self._lastheadername
553 value = self._headers[name] + ' ' + header_line.strip()
554 elif header_line.startswith('HTTP/'):
555 name = 'x-status-line'
558 _logger.error("Unexpected header line: %s", header_line)
560 self._lastheadername = name
561 self._headers[name] = value
562 # Returning None implies all bytes were written
565 class KeepWriterQueue(queue.Queue):
566 def __init__(self, copies, classes=[]):
567 queue.Queue.__init__(self) # Old-style superclass
568 self.wanted_copies = copies
569 self.wanted_storage_classes = classes
570 self.successful_copies = 0
571 self.confirmed_storage_classes = {}
573 self.storage_classes_tracking = True
574 self.queue_data_lock = threading.RLock()
575 self.pending_tries = max(copies, len(classes))
576 self.pending_tries_notification = threading.Condition()
578 def write_success(self, response, replicas_nr, classes_confirmed):
579 with self.queue_data_lock:
580 self.successful_copies += replicas_nr
581 if classes_confirmed is None:
582 self.storage_classes_tracking = False
583 elif self.storage_classes_tracking:
584 for st_class, st_copies in classes_confirmed.items():
586 self.confirmed_storage_classes[st_class] += st_copies
588 self.confirmed_storage_classes[st_class] = st_copies
589 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
590 self.response = response
591 with self.pending_tries_notification:
592 self.pending_tries_notification.notify_all()
594 def write_fail(self, ks):
595 with self.pending_tries_notification:
596 self.pending_tries += 1
597 self.pending_tries_notification.notify()
599 def pending_copies(self):
600 with self.queue_data_lock:
601 return self.wanted_copies - self.successful_copies
603 def satisfied_classes(self):
604 with self.queue_data_lock:
605 if not self.storage_classes_tracking:
606 # Notifies disabled storage classes expectation to
609 return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
611 def pending_classes(self):
612 with self.queue_data_lock:
613 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
615 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
616 for st_class, st_copies in self.confirmed_storage_classes.items():
617 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
618 unsatisfied_classes.remove(st_class)
619 return unsatisfied_classes
621 def get_next_task(self):
622 with self.pending_tries_notification:
624 if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
625 # This notify_all() is unnecessary --
626 # write_success() already called notify_all()
627 # when pending<1 became true, so it's not
628 # possible for any other thread to be in
629 # wait() now -- but it's cheap insurance
630 # against deadlock so we do it anyway:
631 self.pending_tries_notification.notify_all()
632 # Drain the queue and then raise Queue.Empty
636 elif self.pending_tries > 0:
637 service, service_root = self.get_nowait()
638 if service.finished():
641 self.pending_tries -= 1
642 return service, service_root
644 self.pending_tries_notification.notify_all()
647 self.pending_tries_notification.wait()
650 class KeepWriterThreadPool(object):
651 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
652 self.total_task_nr = 0
653 if (not max_service_replicas) or (max_service_replicas >= copies):
656 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
657 _logger.debug("Pool max threads is %d", num_threads)
659 self.queue = KeepClient.KeepWriterQueue(copies, classes)
661 for _ in range(num_threads):
662 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
663 self.workers.append(w)
665 def add_task(self, ks, service_root):
666 self.queue.put((ks, service_root))
667 self.total_task_nr += 1
670 return self.queue.successful_copies, self.queue.satisfied_classes()
674 for worker in self.workers:
676 # Wait for finished work
680 return self.queue.response
683 class KeepWriterThread(threading.Thread):
684 class TaskFailed(RuntimeError): pass
686 def __init__(self, queue, data, data_hash, timeout=None):
687 super(KeepClient.KeepWriterThread, self).__init__()
688 self.timeout = timeout
691 self.data_hash = data_hash
697 service, service_root = self.queue.get_next_task()
701 locator, copies, classes = self.do_task(service, service_root)
702 except Exception as e:
703 if not isinstance(e, self.TaskFailed):
704 _logger.exception("Exception in KeepWriterThread")
705 self.queue.write_fail(service)
707 self.queue.write_success(locator, copies, classes)
709 self.queue.task_done()
711 def do_task(self, service, service_root):
712 classes = self.queue.pending_classes()
716 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
717 success = bool(service.put(self.data_hash,
719 timeout=self.timeout,
721 result = service.last_result()
724 if result.get('status_code'):
725 _logger.debug("Request fail: PUT %s => %s %s",
727 result.get('status_code'),
729 raise self.TaskFailed()
731 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
732 str(threading.current_thread()),
737 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
738 except (KeyError, ValueError):
741 classes_confirmed = {}
743 scch = result['headers']['x-keep-storage-classes-confirmed']
744 for confirmation in scch.replace(' ', '').split(','):
745 if '=' in confirmation:
746 stored_class, stored_copies = confirmation.split('=')[:2]
747 classes_confirmed[stored_class] = int(stored_copies)
748 except (KeyError, ValueError):
749 # Storage classes confirmed header missing or corrupt
750 classes_confirmed = None
752 return result['body'].strip(), replicas_stored, classes_confirmed
755 def __init__(self, api_client=None, proxy=None,
756 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
757 api_token=None, local_store=None, block_cache=None,
758 num_retries=0, session=None):
759 """Initialize a new KeepClient.
763 The API client to use to find Keep services. If not
764 provided, KeepClient will build one from available Arvados
768 If specified, this KeepClient will send requests to this Keep
769 proxy. Otherwise, KeepClient will fall back to the setting of the
770 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
771 If you want to KeepClient does not use a proxy, pass in an empty
775 The initial timeout (in seconds) for HTTP requests to Keep
776 non-proxy servers. A tuple of three floats is interpreted as
777 (connection_timeout, read_timeout, minimum_bandwidth). A connection
778 will be aborted if the average traffic rate falls below
779 minimum_bandwidth bytes per second over an interval of read_timeout
780 seconds. Because timeouts are often a result of transient server
781 load, the actual connection timeout will be increased by a factor
782 of two on each retry.
783 Default: (2, 256, 32768).
786 The initial timeout (in seconds) for HTTP requests to
787 Keep proxies. A tuple of three floats is interpreted as
788 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
789 described above for adjusting connection timeouts on retry also
791 Default: (20, 256, 32768).
794 If you're not using an API client, but only talking
795 directly to a Keep proxy, this parameter specifies an API token
796 to authenticate Keep requests. It is an error to specify both
797 api_client and api_token. If you specify neither, KeepClient
798 will use one available from the Arvados configuration.
801 If specified, this KeepClient will bypass Keep
802 services, and save data to the named directory. If unspecified,
803 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
804 environment variable. If you want to ensure KeepClient does not
805 use local storage, pass in an empty string. This is primarily
806 intended to mock a server for testing.
809 The default number of times to retry failed requests.
810 This will be used as the default num_retries value when get() and
811 put() are called. Default 0.
813 self.lock = threading.Lock()
815 if config.get('ARVADOS_KEEP_SERVICES'):
816 proxy = config.get('ARVADOS_KEEP_SERVICES')
818 proxy = config.get('ARVADOS_KEEP_PROXY')
819 if api_token is None:
820 if api_client is None:
821 api_token = config.get('ARVADOS_API_TOKEN')
823 api_token = api_client.api_token
824 elif api_client is not None:
826 "can't build KeepClient with both API client and token")
827 if local_store is None:
828 local_store = os.environ.get('KEEP_LOCAL_STORE')
830 if api_client is None:
831 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
833 self.insecure = api_client.insecure
835 self.block_cache = block_cache if block_cache else KeepBlockCache()
836 self.timeout = timeout
837 self.proxy_timeout = proxy_timeout
838 self._user_agent_pool = queue.LifoQueue()
839 self.upload_counter = Counter()
840 self.download_counter = Counter()
841 self.put_counter = Counter()
842 self.get_counter = Counter()
843 self.hits_counter = Counter()
844 self.misses_counter = Counter()
845 self._storage_classes_unsupported_warning = False
846 self._default_classes = []
849 self.local_store = local_store
850 self.head = self.local_store_head
851 self.get = self.local_store_get
852 self.put = self.local_store_put
854 self.num_retries = num_retries
855 self.max_replicas_per_service = None
857 proxy_uris = proxy.split()
858 for i in range(len(proxy_uris)):
859 if not proxy_uris[i].endswith('/'):
862 url = urllib.parse.urlparse(proxy_uris[i])
863 if not (url.scheme and url.netloc):
864 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
865 self.api_token = api_token
866 self._gateway_services = {}
867 self._keep_services = [{
868 'uuid': "00000-bi6l4-%015d" % idx,
869 'service_type': 'proxy',
870 '_service_root': uri,
871 } for idx, uri in enumerate(proxy_uris)]
872 self._writable_services = self._keep_services
873 self.using_proxy = True
874 self._static_services_list = True
876 # It's important to avoid instantiating an API client
877 # unless we actually need one, for testing's sake.
878 if api_client is None:
879 api_client = arvados.api('v1')
880 self.api_client = api_client
881 self.api_token = api_client.api_token
882 self._gateway_services = {}
883 self._keep_services = None
884 self._writable_services = None
885 self.using_proxy = None
886 self._static_services_list = False
888 self._default_classes = [
889 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
891 # We're talking to an old cluster
894 def current_timeout(self, attempt_number):
895 """Return the appropriate timeout to use for this client.
897 The proxy timeout setting if the backend service is currently a proxy,
898 the regular timeout setting otherwise. The `attempt_number` indicates
899 how many times the operation has been tried already (starting from 0
900 for the first try), and scales the connection timeout portion of the
901 return value accordingly.
904 # TODO(twp): the timeout should be a property of a
905 # KeepService, not a KeepClient. See #4488.
906 t = self.proxy_timeout if self.using_proxy else self.timeout
908 return (t[0] * (1 << attempt_number), t[1])
910 return (t[0] * (1 << attempt_number), t[1], t[2])
911 def _any_nondisk_services(self, service_list):
912 return any(ks.get('service_type', 'disk') != 'disk'
913 for ks in service_list)
915 def build_services_list(self, force_rebuild=False):
916 if (self._static_services_list or
917 (self._keep_services and not force_rebuild)):
921 keep_services = self.api_client.keep_services().accessible()
922 except Exception: # API server predates Keep services.
923 keep_services = self.api_client.keep_disks().list()
925 # Gateway services are only used when specified by UUID,
926 # so there's nothing to gain by filtering them by
928 self._gateway_services = {ks['uuid']: ks for ks in
929 keep_services.execute()['items']}
930 if not self._gateway_services:
931 raise arvados.errors.NoKeepServersError()
933 # Precompute the base URI for each service.
934 for r in self._gateway_services.values():
935 host = r['service_host']
936 if not host.startswith('[') and host.find(':') >= 0:
937 # IPv6 URIs must be formatted like http://[::1]:80/...
938 host = '[' + host + ']'
939 r['_service_root'] = "{}://{}:{:d}/".format(
940 'https' if r['service_ssl_flag'] else 'http',
944 _logger.debug(str(self._gateway_services))
945 self._keep_services = [
946 ks for ks in self._gateway_services.values()
947 if not ks.get('service_type', '').startswith('gateway:')]
948 self._writable_services = [ks for ks in self._keep_services
949 if not ks.get('read_only')]
951 # For disk type services, max_replicas_per_service is 1
952 # It is unknown (unlimited) for other service types.
953 if self._any_nondisk_services(self._writable_services):
954 self.max_replicas_per_service = None
956 self.max_replicas_per_service = 1
958 def _service_weight(self, data_hash, service_uuid):
959 """Compute the weight of a Keep service endpoint for a data
960 block with a known hash.
962 The weight is md5(h + u) where u is the last 15 characters of
963 the service endpoint's UUID.
965 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
967 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
968 """Return an array of Keep service endpoints, in the order in
969 which they should be probed when reading or writing data with
970 the given hash+hints.
972 self.build_services_list(force_rebuild)
975 # Use the services indicated by the given +K@... remote
976 # service hints, if any are present and can be resolved to a
978 for hint in locator.hints:
979 if hint.startswith('K@'):
982 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
983 elif len(hint) == 29:
984 svc = self._gateway_services.get(hint[2:])
986 sorted_roots.append(svc['_service_root'])
988 # Sort the available local services by weight (heaviest first)
989 # for this locator, and return their service_roots (base URIs)
991 use_services = self._keep_services
993 use_services = self._writable_services
994 self.using_proxy = self._any_nondisk_services(use_services)
995 sorted_roots.extend([
996 svc['_service_root'] for svc in sorted(
999 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1000 _logger.debug("{}: {}".format(locator, sorted_roots))
1003 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1004 # roots_map is a dictionary, mapping Keep service root strings
1005 # to KeepService objects. Poll for Keep services, and add any
1006 # new ones to roots_map. Return the current list of local
1008 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1009 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1010 for root in local_roots:
1011 if root not in roots_map:
1012 roots_map[root] = self.KeepService(
1013 root, self._user_agent_pool,
1014 upload_counter=self.upload_counter,
1015 download_counter=self.download_counter,
1017 insecure=self.insecure)
1021 def _check_loop_result(result):
1022 # KeepClient RetryLoops should save results as a 2-tuple: the
1023 # actual result of the request, and the number of servers available
1024 # to receive the request this round.
1025 # This method returns True if there's a real result, False if
1026 # there are no more servers available, otherwise None.
1027 if isinstance(result, Exception):
1029 result, tried_server_count = result
1030 if (result is not None) and (result is not False):
1032 elif tried_server_count < 1:
1033 _logger.info("No more Keep services to try; giving up")
1038 def get_from_cache(self, loc_s):
1039 """Fetch a block only if is in the cache, otherwise return None."""
1040 locator = KeepLocator(loc_s)
1041 slot = self.block_cache.get(locator.md5sum)
1042 if slot is not None and slot.ready.is_set():
1047 def refresh_signature(self, loc):
1048 """Ask Keep to get the remote block and return its local signature"""
1049 now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1050 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1053 def head(self, loc_s, **kwargs):
1054 return self._get_or_head(loc_s, method="HEAD", **kwargs)
1057 def get(self, loc_s, **kwargs):
1058 return self._get_or_head(loc_s, method="GET", **kwargs)
1060 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1061 """Get data from Keep.
1063 This method fetches one or more blocks of data from Keep. It
1064 sends a request each Keep service registered with the API
1065 server (or the proxy provided when this client was
1066 instantiated), then each service named in location hints, in
1067 sequence. As soon as one service provides the data, it's
1071 * loc_s: A string of one or more comma-separated locators to fetch.
1072 This method returns the concatenation of these blocks.
1073 * num_retries: The number of times to retry GET requests to
1074 *each* Keep server if it returns temporary failures, with
1075 exponential backoff. Note that, in each loop, the method may try
1076 to fetch data from every available Keep service, along with any
1077 that are named in location hints in the locator. The default value
1078 is set when the KeepClient is initialized.
1081 return ''.join(self.get(x) for x in loc_s.split(','))
1083 self.get_counter.add(1)
1085 request_id = (request_id or
1086 (hasattr(self, 'api_client') and self.api_client.request_id) or
1087 arvados.util.new_request_id())
1090 headers['X-Request-Id'] = request_id
1095 locator = KeepLocator(loc_s)
1097 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1100 # this is request for a prefetch, if it is
1101 # already in flight, return immediately.
1102 # clear 'slot' to prevent finally block from
1103 # calling slot.set()
1106 self.hits_counter.add(1)
1109 raise arvados.errors.KeepReadError(
1110 "failed to read {}".format(loc_s))
1113 self.misses_counter.add(1)
1115 # If the locator has hints specifying a prefix (indicating a
1116 # remote keepproxy) or the UUID of a local gateway service,
1117 # read data from the indicated service(s) instead of the usual
1118 # list of local disk services.
1119 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1120 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1121 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1122 for hint in locator.hints if (
1123 hint.startswith('K@') and
1125 self._gateway_services.get(hint[2:])
1127 # Map root URLs to their KeepService objects.
1129 root: self.KeepService(root, self._user_agent_pool,
1130 upload_counter=self.upload_counter,
1131 download_counter=self.download_counter,
1133 insecure=self.insecure)
1134 for root in hint_roots
1137 # See #3147 for a discussion of the loop implementation. Highlights:
1138 # * Refresh the list of Keep services after each failure, in case
1139 # it's being updated.
1140 # * Retry until we succeed, we're out of retries, or every available
1141 # service has returned permanent failure.
1144 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1146 for tries_left in loop:
1148 sorted_roots = self.map_new_services(
1150 force_rebuild=(tries_left < num_retries),
1151 need_writable=False,
1153 except Exception as error:
1154 loop.save_result(error)
1157 # Query KeepService objects that haven't returned
1158 # permanent failure, in our specified shuffle order.
1159 services_to_try = [roots_map[root]
1160 for root in sorted_roots
1161 if roots_map[root].usable()]
1162 for keep_service in services_to_try:
1163 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1164 if blob is not None:
1166 loop.save_result((blob, len(services_to_try)))
1168 # Always cache the result, then return it if we succeeded.
1172 if slot is not None:
1174 self.block_cache.cap_cache()
1176 # Q: Including 403 is necessary for the Keep tests to continue
1177 # passing, but maybe they should expect KeepReadError instead?
1178 not_founds = sum(1 for key in sorted_roots
1179 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1180 service_errors = ((key, roots_map[key].last_result()['error'])
1181 for key in sorted_roots)
1183 raise arvados.errors.KeepReadError(
1184 "[{}] failed to read {}: no Keep services available ({})".format(
1185 request_id, loc_s, loop.last_result()))
1186 elif not_founds == len(sorted_roots):
1187 raise arvados.errors.NotFoundError(
1188 "[{}] {} not found".format(request_id, loc_s), service_errors)
1190 raise arvados.errors.KeepReadError(
1191 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1194 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1195 """Save data in Keep.
1197 This method will get a list of Keep services from the API server, and
1198 send the data to each one simultaneously in a new thread. Once the
1199 uploads are finished, if enough copies are saved, this method returns
1200 the most recent HTTP response body. If requests fail to upload
1201 enough copies, this method raises KeepWriteError.
1204 * data: The string of data to upload.
1205 * copies: The number of copies that the user requires be saved.
1207 * num_retries: The number of times to retry PUT requests to
1208 *each* Keep server if it returns temporary failures, with
1209 exponential backoff. The default value is set when the
1210 KeepClient is initialized.
1211 * classes: An optional list of storage class names where copies should
1215 classes = classes or self._default_classes
1217 if not isinstance(data, bytes):
1218 data = data.encode()
1220 self.put_counter.add(1)
1222 data_hash = hashlib.md5(data).hexdigest()
1223 loc_s = data_hash + '+' + str(len(data))
1226 locator = KeepLocator(loc_s)
1228 request_id = (request_id or
1229 (hasattr(self, 'api_client') and self.api_client.request_id) or
1230 arvados.util.new_request_id())
1232 'X-Request-Id': request_id,
1233 'X-Keep-Desired-Replicas': str(copies),
1236 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1240 for tries_left in loop:
1242 sorted_roots = self.map_new_services(
1244 force_rebuild=(tries_left < num_retries),
1247 except Exception as error:
1248 loop.save_result(error)
1251 pending_classes = []
1252 if done_classes is not None:
1253 pending_classes = list(set(classes) - set(done_classes))
1254 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1255 data_hash=data_hash,
1256 copies=copies - done_copies,
1257 max_service_replicas=self.max_replicas_per_service,
1258 timeout=self.current_timeout(num_retries - tries_left),
1259 classes=pending_classes)
1260 for service_root, ks in [(root, roots_map[root])
1261 for root in sorted_roots]:
1264 writer_pool.add_task(ks, service_root)
1266 pool_copies, pool_classes = writer_pool.done()
1267 done_copies += pool_copies
1268 if (done_classes is not None) and (pool_classes is not None):
1269 done_classes += pool_classes
1271 (done_copies >= copies and set(done_classes) == set(classes),
1272 writer_pool.total_task_nr))
1274 # Old keepstore contacted without storage classes support:
1275 # success is determined only by successful copies.
1277 # Disable storage classes tracking from this point forward.
1278 if not self._storage_classes_unsupported_warning:
1279 self._storage_classes_unsupported_warning = True
1280 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1283 (done_copies >= copies, writer_pool.total_task_nr))
1286 return writer_pool.response()
1288 raise arvados.errors.KeepWriteError(
1289 "[{}] failed to write {}: no Keep services available ({})".format(
1290 request_id, data_hash, loop.last_result()))
1292 service_errors = ((key, roots_map[key].last_result()['error'])
1293 for key in sorted_roots
1294 if roots_map[key].last_result()['error'])
1295 raise arvados.errors.KeepWriteError(
1296 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1297 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1299 def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1300 """A stub for put().
1302 This method is used in place of the real put() method when
1303 using local storage (see constructor's local_store argument).
1305 copies and num_retries arguments are ignored: they are here
1306 only for the sake of offering the same call signature as
1309 Data stored this way can be retrieved via local_store_get().
1311 md5 = hashlib.md5(data).hexdigest()
1312 locator = '%s+%d' % (md5, len(data))
1313 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1315 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1316 os.path.join(self.local_store, md5))
1319 def local_store_get(self, loc_s, num_retries=None):
1320 """Companion to local_store_put()."""
1322 locator = KeepLocator(loc_s)
1324 raise arvados.errors.NotFoundError(
1325 "Invalid data locator: '%s'" % loc_s)
1326 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1328 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1331 def local_store_head(self, loc_s, num_retries=None):
1332 """Companion to local_store_put()."""
1334 locator = KeepLocator(loc_s)
1336 raise arvados.errors.NotFoundError(
1337 "Invalid data locator: '%s'" % loc_s)
1338 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1340 if os.path.exists(os.path.join(self.local_store, locator.md5sum)):