1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from __future__ import division
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
35 if sys.version_info >= (3, 0):
36 from io import BytesIO
38 from cStringIO import StringIO as BytesIO
41 import arvados.config as config
43 import arvados.retry as retry
45 import arvados.diskcache
47 _logger = logging.getLogger('arvados.keep')
48 global_client_object = None
51 # Monkey patch TCP constants when not available (apple). Values sourced from:
52 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
53 if sys.platform == 'darwin':
54 if not hasattr(socket, 'TCP_KEEPALIVE'):
55 socket.TCP_KEEPALIVE = 0x010
56 if not hasattr(socket, 'TCP_KEEPINTVL'):
57 socket.TCP_KEEPINTVL = 0x101
58 if not hasattr(socket, 'TCP_KEEPCNT'):
59 socket.TCP_KEEPCNT = 0x102
62 class KeepLocator(object):
63 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
64 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
66 def __init__(self, locator_str):
69 self._perm_expiry = None
70 pieces = iter(locator_str.split('+'))
71 self.md5sum = next(pieces)
73 self.size = int(next(pieces))
77 if self.HINT_RE.match(hint) is None:
78 raise ValueError("invalid hint format: {}".format(hint))
79 elif hint.startswith('A'):
80 self.parse_permission_hint(hint)
82 self.hints.append(hint)
87 for s in [self.md5sum, self.size,
88 self.permission_hint()] + self.hints
92 if self.size is not None:
93 return "%s+%i" % (self.md5sum, self.size)
97 def _make_hex_prop(name, length):
98 # Build and return a new property with the given name that
99 # must be a hex string of the given length.
100 data_name = '_{}'.format(name)
102 return getattr(self, data_name)
103 def setter(self, hex_str):
104 if not arvados.util.is_hex(hex_str, length):
105 raise ValueError("{} is not a {}-digit hex string: {!r}".
106 format(name, length, hex_str))
107 setattr(self, data_name, hex_str)
108 return property(getter, setter)
110 md5sum = _make_hex_prop('md5sum', 32)
111 perm_sig = _make_hex_prop('perm_sig', 40)
114 def perm_expiry(self):
115 return self._perm_expiry
118 def perm_expiry(self, value):
119 if not arvados.util.is_hex(value, 1, 8):
121 "permission timestamp must be a hex Unix timestamp: {}".
123 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
125 def permission_hint(self):
126 data = [self.perm_sig, self.perm_expiry]
129 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
130 return "A{}@{:08x}".format(*data)
132 def parse_permission_hint(self, s):
134 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
136 raise ValueError("bad permission hint {}".format(s))
138 def permission_expired(self, as_of_dt=None):
139 if self.perm_expiry is None:
141 elif as_of_dt is None:
142 as_of_dt = datetime.datetime.now()
143 return self.perm_expiry <= as_of_dt
147 """Simple interface to a global KeepClient object.
149 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
150 own API client. The global KeepClient will build an API client from the
151 current Arvados configuration, which may not match the one you built.
156 def global_client_object(cls):
157 global global_client_object
158 # Previously, KeepClient would change its behavior at runtime based
159 # on these configuration settings. We simulate that behavior here
160 # by checking the values and returning a new KeepClient if any of
162 key = (config.get('ARVADOS_API_HOST'),
163 config.get('ARVADOS_API_TOKEN'),
164 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
165 config.get('ARVADOS_KEEP_PROXY'),
166 os.environ.get('KEEP_LOCAL_STORE'))
167 if (global_client_object is None) or (cls._last_key != key):
168 global_client_object = KeepClient()
170 return global_client_object
173 def get(locator, **kwargs):
174 return Keep.global_client_object().get(locator, **kwargs)
177 def put(data, **kwargs):
178 return Keep.global_client_object().put(data, **kwargs)
180 class KeepBlockCache(object):
181 def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
182 self.cache_max = cache_max
184 self._cache_lock = threading.Lock()
185 self._max_slots = max_slots
186 self._disk_cache = disk_cache
187 self._disk_cache_dir = disk_cache_dir
189 if self._disk_cache and self._disk_cache_dir is None:
190 self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
191 os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
193 if self._max_slots == 0:
195 # default max slots to half of maximum file handles
196 # NOFILE typically defaults to 1024 on Linux so this
198 self._max_slots = resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 2
201 self._max_slots = 512
203 if self.cache_max == 0:
205 fs = os.statvfs(self._disk_cache_dir)
206 # Calculation of available space incorporates existing cache usage
207 existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
208 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
209 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
211 # 10% of total disk size
212 # 25% of available space
214 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
217 self.cache_max = (256 * 1024 * 1024)
219 self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
222 self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
226 class CacheSlot(object):
227 __slots__ = ("locator", "ready", "content")
229 def __init__(self, locator):
230 self.locator = locator
231 self.ready = threading.Event()
238 def set(self, value):
243 if self.content is None:
246 return len(self.content)
252 '''Cap the cache size to self.cache_max'''
253 with self._cache_lock:
254 # Select all slots except those where ready.is_set() and content is
255 # None (that means there was an error reading the block).
256 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
257 sm = sum([slot.size() for slot in self._cache])
258 while len(self._cache) > 0 and (sm > self.cache_max or len(self._cache) > self._max_slots):
259 for i in range(len(self._cache)-1, -1, -1):
260 # start from the back, find a slot that is a candidate to evict
261 if self._cache[i].ready.is_set():
262 sz = self._cache[i].size()
264 # If evict returns false it means the
265 # underlying disk cache couldn't lock the file
266 # for deletion because another process was using
267 # it. Don't count it as reducing the amount
268 # of data in the cache, find something else to
270 if self._cache[i].evict():
273 # either way we forget about it. either the
274 # other process will delete it, or if we need
275 # it again and it is still there, we'll find
280 def _get(self, locator):
281 # Test if the locator is already in the cache
282 for i in range(0, len(self._cache)):
283 if self._cache[i].locator == locator:
286 # move it to the front
288 self._cache.insert(0, n)
291 # see if it exists on disk
292 n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
294 self._cache.insert(0, n)
298 def get(self, locator):
299 with self._cache_lock:
300 return self._get(locator)
302 def reserve_cache(self, locator):
303 '''Reserve a cache slot for the specified locator,
304 or return the existing slot.'''
305 with self._cache_lock:
306 n = self._get(locator)
310 # Add a new cache slot for the locator
312 n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
314 n = KeepBlockCache.CacheSlot(locator)
315 self._cache.insert(0, n)
318 def set(self, slot, blob):
325 if e.errno == errno.ENOMEM:
326 # Reduce max slots to current - 4, cap cache and retry
327 with self._cache_lock:
328 self._max_slots = max(4, len(self._cache) - 4)
329 elif e.errno == errno.ENOSPC:
330 # Reduce disk max space to current - 256 MiB, cap cache and retry
331 with self._cache_lock:
332 sm = sum([st.size() for st in self._cache])
333 self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
334 elif e.errno == errno.ENODEV:
335 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
336 except Exception as e:
341 # There was an error. Evict some slots and try again.
344 except Exception as e:
345 # It failed again. Give up.
346 raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
348 # Set the notice that that we are done with the cache
349 # slot one way or another.
354 class Counter(object):
355 def __init__(self, v=0):
356 self._lk = threading.Lock()
368 class KeepClient(object):
370 # Default Keep server connection timeout: 2 seconds
371 # Default Keep server read timeout: 256 seconds
372 # Default Keep server bandwidth minimum: 32768 bytes per second
373 # Default Keep proxy connection timeout: 20 seconds
374 # Default Keep proxy read timeout: 256 seconds
375 # Default Keep proxy bandwidth minimum: 32768 bytes per second
376 DEFAULT_TIMEOUT = (2, 256, 32768)
377 DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
380 class KeepService(object):
381 """Make requests to a single Keep service, and track results.
383 A KeepService is intended to last long enough to perform one
384 transaction (GET or PUT) against one Keep service. This can
385 involve calling either get() or put() multiple times in order
386 to retry after transient failures. However, calling both get()
387 and put() on a single instance -- or using the same instance
388 to access two different Keep services -- will not produce
395 arvados.errors.HttpError,
398 def __init__(self, root, user_agent_pool=queue.LifoQueue(),
400 download_counter=None,
404 self._user_agent_pool = user_agent_pool
405 self._result = {'error': None}
409 self.get_headers = {'Accept': 'application/octet-stream'}
410 self.get_headers.update(headers)
411 self.put_headers = headers
412 self.upload_counter = upload_counter
413 self.download_counter = download_counter
414 self.insecure = insecure
417 """Is it worth attempting a request?"""
421 """Did the request succeed or encounter permanent failure?"""
422 return self._result['error'] == False or not self._usable
424 def last_result(self):
427 def _get_user_agent(self):
429 return self._user_agent_pool.get(block=False)
433 def _put_user_agent(self, ua):
436 self._user_agent_pool.put(ua, block=False)
440 def _socket_open(self, *args, **kwargs):
441 if len(args) + len(kwargs) == 2:
442 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
444 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
446 def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
447 return self._socket_open_pycurl_7_21_5(
449 address=collections.namedtuple(
450 'Address', ['family', 'socktype', 'protocol', 'addr'],
451 )(family, socktype, protocol, address))
453 def _socket_open_pycurl_7_21_5(self, purpose, address):
454 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
455 s = socket.socket(address.family, address.socktype, address.protocol)
456 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
457 # Will throw invalid protocol error on mac. This test prevents that.
458 if hasattr(socket, 'TCP_KEEPIDLE'):
459 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
460 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
464 def get(self, locator, method="GET", timeout=None):
465 # locator is a KeepLocator object.
466 url = self.root + str(locator)
467 _logger.debug("Request: %s %s", method, url)
468 curl = self._get_user_agent()
471 with timer.Timer() as t:
473 response_body = BytesIO()
474 curl.setopt(pycurl.NOSIGNAL, 1)
475 curl.setopt(pycurl.OPENSOCKETFUNCTION,
476 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
477 curl.setopt(pycurl.URL, url.encode('utf-8'))
478 curl.setopt(pycurl.HTTPHEADER, [
479 '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
480 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
481 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
483 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
484 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
486 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
488 curl.setopt(pycurl.NOBODY, True)
489 self._setcurltimeouts(curl, timeout, method=="HEAD")
493 except Exception as e:
494 raise arvados.errors.HttpError(0, str(e))
500 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
501 'body': response_body.getvalue(),
502 'headers': self._headers,
506 ok = retry.check_http_response_success(self._result['status_code'])
508 self._result['error'] = arvados.errors.HttpError(
509 self._result['status_code'],
510 self._headers.get('x-status-line', 'Error'))
511 except self.HTTP_ERRORS as e:
515 self._usable = ok != False
516 if self._result.get('status_code', None):
517 # The client worked well enough to get an HTTP status
518 # code, so presumably any problems are just on the
519 # server side and it's OK to reuse the client.
520 self._put_user_agent(curl)
522 # Don't return this client to the pool, in case it's
526 _logger.debug("Request fail: GET %s => %s: %s",
527 url, type(self._result['error']), str(self._result['error']))
530 _logger.info("HEAD %s: %s bytes",
531 self._result['status_code'],
532 self._result.get('content-length'))
533 if self._result['headers'].get('x-keep-locator'):
534 # This is a response to a remote block copy request, return
535 # the local copy block locator.
536 return self._result['headers'].get('x-keep-locator')
539 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
540 self._result['status_code'],
541 len(self._result['body']),
543 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
545 if self.download_counter:
546 self.download_counter.add(len(self._result['body']))
547 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
548 if resp_md5 != locator.md5sum:
549 _logger.warning("Checksum fail: md5(%s) = %s",
551 self._result['error'] = arvados.errors.HttpError(
554 return self._result['body']
556 def put(self, hash_s, body, timeout=None, headers={}):
557 put_headers = copy.copy(self.put_headers)
558 put_headers.update(headers)
559 url = self.root + hash_s
560 _logger.debug("Request: PUT %s", url)
561 curl = self._get_user_agent()
564 with timer.Timer() as t:
566 body_reader = BytesIO(body)
567 response_body = BytesIO()
568 curl.setopt(pycurl.NOSIGNAL, 1)
569 curl.setopt(pycurl.OPENSOCKETFUNCTION,
570 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
571 curl.setopt(pycurl.URL, url.encode('utf-8'))
572 # Using UPLOAD tells cURL to wait for a "go ahead" from the
573 # Keep server (in the form of a HTTP/1.1 "100 Continue"
574 # response) instead of sending the request body immediately.
575 # This allows the server to reject the request if the request
576 # is invalid or the server is read-only, without waiting for
577 # the client to send the entire block.
578 curl.setopt(pycurl.UPLOAD, True)
579 curl.setopt(pycurl.INFILESIZE, len(body))
580 curl.setopt(pycurl.READFUNCTION, body_reader.read)
581 curl.setopt(pycurl.HTTPHEADER, [
582 '{}: {}'.format(k,v) for k,v in put_headers.items()])
583 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
584 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
586 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
587 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
589 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
590 self._setcurltimeouts(curl, timeout)
593 except Exception as e:
594 raise arvados.errors.HttpError(0, str(e))
600 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
601 'body': response_body.getvalue().decode('utf-8'),
602 'headers': self._headers,
605 ok = retry.check_http_response_success(self._result['status_code'])
607 self._result['error'] = arvados.errors.HttpError(
608 self._result['status_code'],
609 self._headers.get('x-status-line', 'Error'))
610 except self.HTTP_ERRORS as e:
614 self._usable = ok != False # still usable if ok is True or None
615 if self._result.get('status_code', None):
616 # Client is functional. See comment in get().
617 self._put_user_agent(curl)
621 _logger.debug("Request fail: PUT %s => %s: %s",
622 url, type(self._result['error']), str(self._result['error']))
624 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
625 self._result['status_code'],
628 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
629 if self.upload_counter:
630 self.upload_counter.add(len(body))
633 def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
636 elif isinstance(timeouts, tuple):
637 if len(timeouts) == 2:
638 conn_t, xfer_t = timeouts
639 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
641 conn_t, xfer_t, bandwidth_bps = timeouts
643 conn_t, xfer_t = (timeouts, timeouts)
644 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
645 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
646 if not ignore_bandwidth:
647 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
648 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
650 def _headerfunction(self, header_line):
651 if isinstance(header_line, bytes):
652 header_line = header_line.decode('iso-8859-1')
653 if ':' in header_line:
654 name, value = header_line.split(':', 1)
655 name = name.strip().lower()
656 value = value.strip()
658 name = self._lastheadername
659 value = self._headers[name] + ' ' + header_line.strip()
660 elif header_line.startswith('HTTP/'):
661 name = 'x-status-line'
664 _logger.error("Unexpected header line: %s", header_line)
666 self._lastheadername = name
667 self._headers[name] = value
668 # Returning None implies all bytes were written
671 class KeepWriterQueue(queue.Queue):
672 def __init__(self, copies, classes=[]):
673 queue.Queue.__init__(self) # Old-style superclass
674 self.wanted_copies = copies
675 self.wanted_storage_classes = classes
676 self.successful_copies = 0
677 self.confirmed_storage_classes = {}
679 self.storage_classes_tracking = True
680 self.queue_data_lock = threading.RLock()
681 self.pending_tries = max(copies, len(classes))
682 self.pending_tries_notification = threading.Condition()
684 def write_success(self, response, replicas_nr, classes_confirmed):
685 with self.queue_data_lock:
686 self.successful_copies += replicas_nr
687 if classes_confirmed is None:
688 self.storage_classes_tracking = False
689 elif self.storage_classes_tracking:
690 for st_class, st_copies in classes_confirmed.items():
692 self.confirmed_storage_classes[st_class] += st_copies
694 self.confirmed_storage_classes[st_class] = st_copies
695 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
696 self.response = response
697 with self.pending_tries_notification:
698 self.pending_tries_notification.notify_all()
700 def write_fail(self, ks):
701 with self.pending_tries_notification:
702 self.pending_tries += 1
703 self.pending_tries_notification.notify()
705 def pending_copies(self):
706 with self.queue_data_lock:
707 return self.wanted_copies - self.successful_copies
709 def satisfied_classes(self):
710 with self.queue_data_lock:
711 if not self.storage_classes_tracking:
712 # Notifies disabled storage classes expectation to
715 return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
717 def pending_classes(self):
718 with self.queue_data_lock:
719 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
721 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
722 for st_class, st_copies in self.confirmed_storage_classes.items():
723 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
724 unsatisfied_classes.remove(st_class)
725 return unsatisfied_classes
727 def get_next_task(self):
728 with self.pending_tries_notification:
730 if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
731 # This notify_all() is unnecessary --
732 # write_success() already called notify_all()
733 # when pending<1 became true, so it's not
734 # possible for any other thread to be in
735 # wait() now -- but it's cheap insurance
736 # against deadlock so we do it anyway:
737 self.pending_tries_notification.notify_all()
738 # Drain the queue and then raise Queue.Empty
742 elif self.pending_tries > 0:
743 service, service_root = self.get_nowait()
744 if service.finished():
747 self.pending_tries -= 1
748 return service, service_root
750 self.pending_tries_notification.notify_all()
753 self.pending_tries_notification.wait()
756 class KeepWriterThreadPool(object):
757 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
758 self.total_task_nr = 0
759 if (not max_service_replicas) or (max_service_replicas >= copies):
762 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
763 _logger.debug("Pool max threads is %d", num_threads)
765 self.queue = KeepClient.KeepWriterQueue(copies, classes)
767 for _ in range(num_threads):
768 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
769 self.workers.append(w)
771 def add_task(self, ks, service_root):
772 self.queue.put((ks, service_root))
773 self.total_task_nr += 1
776 return self.queue.successful_copies, self.queue.satisfied_classes()
780 for worker in self.workers:
782 # Wait for finished work
786 return self.queue.response
789 class KeepWriterThread(threading.Thread):
790 class TaskFailed(RuntimeError): pass
792 def __init__(self, queue, data, data_hash, timeout=None):
793 super(KeepClient.KeepWriterThread, self).__init__()
794 self.timeout = timeout
797 self.data_hash = data_hash
803 service, service_root = self.queue.get_next_task()
807 locator, copies, classes = self.do_task(service, service_root)
808 except Exception as e:
809 if not isinstance(e, self.TaskFailed):
810 _logger.exception("Exception in KeepWriterThread")
811 self.queue.write_fail(service)
813 self.queue.write_success(locator, copies, classes)
815 self.queue.task_done()
817 def do_task(self, service, service_root):
818 classes = self.queue.pending_classes()
822 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
823 success = bool(service.put(self.data_hash,
825 timeout=self.timeout,
827 result = service.last_result()
830 if result.get('status_code'):
831 _logger.debug("Request fail: PUT %s => %s %s",
833 result.get('status_code'),
835 raise self.TaskFailed()
837 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
838 str(threading.current_thread()),
843 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
844 except (KeyError, ValueError):
847 classes_confirmed = {}
849 scch = result['headers']['x-keep-storage-classes-confirmed']
850 for confirmation in scch.replace(' ', '').split(','):
851 if '=' in confirmation:
852 stored_class, stored_copies = confirmation.split('=')[:2]
853 classes_confirmed[stored_class] = int(stored_copies)
854 except (KeyError, ValueError):
855 # Storage classes confirmed header missing or corrupt
856 classes_confirmed = None
858 return result['body'].strip(), replicas_stored, classes_confirmed
861 def __init__(self, api_client=None, proxy=None,
862 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
863 api_token=None, local_store=None, block_cache=None,
864 num_retries=0, session=None):
865 """Initialize a new KeepClient.
869 The API client to use to find Keep services. If not
870 provided, KeepClient will build one from available Arvados
874 If specified, this KeepClient will send requests to this Keep
875 proxy. Otherwise, KeepClient will fall back to the setting of the
876 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
877 If you want to KeepClient does not use a proxy, pass in an empty
881 The initial timeout (in seconds) for HTTP requests to Keep
882 non-proxy servers. A tuple of three floats is interpreted as
883 (connection_timeout, read_timeout, minimum_bandwidth). A connection
884 will be aborted if the average traffic rate falls below
885 minimum_bandwidth bytes per second over an interval of read_timeout
886 seconds. Because timeouts are often a result of transient server
887 load, the actual connection timeout will be increased by a factor
888 of two on each retry.
889 Default: (2, 256, 32768).
892 The initial timeout (in seconds) for HTTP requests to
893 Keep proxies. A tuple of three floats is interpreted as
894 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
895 described above for adjusting connection timeouts on retry also
897 Default: (20, 256, 32768).
900 If you're not using an API client, but only talking
901 directly to a Keep proxy, this parameter specifies an API token
902 to authenticate Keep requests. It is an error to specify both
903 api_client and api_token. If you specify neither, KeepClient
904 will use one available from the Arvados configuration.
907 If specified, this KeepClient will bypass Keep
908 services, and save data to the named directory. If unspecified,
909 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
910 environment variable. If you want to ensure KeepClient does not
911 use local storage, pass in an empty string. This is primarily
912 intended to mock a server for testing.
915 The default number of times to retry failed requests.
916 This will be used as the default num_retries value when get() and
917 put() are called. Default 0.
919 self.lock = threading.Lock()
921 if config.get('ARVADOS_KEEP_SERVICES'):
922 proxy = config.get('ARVADOS_KEEP_SERVICES')
924 proxy = config.get('ARVADOS_KEEP_PROXY')
925 if api_token is None:
926 if api_client is None:
927 api_token = config.get('ARVADOS_API_TOKEN')
929 api_token = api_client.api_token
930 elif api_client is not None:
932 "can't build KeepClient with both API client and token")
933 if local_store is None:
934 local_store = os.environ.get('KEEP_LOCAL_STORE')
936 if api_client is None:
937 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
939 self.insecure = api_client.insecure
941 self.block_cache = block_cache if block_cache else KeepBlockCache()
942 self.timeout = timeout
943 self.proxy_timeout = proxy_timeout
944 self._user_agent_pool = queue.LifoQueue()
945 self.upload_counter = Counter()
946 self.download_counter = Counter()
947 self.put_counter = Counter()
948 self.get_counter = Counter()
949 self.hits_counter = Counter()
950 self.misses_counter = Counter()
951 self._storage_classes_unsupported_warning = False
952 self._default_classes = []
955 self.local_store = local_store
956 self.head = self.local_store_head
957 self.get = self.local_store_get
958 self.put = self.local_store_put
960 self.num_retries = num_retries
961 self.max_replicas_per_service = None
963 proxy_uris = proxy.split()
964 for i in range(len(proxy_uris)):
965 if not proxy_uris[i].endswith('/'):
968 url = urllib.parse.urlparse(proxy_uris[i])
969 if not (url.scheme and url.netloc):
970 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
971 self.api_token = api_token
972 self._gateway_services = {}
973 self._keep_services = [{
974 'uuid': "00000-bi6l4-%015d" % idx,
975 'service_type': 'proxy',
976 '_service_root': uri,
977 } for idx, uri in enumerate(proxy_uris)]
978 self._writable_services = self._keep_services
979 self.using_proxy = True
980 self._static_services_list = True
982 # It's important to avoid instantiating an API client
983 # unless we actually need one, for testing's sake.
984 if api_client is None:
985 api_client = arvados.api('v1')
986 self.api_client = api_client
987 self.api_token = api_client.api_token
988 self._gateway_services = {}
989 self._keep_services = None
990 self._writable_services = None
991 self.using_proxy = None
992 self._static_services_list = False
994 self._default_classes = [
995 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
997 # We're talking to an old cluster
1000 def current_timeout(self, attempt_number):
1001 """Return the appropriate timeout to use for this client.
1003 The proxy timeout setting if the backend service is currently a proxy,
1004 the regular timeout setting otherwise. The `attempt_number` indicates
1005 how many times the operation has been tried already (starting from 0
1006 for the first try), and scales the connection timeout portion of the
1007 return value accordingly.
1010 # TODO(twp): the timeout should be a property of a
1011 # KeepService, not a KeepClient. See #4488.
1012 t = self.proxy_timeout if self.using_proxy else self.timeout
1014 return (t[0] * (1 << attempt_number), t[1])
1016 return (t[0] * (1 << attempt_number), t[1], t[2])
1017 def _any_nondisk_services(self, service_list):
1018 return any(ks.get('service_type', 'disk') != 'disk'
1019 for ks in service_list)
1021 def build_services_list(self, force_rebuild=False):
1022 if (self._static_services_list or
1023 (self._keep_services and not force_rebuild)):
1027 keep_services = self.api_client.keep_services().accessible()
1028 except Exception: # API server predates Keep services.
1029 keep_services = self.api_client.keep_disks().list()
1031 # Gateway services are only used when specified by UUID,
1032 # so there's nothing to gain by filtering them by
1034 self._gateway_services = {ks['uuid']: ks for ks in
1035 keep_services.execute()['items']}
1036 if not self._gateway_services:
1037 raise arvados.errors.NoKeepServersError()
1039 # Precompute the base URI for each service.
1040 for r in self._gateway_services.values():
1041 host = r['service_host']
1042 if not host.startswith('[') and host.find(':') >= 0:
1043 # IPv6 URIs must be formatted like http://[::1]:80/...
1044 host = '[' + host + ']'
1045 r['_service_root'] = "{}://{}:{:d}/".format(
1046 'https' if r['service_ssl_flag'] else 'http',
1050 _logger.debug(str(self._gateway_services))
1051 self._keep_services = [
1052 ks for ks in self._gateway_services.values()
1053 if not ks.get('service_type', '').startswith('gateway:')]
1054 self._writable_services = [ks for ks in self._keep_services
1055 if not ks.get('read_only')]
1057 # For disk type services, max_replicas_per_service is 1
1058 # It is unknown (unlimited) for other service types.
1059 if self._any_nondisk_services(self._writable_services):
1060 self.max_replicas_per_service = None
1062 self.max_replicas_per_service = 1
1064 def _service_weight(self, data_hash, service_uuid):
1065 """Compute the weight of a Keep service endpoint for a data
1066 block with a known hash.
1068 The weight is md5(h + u) where u is the last 15 characters of
1069 the service endpoint's UUID.
1071 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1073 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1074 """Return an array of Keep service endpoints, in the order in
1075 which they should be probed when reading or writing data with
1076 the given hash+hints.
1078 self.build_services_list(force_rebuild)
1081 # Use the services indicated by the given +K@... remote
1082 # service hints, if any are present and can be resolved to a
1084 for hint in locator.hints:
1085 if hint.startswith('K@'):
1087 sorted_roots.append(
1088 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1089 elif len(hint) == 29:
1090 svc = self._gateway_services.get(hint[2:])
1092 sorted_roots.append(svc['_service_root'])
1094 # Sort the available local services by weight (heaviest first)
1095 # for this locator, and return their service_roots (base URIs)
1097 use_services = self._keep_services
1099 use_services = self._writable_services
1100 self.using_proxy = self._any_nondisk_services(use_services)
1101 sorted_roots.extend([
1102 svc['_service_root'] for svc in sorted(
1105 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1106 _logger.debug("{}: {}".format(locator, sorted_roots))
1109 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1110 # roots_map is a dictionary, mapping Keep service root strings
1111 # to KeepService objects. Poll for Keep services, and add any
1112 # new ones to roots_map. Return the current list of local
1114 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1115 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1116 for root in local_roots:
1117 if root not in roots_map:
1118 roots_map[root] = self.KeepService(
1119 root, self._user_agent_pool,
1120 upload_counter=self.upload_counter,
1121 download_counter=self.download_counter,
1123 insecure=self.insecure)
1127 def _check_loop_result(result):
1128 # KeepClient RetryLoops should save results as a 2-tuple: the
1129 # actual result of the request, and the number of servers available
1130 # to receive the request this round.
1131 # This method returns True if there's a real result, False if
1132 # there are no more servers available, otherwise None.
1133 if isinstance(result, Exception):
1135 result, tried_server_count = result
1136 if (result is not None) and (result is not False):
1138 elif tried_server_count < 1:
1139 _logger.info("No more Keep services to try; giving up")
1144 def get_from_cache(self, loc_s):
1145 """Fetch a block only if is in the cache, otherwise return None."""
1146 locator = KeepLocator(loc_s)
1147 slot = self.block_cache.get(locator.md5sum)
1148 if slot is not None and slot.ready.is_set():
1153 def refresh_signature(self, loc):
1154 """Ask Keep to get the remote block and return its local signature"""
1155 now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1156 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1159 def head(self, loc_s, **kwargs):
1160 return self._get_or_head(loc_s, method="HEAD", **kwargs)
1163 def get(self, loc_s, **kwargs):
1164 return self._get_or_head(loc_s, method="GET", **kwargs)
1166 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1167 """Get data from Keep.
1169 This method fetches one or more blocks of data from Keep. It
1170 sends a request each Keep service registered with the API
1171 server (or the proxy provided when this client was
1172 instantiated), then each service named in location hints, in
1173 sequence. As soon as one service provides the data, it's
1177 * loc_s: A string of one or more comma-separated locators to fetch.
1178 This method returns the concatenation of these blocks.
1179 * num_retries: The number of times to retry GET requests to
1180 *each* Keep server if it returns temporary failures, with
1181 exponential backoff. Note that, in each loop, the method may try
1182 to fetch data from every available Keep service, along with any
1183 that are named in location hints in the locator. The default value
1184 is set when the KeepClient is initialized.
1187 return ''.join(self.get(x) for x in loc_s.split(','))
1189 self.get_counter.add(1)
1191 request_id = (request_id or
1192 (hasattr(self, 'api_client') and self.api_client.request_id) or
1193 arvados.util.new_request_id())
1196 headers['X-Request-Id'] = request_id
1201 locator = KeepLocator(loc_s)
1203 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1206 # this is request for a prefetch, if it is
1207 # already in flight, return immediately.
1208 # clear 'slot' to prevent finally block from
1209 # calling slot.set()
1212 self.hits_counter.add(1)
1215 raise arvados.errors.KeepReadError(
1216 "failed to read {}".format(loc_s))
1219 self.misses_counter.add(1)
1221 # If the locator has hints specifying a prefix (indicating a
1222 # remote keepproxy) or the UUID of a local gateway service,
1223 # read data from the indicated service(s) instead of the usual
1224 # list of local disk services.
1225 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1226 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1227 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1228 for hint in locator.hints if (
1229 hint.startswith('K@') and
1231 self._gateway_services.get(hint[2:])
1233 # Map root URLs to their KeepService objects.
1235 root: self.KeepService(root, self._user_agent_pool,
1236 upload_counter=self.upload_counter,
1237 download_counter=self.download_counter,
1239 insecure=self.insecure)
1240 for root in hint_roots
1243 # See #3147 for a discussion of the loop implementation. Highlights:
1244 # * Refresh the list of Keep services after each failure, in case
1245 # it's being updated.
1246 # * Retry until we succeed, we're out of retries, or every available
1247 # service has returned permanent failure.
1250 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1252 for tries_left in loop:
1254 sorted_roots = self.map_new_services(
1256 force_rebuild=(tries_left < num_retries),
1257 need_writable=False,
1259 except Exception as error:
1260 loop.save_result(error)
1263 # Query KeepService objects that haven't returned
1264 # permanent failure, in our specified shuffle order.
1265 services_to_try = [roots_map[root]
1266 for root in sorted_roots
1267 if roots_map[root].usable()]
1268 for keep_service in services_to_try:
1269 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1270 if blob is not None:
1272 loop.save_result((blob, len(services_to_try)))
1274 # Always cache the result, then return it if we succeeded.
1278 if slot is not None:
1279 self.block_cache.set(slot, blob)
1281 # Q: Including 403 is necessary for the Keep tests to continue
1282 # passing, but maybe they should expect KeepReadError instead?
1283 not_founds = sum(1 for key in sorted_roots
1284 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1285 service_errors = ((key, roots_map[key].last_result()['error'])
1286 for key in sorted_roots)
1288 raise arvados.errors.KeepReadError(
1289 "[{}] failed to read {}: no Keep services available ({})".format(
1290 request_id, loc_s, loop.last_result()))
1291 elif not_founds == len(sorted_roots):
1292 raise arvados.errors.NotFoundError(
1293 "[{}] {} not found".format(request_id, loc_s), service_errors)
1295 raise arvados.errors.KeepReadError(
1296 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1299 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1300 """Save data in Keep.
1302 This method will get a list of Keep services from the API server, and
1303 send the data to each one simultaneously in a new thread. Once the
1304 uploads are finished, if enough copies are saved, this method returns
1305 the most recent HTTP response body. If requests fail to upload
1306 enough copies, this method raises KeepWriteError.
1309 * data: The string of data to upload.
1310 * copies: The number of copies that the user requires be saved.
1312 * num_retries: The number of times to retry PUT requests to
1313 *each* Keep server if it returns temporary failures, with
1314 exponential backoff. The default value is set when the
1315 KeepClient is initialized.
1316 * classes: An optional list of storage class names where copies should
1320 classes = classes or self._default_classes
1322 if not isinstance(data, bytes):
1323 data = data.encode()
1325 self.put_counter.add(1)
1327 data_hash = hashlib.md5(data).hexdigest()
1328 loc_s = data_hash + '+' + str(len(data))
1331 locator = KeepLocator(loc_s)
1333 request_id = (request_id or
1334 (hasattr(self, 'api_client') and self.api_client.request_id) or
1335 arvados.util.new_request_id())
1337 'X-Request-Id': request_id,
1338 'X-Keep-Desired-Replicas': str(copies),
1341 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1345 for tries_left in loop:
1347 sorted_roots = self.map_new_services(
1349 force_rebuild=(tries_left < num_retries),
1352 except Exception as error:
1353 loop.save_result(error)
1356 pending_classes = []
1357 if done_classes is not None:
1358 pending_classes = list(set(classes) - set(done_classes))
1359 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1360 data_hash=data_hash,
1361 copies=copies - done_copies,
1362 max_service_replicas=self.max_replicas_per_service,
1363 timeout=self.current_timeout(num_retries - tries_left),
1364 classes=pending_classes)
1365 for service_root, ks in [(root, roots_map[root])
1366 for root in sorted_roots]:
1369 writer_pool.add_task(ks, service_root)
1371 pool_copies, pool_classes = writer_pool.done()
1372 done_copies += pool_copies
1373 if (done_classes is not None) and (pool_classes is not None):
1374 done_classes += pool_classes
1376 (done_copies >= copies and set(done_classes) == set(classes),
1377 writer_pool.total_task_nr))
1379 # Old keepstore contacted without storage classes support:
1380 # success is determined only by successful copies.
1382 # Disable storage classes tracking from this point forward.
1383 if not self._storage_classes_unsupported_warning:
1384 self._storage_classes_unsupported_warning = True
1385 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1388 (done_copies >= copies, writer_pool.total_task_nr))
1391 return writer_pool.response()
1393 raise arvados.errors.KeepWriteError(
1394 "[{}] failed to write {}: no Keep services available ({})".format(
1395 request_id, data_hash, loop.last_result()))
1397 service_errors = ((key, roots_map[key].last_result()['error'])
1398 for key in sorted_roots
1399 if roots_map[key].last_result()['error'])
1400 raise arvados.errors.KeepWriteError(
1401 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1402 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1404 def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1405 """A stub for put().
1407 This method is used in place of the real put() method when
1408 using local storage (see constructor's local_store argument).
1410 copies and num_retries arguments are ignored: they are here
1411 only for the sake of offering the same call signature as
1414 Data stored this way can be retrieved via local_store_get().
1416 md5 = hashlib.md5(data).hexdigest()
1417 locator = '%s+%d' % (md5, len(data))
1418 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1420 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1421 os.path.join(self.local_store, md5))
1424 def local_store_get(self, loc_s, num_retries=None):
1425 """Companion to local_store_put()."""
1427 locator = KeepLocator(loc_s)
1429 raise arvados.errors.NotFoundError(
1430 "Invalid data locator: '%s'" % loc_s)
1431 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1433 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1436 def local_store_head(self, loc_s, num_retries=None):
1437 """Companion to local_store_put()."""
1439 locator = KeepLocator(loc_s)
1441 raise arvados.errors.NotFoundError(
1442 "Invalid data locator: '%s'" % loc_s)
1443 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1445 if os.path.exists(os.path.join(self.local_store, locator.md5sum)):