1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from __future__ import division
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
36 if sys.version_info >= (3, 0):
37 from io import BytesIO
39 from cStringIO import StringIO as BytesIO
42 import arvados.config as config
44 import arvados.retry as retry
46 import arvados.diskcache
48 _logger = logging.getLogger('arvados.keep')
49 global_client_object = None
52 # Monkey patch TCP constants when not available (apple). Values sourced from:
53 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
54 if sys.platform == 'darwin':
55 if not hasattr(socket, 'TCP_KEEPALIVE'):
56 socket.TCP_KEEPALIVE = 0x010
57 if not hasattr(socket, 'TCP_KEEPINTVL'):
58 socket.TCP_KEEPINTVL = 0x101
59 if not hasattr(socket, 'TCP_KEEPCNT'):
60 socket.TCP_KEEPCNT = 0x102
63 class KeepLocator(object):
64 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
65 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
67 def __init__(self, locator_str):
70 self._perm_expiry = None
71 pieces = iter(locator_str.split('+'))
72 self.md5sum = next(pieces)
74 self.size = int(next(pieces))
78 if self.HINT_RE.match(hint) is None:
79 raise ValueError("invalid hint format: {}".format(hint))
80 elif hint.startswith('A'):
81 self.parse_permission_hint(hint)
83 self.hints.append(hint)
88 for s in [self.md5sum, self.size,
89 self.permission_hint()] + self.hints
93 if self.size is not None:
94 return "%s+%i" % (self.md5sum, self.size)
98 def _make_hex_prop(name, length):
99 # Build and return a new property with the given name that
100 # must be a hex string of the given length.
101 data_name = '_{}'.format(name)
103 return getattr(self, data_name)
104 def setter(self, hex_str):
105 if not arvados.util.is_hex(hex_str, length):
106 raise ValueError("{} is not a {}-digit hex string: {!r}".
107 format(name, length, hex_str))
108 setattr(self, data_name, hex_str)
109 return property(getter, setter)
111 md5sum = _make_hex_prop('md5sum', 32)
112 perm_sig = _make_hex_prop('perm_sig', 40)
115 def perm_expiry(self):
116 return self._perm_expiry
119 def perm_expiry(self, value):
120 if not arvados.util.is_hex(value, 1, 8):
122 "permission timestamp must be a hex Unix timestamp: {}".
124 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
126 def permission_hint(self):
127 data = [self.perm_sig, self.perm_expiry]
130 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
131 return "A{}@{:08x}".format(*data)
133 def parse_permission_hint(self, s):
135 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
137 raise ValueError("bad permission hint {}".format(s))
139 def permission_expired(self, as_of_dt=None):
140 if self.perm_expiry is None:
142 elif as_of_dt is None:
143 as_of_dt = datetime.datetime.now()
144 return self.perm_expiry <= as_of_dt
148 """Simple interface to a global KeepClient object.
150 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
151 own API client. The global KeepClient will build an API client from the
152 current Arvados configuration, which may not match the one you built.
157 def global_client_object(cls):
158 global global_client_object
159 # Previously, KeepClient would change its behavior at runtime based
160 # on these configuration settings. We simulate that behavior here
161 # by checking the values and returning a new KeepClient if any of
163 key = (config.get('ARVADOS_API_HOST'),
164 config.get('ARVADOS_API_TOKEN'),
165 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
166 config.get('ARVADOS_KEEP_PROXY'),
167 os.environ.get('KEEP_LOCAL_STORE'))
168 if (global_client_object is None) or (cls._last_key != key):
169 global_client_object = KeepClient()
171 return global_client_object
174 def get(locator, **kwargs):
175 return Keep.global_client_object().get(locator, **kwargs)
178 def put(data, **kwargs):
179 return Keep.global_client_object().put(data, **kwargs)
181 class KeepBlockCache(object):
182 def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
183 self.cache_max = cache_max
185 self._cache_lock = threading.Lock()
186 self._max_slots = max_slots
187 self._disk_cache = disk_cache
188 self._disk_cache_dir = disk_cache_dir
189 self._cache_updating = threading.Condition(self._cache_lock)
191 if self._disk_cache and self._disk_cache_dir is None:
192 self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
193 os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
195 if self._max_slots == 0:
197 # Each block uses two file descriptors, one used to
198 # open it initially and hold the flock(), and a second
199 # hidden one used by mmap().
201 # Set max slots to 1/8 of maximum file handles. This
202 # means we'll use at most 1/4 of total file handles.
204 # NOFILE typically defaults to 1024 on Linux so this
205 # is 128 slots (256 file handles), which means we can
206 # cache up to 8 GiB of 64 MiB blocks. This leaves
207 # 768 file handles for sockets and other stuff.
209 # When we want the ability to have more cache (e.g. in
210 # arv-mount) we'll increase rlimit before calling
212 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
215 self._max_slots = 512
217 if self.cache_max == 0:
219 fs = os.statvfs(self._disk_cache_dir)
220 # Calculation of available space incorporates existing cache usage
221 existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
222 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
223 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
225 # 10% of total disk size
226 # 25% of available space
228 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
231 self.cache_max = (256 * 1024 * 1024)
233 self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
236 self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
240 class CacheSlot(object):
241 __slots__ = ("locator", "ready", "content")
243 def __init__(self, locator):
244 self.locator = locator
245 self.ready = threading.Event()
252 def set(self, value):
257 if self.content is None:
260 return len(self.content)
267 return (self.content is None)
269 def _resize_cache(self, cache_max, max_slots):
270 # Try and make sure the contents of the cache do not exceed
271 # the supplied maximums.
273 # Select all slots except those where ready.is_set() and content is
274 # None (that means there was an error reading the block).
275 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
276 sm = sum([slot.size() for slot in self._cache])
277 while len(self._cache) > 0 and (sm > cache_max or len(self._cache) > max_slots):
278 for i in range(len(self._cache)-1, -1, -1):
279 # start from the back, find a slot that is a candidate to evict
280 if self._cache[i].ready.is_set():
281 sz = self._cache[i].size()
283 # If evict returns false it means the
284 # underlying disk cache couldn't lock the file
285 # for deletion because another process was using
286 # it. Don't count it as reducing the amount
287 # of data in the cache, find something else to
289 if self._cache[i].evict():
292 # check to make sure the underlying data is gone
293 if self._cache[i].gone():
294 # either way we forget about it. either the
295 # other process will delete it, or if we need
296 # it again and it is still there, we'll find
303 '''Cap the cache size to self.cache_max'''
304 with self._cache_updating:
305 self._resize_cache(self.cache_max, self._max_slots)
306 self._cache_updating.notify_all()
308 def _get(self, locator):
309 # Test if the locator is already in the cache
310 for i in range(0, len(self._cache)):
311 if self._cache[i].locator == locator:
314 # move it to the front
316 self._cache.insert(0, n)
319 # see if it exists on disk
320 n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
322 self._cache.insert(0, n)
326 def get(self, locator):
327 with self._cache_lock:
328 return self._get(locator)
330 def reserve_cache(self, locator):
331 '''Reserve a cache slot for the specified locator,
332 or return the existing slot.'''
333 with self._cache_updating:
334 n = self._get(locator)
338 # Add a new cache slot for the locator
339 self._resize_cache(self.cache_max, self._max_slots-1)
340 while len(self._cache) >= self._max_slots:
341 # If there isn't a slot available, need to wait
342 # for something to happen that releases one of the
343 # cache slots. Idle for 200 ms or woken up by
345 self._cache_updating.wait(timeout=0.2)
346 self._resize_cache(self.cache_max, self._max_slots-1)
349 n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
351 n = KeepBlockCache.CacheSlot(locator)
352 self._cache.insert(0, n)
355 def set(self, slot, blob):
360 if e.errno == errno.ENOMEM:
361 # Reduce max slots to current - 4, cap cache and retry
362 with self._cache_lock:
363 self._max_slots = max(4, len(self._cache) - 4)
364 elif e.errno == errno.ENOSPC:
365 # Reduce disk max space to current - 256 MiB, cap cache and retry
366 with self._cache_lock:
367 sm = sum([st.size() for st in self._cache])
368 self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
369 elif e.errno == errno.ENODEV:
370 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
371 except Exception as e:
374 # Check if we should evict things from the cache. Either
375 # because we added a new thing or there was an error and
376 # we possibly adjusted the limits down, so we might need
377 # to push something out.
381 # Only gets here if there was an error the first time. The
382 # exception handler adjusts limits downward in some cases
383 # to free up resources, which would make the operation
386 except Exception as e:
387 # It failed again. Give up.
389 raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
393 class Counter(object):
394 def __init__(self, v=0):
395 self._lk = threading.Lock()
407 class KeepClient(object):
409 # Default Keep server connection timeout: 2 seconds
410 # Default Keep server read timeout: 256 seconds
411 # Default Keep server bandwidth minimum: 32768 bytes per second
412 # Default Keep proxy connection timeout: 20 seconds
413 # Default Keep proxy read timeout: 256 seconds
414 # Default Keep proxy bandwidth minimum: 32768 bytes per second
415 DEFAULT_TIMEOUT = (2, 256, 32768)
416 DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
419 class KeepService(object):
420 """Make requests to a single Keep service, and track results.
422 A KeepService is intended to last long enough to perform one
423 transaction (GET or PUT) against one Keep service. This can
424 involve calling either get() or put() multiple times in order
425 to retry after transient failures. However, calling both get()
426 and put() on a single instance -- or using the same instance
427 to access two different Keep services -- will not produce
434 arvados.errors.HttpError,
437 def __init__(self, root, user_agent_pool=queue.LifoQueue(),
439 download_counter=None,
443 self._user_agent_pool = user_agent_pool
444 self._result = {'error': None}
448 self.get_headers = {'Accept': 'application/octet-stream'}
449 self.get_headers.update(headers)
450 self.put_headers = headers
451 self.upload_counter = upload_counter
452 self.download_counter = download_counter
453 self.insecure = insecure
456 """Is it worth attempting a request?"""
460 """Did the request succeed or encounter permanent failure?"""
461 return self._result['error'] == False or not self._usable
463 def last_result(self):
466 def _get_user_agent(self):
468 return self._user_agent_pool.get(block=False)
472 def _put_user_agent(self, ua):
475 self._user_agent_pool.put(ua, block=False)
479 def _socket_open(self, *args, **kwargs):
480 if len(args) + len(kwargs) == 2:
481 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
483 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
485 def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
486 return self._socket_open_pycurl_7_21_5(
488 address=collections.namedtuple(
489 'Address', ['family', 'socktype', 'protocol', 'addr'],
490 )(family, socktype, protocol, address))
492 def _socket_open_pycurl_7_21_5(self, purpose, address):
493 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
494 s = socket.socket(address.family, address.socktype, address.protocol)
495 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
496 # Will throw invalid protocol error on mac. This test prevents that.
497 if hasattr(socket, 'TCP_KEEPIDLE'):
498 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
499 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
503 def get(self, locator, method="GET", timeout=None):
504 # locator is a KeepLocator object.
505 url = self.root + str(locator)
506 _logger.debug("Request: %s %s", method, url)
507 curl = self._get_user_agent()
510 with timer.Timer() as t:
512 response_body = BytesIO()
513 curl.setopt(pycurl.NOSIGNAL, 1)
514 curl.setopt(pycurl.OPENSOCKETFUNCTION,
515 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
516 curl.setopt(pycurl.URL, url.encode('utf-8'))
517 curl.setopt(pycurl.HTTPHEADER, [
518 '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
519 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
520 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
522 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
523 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
525 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
527 curl.setopt(pycurl.NOBODY, True)
528 self._setcurltimeouts(curl, timeout, method=="HEAD")
532 except Exception as e:
533 raise arvados.errors.HttpError(0, str(e))
539 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
540 'body': response_body.getvalue(),
541 'headers': self._headers,
545 ok = retry.check_http_response_success(self._result['status_code'])
547 self._result['error'] = arvados.errors.HttpError(
548 self._result['status_code'],
549 self._headers.get('x-status-line', 'Error'))
550 except self.HTTP_ERRORS as e:
554 self._usable = ok != False
555 if self._result.get('status_code', None):
556 # The client worked well enough to get an HTTP status
557 # code, so presumably any problems are just on the
558 # server side and it's OK to reuse the client.
559 self._put_user_agent(curl)
561 # Don't return this client to the pool, in case it's
565 _logger.debug("Request fail: GET %s => %s: %s",
566 url, type(self._result['error']), str(self._result['error']))
569 _logger.info("HEAD %s: %s bytes",
570 self._result['status_code'],
571 self._result.get('content-length'))
572 if self._result['headers'].get('x-keep-locator'):
573 # This is a response to a remote block copy request, return
574 # the local copy block locator.
575 return self._result['headers'].get('x-keep-locator')
578 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
579 self._result['status_code'],
580 len(self._result['body']),
582 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
584 if self.download_counter:
585 self.download_counter.add(len(self._result['body']))
586 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
587 if resp_md5 != locator.md5sum:
588 _logger.warning("Checksum fail: md5(%s) = %s",
590 self._result['error'] = arvados.errors.HttpError(
593 return self._result['body']
595 def put(self, hash_s, body, timeout=None, headers={}):
596 put_headers = copy.copy(self.put_headers)
597 put_headers.update(headers)
598 url = self.root + hash_s
599 _logger.debug("Request: PUT %s", url)
600 curl = self._get_user_agent()
603 with timer.Timer() as t:
605 body_reader = BytesIO(body)
606 response_body = BytesIO()
607 curl.setopt(pycurl.NOSIGNAL, 1)
608 curl.setopt(pycurl.OPENSOCKETFUNCTION,
609 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
610 curl.setopt(pycurl.URL, url.encode('utf-8'))
611 # Using UPLOAD tells cURL to wait for a "go ahead" from the
612 # Keep server (in the form of a HTTP/1.1 "100 Continue"
613 # response) instead of sending the request body immediately.
614 # This allows the server to reject the request if the request
615 # is invalid or the server is read-only, without waiting for
616 # the client to send the entire block.
617 curl.setopt(pycurl.UPLOAD, True)
618 curl.setopt(pycurl.INFILESIZE, len(body))
619 curl.setopt(pycurl.READFUNCTION, body_reader.read)
620 curl.setopt(pycurl.HTTPHEADER, [
621 '{}: {}'.format(k,v) for k,v in put_headers.items()])
622 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
623 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
625 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
626 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
628 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
629 self._setcurltimeouts(curl, timeout)
632 except Exception as e:
633 raise arvados.errors.HttpError(0, str(e))
639 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
640 'body': response_body.getvalue().decode('utf-8'),
641 'headers': self._headers,
644 ok = retry.check_http_response_success(self._result['status_code'])
646 self._result['error'] = arvados.errors.HttpError(
647 self._result['status_code'],
648 self._headers.get('x-status-line', 'Error'))
649 except self.HTTP_ERRORS as e:
653 self._usable = ok != False # still usable if ok is True or None
654 if self._result.get('status_code', None):
655 # Client is functional. See comment in get().
656 self._put_user_agent(curl)
660 _logger.debug("Request fail: PUT %s => %s: %s",
661 url, type(self._result['error']), str(self._result['error']))
663 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
664 self._result['status_code'],
667 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
668 if self.upload_counter:
669 self.upload_counter.add(len(body))
672 def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
675 elif isinstance(timeouts, tuple):
676 if len(timeouts) == 2:
677 conn_t, xfer_t = timeouts
678 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
680 conn_t, xfer_t, bandwidth_bps = timeouts
682 conn_t, xfer_t = (timeouts, timeouts)
683 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
684 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
685 if not ignore_bandwidth:
686 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
687 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
689 def _headerfunction(self, header_line):
690 if isinstance(header_line, bytes):
691 header_line = header_line.decode('iso-8859-1')
692 if ':' in header_line:
693 name, value = header_line.split(':', 1)
694 name = name.strip().lower()
695 value = value.strip()
697 name = self._lastheadername
698 value = self._headers[name] + ' ' + header_line.strip()
699 elif header_line.startswith('HTTP/'):
700 name = 'x-status-line'
703 _logger.error("Unexpected header line: %s", header_line)
705 self._lastheadername = name
706 self._headers[name] = value
707 # Returning None implies all bytes were written
710 class KeepWriterQueue(queue.Queue):
711 def __init__(self, copies, classes=[]):
712 queue.Queue.__init__(self) # Old-style superclass
713 self.wanted_copies = copies
714 self.wanted_storage_classes = classes
715 self.successful_copies = 0
716 self.confirmed_storage_classes = {}
718 self.storage_classes_tracking = True
719 self.queue_data_lock = threading.RLock()
720 self.pending_tries = max(copies, len(classes))
721 self.pending_tries_notification = threading.Condition()
723 def write_success(self, response, replicas_nr, classes_confirmed):
724 with self.queue_data_lock:
725 self.successful_copies += replicas_nr
726 if classes_confirmed is None:
727 self.storage_classes_tracking = False
728 elif self.storage_classes_tracking:
729 for st_class, st_copies in classes_confirmed.items():
731 self.confirmed_storage_classes[st_class] += st_copies
733 self.confirmed_storage_classes[st_class] = st_copies
734 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
735 self.response = response
736 with self.pending_tries_notification:
737 self.pending_tries_notification.notify_all()
739 def write_fail(self, ks):
740 with self.pending_tries_notification:
741 self.pending_tries += 1
742 self.pending_tries_notification.notify()
744 def pending_copies(self):
745 with self.queue_data_lock:
746 return self.wanted_copies - self.successful_copies
748 def satisfied_classes(self):
749 with self.queue_data_lock:
750 if not self.storage_classes_tracking:
751 # Notifies disabled storage classes expectation to
754 return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
756 def pending_classes(self):
757 with self.queue_data_lock:
758 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
760 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
761 for st_class, st_copies in self.confirmed_storage_classes.items():
762 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
763 unsatisfied_classes.remove(st_class)
764 return unsatisfied_classes
766 def get_next_task(self):
767 with self.pending_tries_notification:
769 if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
770 # This notify_all() is unnecessary --
771 # write_success() already called notify_all()
772 # when pending<1 became true, so it's not
773 # possible for any other thread to be in
774 # wait() now -- but it's cheap insurance
775 # against deadlock so we do it anyway:
776 self.pending_tries_notification.notify_all()
777 # Drain the queue and then raise Queue.Empty
781 elif self.pending_tries > 0:
782 service, service_root = self.get_nowait()
783 if service.finished():
786 self.pending_tries -= 1
787 return service, service_root
789 self.pending_tries_notification.notify_all()
792 self.pending_tries_notification.wait()
795 class KeepWriterThreadPool(object):
796 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
797 self.total_task_nr = 0
798 if (not max_service_replicas) or (max_service_replicas >= copies):
801 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
802 _logger.debug("Pool max threads is %d", num_threads)
804 self.queue = KeepClient.KeepWriterQueue(copies, classes)
806 for _ in range(num_threads):
807 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
808 self.workers.append(w)
810 def add_task(self, ks, service_root):
811 self.queue.put((ks, service_root))
812 self.total_task_nr += 1
815 return self.queue.successful_copies, self.queue.satisfied_classes()
819 for worker in self.workers:
821 # Wait for finished work
825 return self.queue.response
828 class KeepWriterThread(threading.Thread):
829 class TaskFailed(RuntimeError): pass
831 def __init__(self, queue, data, data_hash, timeout=None):
832 super(KeepClient.KeepWriterThread, self).__init__()
833 self.timeout = timeout
836 self.data_hash = data_hash
842 service, service_root = self.queue.get_next_task()
846 locator, copies, classes = self.do_task(service, service_root)
847 except Exception as e:
848 if not isinstance(e, self.TaskFailed):
849 _logger.exception("Exception in KeepWriterThread")
850 self.queue.write_fail(service)
852 self.queue.write_success(locator, copies, classes)
854 self.queue.task_done()
856 def do_task(self, service, service_root):
857 classes = self.queue.pending_classes()
861 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
862 success = bool(service.put(self.data_hash,
864 timeout=self.timeout,
866 result = service.last_result()
869 if result.get('status_code'):
870 _logger.debug("Request fail: PUT %s => %s %s",
872 result.get('status_code'),
874 raise self.TaskFailed()
876 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
877 str(threading.current_thread()),
882 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
883 except (KeyError, ValueError):
886 classes_confirmed = {}
888 scch = result['headers']['x-keep-storage-classes-confirmed']
889 for confirmation in scch.replace(' ', '').split(','):
890 if '=' in confirmation:
891 stored_class, stored_copies = confirmation.split('=')[:2]
892 classes_confirmed[stored_class] = int(stored_copies)
893 except (KeyError, ValueError):
894 # Storage classes confirmed header missing or corrupt
895 classes_confirmed = None
897 return result['body'].strip(), replicas_stored, classes_confirmed
900 def __init__(self, api_client=None, proxy=None,
901 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
902 api_token=None, local_store=None, block_cache=None,
903 num_retries=0, session=None):
904 """Initialize a new KeepClient.
908 The API client to use to find Keep services. If not
909 provided, KeepClient will build one from available Arvados
913 If specified, this KeepClient will send requests to this Keep
914 proxy. Otherwise, KeepClient will fall back to the setting of the
915 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
916 If you want to KeepClient does not use a proxy, pass in an empty
920 The initial timeout (in seconds) for HTTP requests to Keep
921 non-proxy servers. A tuple of three floats is interpreted as
922 (connection_timeout, read_timeout, minimum_bandwidth). A connection
923 will be aborted if the average traffic rate falls below
924 minimum_bandwidth bytes per second over an interval of read_timeout
925 seconds. Because timeouts are often a result of transient server
926 load, the actual connection timeout will be increased by a factor
927 of two on each retry.
928 Default: (2, 256, 32768).
931 The initial timeout (in seconds) for HTTP requests to
932 Keep proxies. A tuple of three floats is interpreted as
933 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
934 described above for adjusting connection timeouts on retry also
936 Default: (20, 256, 32768).
939 If you're not using an API client, but only talking
940 directly to a Keep proxy, this parameter specifies an API token
941 to authenticate Keep requests. It is an error to specify both
942 api_client and api_token. If you specify neither, KeepClient
943 will use one available from the Arvados configuration.
946 If specified, this KeepClient will bypass Keep
947 services, and save data to the named directory. If unspecified,
948 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
949 environment variable. If you want to ensure KeepClient does not
950 use local storage, pass in an empty string. This is primarily
951 intended to mock a server for testing.
954 The default number of times to retry failed requests.
955 This will be used as the default num_retries value when get() and
956 put() are called. Default 0.
958 self.lock = threading.Lock()
960 if config.get('ARVADOS_KEEP_SERVICES'):
961 proxy = config.get('ARVADOS_KEEP_SERVICES')
963 proxy = config.get('ARVADOS_KEEP_PROXY')
964 if api_token is None:
965 if api_client is None:
966 api_token = config.get('ARVADOS_API_TOKEN')
968 api_token = api_client.api_token
969 elif api_client is not None:
971 "can't build KeepClient with both API client and token")
972 if local_store is None:
973 local_store = os.environ.get('KEEP_LOCAL_STORE')
975 if api_client is None:
976 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
978 self.insecure = api_client.insecure
980 self.block_cache = block_cache if block_cache else KeepBlockCache()
981 self.timeout = timeout
982 self.proxy_timeout = proxy_timeout
983 self._user_agent_pool = queue.LifoQueue()
984 self.upload_counter = Counter()
985 self.download_counter = Counter()
986 self.put_counter = Counter()
987 self.get_counter = Counter()
988 self.hits_counter = Counter()
989 self.misses_counter = Counter()
990 self._storage_classes_unsupported_warning = False
991 self._default_classes = []
994 self.local_store = local_store
995 self.head = self.local_store_head
996 self.get = self.local_store_get
997 self.put = self.local_store_put
999 self.num_retries = num_retries
1000 self.max_replicas_per_service = None
1002 proxy_uris = proxy.split()
1003 for i in range(len(proxy_uris)):
1004 if not proxy_uris[i].endswith('/'):
1005 proxy_uris[i] += '/'
1007 url = urllib.parse.urlparse(proxy_uris[i])
1008 if not (url.scheme and url.netloc):
1009 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
1010 self.api_token = api_token
1011 self._gateway_services = {}
1012 self._keep_services = [{
1013 'uuid': "00000-bi6l4-%015d" % idx,
1014 'service_type': 'proxy',
1015 '_service_root': uri,
1016 } for idx, uri in enumerate(proxy_uris)]
1017 self._writable_services = self._keep_services
1018 self.using_proxy = True
1019 self._static_services_list = True
1021 # It's important to avoid instantiating an API client
1022 # unless we actually need one, for testing's sake.
1023 if api_client is None:
1024 api_client = arvados.api('v1')
1025 self.api_client = api_client
1026 self.api_token = api_client.api_token
1027 self._gateway_services = {}
1028 self._keep_services = None
1029 self._writable_services = None
1030 self.using_proxy = None
1031 self._static_services_list = False
1033 self._default_classes = [
1034 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
1036 # We're talking to an old cluster
1039 def current_timeout(self, attempt_number):
1040 """Return the appropriate timeout to use for this client.
1042 The proxy timeout setting if the backend service is currently a proxy,
1043 the regular timeout setting otherwise. The `attempt_number` indicates
1044 how many times the operation has been tried already (starting from 0
1045 for the first try), and scales the connection timeout portion of the
1046 return value accordingly.
1049 # TODO(twp): the timeout should be a property of a
1050 # KeepService, not a KeepClient. See #4488.
1051 t = self.proxy_timeout if self.using_proxy else self.timeout
1053 return (t[0] * (1 << attempt_number), t[1])
1055 return (t[0] * (1 << attempt_number), t[1], t[2])
1056 def _any_nondisk_services(self, service_list):
1057 return any(ks.get('service_type', 'disk') != 'disk'
1058 for ks in service_list)
1060 def build_services_list(self, force_rebuild=False):
1061 if (self._static_services_list or
1062 (self._keep_services and not force_rebuild)):
1066 keep_services = self.api_client.keep_services().accessible()
1067 except Exception: # API server predates Keep services.
1068 keep_services = self.api_client.keep_disks().list()
1070 # Gateway services are only used when specified by UUID,
1071 # so there's nothing to gain by filtering them by
1073 self._gateway_services = {ks['uuid']: ks for ks in
1074 keep_services.execute()['items']}
1075 if not self._gateway_services:
1076 raise arvados.errors.NoKeepServersError()
1078 # Precompute the base URI for each service.
1079 for r in self._gateway_services.values():
1080 host = r['service_host']
1081 if not host.startswith('[') and host.find(':') >= 0:
1082 # IPv6 URIs must be formatted like http://[::1]:80/...
1083 host = '[' + host + ']'
1084 r['_service_root'] = "{}://{}:{:d}/".format(
1085 'https' if r['service_ssl_flag'] else 'http',
1089 _logger.debug(str(self._gateway_services))
1090 self._keep_services = [
1091 ks for ks in self._gateway_services.values()
1092 if not ks.get('service_type', '').startswith('gateway:')]
1093 self._writable_services = [ks for ks in self._keep_services
1094 if not ks.get('read_only')]
1096 # For disk type services, max_replicas_per_service is 1
1097 # It is unknown (unlimited) for other service types.
1098 if self._any_nondisk_services(self._writable_services):
1099 self.max_replicas_per_service = None
1101 self.max_replicas_per_service = 1
1103 def _service_weight(self, data_hash, service_uuid):
1104 """Compute the weight of a Keep service endpoint for a data
1105 block with a known hash.
1107 The weight is md5(h + u) where u is the last 15 characters of
1108 the service endpoint's UUID.
1110 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1112 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1113 """Return an array of Keep service endpoints, in the order in
1114 which they should be probed when reading or writing data with
1115 the given hash+hints.
1117 self.build_services_list(force_rebuild)
1120 # Use the services indicated by the given +K@... remote
1121 # service hints, if any are present and can be resolved to a
1123 for hint in locator.hints:
1124 if hint.startswith('K@'):
1126 sorted_roots.append(
1127 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1128 elif len(hint) == 29:
1129 svc = self._gateway_services.get(hint[2:])
1131 sorted_roots.append(svc['_service_root'])
1133 # Sort the available local services by weight (heaviest first)
1134 # for this locator, and return their service_roots (base URIs)
1136 use_services = self._keep_services
1138 use_services = self._writable_services
1139 self.using_proxy = self._any_nondisk_services(use_services)
1140 sorted_roots.extend([
1141 svc['_service_root'] for svc in sorted(
1144 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1145 _logger.debug("{}: {}".format(locator, sorted_roots))
1148 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1149 # roots_map is a dictionary, mapping Keep service root strings
1150 # to KeepService objects. Poll for Keep services, and add any
1151 # new ones to roots_map. Return the current list of local
1153 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1154 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1155 for root in local_roots:
1156 if root not in roots_map:
1157 roots_map[root] = self.KeepService(
1158 root, self._user_agent_pool,
1159 upload_counter=self.upload_counter,
1160 download_counter=self.download_counter,
1162 insecure=self.insecure)
1166 def _check_loop_result(result):
1167 # KeepClient RetryLoops should save results as a 2-tuple: the
1168 # actual result of the request, and the number of servers available
1169 # to receive the request this round.
1170 # This method returns True if there's a real result, False if
1171 # there are no more servers available, otherwise None.
1172 if isinstance(result, Exception):
1174 result, tried_server_count = result
1175 if (result is not None) and (result is not False):
1177 elif tried_server_count < 1:
1178 _logger.info("No more Keep services to try; giving up")
1183 def get_from_cache(self, loc_s):
1184 """Fetch a block only if is in the cache, otherwise return None."""
1185 locator = KeepLocator(loc_s)
1186 slot = self.block_cache.get(locator.md5sum)
1187 if slot is not None and slot.ready.is_set():
1192 def refresh_signature(self, loc):
1193 """Ask Keep to get the remote block and return its local signature"""
1194 now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1195 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1198 def head(self, loc_s, **kwargs):
1199 return self._get_or_head(loc_s, method="HEAD", **kwargs)
1202 def get(self, loc_s, **kwargs):
1203 return self._get_or_head(loc_s, method="GET", **kwargs)
1205 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1206 """Get data from Keep.
1208 This method fetches one or more blocks of data from Keep. It
1209 sends a request each Keep service registered with the API
1210 server (or the proxy provided when this client was
1211 instantiated), then each service named in location hints, in
1212 sequence. As soon as one service provides the data, it's
1216 * loc_s: A string of one or more comma-separated locators to fetch.
1217 This method returns the concatenation of these blocks.
1218 * num_retries: The number of times to retry GET requests to
1219 *each* Keep server if it returns temporary failures, with
1220 exponential backoff. Note that, in each loop, the method may try
1221 to fetch data from every available Keep service, along with any
1222 that are named in location hints in the locator. The default value
1223 is set when the KeepClient is initialized.
1226 return ''.join(self.get(x) for x in loc_s.split(','))
1228 self.get_counter.add(1)
1230 request_id = (request_id or
1231 (hasattr(self, 'api_client') and self.api_client.request_id) or
1232 arvados.util.new_request_id())
1235 headers['X-Request-Id'] = request_id
1240 locator = KeepLocator(loc_s)
1242 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1245 # this is request for a prefetch, if it is
1246 # already in flight, return immediately.
1247 # clear 'slot' to prevent finally block from
1248 # calling slot.set()
1251 self.hits_counter.add(1)
1254 raise arvados.errors.KeepReadError(
1255 "failed to read {}".format(loc_s))
1258 self.misses_counter.add(1)
1260 # If the locator has hints specifying a prefix (indicating a
1261 # remote keepproxy) or the UUID of a local gateway service,
1262 # read data from the indicated service(s) instead of the usual
1263 # list of local disk services.
1264 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1265 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1266 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1267 for hint in locator.hints if (
1268 hint.startswith('K@') and
1270 self._gateway_services.get(hint[2:])
1272 # Map root URLs to their KeepService objects.
1274 root: self.KeepService(root, self._user_agent_pool,
1275 upload_counter=self.upload_counter,
1276 download_counter=self.download_counter,
1278 insecure=self.insecure)
1279 for root in hint_roots
1282 # See #3147 for a discussion of the loop implementation. Highlights:
1283 # * Refresh the list of Keep services after each failure, in case
1284 # it's being updated.
1285 # * Retry until we succeed, we're out of retries, or every available
1286 # service has returned permanent failure.
1289 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1291 for tries_left in loop:
1293 sorted_roots = self.map_new_services(
1295 force_rebuild=(tries_left < num_retries),
1296 need_writable=False,
1298 except Exception as error:
1299 loop.save_result(error)
1302 # Query KeepService objects that haven't returned
1303 # permanent failure, in our specified shuffle order.
1304 services_to_try = [roots_map[root]
1305 for root in sorted_roots
1306 if roots_map[root].usable()]
1307 for keep_service in services_to_try:
1308 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1309 if blob is not None:
1311 loop.save_result((blob, len(services_to_try)))
1313 # Always cache the result, then return it if we succeeded.
1317 if slot is not None:
1318 self.block_cache.set(slot, blob)
1320 # Q: Including 403 is necessary for the Keep tests to continue
1321 # passing, but maybe they should expect KeepReadError instead?
1322 not_founds = sum(1 for key in sorted_roots
1323 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1324 service_errors = ((key, roots_map[key].last_result()['error'])
1325 for key in sorted_roots)
1327 raise arvados.errors.KeepReadError(
1328 "[{}] failed to read {}: no Keep services available ({})".format(
1329 request_id, loc_s, loop.last_result()))
1330 elif not_founds == len(sorted_roots):
1331 raise arvados.errors.NotFoundError(
1332 "[{}] {} not found".format(request_id, loc_s), service_errors)
1334 raise arvados.errors.KeepReadError(
1335 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1338 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1339 """Save data in Keep.
1341 This method will get a list of Keep services from the API server, and
1342 send the data to each one simultaneously in a new thread. Once the
1343 uploads are finished, if enough copies are saved, this method returns
1344 the most recent HTTP response body. If requests fail to upload
1345 enough copies, this method raises KeepWriteError.
1348 * data: The string of data to upload.
1349 * copies: The number of copies that the user requires be saved.
1351 * num_retries: The number of times to retry PUT requests to
1352 *each* Keep server if it returns temporary failures, with
1353 exponential backoff. The default value is set when the
1354 KeepClient is initialized.
1355 * classes: An optional list of storage class names where copies should
1359 classes = classes or self._default_classes
1361 if not isinstance(data, bytes):
1362 data = data.encode()
1364 self.put_counter.add(1)
1366 data_hash = hashlib.md5(data).hexdigest()
1367 loc_s = data_hash + '+' + str(len(data))
1370 locator = KeepLocator(loc_s)
1372 request_id = (request_id or
1373 (hasattr(self, 'api_client') and self.api_client.request_id) or
1374 arvados.util.new_request_id())
1376 'X-Request-Id': request_id,
1377 'X-Keep-Desired-Replicas': str(copies),
1380 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1384 for tries_left in loop:
1386 sorted_roots = self.map_new_services(
1388 force_rebuild=(tries_left < num_retries),
1391 except Exception as error:
1392 loop.save_result(error)
1395 pending_classes = []
1396 if done_classes is not None:
1397 pending_classes = list(set(classes) - set(done_classes))
1398 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1399 data_hash=data_hash,
1400 copies=copies - done_copies,
1401 max_service_replicas=self.max_replicas_per_service,
1402 timeout=self.current_timeout(num_retries - tries_left),
1403 classes=pending_classes)
1404 for service_root, ks in [(root, roots_map[root])
1405 for root in sorted_roots]:
1408 writer_pool.add_task(ks, service_root)
1410 pool_copies, pool_classes = writer_pool.done()
1411 done_copies += pool_copies
1412 if (done_classes is not None) and (pool_classes is not None):
1413 done_classes += pool_classes
1415 (done_copies >= copies and set(done_classes) == set(classes),
1416 writer_pool.total_task_nr))
1418 # Old keepstore contacted without storage classes support:
1419 # success is determined only by successful copies.
1421 # Disable storage classes tracking from this point forward.
1422 if not self._storage_classes_unsupported_warning:
1423 self._storage_classes_unsupported_warning = True
1424 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1427 (done_copies >= copies, writer_pool.total_task_nr))
1430 return writer_pool.response()
1432 raise arvados.errors.KeepWriteError(
1433 "[{}] failed to write {}: no Keep services available ({})".format(
1434 request_id, data_hash, loop.last_result()))
1436 service_errors = ((key, roots_map[key].last_result()['error'])
1437 for key in sorted_roots
1438 if roots_map[key].last_result()['error'])
1439 raise arvados.errors.KeepWriteError(
1440 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1441 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1443 def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1444 """A stub for put().
1446 This method is used in place of the real put() method when
1447 using local storage (see constructor's local_store argument).
1449 copies and num_retries arguments are ignored: they are here
1450 only for the sake of offering the same call signature as
1453 Data stored this way can be retrieved via local_store_get().
1455 md5 = hashlib.md5(data).hexdigest()
1456 locator = '%s+%d' % (md5, len(data))
1457 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1459 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1460 os.path.join(self.local_store, md5))
1463 def local_store_get(self, loc_s, num_retries=None):
1464 """Companion to local_store_put()."""
1466 locator = KeepLocator(loc_s)
1468 raise arvados.errors.NotFoundError(
1469 "Invalid data locator: '%s'" % loc_s)
1470 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1472 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1475 def local_store_head(self, loc_s, num_retries=None):
1476 """Companion to local_store_put()."""
1478 locator = KeepLocator(loc_s)
1480 raise arvados.errors.NotFoundError(
1481 "Invalid data locator: '%s'" % loc_s)
1482 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1484 if os.path.exists(os.path.join(self.local_store, locator.md5sum)):