1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from __future__ import division
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
35 if sys.version_info >= (3, 0):
36 from io import BytesIO
38 from cStringIO import StringIO as BytesIO
41 import arvados.config as config
43 import arvados.retry as retry
45 import arvados.diskcache
47 _logger = logging.getLogger('arvados.keep')
48 global_client_object = None
51 # Monkey patch TCP constants when not available (apple). Values sourced from:
52 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
53 if sys.platform == 'darwin':
54 if not hasattr(socket, 'TCP_KEEPALIVE'):
55 socket.TCP_KEEPALIVE = 0x010
56 if not hasattr(socket, 'TCP_KEEPINTVL'):
57 socket.TCP_KEEPINTVL = 0x101
58 if not hasattr(socket, 'TCP_KEEPCNT'):
59 socket.TCP_KEEPCNT = 0x102
62 class KeepLocator(object):
63 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
64 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
66 def __init__(self, locator_str):
69 self._perm_expiry = None
70 pieces = iter(locator_str.split('+'))
71 self.md5sum = next(pieces)
73 self.size = int(next(pieces))
77 if self.HINT_RE.match(hint) is None:
78 raise ValueError("invalid hint format: {}".format(hint))
79 elif hint.startswith('A'):
80 self.parse_permission_hint(hint)
82 self.hints.append(hint)
87 for s in [self.md5sum, self.size,
88 self.permission_hint()] + self.hints
92 if self.size is not None:
93 return "%s+%i" % (self.md5sum, self.size)
97 def _make_hex_prop(name, length):
98 # Build and return a new property with the given name that
99 # must be a hex string of the given length.
100 data_name = '_{}'.format(name)
102 return getattr(self, data_name)
103 def setter(self, hex_str):
104 if not arvados.util.is_hex(hex_str, length):
105 raise ValueError("{} is not a {}-digit hex string: {!r}".
106 format(name, length, hex_str))
107 setattr(self, data_name, hex_str)
108 return property(getter, setter)
110 md5sum = _make_hex_prop('md5sum', 32)
111 perm_sig = _make_hex_prop('perm_sig', 40)
114 def perm_expiry(self):
115 return self._perm_expiry
118 def perm_expiry(self, value):
119 if not arvados.util.is_hex(value, 1, 8):
121 "permission timestamp must be a hex Unix timestamp: {}".
123 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
125 def permission_hint(self):
126 data = [self.perm_sig, self.perm_expiry]
129 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
130 return "A{}@{:08x}".format(*data)
132 def parse_permission_hint(self, s):
134 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
136 raise ValueError("bad permission hint {}".format(s))
138 def permission_expired(self, as_of_dt=None):
139 if self.perm_expiry is None:
141 elif as_of_dt is None:
142 as_of_dt = datetime.datetime.now()
143 return self.perm_expiry <= as_of_dt
147 """Simple interface to a global KeepClient object.
149 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
150 own API client. The global KeepClient will build an API client from the
151 current Arvados configuration, which may not match the one you built.
156 def global_client_object(cls):
157 global global_client_object
158 # Previously, KeepClient would change its behavior at runtime based
159 # on these configuration settings. We simulate that behavior here
160 # by checking the values and returning a new KeepClient if any of
162 key = (config.get('ARVADOS_API_HOST'),
163 config.get('ARVADOS_API_TOKEN'),
164 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
165 config.get('ARVADOS_KEEP_PROXY'),
166 os.environ.get('KEEP_LOCAL_STORE'))
167 if (global_client_object is None) or (cls._last_key != key):
168 global_client_object = KeepClient()
170 return global_client_object
173 def get(locator, **kwargs):
174 return Keep.global_client_object().get(locator, **kwargs)
177 def put(data, **kwargs):
178 return Keep.global_client_object().put(data, **kwargs)
180 class KeepBlockCache(object):
181 def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
182 self.cache_max = cache_max
184 self._cache_lock = threading.Lock()
185 self._max_slots = max_slots
186 self._disk_cache = disk_cache
187 self._disk_cache_dir = disk_cache_dir
189 if self._disk_cache and self._disk_cache_dir is None:
190 self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
191 os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
193 if self._max_slots == 0:
195 # default max slots to half of maximum file handles
196 # NOFILE typically defaults to 1024 on Linux so this
198 self._max_slots = resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 2
201 self._max_slots = 512
203 if self.cache_max == 0:
205 fs = os.statvfs(self._disk_cache_dir)
206 # Calculation of available space incorporates existing cache usage
207 existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
208 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
209 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
211 # 10% of total disk size
212 # 25% of available space
214 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
217 self.cache_max = (256 * 1024 * 1024)
219 self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
222 self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
226 class CacheSlot(object):
227 __slots__ = ("locator", "ready", "content")
229 def __init__(self, locator):
230 self.locator = locator
231 self.ready = threading.Event()
238 def set(self, value):
243 if self.content is None:
246 return len(self.content)
252 '''Cap the cache size to self.cache_max'''
253 with self._cache_lock:
254 # Select all slots except those where ready.is_set() and content is
255 # None (that means there was an error reading the block).
256 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
257 sm = sum([slot.size() for slot in self._cache])
258 while len(self._cache) > 0 and (sm > self.cache_max or len(self._cache) > self._max_slots):
259 for i in range(len(self._cache)-1, -1, -1):
260 # start from the back, find a slot that is a candidate to evict
261 if self._cache[i].ready.is_set():
262 sz = self._cache[i].size()
264 # If evict returns false it means the
265 # underlying disk cache couldn't lock the file
266 # for deletion because another process was using
267 # it. Don't count it as reducing the amount
268 # of data in the cache, find something else to
270 if self._cache[i].evict():
273 # either way we forget about it. either the
274 # other process will delete it, or if we need
275 # it again and it is still there, we'll find
280 def _get(self, locator):
281 # Test if the locator is already in the cache
282 for i in range(0, len(self._cache)):
283 if self._cache[i].locator == locator:
286 # move it to the front
288 self._cache.insert(0, n)
291 # see if it exists on disk
292 n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
294 self._cache.insert(0, n)
298 def get(self, locator):
299 with self._cache_lock:
300 return self._get(locator)
302 def reserve_cache(self, locator):
303 '''Reserve a cache slot for the specified locator,
304 or return the existing slot.'''
305 with self._cache_lock:
306 n = self._get(locator)
310 # Add a new cache slot for the locator
312 n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
314 n = KeepBlockCache.CacheSlot(locator)
315 self._cache.insert(0, n)
318 def set(self, slot, blob):
325 if e.errno == errno.ENOMEM:
326 # Reduce max slots to current - 4, cap cache and retry
327 with self._cache_lock:
328 self._max_slots = max(4, len(self._cache) - 4)
329 elif e.errno == errno.ENOSPC:
330 # Reduce disk max space to current - 256 MiB, cap cache and retry
331 with self._cache_lock:
332 sm = sum([st.size() for st in self._cache])
333 self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
334 elif e.errno == errno.ENODEV:
335 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
336 except Exception as e:
339 # Check if we should evict things from the cache. Either
340 # because we added a new thing or we adjusted the limits down,
341 # so we might need to push something out.
349 # There was an error, we ran cap_cache so try one more time.
351 except Exception as e:
352 # It failed again. Give up.
353 raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
355 # Set the notice that that we are done with the cache
356 # slot one way or another.
360 class Counter(object):
361 def __init__(self, v=0):
362 self._lk = threading.Lock()
374 class KeepClient(object):
376 # Default Keep server connection timeout: 2 seconds
377 # Default Keep server read timeout: 256 seconds
378 # Default Keep server bandwidth minimum: 32768 bytes per second
379 # Default Keep proxy connection timeout: 20 seconds
380 # Default Keep proxy read timeout: 256 seconds
381 # Default Keep proxy bandwidth minimum: 32768 bytes per second
382 DEFAULT_TIMEOUT = (2, 256, 32768)
383 DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
386 class KeepService(object):
387 """Make requests to a single Keep service, and track results.
389 A KeepService is intended to last long enough to perform one
390 transaction (GET or PUT) against one Keep service. This can
391 involve calling either get() or put() multiple times in order
392 to retry after transient failures. However, calling both get()
393 and put() on a single instance -- or using the same instance
394 to access two different Keep services -- will not produce
401 arvados.errors.HttpError,
404 def __init__(self, root, user_agent_pool=queue.LifoQueue(),
406 download_counter=None,
410 self._user_agent_pool = user_agent_pool
411 self._result = {'error': None}
415 self.get_headers = {'Accept': 'application/octet-stream'}
416 self.get_headers.update(headers)
417 self.put_headers = headers
418 self.upload_counter = upload_counter
419 self.download_counter = download_counter
420 self.insecure = insecure
423 """Is it worth attempting a request?"""
427 """Did the request succeed or encounter permanent failure?"""
428 return self._result['error'] == False or not self._usable
430 def last_result(self):
433 def _get_user_agent(self):
435 return self._user_agent_pool.get(block=False)
439 def _put_user_agent(self, ua):
442 self._user_agent_pool.put(ua, block=False)
446 def _socket_open(self, *args, **kwargs):
447 if len(args) + len(kwargs) == 2:
448 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
450 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
452 def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
453 return self._socket_open_pycurl_7_21_5(
455 address=collections.namedtuple(
456 'Address', ['family', 'socktype', 'protocol', 'addr'],
457 )(family, socktype, protocol, address))
459 def _socket_open_pycurl_7_21_5(self, purpose, address):
460 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
461 s = socket.socket(address.family, address.socktype, address.protocol)
462 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
463 # Will throw invalid protocol error on mac. This test prevents that.
464 if hasattr(socket, 'TCP_KEEPIDLE'):
465 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
466 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
470 def get(self, locator, method="GET", timeout=None):
471 # locator is a KeepLocator object.
472 url = self.root + str(locator)
473 _logger.debug("Request: %s %s", method, url)
474 curl = self._get_user_agent()
477 with timer.Timer() as t:
479 response_body = BytesIO()
480 curl.setopt(pycurl.NOSIGNAL, 1)
481 curl.setopt(pycurl.OPENSOCKETFUNCTION,
482 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
483 curl.setopt(pycurl.URL, url.encode('utf-8'))
484 curl.setopt(pycurl.HTTPHEADER, [
485 '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
486 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
487 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
489 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
490 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
492 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
494 curl.setopt(pycurl.NOBODY, True)
495 self._setcurltimeouts(curl, timeout, method=="HEAD")
499 except Exception as e:
500 raise arvados.errors.HttpError(0, str(e))
506 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
507 'body': response_body.getvalue(),
508 'headers': self._headers,
512 ok = retry.check_http_response_success(self._result['status_code'])
514 self._result['error'] = arvados.errors.HttpError(
515 self._result['status_code'],
516 self._headers.get('x-status-line', 'Error'))
517 except self.HTTP_ERRORS as e:
521 self._usable = ok != False
522 if self._result.get('status_code', None):
523 # The client worked well enough to get an HTTP status
524 # code, so presumably any problems are just on the
525 # server side and it's OK to reuse the client.
526 self._put_user_agent(curl)
528 # Don't return this client to the pool, in case it's
532 _logger.debug("Request fail: GET %s => %s: %s",
533 url, type(self._result['error']), str(self._result['error']))
536 _logger.info("HEAD %s: %s bytes",
537 self._result['status_code'],
538 self._result.get('content-length'))
539 if self._result['headers'].get('x-keep-locator'):
540 # This is a response to a remote block copy request, return
541 # the local copy block locator.
542 return self._result['headers'].get('x-keep-locator')
545 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
546 self._result['status_code'],
547 len(self._result['body']),
549 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
551 if self.download_counter:
552 self.download_counter.add(len(self._result['body']))
553 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
554 if resp_md5 != locator.md5sum:
555 _logger.warning("Checksum fail: md5(%s) = %s",
557 self._result['error'] = arvados.errors.HttpError(
560 return self._result['body']
562 def put(self, hash_s, body, timeout=None, headers={}):
563 put_headers = copy.copy(self.put_headers)
564 put_headers.update(headers)
565 url = self.root + hash_s
566 _logger.debug("Request: PUT %s", url)
567 curl = self._get_user_agent()
570 with timer.Timer() as t:
572 body_reader = BytesIO(body)
573 response_body = BytesIO()
574 curl.setopt(pycurl.NOSIGNAL, 1)
575 curl.setopt(pycurl.OPENSOCKETFUNCTION,
576 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
577 curl.setopt(pycurl.URL, url.encode('utf-8'))
578 # Using UPLOAD tells cURL to wait for a "go ahead" from the
579 # Keep server (in the form of a HTTP/1.1 "100 Continue"
580 # response) instead of sending the request body immediately.
581 # This allows the server to reject the request if the request
582 # is invalid or the server is read-only, without waiting for
583 # the client to send the entire block.
584 curl.setopt(pycurl.UPLOAD, True)
585 curl.setopt(pycurl.INFILESIZE, len(body))
586 curl.setopt(pycurl.READFUNCTION, body_reader.read)
587 curl.setopt(pycurl.HTTPHEADER, [
588 '{}: {}'.format(k,v) for k,v in put_headers.items()])
589 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
590 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
592 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
593 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
595 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
596 self._setcurltimeouts(curl, timeout)
599 except Exception as e:
600 raise arvados.errors.HttpError(0, str(e))
606 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
607 'body': response_body.getvalue().decode('utf-8'),
608 'headers': self._headers,
611 ok = retry.check_http_response_success(self._result['status_code'])
613 self._result['error'] = arvados.errors.HttpError(
614 self._result['status_code'],
615 self._headers.get('x-status-line', 'Error'))
616 except self.HTTP_ERRORS as e:
620 self._usable = ok != False # still usable if ok is True or None
621 if self._result.get('status_code', None):
622 # Client is functional. See comment in get().
623 self._put_user_agent(curl)
627 _logger.debug("Request fail: PUT %s => %s: %s",
628 url, type(self._result['error']), str(self._result['error']))
630 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
631 self._result['status_code'],
634 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
635 if self.upload_counter:
636 self.upload_counter.add(len(body))
639 def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
642 elif isinstance(timeouts, tuple):
643 if len(timeouts) == 2:
644 conn_t, xfer_t = timeouts
645 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
647 conn_t, xfer_t, bandwidth_bps = timeouts
649 conn_t, xfer_t = (timeouts, timeouts)
650 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
651 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
652 if not ignore_bandwidth:
653 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
654 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
656 def _headerfunction(self, header_line):
657 if isinstance(header_line, bytes):
658 header_line = header_line.decode('iso-8859-1')
659 if ':' in header_line:
660 name, value = header_line.split(':', 1)
661 name = name.strip().lower()
662 value = value.strip()
664 name = self._lastheadername
665 value = self._headers[name] + ' ' + header_line.strip()
666 elif header_line.startswith('HTTP/'):
667 name = 'x-status-line'
670 _logger.error("Unexpected header line: %s", header_line)
672 self._lastheadername = name
673 self._headers[name] = value
674 # Returning None implies all bytes were written
677 class KeepWriterQueue(queue.Queue):
678 def __init__(self, copies, classes=[]):
679 queue.Queue.__init__(self) # Old-style superclass
680 self.wanted_copies = copies
681 self.wanted_storage_classes = classes
682 self.successful_copies = 0
683 self.confirmed_storage_classes = {}
685 self.storage_classes_tracking = True
686 self.queue_data_lock = threading.RLock()
687 self.pending_tries = max(copies, len(classes))
688 self.pending_tries_notification = threading.Condition()
690 def write_success(self, response, replicas_nr, classes_confirmed):
691 with self.queue_data_lock:
692 self.successful_copies += replicas_nr
693 if classes_confirmed is None:
694 self.storage_classes_tracking = False
695 elif self.storage_classes_tracking:
696 for st_class, st_copies in classes_confirmed.items():
698 self.confirmed_storage_classes[st_class] += st_copies
700 self.confirmed_storage_classes[st_class] = st_copies
701 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
702 self.response = response
703 with self.pending_tries_notification:
704 self.pending_tries_notification.notify_all()
706 def write_fail(self, ks):
707 with self.pending_tries_notification:
708 self.pending_tries += 1
709 self.pending_tries_notification.notify()
711 def pending_copies(self):
712 with self.queue_data_lock:
713 return self.wanted_copies - self.successful_copies
715 def satisfied_classes(self):
716 with self.queue_data_lock:
717 if not self.storage_classes_tracking:
718 # Notifies disabled storage classes expectation to
721 return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
723 def pending_classes(self):
724 with self.queue_data_lock:
725 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
727 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
728 for st_class, st_copies in self.confirmed_storage_classes.items():
729 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
730 unsatisfied_classes.remove(st_class)
731 return unsatisfied_classes
733 def get_next_task(self):
734 with self.pending_tries_notification:
736 if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
737 # This notify_all() is unnecessary --
738 # write_success() already called notify_all()
739 # when pending<1 became true, so it's not
740 # possible for any other thread to be in
741 # wait() now -- but it's cheap insurance
742 # against deadlock so we do it anyway:
743 self.pending_tries_notification.notify_all()
744 # Drain the queue and then raise Queue.Empty
748 elif self.pending_tries > 0:
749 service, service_root = self.get_nowait()
750 if service.finished():
753 self.pending_tries -= 1
754 return service, service_root
756 self.pending_tries_notification.notify_all()
759 self.pending_tries_notification.wait()
762 class KeepWriterThreadPool(object):
763 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
764 self.total_task_nr = 0
765 if (not max_service_replicas) or (max_service_replicas >= copies):
768 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
769 _logger.debug("Pool max threads is %d", num_threads)
771 self.queue = KeepClient.KeepWriterQueue(copies, classes)
773 for _ in range(num_threads):
774 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
775 self.workers.append(w)
777 def add_task(self, ks, service_root):
778 self.queue.put((ks, service_root))
779 self.total_task_nr += 1
782 return self.queue.successful_copies, self.queue.satisfied_classes()
786 for worker in self.workers:
788 # Wait for finished work
792 return self.queue.response
795 class KeepWriterThread(threading.Thread):
796 class TaskFailed(RuntimeError): pass
798 def __init__(self, queue, data, data_hash, timeout=None):
799 super(KeepClient.KeepWriterThread, self).__init__()
800 self.timeout = timeout
803 self.data_hash = data_hash
809 service, service_root = self.queue.get_next_task()
813 locator, copies, classes = self.do_task(service, service_root)
814 except Exception as e:
815 if not isinstance(e, self.TaskFailed):
816 _logger.exception("Exception in KeepWriterThread")
817 self.queue.write_fail(service)
819 self.queue.write_success(locator, copies, classes)
821 self.queue.task_done()
823 def do_task(self, service, service_root):
824 classes = self.queue.pending_classes()
828 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
829 success = bool(service.put(self.data_hash,
831 timeout=self.timeout,
833 result = service.last_result()
836 if result.get('status_code'):
837 _logger.debug("Request fail: PUT %s => %s %s",
839 result.get('status_code'),
841 raise self.TaskFailed()
843 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
844 str(threading.current_thread()),
849 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
850 except (KeyError, ValueError):
853 classes_confirmed = {}
855 scch = result['headers']['x-keep-storage-classes-confirmed']
856 for confirmation in scch.replace(' ', '').split(','):
857 if '=' in confirmation:
858 stored_class, stored_copies = confirmation.split('=')[:2]
859 classes_confirmed[stored_class] = int(stored_copies)
860 except (KeyError, ValueError):
861 # Storage classes confirmed header missing or corrupt
862 classes_confirmed = None
864 return result['body'].strip(), replicas_stored, classes_confirmed
867 def __init__(self, api_client=None, proxy=None,
868 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
869 api_token=None, local_store=None, block_cache=None,
870 num_retries=0, session=None):
871 """Initialize a new KeepClient.
875 The API client to use to find Keep services. If not
876 provided, KeepClient will build one from available Arvados
880 If specified, this KeepClient will send requests to this Keep
881 proxy. Otherwise, KeepClient will fall back to the setting of the
882 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
883 If you want to KeepClient does not use a proxy, pass in an empty
887 The initial timeout (in seconds) for HTTP requests to Keep
888 non-proxy servers. A tuple of three floats is interpreted as
889 (connection_timeout, read_timeout, minimum_bandwidth). A connection
890 will be aborted if the average traffic rate falls below
891 minimum_bandwidth bytes per second over an interval of read_timeout
892 seconds. Because timeouts are often a result of transient server
893 load, the actual connection timeout will be increased by a factor
894 of two on each retry.
895 Default: (2, 256, 32768).
898 The initial timeout (in seconds) for HTTP requests to
899 Keep proxies. A tuple of three floats is interpreted as
900 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
901 described above for adjusting connection timeouts on retry also
903 Default: (20, 256, 32768).
906 If you're not using an API client, but only talking
907 directly to a Keep proxy, this parameter specifies an API token
908 to authenticate Keep requests. It is an error to specify both
909 api_client and api_token. If you specify neither, KeepClient
910 will use one available from the Arvados configuration.
913 If specified, this KeepClient will bypass Keep
914 services, and save data to the named directory. If unspecified,
915 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
916 environment variable. If you want to ensure KeepClient does not
917 use local storage, pass in an empty string. This is primarily
918 intended to mock a server for testing.
921 The default number of times to retry failed requests.
922 This will be used as the default num_retries value when get() and
923 put() are called. Default 0.
925 self.lock = threading.Lock()
927 if config.get('ARVADOS_KEEP_SERVICES'):
928 proxy = config.get('ARVADOS_KEEP_SERVICES')
930 proxy = config.get('ARVADOS_KEEP_PROXY')
931 if api_token is None:
932 if api_client is None:
933 api_token = config.get('ARVADOS_API_TOKEN')
935 api_token = api_client.api_token
936 elif api_client is not None:
938 "can't build KeepClient with both API client and token")
939 if local_store is None:
940 local_store = os.environ.get('KEEP_LOCAL_STORE')
942 if api_client is None:
943 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
945 self.insecure = api_client.insecure
947 self.block_cache = block_cache if block_cache else KeepBlockCache()
948 self.timeout = timeout
949 self.proxy_timeout = proxy_timeout
950 self._user_agent_pool = queue.LifoQueue()
951 self.upload_counter = Counter()
952 self.download_counter = Counter()
953 self.put_counter = Counter()
954 self.get_counter = Counter()
955 self.hits_counter = Counter()
956 self.misses_counter = Counter()
957 self._storage_classes_unsupported_warning = False
958 self._default_classes = []
961 self.local_store = local_store
962 self.head = self.local_store_head
963 self.get = self.local_store_get
964 self.put = self.local_store_put
966 self.num_retries = num_retries
967 self.max_replicas_per_service = None
969 proxy_uris = proxy.split()
970 for i in range(len(proxy_uris)):
971 if not proxy_uris[i].endswith('/'):
974 url = urllib.parse.urlparse(proxy_uris[i])
975 if not (url.scheme and url.netloc):
976 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
977 self.api_token = api_token
978 self._gateway_services = {}
979 self._keep_services = [{
980 'uuid': "00000-bi6l4-%015d" % idx,
981 'service_type': 'proxy',
982 '_service_root': uri,
983 } for idx, uri in enumerate(proxy_uris)]
984 self._writable_services = self._keep_services
985 self.using_proxy = True
986 self._static_services_list = True
988 # It's important to avoid instantiating an API client
989 # unless we actually need one, for testing's sake.
990 if api_client is None:
991 api_client = arvados.api('v1')
992 self.api_client = api_client
993 self.api_token = api_client.api_token
994 self._gateway_services = {}
995 self._keep_services = None
996 self._writable_services = None
997 self.using_proxy = None
998 self._static_services_list = False
1000 self._default_classes = [
1001 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
1003 # We're talking to an old cluster
1006 def current_timeout(self, attempt_number):
1007 """Return the appropriate timeout to use for this client.
1009 The proxy timeout setting if the backend service is currently a proxy,
1010 the regular timeout setting otherwise. The `attempt_number` indicates
1011 how many times the operation has been tried already (starting from 0
1012 for the first try), and scales the connection timeout portion of the
1013 return value accordingly.
1016 # TODO(twp): the timeout should be a property of a
1017 # KeepService, not a KeepClient. See #4488.
1018 t = self.proxy_timeout if self.using_proxy else self.timeout
1020 return (t[0] * (1 << attempt_number), t[1])
1022 return (t[0] * (1 << attempt_number), t[1], t[2])
1023 def _any_nondisk_services(self, service_list):
1024 return any(ks.get('service_type', 'disk') != 'disk'
1025 for ks in service_list)
1027 def build_services_list(self, force_rebuild=False):
1028 if (self._static_services_list or
1029 (self._keep_services and not force_rebuild)):
1033 keep_services = self.api_client.keep_services().accessible()
1034 except Exception: # API server predates Keep services.
1035 keep_services = self.api_client.keep_disks().list()
1037 # Gateway services are only used when specified by UUID,
1038 # so there's nothing to gain by filtering them by
1040 self._gateway_services = {ks['uuid']: ks for ks in
1041 keep_services.execute()['items']}
1042 if not self._gateway_services:
1043 raise arvados.errors.NoKeepServersError()
1045 # Precompute the base URI for each service.
1046 for r in self._gateway_services.values():
1047 host = r['service_host']
1048 if not host.startswith('[') and host.find(':') >= 0:
1049 # IPv6 URIs must be formatted like http://[::1]:80/...
1050 host = '[' + host + ']'
1051 r['_service_root'] = "{}://{}:{:d}/".format(
1052 'https' if r['service_ssl_flag'] else 'http',
1056 _logger.debug(str(self._gateway_services))
1057 self._keep_services = [
1058 ks for ks in self._gateway_services.values()
1059 if not ks.get('service_type', '').startswith('gateway:')]
1060 self._writable_services = [ks for ks in self._keep_services
1061 if not ks.get('read_only')]
1063 # For disk type services, max_replicas_per_service is 1
1064 # It is unknown (unlimited) for other service types.
1065 if self._any_nondisk_services(self._writable_services):
1066 self.max_replicas_per_service = None
1068 self.max_replicas_per_service = 1
1070 def _service_weight(self, data_hash, service_uuid):
1071 """Compute the weight of a Keep service endpoint for a data
1072 block with a known hash.
1074 The weight is md5(h + u) where u is the last 15 characters of
1075 the service endpoint's UUID.
1077 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1079 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1080 """Return an array of Keep service endpoints, in the order in
1081 which they should be probed when reading or writing data with
1082 the given hash+hints.
1084 self.build_services_list(force_rebuild)
1087 # Use the services indicated by the given +K@... remote
1088 # service hints, if any are present and can be resolved to a
1090 for hint in locator.hints:
1091 if hint.startswith('K@'):
1093 sorted_roots.append(
1094 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1095 elif len(hint) == 29:
1096 svc = self._gateway_services.get(hint[2:])
1098 sorted_roots.append(svc['_service_root'])
1100 # Sort the available local services by weight (heaviest first)
1101 # for this locator, and return their service_roots (base URIs)
1103 use_services = self._keep_services
1105 use_services = self._writable_services
1106 self.using_proxy = self._any_nondisk_services(use_services)
1107 sorted_roots.extend([
1108 svc['_service_root'] for svc in sorted(
1111 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1112 _logger.debug("{}: {}".format(locator, sorted_roots))
1115 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1116 # roots_map is a dictionary, mapping Keep service root strings
1117 # to KeepService objects. Poll for Keep services, and add any
1118 # new ones to roots_map. Return the current list of local
1120 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1121 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1122 for root in local_roots:
1123 if root not in roots_map:
1124 roots_map[root] = self.KeepService(
1125 root, self._user_agent_pool,
1126 upload_counter=self.upload_counter,
1127 download_counter=self.download_counter,
1129 insecure=self.insecure)
1133 def _check_loop_result(result):
1134 # KeepClient RetryLoops should save results as a 2-tuple: the
1135 # actual result of the request, and the number of servers available
1136 # to receive the request this round.
1137 # This method returns True if there's a real result, False if
1138 # there are no more servers available, otherwise None.
1139 if isinstance(result, Exception):
1141 result, tried_server_count = result
1142 if (result is not None) and (result is not False):
1144 elif tried_server_count < 1:
1145 _logger.info("No more Keep services to try; giving up")
1150 def get_from_cache(self, loc_s):
1151 """Fetch a block only if is in the cache, otherwise return None."""
1152 locator = KeepLocator(loc_s)
1153 slot = self.block_cache.get(locator.md5sum)
1154 if slot is not None and slot.ready.is_set():
1159 def refresh_signature(self, loc):
1160 """Ask Keep to get the remote block and return its local signature"""
1161 now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1162 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1165 def head(self, loc_s, **kwargs):
1166 return self._get_or_head(loc_s, method="HEAD", **kwargs)
1169 def get(self, loc_s, **kwargs):
1170 return self._get_or_head(loc_s, method="GET", **kwargs)
1172 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1173 """Get data from Keep.
1175 This method fetches one or more blocks of data from Keep. It
1176 sends a request each Keep service registered with the API
1177 server (or the proxy provided when this client was
1178 instantiated), then each service named in location hints, in
1179 sequence. As soon as one service provides the data, it's
1183 * loc_s: A string of one or more comma-separated locators to fetch.
1184 This method returns the concatenation of these blocks.
1185 * num_retries: The number of times to retry GET requests to
1186 *each* Keep server if it returns temporary failures, with
1187 exponential backoff. Note that, in each loop, the method may try
1188 to fetch data from every available Keep service, along with any
1189 that are named in location hints in the locator. The default value
1190 is set when the KeepClient is initialized.
1193 return ''.join(self.get(x) for x in loc_s.split(','))
1195 self.get_counter.add(1)
1197 request_id = (request_id or
1198 (hasattr(self, 'api_client') and self.api_client.request_id) or
1199 arvados.util.new_request_id())
1202 headers['X-Request-Id'] = request_id
1207 locator = KeepLocator(loc_s)
1209 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1212 # this is request for a prefetch, if it is
1213 # already in flight, return immediately.
1214 # clear 'slot' to prevent finally block from
1215 # calling slot.set()
1218 self.hits_counter.add(1)
1221 raise arvados.errors.KeepReadError(
1222 "failed to read {}".format(loc_s))
1225 self.misses_counter.add(1)
1227 # If the locator has hints specifying a prefix (indicating a
1228 # remote keepproxy) or the UUID of a local gateway service,
1229 # read data from the indicated service(s) instead of the usual
1230 # list of local disk services.
1231 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1232 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1233 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1234 for hint in locator.hints if (
1235 hint.startswith('K@') and
1237 self._gateway_services.get(hint[2:])
1239 # Map root URLs to their KeepService objects.
1241 root: self.KeepService(root, self._user_agent_pool,
1242 upload_counter=self.upload_counter,
1243 download_counter=self.download_counter,
1245 insecure=self.insecure)
1246 for root in hint_roots
1249 # See #3147 for a discussion of the loop implementation. Highlights:
1250 # * Refresh the list of Keep services after each failure, in case
1251 # it's being updated.
1252 # * Retry until we succeed, we're out of retries, or every available
1253 # service has returned permanent failure.
1256 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1258 for tries_left in loop:
1260 sorted_roots = self.map_new_services(
1262 force_rebuild=(tries_left < num_retries),
1263 need_writable=False,
1265 except Exception as error:
1266 loop.save_result(error)
1269 # Query KeepService objects that haven't returned
1270 # permanent failure, in our specified shuffle order.
1271 services_to_try = [roots_map[root]
1272 for root in sorted_roots
1273 if roots_map[root].usable()]
1274 for keep_service in services_to_try:
1275 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1276 if blob is not None:
1278 loop.save_result((blob, len(services_to_try)))
1280 # Always cache the result, then return it if we succeeded.
1284 if slot is not None:
1285 self.block_cache.set(slot, blob)
1287 # Q: Including 403 is necessary for the Keep tests to continue
1288 # passing, but maybe they should expect KeepReadError instead?
1289 not_founds = sum(1 for key in sorted_roots
1290 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1291 service_errors = ((key, roots_map[key].last_result()['error'])
1292 for key in sorted_roots)
1294 raise arvados.errors.KeepReadError(
1295 "[{}] failed to read {}: no Keep services available ({})".format(
1296 request_id, loc_s, loop.last_result()))
1297 elif not_founds == len(sorted_roots):
1298 raise arvados.errors.NotFoundError(
1299 "[{}] {} not found".format(request_id, loc_s), service_errors)
1301 raise arvados.errors.KeepReadError(
1302 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1305 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1306 """Save data in Keep.
1308 This method will get a list of Keep services from the API server, and
1309 send the data to each one simultaneously in a new thread. Once the
1310 uploads are finished, if enough copies are saved, this method returns
1311 the most recent HTTP response body. If requests fail to upload
1312 enough copies, this method raises KeepWriteError.
1315 * data: The string of data to upload.
1316 * copies: The number of copies that the user requires be saved.
1318 * num_retries: The number of times to retry PUT requests to
1319 *each* Keep server if it returns temporary failures, with
1320 exponential backoff. The default value is set when the
1321 KeepClient is initialized.
1322 * classes: An optional list of storage class names where copies should
1326 classes = classes or self._default_classes
1328 if not isinstance(data, bytes):
1329 data = data.encode()
1331 self.put_counter.add(1)
1333 data_hash = hashlib.md5(data).hexdigest()
1334 loc_s = data_hash + '+' + str(len(data))
1337 locator = KeepLocator(loc_s)
1339 request_id = (request_id or
1340 (hasattr(self, 'api_client') and self.api_client.request_id) or
1341 arvados.util.new_request_id())
1343 'X-Request-Id': request_id,
1344 'X-Keep-Desired-Replicas': str(copies),
1347 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1351 for tries_left in loop:
1353 sorted_roots = self.map_new_services(
1355 force_rebuild=(tries_left < num_retries),
1358 except Exception as error:
1359 loop.save_result(error)
1362 pending_classes = []
1363 if done_classes is not None:
1364 pending_classes = list(set(classes) - set(done_classes))
1365 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1366 data_hash=data_hash,
1367 copies=copies - done_copies,
1368 max_service_replicas=self.max_replicas_per_service,
1369 timeout=self.current_timeout(num_retries - tries_left),
1370 classes=pending_classes)
1371 for service_root, ks in [(root, roots_map[root])
1372 for root in sorted_roots]:
1375 writer_pool.add_task(ks, service_root)
1377 pool_copies, pool_classes = writer_pool.done()
1378 done_copies += pool_copies
1379 if (done_classes is not None) and (pool_classes is not None):
1380 done_classes += pool_classes
1382 (done_copies >= copies and set(done_classes) == set(classes),
1383 writer_pool.total_task_nr))
1385 # Old keepstore contacted without storage classes support:
1386 # success is determined only by successful copies.
1388 # Disable storage classes tracking from this point forward.
1389 if not self._storage_classes_unsupported_warning:
1390 self._storage_classes_unsupported_warning = True
1391 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1394 (done_copies >= copies, writer_pool.total_task_nr))
1397 return writer_pool.response()
1399 raise arvados.errors.KeepWriteError(
1400 "[{}] failed to write {}: no Keep services available ({})".format(
1401 request_id, data_hash, loop.last_result()))
1403 service_errors = ((key, roots_map[key].last_result()['error'])
1404 for key in sorted_roots
1405 if roots_map[key].last_result()['error'])
1406 raise arvados.errors.KeepWriteError(
1407 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1408 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1410 def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1411 """A stub for put().
1413 This method is used in place of the real put() method when
1414 using local storage (see constructor's local_store argument).
1416 copies and num_retries arguments are ignored: they are here
1417 only for the sake of offering the same call signature as
1420 Data stored this way can be retrieved via local_store_get().
1422 md5 = hashlib.md5(data).hexdigest()
1423 locator = '%s+%d' % (md5, len(data))
1424 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1426 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1427 os.path.join(self.local_store, md5))
1430 def local_store_get(self, loc_s, num_retries=None):
1431 """Companion to local_store_put()."""
1433 locator = KeepLocator(loc_s)
1435 raise arvados.errors.NotFoundError(
1436 "Invalid data locator: '%s'" % loc_s)
1437 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1439 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1442 def local_store_head(self, loc_s, num_retries=None):
1443 """Companion to local_store_put()."""
1445 locator = KeepLocator(loc_s)
1447 raise arvados.errors.NotFoundError(
1448 "Invalid data locator: '%s'" % loc_s)
1449 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1451 if os.path.exists(os.path.join(self.local_store, locator.md5sum)):