1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from __future__ import division
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
33 if sys.version_info >= (3, 0):
34 from io import BytesIO
36 from cStringIO import StringIO as BytesIO
39 import arvados.config as config
41 import arvados.retry as retry
43 import arvados.diskcache
45 _logger = logging.getLogger('arvados.keep')
46 global_client_object = None
49 # Monkey patch TCP constants when not available (apple). Values sourced from:
50 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
51 if sys.platform == 'darwin':
52 if not hasattr(socket, 'TCP_KEEPALIVE'):
53 socket.TCP_KEEPALIVE = 0x010
54 if not hasattr(socket, 'TCP_KEEPINTVL'):
55 socket.TCP_KEEPINTVL = 0x101
56 if not hasattr(socket, 'TCP_KEEPCNT'):
57 socket.TCP_KEEPCNT = 0x102
60 class KeepLocator(object):
61 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
62 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
64 def __init__(self, locator_str):
67 self._perm_expiry = None
68 pieces = iter(locator_str.split('+'))
69 self.md5sum = next(pieces)
71 self.size = int(next(pieces))
75 if self.HINT_RE.match(hint) is None:
76 raise ValueError("invalid hint format: {}".format(hint))
77 elif hint.startswith('A'):
78 self.parse_permission_hint(hint)
80 self.hints.append(hint)
85 for s in [self.md5sum, self.size,
86 self.permission_hint()] + self.hints
90 if self.size is not None:
91 return "%s+%i" % (self.md5sum, self.size)
95 def _make_hex_prop(name, length):
96 # Build and return a new property with the given name that
97 # must be a hex string of the given length.
98 data_name = '_{}'.format(name)
100 return getattr(self, data_name)
101 def setter(self, hex_str):
102 if not arvados.util.is_hex(hex_str, length):
103 raise ValueError("{} is not a {}-digit hex string: {!r}".
104 format(name, length, hex_str))
105 setattr(self, data_name, hex_str)
106 return property(getter, setter)
108 md5sum = _make_hex_prop('md5sum', 32)
109 perm_sig = _make_hex_prop('perm_sig', 40)
112 def perm_expiry(self):
113 return self._perm_expiry
116 def perm_expiry(self, value):
117 if not arvados.util.is_hex(value, 1, 8):
119 "permission timestamp must be a hex Unix timestamp: {}".
121 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
123 def permission_hint(self):
124 data = [self.perm_sig, self.perm_expiry]
127 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
128 return "A{}@{:08x}".format(*data)
130 def parse_permission_hint(self, s):
132 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
134 raise ValueError("bad permission hint {}".format(s))
136 def permission_expired(self, as_of_dt=None):
137 if self.perm_expiry is None:
139 elif as_of_dt is None:
140 as_of_dt = datetime.datetime.now()
141 return self.perm_expiry <= as_of_dt
145 """Simple interface to a global KeepClient object.
147 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
148 own API client. The global KeepClient will build an API client from the
149 current Arvados configuration, which may not match the one you built.
154 def global_client_object(cls):
155 global global_client_object
156 # Previously, KeepClient would change its behavior at runtime based
157 # on these configuration settings. We simulate that behavior here
158 # by checking the values and returning a new KeepClient if any of
160 key = (config.get('ARVADOS_API_HOST'),
161 config.get('ARVADOS_API_TOKEN'),
162 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
163 config.get('ARVADOS_KEEP_PROXY'),
164 os.environ.get('KEEP_LOCAL_STORE'))
165 if (global_client_object is None) or (cls._last_key != key):
166 global_client_object = KeepClient()
168 return global_client_object
171 def get(locator, **kwargs):
172 return Keep.global_client_object().get(locator, **kwargs)
175 def put(data, **kwargs):
176 return Keep.global_client_object().put(data, **kwargs)
178 class KeepBlockCache(object):
179 def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
180 self.cache_max = cache_max
182 self._cache_lock = threading.Lock()
183 self._max_slots = max_slots
184 self._disk_cache = disk_cache
185 self._disk_cache_dir = disk_cache_dir
187 if self._disk_cache and self._disk_cache_dir is None:
188 self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
189 os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
191 if self._max_slots == 0:
193 # default set max slots to half of maximum file handles
194 self._max_slots = resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 2
196 self._max_slots = 1024
198 if self.cache_max == 0:
200 fs = os.statvfs(self._disk_cache_dir)
201 avail = (fs.f_bavail * fs.f_bsize) / 2
202 # Half the available space or max_slots * 64 MiB
203 self.cache_max = min(avail, (self._max_slots * 64 * 1024 * 1024))
206 self.cache_max = (256 * 1024 * 1024)
208 self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
210 class CacheSlot(object):
211 __slots__ = ("locator", "ready", "content")
213 def __init__(self, locator):
214 self.locator = locator
215 self.ready = threading.Event()
222 def set(self, value):
227 if self.content is None:
230 return len(self.content)
236 '''Cap the cache size to self.cache_max'''
237 with self._cache_lock:
238 # Select all slots except those where ready.is_set() and content is
239 # None (that means there was an error reading the block).
240 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
241 sm = sum([slot.size() for slot in self._cache])
242 while len(self._cache) > 0 and (sm > self.cache_max or len(self._cache) > self._max_slots):
243 for i in range(len(self._cache)-1, -1, -1):
244 if self._cache[i].ready.is_set():
245 self._cache[i].evict()
248 sm = sum([slot.size() for slot in self._cache])
250 def _get(self, locator):
251 # Test if the locator is already in the cache
252 for i in range(0, len(self._cache)):
253 if self._cache[i].locator == locator:
256 # move it to the front
258 self._cache.insert(0, n)
261 # see if it exists on disk
262 n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
264 self._cache.insert(0, n)
268 def get(self, locator):
269 with self._cache_lock:
270 return self._get(locator)
272 def reserve_cache(self, locator):
273 '''Reserve a cache slot for the specified locator,
274 or return the existing slot.'''
275 with self._cache_lock:
276 n = self._get(locator)
280 # Add a new cache slot for the locator
282 n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
284 n = KeepBlockCache.CacheSlot(locator)
285 self._cache.insert(0, n)
288 class Counter(object):
289 def __init__(self, v=0):
290 self._lk = threading.Lock()
302 class KeepClient(object):
304 # Default Keep server connection timeout: 2 seconds
305 # Default Keep server read timeout: 256 seconds
306 # Default Keep server bandwidth minimum: 32768 bytes per second
307 # Default Keep proxy connection timeout: 20 seconds
308 # Default Keep proxy read timeout: 256 seconds
309 # Default Keep proxy bandwidth minimum: 32768 bytes per second
310 DEFAULT_TIMEOUT = (2, 256, 32768)
311 DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
314 class KeepService(object):
315 """Make requests to a single Keep service, and track results.
317 A KeepService is intended to last long enough to perform one
318 transaction (GET or PUT) against one Keep service. This can
319 involve calling either get() or put() multiple times in order
320 to retry after transient failures. However, calling both get()
321 and put() on a single instance -- or using the same instance
322 to access two different Keep services -- will not produce
329 arvados.errors.HttpError,
332 def __init__(self, root, user_agent_pool=queue.LifoQueue(),
334 download_counter=None,
338 self._user_agent_pool = user_agent_pool
339 self._result = {'error': None}
343 self.get_headers = {'Accept': 'application/octet-stream'}
344 self.get_headers.update(headers)
345 self.put_headers = headers
346 self.upload_counter = upload_counter
347 self.download_counter = download_counter
348 self.insecure = insecure
351 """Is it worth attempting a request?"""
355 """Did the request succeed or encounter permanent failure?"""
356 return self._result['error'] == False or not self._usable
358 def last_result(self):
361 def _get_user_agent(self):
363 return self._user_agent_pool.get(block=False)
367 def _put_user_agent(self, ua):
370 self._user_agent_pool.put(ua, block=False)
374 def _socket_open(self, *args, **kwargs):
375 if len(args) + len(kwargs) == 2:
376 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
378 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
380 def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
381 return self._socket_open_pycurl_7_21_5(
383 address=collections.namedtuple(
384 'Address', ['family', 'socktype', 'protocol', 'addr'],
385 )(family, socktype, protocol, address))
387 def _socket_open_pycurl_7_21_5(self, purpose, address):
388 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
389 s = socket.socket(address.family, address.socktype, address.protocol)
390 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
391 # Will throw invalid protocol error on mac. This test prevents that.
392 if hasattr(socket, 'TCP_KEEPIDLE'):
393 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
394 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
398 def get(self, locator, method="GET", timeout=None):
399 # locator is a KeepLocator object.
400 url = self.root + str(locator)
401 _logger.debug("Request: %s %s", method, url)
402 curl = self._get_user_agent()
405 with timer.Timer() as t:
407 response_body = BytesIO()
408 curl.setopt(pycurl.NOSIGNAL, 1)
409 curl.setopt(pycurl.OPENSOCKETFUNCTION,
410 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
411 curl.setopt(pycurl.URL, url.encode('utf-8'))
412 curl.setopt(pycurl.HTTPHEADER, [
413 '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
414 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
415 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
417 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
418 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
420 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
422 curl.setopt(pycurl.NOBODY, True)
423 self._setcurltimeouts(curl, timeout, method=="HEAD")
427 except Exception as e:
428 raise arvados.errors.HttpError(0, str(e))
434 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
435 'body': response_body.getvalue(),
436 'headers': self._headers,
440 ok = retry.check_http_response_success(self._result['status_code'])
442 self._result['error'] = arvados.errors.HttpError(
443 self._result['status_code'],
444 self._headers.get('x-status-line', 'Error'))
445 except self.HTTP_ERRORS as e:
449 self._usable = ok != False
450 if self._result.get('status_code', None):
451 # The client worked well enough to get an HTTP status
452 # code, so presumably any problems are just on the
453 # server side and it's OK to reuse the client.
454 self._put_user_agent(curl)
456 # Don't return this client to the pool, in case it's
460 _logger.debug("Request fail: GET %s => %s: %s",
461 url, type(self._result['error']), str(self._result['error']))
464 _logger.info("HEAD %s: %s bytes",
465 self._result['status_code'],
466 self._result.get('content-length'))
467 if self._result['headers'].get('x-keep-locator'):
468 # This is a response to a remote block copy request, return
469 # the local copy block locator.
470 return self._result['headers'].get('x-keep-locator')
473 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
474 self._result['status_code'],
475 len(self._result['body']),
477 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
479 if self.download_counter:
480 self.download_counter.add(len(self._result['body']))
481 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
482 if resp_md5 != locator.md5sum:
483 _logger.warning("Checksum fail: md5(%s) = %s",
485 self._result['error'] = arvados.errors.HttpError(
488 return self._result['body']
490 def put(self, hash_s, body, timeout=None, headers={}):
491 put_headers = copy.copy(self.put_headers)
492 put_headers.update(headers)
493 url = self.root + hash_s
494 _logger.debug("Request: PUT %s", url)
495 curl = self._get_user_agent()
498 with timer.Timer() as t:
500 body_reader = BytesIO(body)
501 response_body = BytesIO()
502 curl.setopt(pycurl.NOSIGNAL, 1)
503 curl.setopt(pycurl.OPENSOCKETFUNCTION,
504 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
505 curl.setopt(pycurl.URL, url.encode('utf-8'))
506 # Using UPLOAD tells cURL to wait for a "go ahead" from the
507 # Keep server (in the form of a HTTP/1.1 "100 Continue"
508 # response) instead of sending the request body immediately.
509 # This allows the server to reject the request if the request
510 # is invalid or the server is read-only, without waiting for
511 # the client to send the entire block.
512 curl.setopt(pycurl.UPLOAD, True)
513 curl.setopt(pycurl.INFILESIZE, len(body))
514 curl.setopt(pycurl.READFUNCTION, body_reader.read)
515 curl.setopt(pycurl.HTTPHEADER, [
516 '{}: {}'.format(k,v) for k,v in put_headers.items()])
517 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
518 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
520 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
521 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
523 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
524 self._setcurltimeouts(curl, timeout)
527 except Exception as e:
528 raise arvados.errors.HttpError(0, str(e))
534 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
535 'body': response_body.getvalue().decode('utf-8'),
536 'headers': self._headers,
539 ok = retry.check_http_response_success(self._result['status_code'])
541 self._result['error'] = arvados.errors.HttpError(
542 self._result['status_code'],
543 self._headers.get('x-status-line', 'Error'))
544 except self.HTTP_ERRORS as e:
548 self._usable = ok != False # still usable if ok is True or None
549 if self._result.get('status_code', None):
550 # Client is functional. See comment in get().
551 self._put_user_agent(curl)
555 _logger.debug("Request fail: PUT %s => %s: %s",
556 url, type(self._result['error']), str(self._result['error']))
558 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
559 self._result['status_code'],
562 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
563 if self.upload_counter:
564 self.upload_counter.add(len(body))
567 def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
570 elif isinstance(timeouts, tuple):
571 if len(timeouts) == 2:
572 conn_t, xfer_t = timeouts
573 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
575 conn_t, xfer_t, bandwidth_bps = timeouts
577 conn_t, xfer_t = (timeouts, timeouts)
578 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
579 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
580 if not ignore_bandwidth:
581 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
582 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
584 def _headerfunction(self, header_line):
585 if isinstance(header_line, bytes):
586 header_line = header_line.decode('iso-8859-1')
587 if ':' in header_line:
588 name, value = header_line.split(':', 1)
589 name = name.strip().lower()
590 value = value.strip()
592 name = self._lastheadername
593 value = self._headers[name] + ' ' + header_line.strip()
594 elif header_line.startswith('HTTP/'):
595 name = 'x-status-line'
598 _logger.error("Unexpected header line: %s", header_line)
600 self._lastheadername = name
601 self._headers[name] = value
602 # Returning None implies all bytes were written
605 class KeepWriterQueue(queue.Queue):
606 def __init__(self, copies, classes=[]):
607 queue.Queue.__init__(self) # Old-style superclass
608 self.wanted_copies = copies
609 self.wanted_storage_classes = classes
610 self.successful_copies = 0
611 self.confirmed_storage_classes = {}
613 self.storage_classes_tracking = True
614 self.queue_data_lock = threading.RLock()
615 self.pending_tries = max(copies, len(classes))
616 self.pending_tries_notification = threading.Condition()
618 def write_success(self, response, replicas_nr, classes_confirmed):
619 with self.queue_data_lock:
620 self.successful_copies += replicas_nr
621 if classes_confirmed is None:
622 self.storage_classes_tracking = False
623 elif self.storage_classes_tracking:
624 for st_class, st_copies in classes_confirmed.items():
626 self.confirmed_storage_classes[st_class] += st_copies
628 self.confirmed_storage_classes[st_class] = st_copies
629 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
630 self.response = response
631 with self.pending_tries_notification:
632 self.pending_tries_notification.notify_all()
634 def write_fail(self, ks):
635 with self.pending_tries_notification:
636 self.pending_tries += 1
637 self.pending_tries_notification.notify()
639 def pending_copies(self):
640 with self.queue_data_lock:
641 return self.wanted_copies - self.successful_copies
643 def satisfied_classes(self):
644 with self.queue_data_lock:
645 if not self.storage_classes_tracking:
646 # Notifies disabled storage classes expectation to
649 return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
651 def pending_classes(self):
652 with self.queue_data_lock:
653 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
655 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
656 for st_class, st_copies in self.confirmed_storage_classes.items():
657 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
658 unsatisfied_classes.remove(st_class)
659 return unsatisfied_classes
661 def get_next_task(self):
662 with self.pending_tries_notification:
664 if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
665 # This notify_all() is unnecessary --
666 # write_success() already called notify_all()
667 # when pending<1 became true, so it's not
668 # possible for any other thread to be in
669 # wait() now -- but it's cheap insurance
670 # against deadlock so we do it anyway:
671 self.pending_tries_notification.notify_all()
672 # Drain the queue and then raise Queue.Empty
676 elif self.pending_tries > 0:
677 service, service_root = self.get_nowait()
678 if service.finished():
681 self.pending_tries -= 1
682 return service, service_root
684 self.pending_tries_notification.notify_all()
687 self.pending_tries_notification.wait()
690 class KeepWriterThreadPool(object):
691 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
692 self.total_task_nr = 0
693 if (not max_service_replicas) or (max_service_replicas >= copies):
696 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
697 _logger.debug("Pool max threads is %d", num_threads)
699 self.queue = KeepClient.KeepWriterQueue(copies, classes)
701 for _ in range(num_threads):
702 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
703 self.workers.append(w)
705 def add_task(self, ks, service_root):
706 self.queue.put((ks, service_root))
707 self.total_task_nr += 1
710 return self.queue.successful_copies, self.queue.satisfied_classes()
714 for worker in self.workers:
716 # Wait for finished work
720 return self.queue.response
723 class KeepWriterThread(threading.Thread):
724 class TaskFailed(RuntimeError): pass
726 def __init__(self, queue, data, data_hash, timeout=None):
727 super(KeepClient.KeepWriterThread, self).__init__()
728 self.timeout = timeout
731 self.data_hash = data_hash
737 service, service_root = self.queue.get_next_task()
741 locator, copies, classes = self.do_task(service, service_root)
742 except Exception as e:
743 if not isinstance(e, self.TaskFailed):
744 _logger.exception("Exception in KeepWriterThread")
745 self.queue.write_fail(service)
747 self.queue.write_success(locator, copies, classes)
749 self.queue.task_done()
751 def do_task(self, service, service_root):
752 classes = self.queue.pending_classes()
756 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
757 success = bool(service.put(self.data_hash,
759 timeout=self.timeout,
761 result = service.last_result()
764 if result.get('status_code'):
765 _logger.debug("Request fail: PUT %s => %s %s",
767 result.get('status_code'),
769 raise self.TaskFailed()
771 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
772 str(threading.current_thread()),
777 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
778 except (KeyError, ValueError):
781 classes_confirmed = {}
783 scch = result['headers']['x-keep-storage-classes-confirmed']
784 for confirmation in scch.replace(' ', '').split(','):
785 if '=' in confirmation:
786 stored_class, stored_copies = confirmation.split('=')[:2]
787 classes_confirmed[stored_class] = int(stored_copies)
788 except (KeyError, ValueError):
789 # Storage classes confirmed header missing or corrupt
790 classes_confirmed = None
792 return result['body'].strip(), replicas_stored, classes_confirmed
795 def __init__(self, api_client=None, proxy=None,
796 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
797 api_token=None, local_store=None, block_cache=None,
798 num_retries=0, session=None):
799 """Initialize a new KeepClient.
803 The API client to use to find Keep services. If not
804 provided, KeepClient will build one from available Arvados
808 If specified, this KeepClient will send requests to this Keep
809 proxy. Otherwise, KeepClient will fall back to the setting of the
810 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
811 If you want to KeepClient does not use a proxy, pass in an empty
815 The initial timeout (in seconds) for HTTP requests to Keep
816 non-proxy servers. A tuple of three floats is interpreted as
817 (connection_timeout, read_timeout, minimum_bandwidth). A connection
818 will be aborted if the average traffic rate falls below
819 minimum_bandwidth bytes per second over an interval of read_timeout
820 seconds. Because timeouts are often a result of transient server
821 load, the actual connection timeout will be increased by a factor
822 of two on each retry.
823 Default: (2, 256, 32768).
826 The initial timeout (in seconds) for HTTP requests to
827 Keep proxies. A tuple of three floats is interpreted as
828 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
829 described above for adjusting connection timeouts on retry also
831 Default: (20, 256, 32768).
834 If you're not using an API client, but only talking
835 directly to a Keep proxy, this parameter specifies an API token
836 to authenticate Keep requests. It is an error to specify both
837 api_client and api_token. If you specify neither, KeepClient
838 will use one available from the Arvados configuration.
841 If specified, this KeepClient will bypass Keep
842 services, and save data to the named directory. If unspecified,
843 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
844 environment variable. If you want to ensure KeepClient does not
845 use local storage, pass in an empty string. This is primarily
846 intended to mock a server for testing.
849 The default number of times to retry failed requests.
850 This will be used as the default num_retries value when get() and
851 put() are called. Default 0.
853 self.lock = threading.Lock()
855 if config.get('ARVADOS_KEEP_SERVICES'):
856 proxy = config.get('ARVADOS_KEEP_SERVICES')
858 proxy = config.get('ARVADOS_KEEP_PROXY')
859 if api_token is None:
860 if api_client is None:
861 api_token = config.get('ARVADOS_API_TOKEN')
863 api_token = api_client.api_token
864 elif api_client is not None:
866 "can't build KeepClient with both API client and token")
867 if local_store is None:
868 local_store = os.environ.get('KEEP_LOCAL_STORE')
870 if api_client is None:
871 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
873 self.insecure = api_client.insecure
875 self.block_cache = block_cache if block_cache else KeepBlockCache()
876 self.timeout = timeout
877 self.proxy_timeout = proxy_timeout
878 self._user_agent_pool = queue.LifoQueue()
879 self.upload_counter = Counter()
880 self.download_counter = Counter()
881 self.put_counter = Counter()
882 self.get_counter = Counter()
883 self.hits_counter = Counter()
884 self.misses_counter = Counter()
885 self._storage_classes_unsupported_warning = False
886 self._default_classes = []
889 self.local_store = local_store
890 self.head = self.local_store_head
891 self.get = self.local_store_get
892 self.put = self.local_store_put
894 self.num_retries = num_retries
895 self.max_replicas_per_service = None
897 proxy_uris = proxy.split()
898 for i in range(len(proxy_uris)):
899 if not proxy_uris[i].endswith('/'):
902 url = urllib.parse.urlparse(proxy_uris[i])
903 if not (url.scheme and url.netloc):
904 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
905 self.api_token = api_token
906 self._gateway_services = {}
907 self._keep_services = [{
908 'uuid': "00000-bi6l4-%015d" % idx,
909 'service_type': 'proxy',
910 '_service_root': uri,
911 } for idx, uri in enumerate(proxy_uris)]
912 self._writable_services = self._keep_services
913 self.using_proxy = True
914 self._static_services_list = True
916 # It's important to avoid instantiating an API client
917 # unless we actually need one, for testing's sake.
918 if api_client is None:
919 api_client = arvados.api('v1')
920 self.api_client = api_client
921 self.api_token = api_client.api_token
922 self._gateway_services = {}
923 self._keep_services = None
924 self._writable_services = None
925 self.using_proxy = None
926 self._static_services_list = False
928 self._default_classes = [
929 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
931 # We're talking to an old cluster
934 def current_timeout(self, attempt_number):
935 """Return the appropriate timeout to use for this client.
937 The proxy timeout setting if the backend service is currently a proxy,
938 the regular timeout setting otherwise. The `attempt_number` indicates
939 how many times the operation has been tried already (starting from 0
940 for the first try), and scales the connection timeout portion of the
941 return value accordingly.
944 # TODO(twp): the timeout should be a property of a
945 # KeepService, not a KeepClient. See #4488.
946 t = self.proxy_timeout if self.using_proxy else self.timeout
948 return (t[0] * (1 << attempt_number), t[1])
950 return (t[0] * (1 << attempt_number), t[1], t[2])
951 def _any_nondisk_services(self, service_list):
952 return any(ks.get('service_type', 'disk') != 'disk'
953 for ks in service_list)
955 def build_services_list(self, force_rebuild=False):
956 if (self._static_services_list or
957 (self._keep_services and not force_rebuild)):
961 keep_services = self.api_client.keep_services().accessible()
962 except Exception: # API server predates Keep services.
963 keep_services = self.api_client.keep_disks().list()
965 # Gateway services are only used when specified by UUID,
966 # so there's nothing to gain by filtering them by
968 self._gateway_services = {ks['uuid']: ks for ks in
969 keep_services.execute()['items']}
970 if not self._gateway_services:
971 raise arvados.errors.NoKeepServersError()
973 # Precompute the base URI for each service.
974 for r in self._gateway_services.values():
975 host = r['service_host']
976 if not host.startswith('[') and host.find(':') >= 0:
977 # IPv6 URIs must be formatted like http://[::1]:80/...
978 host = '[' + host + ']'
979 r['_service_root'] = "{}://{}:{:d}/".format(
980 'https' if r['service_ssl_flag'] else 'http',
984 _logger.debug(str(self._gateway_services))
985 self._keep_services = [
986 ks for ks in self._gateway_services.values()
987 if not ks.get('service_type', '').startswith('gateway:')]
988 self._writable_services = [ks for ks in self._keep_services
989 if not ks.get('read_only')]
991 # For disk type services, max_replicas_per_service is 1
992 # It is unknown (unlimited) for other service types.
993 if self._any_nondisk_services(self._writable_services):
994 self.max_replicas_per_service = None
996 self.max_replicas_per_service = 1
998 def _service_weight(self, data_hash, service_uuid):
999 """Compute the weight of a Keep service endpoint for a data
1000 block with a known hash.
1002 The weight is md5(h + u) where u is the last 15 characters of
1003 the service endpoint's UUID.
1005 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1007 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1008 """Return an array of Keep service endpoints, in the order in
1009 which they should be probed when reading or writing data with
1010 the given hash+hints.
1012 self.build_services_list(force_rebuild)
1015 # Use the services indicated by the given +K@... remote
1016 # service hints, if any are present and can be resolved to a
1018 for hint in locator.hints:
1019 if hint.startswith('K@'):
1021 sorted_roots.append(
1022 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1023 elif len(hint) == 29:
1024 svc = self._gateway_services.get(hint[2:])
1026 sorted_roots.append(svc['_service_root'])
1028 # Sort the available local services by weight (heaviest first)
1029 # for this locator, and return their service_roots (base URIs)
1031 use_services = self._keep_services
1033 use_services = self._writable_services
1034 self.using_proxy = self._any_nondisk_services(use_services)
1035 sorted_roots.extend([
1036 svc['_service_root'] for svc in sorted(
1039 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1040 _logger.debug("{}: {}".format(locator, sorted_roots))
1043 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1044 # roots_map is a dictionary, mapping Keep service root strings
1045 # to KeepService objects. Poll for Keep services, and add any
1046 # new ones to roots_map. Return the current list of local
1048 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1049 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1050 for root in local_roots:
1051 if root not in roots_map:
1052 roots_map[root] = self.KeepService(
1053 root, self._user_agent_pool,
1054 upload_counter=self.upload_counter,
1055 download_counter=self.download_counter,
1057 insecure=self.insecure)
1061 def _check_loop_result(result):
1062 # KeepClient RetryLoops should save results as a 2-tuple: the
1063 # actual result of the request, and the number of servers available
1064 # to receive the request this round.
1065 # This method returns True if there's a real result, False if
1066 # there are no more servers available, otherwise None.
1067 if isinstance(result, Exception):
1069 result, tried_server_count = result
1070 if (result is not None) and (result is not False):
1072 elif tried_server_count < 1:
1073 _logger.info("No more Keep services to try; giving up")
1078 def get_from_cache(self, loc_s):
1079 """Fetch a block only if is in the cache, otherwise return None."""
1080 locator = KeepLocator(loc_s)
1081 slot = self.block_cache.get(locator.md5sum)
1082 if slot is not None and slot.ready.is_set():
1087 def refresh_signature(self, loc):
1088 """Ask Keep to get the remote block and return its local signature"""
1089 now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1090 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1093 def head(self, loc_s, **kwargs):
1094 return self._get_or_head(loc_s, method="HEAD", **kwargs)
1097 def get(self, loc_s, **kwargs):
1098 return self._get_or_head(loc_s, method="GET", **kwargs)
1100 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1101 """Get data from Keep.
1103 This method fetches one or more blocks of data from Keep. It
1104 sends a request each Keep service registered with the API
1105 server (or the proxy provided when this client was
1106 instantiated), then each service named in location hints, in
1107 sequence. As soon as one service provides the data, it's
1111 * loc_s: A string of one or more comma-separated locators to fetch.
1112 This method returns the concatenation of these blocks.
1113 * num_retries: The number of times to retry GET requests to
1114 *each* Keep server if it returns temporary failures, with
1115 exponential backoff. Note that, in each loop, the method may try
1116 to fetch data from every available Keep service, along with any
1117 that are named in location hints in the locator. The default value
1118 is set when the KeepClient is initialized.
1121 return ''.join(self.get(x) for x in loc_s.split(','))
1123 self.get_counter.add(1)
1125 request_id = (request_id or
1126 (hasattr(self, 'api_client') and self.api_client.request_id) or
1127 arvados.util.new_request_id())
1130 headers['X-Request-Id'] = request_id
1135 locator = KeepLocator(loc_s)
1137 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1140 # this is request for a prefetch, if it is
1141 # already in flight, return immediately.
1142 # clear 'slot' to prevent finally block from
1143 # calling slot.set()
1146 self.hits_counter.add(1)
1149 raise arvados.errors.KeepReadError(
1150 "failed to read {}".format(loc_s))
1153 self.misses_counter.add(1)
1155 # If the locator has hints specifying a prefix (indicating a
1156 # remote keepproxy) or the UUID of a local gateway service,
1157 # read data from the indicated service(s) instead of the usual
1158 # list of local disk services.
1159 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1160 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1161 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1162 for hint in locator.hints if (
1163 hint.startswith('K@') and
1165 self._gateway_services.get(hint[2:])
1167 # Map root URLs to their KeepService objects.
1169 root: self.KeepService(root, self._user_agent_pool,
1170 upload_counter=self.upload_counter,
1171 download_counter=self.download_counter,
1173 insecure=self.insecure)
1174 for root in hint_roots
1177 # See #3147 for a discussion of the loop implementation. Highlights:
1178 # * Refresh the list of Keep services after each failure, in case
1179 # it's being updated.
1180 # * Retry until we succeed, we're out of retries, or every available
1181 # service has returned permanent failure.
1184 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1186 for tries_left in loop:
1188 sorted_roots = self.map_new_services(
1190 force_rebuild=(tries_left < num_retries),
1191 need_writable=False,
1193 except Exception as error:
1194 loop.save_result(error)
1197 # Query KeepService objects that haven't returned
1198 # permanent failure, in our specified shuffle order.
1199 services_to_try = [roots_map[root]
1200 for root in sorted_roots
1201 if roots_map[root].usable()]
1202 for keep_service in services_to_try:
1203 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1204 if blob is not None:
1206 loop.save_result((blob, len(services_to_try)))
1208 # Always cache the result, then return it if we succeeded.
1212 if slot is not None:
1214 self.block_cache.cap_cache()
1216 # Q: Including 403 is necessary for the Keep tests to continue
1217 # passing, but maybe they should expect KeepReadError instead?
1218 not_founds = sum(1 for key in sorted_roots
1219 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1220 service_errors = ((key, roots_map[key].last_result()['error'])
1221 for key in sorted_roots)
1223 raise arvados.errors.KeepReadError(
1224 "[{}] failed to read {}: no Keep services available ({})".format(
1225 request_id, loc_s, loop.last_result()))
1226 elif not_founds == len(sorted_roots):
1227 raise arvados.errors.NotFoundError(
1228 "[{}] {} not found".format(request_id, loc_s), service_errors)
1230 raise arvados.errors.KeepReadError(
1231 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1234 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1235 """Save data in Keep.
1237 This method will get a list of Keep services from the API server, and
1238 send the data to each one simultaneously in a new thread. Once the
1239 uploads are finished, if enough copies are saved, this method returns
1240 the most recent HTTP response body. If requests fail to upload
1241 enough copies, this method raises KeepWriteError.
1244 * data: The string of data to upload.
1245 * copies: The number of copies that the user requires be saved.
1247 * num_retries: The number of times to retry PUT requests to
1248 *each* Keep server if it returns temporary failures, with
1249 exponential backoff. The default value is set when the
1250 KeepClient is initialized.
1251 * classes: An optional list of storage class names where copies should
1255 classes = classes or self._default_classes
1257 if not isinstance(data, bytes):
1258 data = data.encode()
1260 self.put_counter.add(1)
1262 data_hash = hashlib.md5(data).hexdigest()
1263 loc_s = data_hash + '+' + str(len(data))
1266 locator = KeepLocator(loc_s)
1268 request_id = (request_id or
1269 (hasattr(self, 'api_client') and self.api_client.request_id) or
1270 arvados.util.new_request_id())
1272 'X-Request-Id': request_id,
1273 'X-Keep-Desired-Replicas': str(copies),
1276 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1280 for tries_left in loop:
1282 sorted_roots = self.map_new_services(
1284 force_rebuild=(tries_left < num_retries),
1287 except Exception as error:
1288 loop.save_result(error)
1291 pending_classes = []
1292 if done_classes is not None:
1293 pending_classes = list(set(classes) - set(done_classes))
1294 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1295 data_hash=data_hash,
1296 copies=copies - done_copies,
1297 max_service_replicas=self.max_replicas_per_service,
1298 timeout=self.current_timeout(num_retries - tries_left),
1299 classes=pending_classes)
1300 for service_root, ks in [(root, roots_map[root])
1301 for root in sorted_roots]:
1304 writer_pool.add_task(ks, service_root)
1306 pool_copies, pool_classes = writer_pool.done()
1307 done_copies += pool_copies
1308 if (done_classes is not None) and (pool_classes is not None):
1309 done_classes += pool_classes
1311 (done_copies >= copies and set(done_classes) == set(classes),
1312 writer_pool.total_task_nr))
1314 # Old keepstore contacted without storage classes support:
1315 # success is determined only by successful copies.
1317 # Disable storage classes tracking from this point forward.
1318 if not self._storage_classes_unsupported_warning:
1319 self._storage_classes_unsupported_warning = True
1320 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1323 (done_copies >= copies, writer_pool.total_task_nr))
1326 return writer_pool.response()
1328 raise arvados.errors.KeepWriteError(
1329 "[{}] failed to write {}: no Keep services available ({})".format(
1330 request_id, data_hash, loop.last_result()))
1332 service_errors = ((key, roots_map[key].last_result()['error'])
1333 for key in sorted_roots
1334 if roots_map[key].last_result()['error'])
1335 raise arvados.errors.KeepWriteError(
1336 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1337 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1339 def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1340 """A stub for put().
1342 This method is used in place of the real put() method when
1343 using local storage (see constructor's local_store argument).
1345 copies and num_retries arguments are ignored: they are here
1346 only for the sake of offering the same call signature as
1349 Data stored this way can be retrieved via local_store_get().
1351 md5 = hashlib.md5(data).hexdigest()
1352 locator = '%s+%d' % (md5, len(data))
1353 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1355 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1356 os.path.join(self.local_store, md5))
1359 def local_store_get(self, loc_s, num_retries=None):
1360 """Companion to local_store_put()."""
1362 locator = KeepLocator(loc_s)
1364 raise arvados.errors.NotFoundError(
1365 "Invalid data locator: '%s'" % loc_s)
1366 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1368 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1371 def local_store_head(self, loc_s, num_retries=None):
1372 """Companion to local_store_put()."""
1374 locator = KeepLocator(loc_s)
1376 raise arvados.errors.NotFoundError(
1377 "Invalid data locator: '%s'" % loc_s)
1378 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1380 if os.path.exists(os.path.join(self.local_store, locator.md5sum)):