1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from __future__ import division
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
35 if sys.version_info >= (3, 0):
36 from io import BytesIO
38 from cStringIO import StringIO as BytesIO
41 import arvados.config as config
43 import arvados.retry as retry
45 import arvados.diskcache
47 _logger = logging.getLogger('arvados.keep')
48 global_client_object = None
51 # Monkey patch TCP constants when not available (apple). Values sourced from:
52 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
53 if sys.platform == 'darwin':
54 if not hasattr(socket, 'TCP_KEEPALIVE'):
55 socket.TCP_KEEPALIVE = 0x010
56 if not hasattr(socket, 'TCP_KEEPINTVL'):
57 socket.TCP_KEEPINTVL = 0x101
58 if not hasattr(socket, 'TCP_KEEPCNT'):
59 socket.TCP_KEEPCNT = 0x102
62 class KeepLocator(object):
63 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
64 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
66 def __init__(self, locator_str):
69 self._perm_expiry = None
70 pieces = iter(locator_str.split('+'))
71 self.md5sum = next(pieces)
73 self.size = int(next(pieces))
77 if self.HINT_RE.match(hint) is None:
78 raise ValueError("invalid hint format: {}".format(hint))
79 elif hint.startswith('A'):
80 self.parse_permission_hint(hint)
82 self.hints.append(hint)
87 for s in [self.md5sum, self.size,
88 self.permission_hint()] + self.hints
92 if self.size is not None:
93 return "%s+%i" % (self.md5sum, self.size)
97 def _make_hex_prop(name, length):
98 # Build and return a new property with the given name that
99 # must be a hex string of the given length.
100 data_name = '_{}'.format(name)
102 return getattr(self, data_name)
103 def setter(self, hex_str):
104 if not arvados.util.is_hex(hex_str, length):
105 raise ValueError("{} is not a {}-digit hex string: {!r}".
106 format(name, length, hex_str))
107 setattr(self, data_name, hex_str)
108 return property(getter, setter)
110 md5sum = _make_hex_prop('md5sum', 32)
111 perm_sig = _make_hex_prop('perm_sig', 40)
114 def perm_expiry(self):
115 return self._perm_expiry
118 def perm_expiry(self, value):
119 if not arvados.util.is_hex(value, 1, 8):
121 "permission timestamp must be a hex Unix timestamp: {}".
123 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
125 def permission_hint(self):
126 data = [self.perm_sig, self.perm_expiry]
129 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
130 return "A{}@{:08x}".format(*data)
132 def parse_permission_hint(self, s):
134 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
136 raise ValueError("bad permission hint {}".format(s))
138 def permission_expired(self, as_of_dt=None):
139 if self.perm_expiry is None:
141 elif as_of_dt is None:
142 as_of_dt = datetime.datetime.now()
143 return self.perm_expiry <= as_of_dt
147 """Simple interface to a global KeepClient object.
149 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
150 own API client. The global KeepClient will build an API client from the
151 current Arvados configuration, which may not match the one you built.
156 def global_client_object(cls):
157 global global_client_object
158 # Previously, KeepClient would change its behavior at runtime based
159 # on these configuration settings. We simulate that behavior here
160 # by checking the values and returning a new KeepClient if any of
162 key = (config.get('ARVADOS_API_HOST'),
163 config.get('ARVADOS_API_TOKEN'),
164 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
165 config.get('ARVADOS_KEEP_PROXY'),
166 os.environ.get('KEEP_LOCAL_STORE'))
167 if (global_client_object is None) or (cls._last_key != key):
168 global_client_object = KeepClient()
170 return global_client_object
173 def get(locator, **kwargs):
174 return Keep.global_client_object().get(locator, **kwargs)
177 def put(data, **kwargs):
178 return Keep.global_client_object().put(data, **kwargs)
180 class KeepBlockCache(object):
181 def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
182 self.cache_max = cache_max
184 self._cache_lock = threading.Lock()
185 self._max_slots = max_slots
186 self._disk_cache = disk_cache
187 self._disk_cache_dir = disk_cache_dir
189 if self._disk_cache and self._disk_cache_dir is None:
190 self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
191 os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
193 if self._max_slots == 0:
195 # default max slots to half of maximum file handles
196 # NOFILE typically defaults to 1024 on Linux so this
198 self._max_slots = resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 2
201 self._max_slots = 512
203 if self.cache_max == 0:
205 fs = os.statvfs(self._disk_cache_dir)
206 # Calculation of available space incorporates existing cache usage
207 existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
208 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
209 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
211 # 10% of total disk size
212 # 25% of available space
214 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
217 self.cache_max = (256 * 1024 * 1024)
219 self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
222 self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
226 class CacheSlot(object):
227 __slots__ = ("locator", "ready", "content")
229 def __init__(self, locator):
230 self.locator = locator
231 self.ready = threading.Event()
238 def set(self, value):
243 if self.content is None:
246 return len(self.content)
252 '''Cap the cache size to self.cache_max'''
253 with self._cache_lock:
254 # Select all slots except those where ready.is_set() and content is
255 # None (that means there was an error reading the block).
256 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
257 sm = sum([slot.size() for slot in self._cache])
258 while len(self._cache) > 0 and (sm > self.cache_max or len(self._cache) > self._max_slots):
259 for i in range(len(self._cache)-1, -1, -1):
260 # start from the back, find a slot that is a candidate to evict
261 if self._cache[i].ready.is_set():
262 sz = self._cache[i].size()
264 # If evict returns false it means the
265 # underlying disk cache couldn't lock the file
266 # for deletion because another process was using
267 # it. Don't count it as reducing the amount
268 # of data in the cache, find something else to
270 if self._cache[i].evict():
273 # either way we forget about it. either the
274 # other process will delete it, or if we need
275 # it again and it is still there, we'll find
280 def _get(self, locator):
281 # Test if the locator is already in the cache
282 for i in range(0, len(self._cache)):
283 if self._cache[i].locator == locator:
286 # move it to the front
288 self._cache.insert(0, n)
291 # see if it exists on disk
292 n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
294 self._cache.insert(0, n)
298 def get(self, locator):
299 with self._cache_lock:
300 return self._get(locator)
302 def reserve_cache(self, locator):
303 '''Reserve a cache slot for the specified locator,
304 or return the existing slot.'''
305 with self._cache_lock:
306 n = self._get(locator)
310 # Add a new cache slot for the locator
312 n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
314 n = KeepBlockCache.CacheSlot(locator)
315 self._cache.insert(0, n)
318 def set(self, slot, blob):
323 if e.errno == errno.ENOMEM:
324 # Reduce max slots to current - 4, cap cache and retry
325 with self._cache_lock:
326 self._max_slots = max(4, len(self._cache) - 4)
327 elif e.errno == errno.ENOSPC:
328 # Reduce disk max space to current - 256 MiB, cap cache and retry
329 with self._cache_lock:
330 sm = sum([st.size() for st in self._cache])
331 self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
332 elif e.errno == errno.ENODEV:
333 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
334 except Exception as e:
337 # Check if we should evict things from the cache. Either
338 # because we added a new thing or there was an error and
339 # we possibly adjusted the limits down, so we might need
340 # to push something out.
344 # Only gets here if there was an error the first time. The
345 # exception handler adjusts limits downward in some cases
346 # to free up resources, which would make the operation
350 except Exception as e:
351 # It failed again. Give up.
352 raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
354 # Set the notice that that we are done with the cache
355 # slot one way or another.
359 class Counter(object):
360 def __init__(self, v=0):
361 self._lk = threading.Lock()
373 class KeepClient(object):
375 # Default Keep server connection timeout: 2 seconds
376 # Default Keep server read timeout: 256 seconds
377 # Default Keep server bandwidth minimum: 32768 bytes per second
378 # Default Keep proxy connection timeout: 20 seconds
379 # Default Keep proxy read timeout: 256 seconds
380 # Default Keep proxy bandwidth minimum: 32768 bytes per second
381 DEFAULT_TIMEOUT = (2, 256, 32768)
382 DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
385 class KeepService(object):
386 """Make requests to a single Keep service, and track results.
388 A KeepService is intended to last long enough to perform one
389 transaction (GET or PUT) against one Keep service. This can
390 involve calling either get() or put() multiple times in order
391 to retry after transient failures. However, calling both get()
392 and put() on a single instance -- or using the same instance
393 to access two different Keep services -- will not produce
400 arvados.errors.HttpError,
403 def __init__(self, root, user_agent_pool=queue.LifoQueue(),
405 download_counter=None,
409 self._user_agent_pool = user_agent_pool
410 self._result = {'error': None}
414 self.get_headers = {'Accept': 'application/octet-stream'}
415 self.get_headers.update(headers)
416 self.put_headers = headers
417 self.upload_counter = upload_counter
418 self.download_counter = download_counter
419 self.insecure = insecure
422 """Is it worth attempting a request?"""
426 """Did the request succeed or encounter permanent failure?"""
427 return self._result['error'] == False or not self._usable
429 def last_result(self):
432 def _get_user_agent(self):
434 return self._user_agent_pool.get(block=False)
438 def _put_user_agent(self, ua):
441 self._user_agent_pool.put(ua, block=False)
445 def _socket_open(self, *args, **kwargs):
446 if len(args) + len(kwargs) == 2:
447 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
449 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
451 def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
452 return self._socket_open_pycurl_7_21_5(
454 address=collections.namedtuple(
455 'Address', ['family', 'socktype', 'protocol', 'addr'],
456 )(family, socktype, protocol, address))
458 def _socket_open_pycurl_7_21_5(self, purpose, address):
459 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
460 s = socket.socket(address.family, address.socktype, address.protocol)
461 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
462 # Will throw invalid protocol error on mac. This test prevents that.
463 if hasattr(socket, 'TCP_KEEPIDLE'):
464 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
465 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
469 def get(self, locator, method="GET", timeout=None):
470 # locator is a KeepLocator object.
471 url = self.root + str(locator)
472 _logger.debug("Request: %s %s", method, url)
473 curl = self._get_user_agent()
476 with timer.Timer() as t:
478 response_body = BytesIO()
479 curl.setopt(pycurl.NOSIGNAL, 1)
480 curl.setopt(pycurl.OPENSOCKETFUNCTION,
481 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
482 curl.setopt(pycurl.URL, url.encode('utf-8'))
483 curl.setopt(pycurl.HTTPHEADER, [
484 '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
485 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
486 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
488 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
489 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
491 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
493 curl.setopt(pycurl.NOBODY, True)
494 self._setcurltimeouts(curl, timeout, method=="HEAD")
498 except Exception as e:
499 raise arvados.errors.HttpError(0, str(e))
505 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
506 'body': response_body.getvalue(),
507 'headers': self._headers,
511 ok = retry.check_http_response_success(self._result['status_code'])
513 self._result['error'] = arvados.errors.HttpError(
514 self._result['status_code'],
515 self._headers.get('x-status-line', 'Error'))
516 except self.HTTP_ERRORS as e:
520 self._usable = ok != False
521 if self._result.get('status_code', None):
522 # The client worked well enough to get an HTTP status
523 # code, so presumably any problems are just on the
524 # server side and it's OK to reuse the client.
525 self._put_user_agent(curl)
527 # Don't return this client to the pool, in case it's
531 _logger.debug("Request fail: GET %s => %s: %s",
532 url, type(self._result['error']), str(self._result['error']))
535 _logger.info("HEAD %s: %s bytes",
536 self._result['status_code'],
537 self._result.get('content-length'))
538 if self._result['headers'].get('x-keep-locator'):
539 # This is a response to a remote block copy request, return
540 # the local copy block locator.
541 return self._result['headers'].get('x-keep-locator')
544 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
545 self._result['status_code'],
546 len(self._result['body']),
548 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
550 if self.download_counter:
551 self.download_counter.add(len(self._result['body']))
552 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
553 if resp_md5 != locator.md5sum:
554 _logger.warning("Checksum fail: md5(%s) = %s",
556 self._result['error'] = arvados.errors.HttpError(
559 return self._result['body']
561 def put(self, hash_s, body, timeout=None, headers={}):
562 put_headers = copy.copy(self.put_headers)
563 put_headers.update(headers)
564 url = self.root + hash_s
565 _logger.debug("Request: PUT %s", url)
566 curl = self._get_user_agent()
569 with timer.Timer() as t:
571 body_reader = BytesIO(body)
572 response_body = BytesIO()
573 curl.setopt(pycurl.NOSIGNAL, 1)
574 curl.setopt(pycurl.OPENSOCKETFUNCTION,
575 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
576 curl.setopt(pycurl.URL, url.encode('utf-8'))
577 # Using UPLOAD tells cURL to wait for a "go ahead" from the
578 # Keep server (in the form of a HTTP/1.1 "100 Continue"
579 # response) instead of sending the request body immediately.
580 # This allows the server to reject the request if the request
581 # is invalid or the server is read-only, without waiting for
582 # the client to send the entire block.
583 curl.setopt(pycurl.UPLOAD, True)
584 curl.setopt(pycurl.INFILESIZE, len(body))
585 curl.setopt(pycurl.READFUNCTION, body_reader.read)
586 curl.setopt(pycurl.HTTPHEADER, [
587 '{}: {}'.format(k,v) for k,v in put_headers.items()])
588 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
589 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
591 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
592 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
594 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
595 self._setcurltimeouts(curl, timeout)
598 except Exception as e:
599 raise arvados.errors.HttpError(0, str(e))
605 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
606 'body': response_body.getvalue().decode('utf-8'),
607 'headers': self._headers,
610 ok = retry.check_http_response_success(self._result['status_code'])
612 self._result['error'] = arvados.errors.HttpError(
613 self._result['status_code'],
614 self._headers.get('x-status-line', 'Error'))
615 except self.HTTP_ERRORS as e:
619 self._usable = ok != False # still usable if ok is True or None
620 if self._result.get('status_code', None):
621 # Client is functional. See comment in get().
622 self._put_user_agent(curl)
626 _logger.debug("Request fail: PUT %s => %s: %s",
627 url, type(self._result['error']), str(self._result['error']))
629 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
630 self._result['status_code'],
633 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
634 if self.upload_counter:
635 self.upload_counter.add(len(body))
638 def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
641 elif isinstance(timeouts, tuple):
642 if len(timeouts) == 2:
643 conn_t, xfer_t = timeouts
644 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
646 conn_t, xfer_t, bandwidth_bps = timeouts
648 conn_t, xfer_t = (timeouts, timeouts)
649 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
650 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
651 if not ignore_bandwidth:
652 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
653 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
655 def _headerfunction(self, header_line):
656 if isinstance(header_line, bytes):
657 header_line = header_line.decode('iso-8859-1')
658 if ':' in header_line:
659 name, value = header_line.split(':', 1)
660 name = name.strip().lower()
661 value = value.strip()
663 name = self._lastheadername
664 value = self._headers[name] + ' ' + header_line.strip()
665 elif header_line.startswith('HTTP/'):
666 name = 'x-status-line'
669 _logger.error("Unexpected header line: %s", header_line)
671 self._lastheadername = name
672 self._headers[name] = value
673 # Returning None implies all bytes were written
676 class KeepWriterQueue(queue.Queue):
677 def __init__(self, copies, classes=[]):
678 queue.Queue.__init__(self) # Old-style superclass
679 self.wanted_copies = copies
680 self.wanted_storage_classes = classes
681 self.successful_copies = 0
682 self.confirmed_storage_classes = {}
684 self.storage_classes_tracking = True
685 self.queue_data_lock = threading.RLock()
686 self.pending_tries = max(copies, len(classes))
687 self.pending_tries_notification = threading.Condition()
689 def write_success(self, response, replicas_nr, classes_confirmed):
690 with self.queue_data_lock:
691 self.successful_copies += replicas_nr
692 if classes_confirmed is None:
693 self.storage_classes_tracking = False
694 elif self.storage_classes_tracking:
695 for st_class, st_copies in classes_confirmed.items():
697 self.confirmed_storage_classes[st_class] += st_copies
699 self.confirmed_storage_classes[st_class] = st_copies
700 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
701 self.response = response
702 with self.pending_tries_notification:
703 self.pending_tries_notification.notify_all()
705 def write_fail(self, ks):
706 with self.pending_tries_notification:
707 self.pending_tries += 1
708 self.pending_tries_notification.notify()
710 def pending_copies(self):
711 with self.queue_data_lock:
712 return self.wanted_copies - self.successful_copies
714 def satisfied_classes(self):
715 with self.queue_data_lock:
716 if not self.storage_classes_tracking:
717 # Notifies disabled storage classes expectation to
720 return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
722 def pending_classes(self):
723 with self.queue_data_lock:
724 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
726 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
727 for st_class, st_copies in self.confirmed_storage_classes.items():
728 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
729 unsatisfied_classes.remove(st_class)
730 return unsatisfied_classes
732 def get_next_task(self):
733 with self.pending_tries_notification:
735 if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
736 # This notify_all() is unnecessary --
737 # write_success() already called notify_all()
738 # when pending<1 became true, so it's not
739 # possible for any other thread to be in
740 # wait() now -- but it's cheap insurance
741 # against deadlock so we do it anyway:
742 self.pending_tries_notification.notify_all()
743 # Drain the queue and then raise Queue.Empty
747 elif self.pending_tries > 0:
748 service, service_root = self.get_nowait()
749 if service.finished():
752 self.pending_tries -= 1
753 return service, service_root
755 self.pending_tries_notification.notify_all()
758 self.pending_tries_notification.wait()
761 class KeepWriterThreadPool(object):
762 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
763 self.total_task_nr = 0
764 if (not max_service_replicas) or (max_service_replicas >= copies):
767 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
768 _logger.debug("Pool max threads is %d", num_threads)
770 self.queue = KeepClient.KeepWriterQueue(copies, classes)
772 for _ in range(num_threads):
773 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
774 self.workers.append(w)
776 def add_task(self, ks, service_root):
777 self.queue.put((ks, service_root))
778 self.total_task_nr += 1
781 return self.queue.successful_copies, self.queue.satisfied_classes()
785 for worker in self.workers:
787 # Wait for finished work
791 return self.queue.response
794 class KeepWriterThread(threading.Thread):
795 class TaskFailed(RuntimeError): pass
797 def __init__(self, queue, data, data_hash, timeout=None):
798 super(KeepClient.KeepWriterThread, self).__init__()
799 self.timeout = timeout
802 self.data_hash = data_hash
808 service, service_root = self.queue.get_next_task()
812 locator, copies, classes = self.do_task(service, service_root)
813 except Exception as e:
814 if not isinstance(e, self.TaskFailed):
815 _logger.exception("Exception in KeepWriterThread")
816 self.queue.write_fail(service)
818 self.queue.write_success(locator, copies, classes)
820 self.queue.task_done()
822 def do_task(self, service, service_root):
823 classes = self.queue.pending_classes()
827 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
828 success = bool(service.put(self.data_hash,
830 timeout=self.timeout,
832 result = service.last_result()
835 if result.get('status_code'):
836 _logger.debug("Request fail: PUT %s => %s %s",
838 result.get('status_code'),
840 raise self.TaskFailed()
842 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
843 str(threading.current_thread()),
848 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
849 except (KeyError, ValueError):
852 classes_confirmed = {}
854 scch = result['headers']['x-keep-storage-classes-confirmed']
855 for confirmation in scch.replace(' ', '').split(','):
856 if '=' in confirmation:
857 stored_class, stored_copies = confirmation.split('=')[:2]
858 classes_confirmed[stored_class] = int(stored_copies)
859 except (KeyError, ValueError):
860 # Storage classes confirmed header missing or corrupt
861 classes_confirmed = None
863 return result['body'].strip(), replicas_stored, classes_confirmed
866 def __init__(self, api_client=None, proxy=None,
867 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
868 api_token=None, local_store=None, block_cache=None,
869 num_retries=0, session=None):
870 """Initialize a new KeepClient.
874 The API client to use to find Keep services. If not
875 provided, KeepClient will build one from available Arvados
879 If specified, this KeepClient will send requests to this Keep
880 proxy. Otherwise, KeepClient will fall back to the setting of the
881 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
882 If you want to KeepClient does not use a proxy, pass in an empty
886 The initial timeout (in seconds) for HTTP requests to Keep
887 non-proxy servers. A tuple of three floats is interpreted as
888 (connection_timeout, read_timeout, minimum_bandwidth). A connection
889 will be aborted if the average traffic rate falls below
890 minimum_bandwidth bytes per second over an interval of read_timeout
891 seconds. Because timeouts are often a result of transient server
892 load, the actual connection timeout will be increased by a factor
893 of two on each retry.
894 Default: (2, 256, 32768).
897 The initial timeout (in seconds) for HTTP requests to
898 Keep proxies. A tuple of three floats is interpreted as
899 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
900 described above for adjusting connection timeouts on retry also
902 Default: (20, 256, 32768).
905 If you're not using an API client, but only talking
906 directly to a Keep proxy, this parameter specifies an API token
907 to authenticate Keep requests. It is an error to specify both
908 api_client and api_token. If you specify neither, KeepClient
909 will use one available from the Arvados configuration.
912 If specified, this KeepClient will bypass Keep
913 services, and save data to the named directory. If unspecified,
914 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
915 environment variable. If you want to ensure KeepClient does not
916 use local storage, pass in an empty string. This is primarily
917 intended to mock a server for testing.
920 The default number of times to retry failed requests.
921 This will be used as the default num_retries value when get() and
922 put() are called. Default 0.
924 self.lock = threading.Lock()
926 if config.get('ARVADOS_KEEP_SERVICES'):
927 proxy = config.get('ARVADOS_KEEP_SERVICES')
929 proxy = config.get('ARVADOS_KEEP_PROXY')
930 if api_token is None:
931 if api_client is None:
932 api_token = config.get('ARVADOS_API_TOKEN')
934 api_token = api_client.api_token
935 elif api_client is not None:
937 "can't build KeepClient with both API client and token")
938 if local_store is None:
939 local_store = os.environ.get('KEEP_LOCAL_STORE')
941 if api_client is None:
942 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
944 self.insecure = api_client.insecure
946 self.block_cache = block_cache if block_cache else KeepBlockCache()
947 self.timeout = timeout
948 self.proxy_timeout = proxy_timeout
949 self._user_agent_pool = queue.LifoQueue()
950 self.upload_counter = Counter()
951 self.download_counter = Counter()
952 self.put_counter = Counter()
953 self.get_counter = Counter()
954 self.hits_counter = Counter()
955 self.misses_counter = Counter()
956 self._storage_classes_unsupported_warning = False
957 self._default_classes = []
960 self.local_store = local_store
961 self.head = self.local_store_head
962 self.get = self.local_store_get
963 self.put = self.local_store_put
965 self.num_retries = num_retries
966 self.max_replicas_per_service = None
968 proxy_uris = proxy.split()
969 for i in range(len(proxy_uris)):
970 if not proxy_uris[i].endswith('/'):
973 url = urllib.parse.urlparse(proxy_uris[i])
974 if not (url.scheme and url.netloc):
975 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
976 self.api_token = api_token
977 self._gateway_services = {}
978 self._keep_services = [{
979 'uuid': "00000-bi6l4-%015d" % idx,
980 'service_type': 'proxy',
981 '_service_root': uri,
982 } for idx, uri in enumerate(proxy_uris)]
983 self._writable_services = self._keep_services
984 self.using_proxy = True
985 self._static_services_list = True
987 # It's important to avoid instantiating an API client
988 # unless we actually need one, for testing's sake.
989 if api_client is None:
990 api_client = arvados.api('v1')
991 self.api_client = api_client
992 self.api_token = api_client.api_token
993 self._gateway_services = {}
994 self._keep_services = None
995 self._writable_services = None
996 self.using_proxy = None
997 self._static_services_list = False
999 self._default_classes = [
1000 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
1002 # We're talking to an old cluster
1005 def current_timeout(self, attempt_number):
1006 """Return the appropriate timeout to use for this client.
1008 The proxy timeout setting if the backend service is currently a proxy,
1009 the regular timeout setting otherwise. The `attempt_number` indicates
1010 how many times the operation has been tried already (starting from 0
1011 for the first try), and scales the connection timeout portion of the
1012 return value accordingly.
1015 # TODO(twp): the timeout should be a property of a
1016 # KeepService, not a KeepClient. See #4488.
1017 t = self.proxy_timeout if self.using_proxy else self.timeout
1019 return (t[0] * (1 << attempt_number), t[1])
1021 return (t[0] * (1 << attempt_number), t[1], t[2])
1022 def _any_nondisk_services(self, service_list):
1023 return any(ks.get('service_type', 'disk') != 'disk'
1024 for ks in service_list)
1026 def build_services_list(self, force_rebuild=False):
1027 if (self._static_services_list or
1028 (self._keep_services and not force_rebuild)):
1032 keep_services = self.api_client.keep_services().accessible()
1033 except Exception: # API server predates Keep services.
1034 keep_services = self.api_client.keep_disks().list()
1036 # Gateway services are only used when specified by UUID,
1037 # so there's nothing to gain by filtering them by
1039 self._gateway_services = {ks['uuid']: ks for ks in
1040 keep_services.execute()['items']}
1041 if not self._gateway_services:
1042 raise arvados.errors.NoKeepServersError()
1044 # Precompute the base URI for each service.
1045 for r in self._gateway_services.values():
1046 host = r['service_host']
1047 if not host.startswith('[') and host.find(':') >= 0:
1048 # IPv6 URIs must be formatted like http://[::1]:80/...
1049 host = '[' + host + ']'
1050 r['_service_root'] = "{}://{}:{:d}/".format(
1051 'https' if r['service_ssl_flag'] else 'http',
1055 _logger.debug(str(self._gateway_services))
1056 self._keep_services = [
1057 ks for ks in self._gateway_services.values()
1058 if not ks.get('service_type', '').startswith('gateway:')]
1059 self._writable_services = [ks for ks in self._keep_services
1060 if not ks.get('read_only')]
1062 # For disk type services, max_replicas_per_service is 1
1063 # It is unknown (unlimited) for other service types.
1064 if self._any_nondisk_services(self._writable_services):
1065 self.max_replicas_per_service = None
1067 self.max_replicas_per_service = 1
1069 def _service_weight(self, data_hash, service_uuid):
1070 """Compute the weight of a Keep service endpoint for a data
1071 block with a known hash.
1073 The weight is md5(h + u) where u is the last 15 characters of
1074 the service endpoint's UUID.
1076 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1078 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1079 """Return an array of Keep service endpoints, in the order in
1080 which they should be probed when reading or writing data with
1081 the given hash+hints.
1083 self.build_services_list(force_rebuild)
1086 # Use the services indicated by the given +K@... remote
1087 # service hints, if any are present and can be resolved to a
1089 for hint in locator.hints:
1090 if hint.startswith('K@'):
1092 sorted_roots.append(
1093 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1094 elif len(hint) == 29:
1095 svc = self._gateway_services.get(hint[2:])
1097 sorted_roots.append(svc['_service_root'])
1099 # Sort the available local services by weight (heaviest first)
1100 # for this locator, and return their service_roots (base URIs)
1102 use_services = self._keep_services
1104 use_services = self._writable_services
1105 self.using_proxy = self._any_nondisk_services(use_services)
1106 sorted_roots.extend([
1107 svc['_service_root'] for svc in sorted(
1110 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1111 _logger.debug("{}: {}".format(locator, sorted_roots))
1114 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1115 # roots_map is a dictionary, mapping Keep service root strings
1116 # to KeepService objects. Poll for Keep services, and add any
1117 # new ones to roots_map. Return the current list of local
1119 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1120 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1121 for root in local_roots:
1122 if root not in roots_map:
1123 roots_map[root] = self.KeepService(
1124 root, self._user_agent_pool,
1125 upload_counter=self.upload_counter,
1126 download_counter=self.download_counter,
1128 insecure=self.insecure)
1132 def _check_loop_result(result):
1133 # KeepClient RetryLoops should save results as a 2-tuple: the
1134 # actual result of the request, and the number of servers available
1135 # to receive the request this round.
1136 # This method returns True if there's a real result, False if
1137 # there are no more servers available, otherwise None.
1138 if isinstance(result, Exception):
1140 result, tried_server_count = result
1141 if (result is not None) and (result is not False):
1143 elif tried_server_count < 1:
1144 _logger.info("No more Keep services to try; giving up")
1149 def get_from_cache(self, loc_s):
1150 """Fetch a block only if is in the cache, otherwise return None."""
1151 locator = KeepLocator(loc_s)
1152 slot = self.block_cache.get(locator.md5sum)
1153 if slot is not None and slot.ready.is_set():
1158 def refresh_signature(self, loc):
1159 """Ask Keep to get the remote block and return its local signature"""
1160 now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1161 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1164 def head(self, loc_s, **kwargs):
1165 return self._get_or_head(loc_s, method="HEAD", **kwargs)
1168 def get(self, loc_s, **kwargs):
1169 return self._get_or_head(loc_s, method="GET", **kwargs)
1171 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1172 """Get data from Keep.
1174 This method fetches one or more blocks of data from Keep. It
1175 sends a request each Keep service registered with the API
1176 server (or the proxy provided when this client was
1177 instantiated), then each service named in location hints, in
1178 sequence. As soon as one service provides the data, it's
1182 * loc_s: A string of one or more comma-separated locators to fetch.
1183 This method returns the concatenation of these blocks.
1184 * num_retries: The number of times to retry GET requests to
1185 *each* Keep server if it returns temporary failures, with
1186 exponential backoff. Note that, in each loop, the method may try
1187 to fetch data from every available Keep service, along with any
1188 that are named in location hints in the locator. The default value
1189 is set when the KeepClient is initialized.
1192 return ''.join(self.get(x) for x in loc_s.split(','))
1194 self.get_counter.add(1)
1196 request_id = (request_id or
1197 (hasattr(self, 'api_client') and self.api_client.request_id) or
1198 arvados.util.new_request_id())
1201 headers['X-Request-Id'] = request_id
1206 locator = KeepLocator(loc_s)
1208 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1211 # this is request for a prefetch, if it is
1212 # already in flight, return immediately.
1213 # clear 'slot' to prevent finally block from
1214 # calling slot.set()
1217 self.hits_counter.add(1)
1220 raise arvados.errors.KeepReadError(
1221 "failed to read {}".format(loc_s))
1224 self.misses_counter.add(1)
1226 # If the locator has hints specifying a prefix (indicating a
1227 # remote keepproxy) or the UUID of a local gateway service,
1228 # read data from the indicated service(s) instead of the usual
1229 # list of local disk services.
1230 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1231 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1232 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1233 for hint in locator.hints if (
1234 hint.startswith('K@') and
1236 self._gateway_services.get(hint[2:])
1238 # Map root URLs to their KeepService objects.
1240 root: self.KeepService(root, self._user_agent_pool,
1241 upload_counter=self.upload_counter,
1242 download_counter=self.download_counter,
1244 insecure=self.insecure)
1245 for root in hint_roots
1248 # See #3147 for a discussion of the loop implementation. Highlights:
1249 # * Refresh the list of Keep services after each failure, in case
1250 # it's being updated.
1251 # * Retry until we succeed, we're out of retries, or every available
1252 # service has returned permanent failure.
1255 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1257 for tries_left in loop:
1259 sorted_roots = self.map_new_services(
1261 force_rebuild=(tries_left < num_retries),
1262 need_writable=False,
1264 except Exception as error:
1265 loop.save_result(error)
1268 # Query KeepService objects that haven't returned
1269 # permanent failure, in our specified shuffle order.
1270 services_to_try = [roots_map[root]
1271 for root in sorted_roots
1272 if roots_map[root].usable()]
1273 for keep_service in services_to_try:
1274 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1275 if blob is not None:
1277 loop.save_result((blob, len(services_to_try)))
1279 # Always cache the result, then return it if we succeeded.
1283 if slot is not None:
1284 self.block_cache.set(slot, blob)
1286 # Q: Including 403 is necessary for the Keep tests to continue
1287 # passing, but maybe they should expect KeepReadError instead?
1288 not_founds = sum(1 for key in sorted_roots
1289 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1290 service_errors = ((key, roots_map[key].last_result()['error'])
1291 for key in sorted_roots)
1293 raise arvados.errors.KeepReadError(
1294 "[{}] failed to read {}: no Keep services available ({})".format(
1295 request_id, loc_s, loop.last_result()))
1296 elif not_founds == len(sorted_roots):
1297 raise arvados.errors.NotFoundError(
1298 "[{}] {} not found".format(request_id, loc_s), service_errors)
1300 raise arvados.errors.KeepReadError(
1301 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1304 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1305 """Save data in Keep.
1307 This method will get a list of Keep services from the API server, and
1308 send the data to each one simultaneously in a new thread. Once the
1309 uploads are finished, if enough copies are saved, this method returns
1310 the most recent HTTP response body. If requests fail to upload
1311 enough copies, this method raises KeepWriteError.
1314 * data: The string of data to upload.
1315 * copies: The number of copies that the user requires be saved.
1317 * num_retries: The number of times to retry PUT requests to
1318 *each* Keep server if it returns temporary failures, with
1319 exponential backoff. The default value is set when the
1320 KeepClient is initialized.
1321 * classes: An optional list of storage class names where copies should
1325 classes = classes or self._default_classes
1327 if not isinstance(data, bytes):
1328 data = data.encode()
1330 self.put_counter.add(1)
1332 data_hash = hashlib.md5(data).hexdigest()
1333 loc_s = data_hash + '+' + str(len(data))
1336 locator = KeepLocator(loc_s)
1338 request_id = (request_id or
1339 (hasattr(self, 'api_client') and self.api_client.request_id) or
1340 arvados.util.new_request_id())
1342 'X-Request-Id': request_id,
1343 'X-Keep-Desired-Replicas': str(copies),
1346 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1350 for tries_left in loop:
1352 sorted_roots = self.map_new_services(
1354 force_rebuild=(tries_left < num_retries),
1357 except Exception as error:
1358 loop.save_result(error)
1361 pending_classes = []
1362 if done_classes is not None:
1363 pending_classes = list(set(classes) - set(done_classes))
1364 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1365 data_hash=data_hash,
1366 copies=copies - done_copies,
1367 max_service_replicas=self.max_replicas_per_service,
1368 timeout=self.current_timeout(num_retries - tries_left),
1369 classes=pending_classes)
1370 for service_root, ks in [(root, roots_map[root])
1371 for root in sorted_roots]:
1374 writer_pool.add_task(ks, service_root)
1376 pool_copies, pool_classes = writer_pool.done()
1377 done_copies += pool_copies
1378 if (done_classes is not None) and (pool_classes is not None):
1379 done_classes += pool_classes
1381 (done_copies >= copies and set(done_classes) == set(classes),
1382 writer_pool.total_task_nr))
1384 # Old keepstore contacted without storage classes support:
1385 # success is determined only by successful copies.
1387 # Disable storage classes tracking from this point forward.
1388 if not self._storage_classes_unsupported_warning:
1389 self._storage_classes_unsupported_warning = True
1390 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1393 (done_copies >= copies, writer_pool.total_task_nr))
1396 return writer_pool.response()
1398 raise arvados.errors.KeepWriteError(
1399 "[{}] failed to write {}: no Keep services available ({})".format(
1400 request_id, data_hash, loop.last_result()))
1402 service_errors = ((key, roots_map[key].last_result()['error'])
1403 for key in sorted_roots
1404 if roots_map[key].last_result()['error'])
1405 raise arvados.errors.KeepWriteError(
1406 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1407 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1409 def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1410 """A stub for put().
1412 This method is used in place of the real put() method when
1413 using local storage (see constructor's local_store argument).
1415 copies and num_retries arguments are ignored: they are here
1416 only for the sake of offering the same call signature as
1419 Data stored this way can be retrieved via local_store_get().
1421 md5 = hashlib.md5(data).hexdigest()
1422 locator = '%s+%d' % (md5, len(data))
1423 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1425 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1426 os.path.join(self.local_store, md5))
1429 def local_store_get(self, loc_s, num_retries=None):
1430 """Companion to local_store_put()."""
1432 locator = KeepLocator(loc_s)
1434 raise arvados.errors.NotFoundError(
1435 "Invalid data locator: '%s'" % loc_s)
1436 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1438 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1441 def local_store_head(self, loc_s, num_retries=None):
1442 """Companion to local_store_put()."""
1444 locator = KeepLocator(loc_s)
1446 raise arvados.errors.NotFoundError(
1447 "Invalid data locator: '%s'" % loc_s)
1448 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1450 if os.path.exists(os.path.join(self.local_store, locator.md5sum)):