1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from __future__ import division
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
35 if sys.version_info >= (3, 0):
36 from io import BytesIO
38 from cStringIO import StringIO as BytesIO
41 import arvados.config as config
43 import arvados.retry as retry
45 import arvados.diskcache
47 _logger = logging.getLogger('arvados.keep')
48 global_client_object = None
51 # Monkey patch TCP constants when not available (apple). Values sourced from:
52 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
53 if sys.platform == 'darwin':
54 if not hasattr(socket, 'TCP_KEEPALIVE'):
55 socket.TCP_KEEPALIVE = 0x010
56 if not hasattr(socket, 'TCP_KEEPINTVL'):
57 socket.TCP_KEEPINTVL = 0x101
58 if not hasattr(socket, 'TCP_KEEPCNT'):
59 socket.TCP_KEEPCNT = 0x102
62 class KeepLocator(object):
63 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
64 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
66 def __init__(self, locator_str):
69 self._perm_expiry = None
70 pieces = iter(locator_str.split('+'))
71 self.md5sum = next(pieces)
73 self.size = int(next(pieces))
77 if self.HINT_RE.match(hint) is None:
78 raise ValueError("invalid hint format: {}".format(hint))
79 elif hint.startswith('A'):
80 self.parse_permission_hint(hint)
82 self.hints.append(hint)
87 for s in [self.md5sum, self.size,
88 self.permission_hint()] + self.hints
92 if self.size is not None:
93 return "%s+%i" % (self.md5sum, self.size)
97 def _make_hex_prop(name, length):
98 # Build and return a new property with the given name that
99 # must be a hex string of the given length.
100 data_name = '_{}'.format(name)
102 return getattr(self, data_name)
103 def setter(self, hex_str):
104 if not arvados.util.is_hex(hex_str, length):
105 raise ValueError("{} is not a {}-digit hex string: {!r}".
106 format(name, length, hex_str))
107 setattr(self, data_name, hex_str)
108 return property(getter, setter)
110 md5sum = _make_hex_prop('md5sum', 32)
111 perm_sig = _make_hex_prop('perm_sig', 40)
114 def perm_expiry(self):
115 return self._perm_expiry
118 def perm_expiry(self, value):
119 if not arvados.util.is_hex(value, 1, 8):
121 "permission timestamp must be a hex Unix timestamp: {}".
123 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
125 def permission_hint(self):
126 data = [self.perm_sig, self.perm_expiry]
129 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
130 return "A{}@{:08x}".format(*data)
132 def parse_permission_hint(self, s):
134 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
136 raise ValueError("bad permission hint {}".format(s))
138 def permission_expired(self, as_of_dt=None):
139 if self.perm_expiry is None:
141 elif as_of_dt is None:
142 as_of_dt = datetime.datetime.now()
143 return self.perm_expiry <= as_of_dt
147 """Simple interface to a global KeepClient object.
149 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
150 own API client. The global KeepClient will build an API client from the
151 current Arvados configuration, which may not match the one you built.
156 def global_client_object(cls):
157 global global_client_object
158 # Previously, KeepClient would change its behavior at runtime based
159 # on these configuration settings. We simulate that behavior here
160 # by checking the values and returning a new KeepClient if any of
162 key = (config.get('ARVADOS_API_HOST'),
163 config.get('ARVADOS_API_TOKEN'),
164 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
165 config.get('ARVADOS_KEEP_PROXY'),
166 os.environ.get('KEEP_LOCAL_STORE'))
167 if (global_client_object is None) or (cls._last_key != key):
168 global_client_object = KeepClient()
170 return global_client_object
173 def get(locator, **kwargs):
174 return Keep.global_client_object().get(locator, **kwargs)
177 def put(data, **kwargs):
178 return Keep.global_client_object().put(data, **kwargs)
180 class KeepBlockCache(object):
181 def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
182 self.cache_max = cache_max
184 self._cache_lock = threading.Lock()
185 self._max_slots = max_slots
186 self._disk_cache = disk_cache
187 self._disk_cache_dir = disk_cache_dir
189 if self._disk_cache and self._disk_cache_dir is None:
190 self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
191 os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
193 if self._max_slots == 0:
195 # default max slots to half of maximum file handles
196 # NOFILE typically defaults to 1024 on Linux so this
198 self._max_slots = resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 2
201 self._max_slots = 512
203 if self.cache_max == 0:
205 fs = os.statvfs(self._disk_cache_dir)
206 # Calculation of available space incorporates existing cache usage
207 existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
208 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
209 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
211 # 10% of total disk size
212 # 25% of available space
214 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
217 self.cache_max = (256 * 1024 * 1024)
219 self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
222 self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
226 class CacheSlot(object):
227 __slots__ = ("locator", "ready", "content")
229 def __init__(self, locator):
230 self.locator = locator
231 self.ready = threading.Event()
238 def set(self, value):
243 if self.content is None:
246 return len(self.content)
252 '''Cap the cache size to self.cache_max'''
253 with self._cache_lock:
254 # Select all slots except those where ready.is_set() and content is
255 # None (that means there was an error reading the block).
256 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
257 sm = sum([slot.size() for slot in self._cache])
258 while len(self._cache) > 0 and (sm > self.cache_max or len(self._cache) > self._max_slots):
259 for i in range(len(self._cache)-1, -1, -1):
260 # start from the back, find a slot that is a candidate to evict
261 if self._cache[i].ready.is_set():
262 sz = self._cache[i].size()
264 # If evict returns false it means the
265 # underlying disk cache couldn't lock the file
266 # for deletion because another process was using
267 # it. Don't count it as reducing the amount
268 # of data in the cache, find something else to
270 if self._cache[i].evict():
273 # either way we forget about it. either the
274 # other process will delete it, or if we need
275 # it again and it is still there, we'll find
280 def _get(self, locator):
281 # Test if the locator is already in the cache
282 for i in range(0, len(self._cache)):
283 if self._cache[i].locator == locator:
286 # move it to the front
288 self._cache.insert(0, n)
291 # see if it exists on disk
292 n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
294 self._cache.insert(0, n)
298 def get(self, locator):
299 with self._cache_lock:
300 return self._get(locator)
302 def reserve_cache(self, locator):
303 '''Reserve a cache slot for the specified locator,
304 or return the existing slot.'''
305 with self._cache_lock:
306 n = self._get(locator)
310 # Add a new cache slot for the locator
312 n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
314 n = KeepBlockCache.CacheSlot(locator)
315 self._cache.insert(0, n)
318 def set(self, slot, blob):
323 if e.errno == errno.ENOMEM:
324 # Reduce max slots to current - 4, cap cache and retry
325 with self._cache_lock:
326 self._max_slots = max(4, len(self._cache) - 4)
327 elif e.errno == errno.ENOSPC:
328 # Reduce disk max space to current - 256 MiB, cap cache and retry
329 with self._cache_lock:
330 sm = sum([st.size() for st in self._cache])
331 self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
332 elif e.errno == errno.ENODEV:
333 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
334 except Exception as e:
337 # Check if we should evict things from the cache. Either
338 # because we added a new thing or there was an error and
339 # we possibly adjusted the limits down, so we might need
340 # to push something out.
344 # Only gets here if there was an error the first time. The
345 # exception handler adjusts limits downward in some cases
346 # to free up resources, which would make the operation
349 except Exception as e:
350 # It failed again. Give up.
352 raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
356 class Counter(object):
357 def __init__(self, v=0):
358 self._lk = threading.Lock()
370 class KeepClient(object):
372 # Default Keep server connection timeout: 2 seconds
373 # Default Keep server read timeout: 256 seconds
374 # Default Keep server bandwidth minimum: 32768 bytes per second
375 # Default Keep proxy connection timeout: 20 seconds
376 # Default Keep proxy read timeout: 256 seconds
377 # Default Keep proxy bandwidth minimum: 32768 bytes per second
378 DEFAULT_TIMEOUT = (2, 256, 32768)
379 DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
382 class KeepService(object):
383 """Make requests to a single Keep service, and track results.
385 A KeepService is intended to last long enough to perform one
386 transaction (GET or PUT) against one Keep service. This can
387 involve calling either get() or put() multiple times in order
388 to retry after transient failures. However, calling both get()
389 and put() on a single instance -- or using the same instance
390 to access two different Keep services -- will not produce
397 arvados.errors.HttpError,
400 def __init__(self, root, user_agent_pool=queue.LifoQueue(),
402 download_counter=None,
406 self._user_agent_pool = user_agent_pool
407 self._result = {'error': None}
411 self.get_headers = {'Accept': 'application/octet-stream'}
412 self.get_headers.update(headers)
413 self.put_headers = headers
414 self.upload_counter = upload_counter
415 self.download_counter = download_counter
416 self.insecure = insecure
419 """Is it worth attempting a request?"""
423 """Did the request succeed or encounter permanent failure?"""
424 return self._result['error'] == False or not self._usable
426 def last_result(self):
429 def _get_user_agent(self):
431 return self._user_agent_pool.get(block=False)
435 def _put_user_agent(self, ua):
438 self._user_agent_pool.put(ua, block=False)
442 def _socket_open(self, *args, **kwargs):
443 if len(args) + len(kwargs) == 2:
444 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
446 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
448 def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
449 return self._socket_open_pycurl_7_21_5(
451 address=collections.namedtuple(
452 'Address', ['family', 'socktype', 'protocol', 'addr'],
453 )(family, socktype, protocol, address))
455 def _socket_open_pycurl_7_21_5(self, purpose, address):
456 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
457 s = socket.socket(address.family, address.socktype, address.protocol)
458 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
459 # Will throw invalid protocol error on mac. This test prevents that.
460 if hasattr(socket, 'TCP_KEEPIDLE'):
461 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
462 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
466 def get(self, locator, method="GET", timeout=None):
467 # locator is a KeepLocator object.
468 url = self.root + str(locator)
469 _logger.debug("Request: %s %s", method, url)
470 curl = self._get_user_agent()
473 with timer.Timer() as t:
475 response_body = BytesIO()
476 curl.setopt(pycurl.NOSIGNAL, 1)
477 curl.setopt(pycurl.OPENSOCKETFUNCTION,
478 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
479 curl.setopt(pycurl.URL, url.encode('utf-8'))
480 curl.setopt(pycurl.HTTPHEADER, [
481 '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
482 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
483 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
485 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
486 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
488 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
490 curl.setopt(pycurl.NOBODY, True)
491 self._setcurltimeouts(curl, timeout, method=="HEAD")
495 except Exception as e:
496 raise arvados.errors.HttpError(0, str(e))
502 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
503 'body': response_body.getvalue(),
504 'headers': self._headers,
508 ok = retry.check_http_response_success(self._result['status_code'])
510 self._result['error'] = arvados.errors.HttpError(
511 self._result['status_code'],
512 self._headers.get('x-status-line', 'Error'))
513 except self.HTTP_ERRORS as e:
517 self._usable = ok != False
518 if self._result.get('status_code', None):
519 # The client worked well enough to get an HTTP status
520 # code, so presumably any problems are just on the
521 # server side and it's OK to reuse the client.
522 self._put_user_agent(curl)
524 # Don't return this client to the pool, in case it's
528 _logger.debug("Request fail: GET %s => %s: %s",
529 url, type(self._result['error']), str(self._result['error']))
532 _logger.info("HEAD %s: %s bytes",
533 self._result['status_code'],
534 self._result.get('content-length'))
535 if self._result['headers'].get('x-keep-locator'):
536 # This is a response to a remote block copy request, return
537 # the local copy block locator.
538 return self._result['headers'].get('x-keep-locator')
541 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
542 self._result['status_code'],
543 len(self._result['body']),
545 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
547 if self.download_counter:
548 self.download_counter.add(len(self._result['body']))
549 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
550 if resp_md5 != locator.md5sum:
551 _logger.warning("Checksum fail: md5(%s) = %s",
553 self._result['error'] = arvados.errors.HttpError(
556 return self._result['body']
558 def put(self, hash_s, body, timeout=None, headers={}):
559 put_headers = copy.copy(self.put_headers)
560 put_headers.update(headers)
561 url = self.root + hash_s
562 _logger.debug("Request: PUT %s", url)
563 curl = self._get_user_agent()
566 with timer.Timer() as t:
568 body_reader = BytesIO(body)
569 response_body = BytesIO()
570 curl.setopt(pycurl.NOSIGNAL, 1)
571 curl.setopt(pycurl.OPENSOCKETFUNCTION,
572 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
573 curl.setopt(pycurl.URL, url.encode('utf-8'))
574 # Using UPLOAD tells cURL to wait for a "go ahead" from the
575 # Keep server (in the form of a HTTP/1.1 "100 Continue"
576 # response) instead of sending the request body immediately.
577 # This allows the server to reject the request if the request
578 # is invalid or the server is read-only, without waiting for
579 # the client to send the entire block.
580 curl.setopt(pycurl.UPLOAD, True)
581 curl.setopt(pycurl.INFILESIZE, len(body))
582 curl.setopt(pycurl.READFUNCTION, body_reader.read)
583 curl.setopt(pycurl.HTTPHEADER, [
584 '{}: {}'.format(k,v) for k,v in put_headers.items()])
585 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
586 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
588 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
589 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
591 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
592 self._setcurltimeouts(curl, timeout)
595 except Exception as e:
596 raise arvados.errors.HttpError(0, str(e))
602 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
603 'body': response_body.getvalue().decode('utf-8'),
604 'headers': self._headers,
607 ok = retry.check_http_response_success(self._result['status_code'])
609 self._result['error'] = arvados.errors.HttpError(
610 self._result['status_code'],
611 self._headers.get('x-status-line', 'Error'))
612 except self.HTTP_ERRORS as e:
616 self._usable = ok != False # still usable if ok is True or None
617 if self._result.get('status_code', None):
618 # Client is functional. See comment in get().
619 self._put_user_agent(curl)
623 _logger.debug("Request fail: PUT %s => %s: %s",
624 url, type(self._result['error']), str(self._result['error']))
626 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
627 self._result['status_code'],
630 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
631 if self.upload_counter:
632 self.upload_counter.add(len(body))
635 def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
638 elif isinstance(timeouts, tuple):
639 if len(timeouts) == 2:
640 conn_t, xfer_t = timeouts
641 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
643 conn_t, xfer_t, bandwidth_bps = timeouts
645 conn_t, xfer_t = (timeouts, timeouts)
646 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
647 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
648 if not ignore_bandwidth:
649 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
650 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
652 def _headerfunction(self, header_line):
653 if isinstance(header_line, bytes):
654 header_line = header_line.decode('iso-8859-1')
655 if ':' in header_line:
656 name, value = header_line.split(':', 1)
657 name = name.strip().lower()
658 value = value.strip()
660 name = self._lastheadername
661 value = self._headers[name] + ' ' + header_line.strip()
662 elif header_line.startswith('HTTP/'):
663 name = 'x-status-line'
666 _logger.error("Unexpected header line: %s", header_line)
668 self._lastheadername = name
669 self._headers[name] = value
670 # Returning None implies all bytes were written
673 class KeepWriterQueue(queue.Queue):
674 def __init__(self, copies, classes=[]):
675 queue.Queue.__init__(self) # Old-style superclass
676 self.wanted_copies = copies
677 self.wanted_storage_classes = classes
678 self.successful_copies = 0
679 self.confirmed_storage_classes = {}
681 self.storage_classes_tracking = True
682 self.queue_data_lock = threading.RLock()
683 self.pending_tries = max(copies, len(classes))
684 self.pending_tries_notification = threading.Condition()
686 def write_success(self, response, replicas_nr, classes_confirmed):
687 with self.queue_data_lock:
688 self.successful_copies += replicas_nr
689 if classes_confirmed is None:
690 self.storage_classes_tracking = False
691 elif self.storage_classes_tracking:
692 for st_class, st_copies in classes_confirmed.items():
694 self.confirmed_storage_classes[st_class] += st_copies
696 self.confirmed_storage_classes[st_class] = st_copies
697 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
698 self.response = response
699 with self.pending_tries_notification:
700 self.pending_tries_notification.notify_all()
702 def write_fail(self, ks):
703 with self.pending_tries_notification:
704 self.pending_tries += 1
705 self.pending_tries_notification.notify()
707 def pending_copies(self):
708 with self.queue_data_lock:
709 return self.wanted_copies - self.successful_copies
711 def satisfied_classes(self):
712 with self.queue_data_lock:
713 if not self.storage_classes_tracking:
714 # Notifies disabled storage classes expectation to
717 return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
719 def pending_classes(self):
720 with self.queue_data_lock:
721 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
723 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
724 for st_class, st_copies in self.confirmed_storage_classes.items():
725 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
726 unsatisfied_classes.remove(st_class)
727 return unsatisfied_classes
729 def get_next_task(self):
730 with self.pending_tries_notification:
732 if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
733 # This notify_all() is unnecessary --
734 # write_success() already called notify_all()
735 # when pending<1 became true, so it's not
736 # possible for any other thread to be in
737 # wait() now -- but it's cheap insurance
738 # against deadlock so we do it anyway:
739 self.pending_tries_notification.notify_all()
740 # Drain the queue and then raise Queue.Empty
744 elif self.pending_tries > 0:
745 service, service_root = self.get_nowait()
746 if service.finished():
749 self.pending_tries -= 1
750 return service, service_root
752 self.pending_tries_notification.notify_all()
755 self.pending_tries_notification.wait()
758 class KeepWriterThreadPool(object):
759 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
760 self.total_task_nr = 0
761 if (not max_service_replicas) or (max_service_replicas >= copies):
764 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
765 _logger.debug("Pool max threads is %d", num_threads)
767 self.queue = KeepClient.KeepWriterQueue(copies, classes)
769 for _ in range(num_threads):
770 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
771 self.workers.append(w)
773 def add_task(self, ks, service_root):
774 self.queue.put((ks, service_root))
775 self.total_task_nr += 1
778 return self.queue.successful_copies, self.queue.satisfied_classes()
782 for worker in self.workers:
784 # Wait for finished work
788 return self.queue.response
791 class KeepWriterThread(threading.Thread):
792 class TaskFailed(RuntimeError): pass
794 def __init__(self, queue, data, data_hash, timeout=None):
795 super(KeepClient.KeepWriterThread, self).__init__()
796 self.timeout = timeout
799 self.data_hash = data_hash
805 service, service_root = self.queue.get_next_task()
809 locator, copies, classes = self.do_task(service, service_root)
810 except Exception as e:
811 if not isinstance(e, self.TaskFailed):
812 _logger.exception("Exception in KeepWriterThread")
813 self.queue.write_fail(service)
815 self.queue.write_success(locator, copies, classes)
817 self.queue.task_done()
819 def do_task(self, service, service_root):
820 classes = self.queue.pending_classes()
824 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
825 success = bool(service.put(self.data_hash,
827 timeout=self.timeout,
829 result = service.last_result()
832 if result.get('status_code'):
833 _logger.debug("Request fail: PUT %s => %s %s",
835 result.get('status_code'),
837 raise self.TaskFailed()
839 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
840 str(threading.current_thread()),
845 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
846 except (KeyError, ValueError):
849 classes_confirmed = {}
851 scch = result['headers']['x-keep-storage-classes-confirmed']
852 for confirmation in scch.replace(' ', '').split(','):
853 if '=' in confirmation:
854 stored_class, stored_copies = confirmation.split('=')[:2]
855 classes_confirmed[stored_class] = int(stored_copies)
856 except (KeyError, ValueError):
857 # Storage classes confirmed header missing or corrupt
858 classes_confirmed = None
860 return result['body'].strip(), replicas_stored, classes_confirmed
863 def __init__(self, api_client=None, proxy=None,
864 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
865 api_token=None, local_store=None, block_cache=None,
866 num_retries=0, session=None):
867 """Initialize a new KeepClient.
871 The API client to use to find Keep services. If not
872 provided, KeepClient will build one from available Arvados
876 If specified, this KeepClient will send requests to this Keep
877 proxy. Otherwise, KeepClient will fall back to the setting of the
878 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
879 If you want to KeepClient does not use a proxy, pass in an empty
883 The initial timeout (in seconds) for HTTP requests to Keep
884 non-proxy servers. A tuple of three floats is interpreted as
885 (connection_timeout, read_timeout, minimum_bandwidth). A connection
886 will be aborted if the average traffic rate falls below
887 minimum_bandwidth bytes per second over an interval of read_timeout
888 seconds. Because timeouts are often a result of transient server
889 load, the actual connection timeout will be increased by a factor
890 of two on each retry.
891 Default: (2, 256, 32768).
894 The initial timeout (in seconds) for HTTP requests to
895 Keep proxies. A tuple of three floats is interpreted as
896 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
897 described above for adjusting connection timeouts on retry also
899 Default: (20, 256, 32768).
902 If you're not using an API client, but only talking
903 directly to a Keep proxy, this parameter specifies an API token
904 to authenticate Keep requests. It is an error to specify both
905 api_client and api_token. If you specify neither, KeepClient
906 will use one available from the Arvados configuration.
909 If specified, this KeepClient will bypass Keep
910 services, and save data to the named directory. If unspecified,
911 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
912 environment variable. If you want to ensure KeepClient does not
913 use local storage, pass in an empty string. This is primarily
914 intended to mock a server for testing.
917 The default number of times to retry failed requests.
918 This will be used as the default num_retries value when get() and
919 put() are called. Default 0.
921 self.lock = threading.Lock()
923 if config.get('ARVADOS_KEEP_SERVICES'):
924 proxy = config.get('ARVADOS_KEEP_SERVICES')
926 proxy = config.get('ARVADOS_KEEP_PROXY')
927 if api_token is None:
928 if api_client is None:
929 api_token = config.get('ARVADOS_API_TOKEN')
931 api_token = api_client.api_token
932 elif api_client is not None:
934 "can't build KeepClient with both API client and token")
935 if local_store is None:
936 local_store = os.environ.get('KEEP_LOCAL_STORE')
938 if api_client is None:
939 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
941 self.insecure = api_client.insecure
943 self.block_cache = block_cache if block_cache else KeepBlockCache()
944 self.timeout = timeout
945 self.proxy_timeout = proxy_timeout
946 self._user_agent_pool = queue.LifoQueue()
947 self.upload_counter = Counter()
948 self.download_counter = Counter()
949 self.put_counter = Counter()
950 self.get_counter = Counter()
951 self.hits_counter = Counter()
952 self.misses_counter = Counter()
953 self._storage_classes_unsupported_warning = False
954 self._default_classes = []
957 self.local_store = local_store
958 self.head = self.local_store_head
959 self.get = self.local_store_get
960 self.put = self.local_store_put
962 self.num_retries = num_retries
963 self.max_replicas_per_service = None
965 proxy_uris = proxy.split()
966 for i in range(len(proxy_uris)):
967 if not proxy_uris[i].endswith('/'):
970 url = urllib.parse.urlparse(proxy_uris[i])
971 if not (url.scheme and url.netloc):
972 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
973 self.api_token = api_token
974 self._gateway_services = {}
975 self._keep_services = [{
976 'uuid': "00000-bi6l4-%015d" % idx,
977 'service_type': 'proxy',
978 '_service_root': uri,
979 } for idx, uri in enumerate(proxy_uris)]
980 self._writable_services = self._keep_services
981 self.using_proxy = True
982 self._static_services_list = True
984 # It's important to avoid instantiating an API client
985 # unless we actually need one, for testing's sake.
986 if api_client is None:
987 api_client = arvados.api('v1')
988 self.api_client = api_client
989 self.api_token = api_client.api_token
990 self._gateway_services = {}
991 self._keep_services = None
992 self._writable_services = None
993 self.using_proxy = None
994 self._static_services_list = False
996 self._default_classes = [
997 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
999 # We're talking to an old cluster
1002 def current_timeout(self, attempt_number):
1003 """Return the appropriate timeout to use for this client.
1005 The proxy timeout setting if the backend service is currently a proxy,
1006 the regular timeout setting otherwise. The `attempt_number` indicates
1007 how many times the operation has been tried already (starting from 0
1008 for the first try), and scales the connection timeout portion of the
1009 return value accordingly.
1012 # TODO(twp): the timeout should be a property of a
1013 # KeepService, not a KeepClient. See #4488.
1014 t = self.proxy_timeout if self.using_proxy else self.timeout
1016 return (t[0] * (1 << attempt_number), t[1])
1018 return (t[0] * (1 << attempt_number), t[1], t[2])
1019 def _any_nondisk_services(self, service_list):
1020 return any(ks.get('service_type', 'disk') != 'disk'
1021 for ks in service_list)
1023 def build_services_list(self, force_rebuild=False):
1024 if (self._static_services_list or
1025 (self._keep_services and not force_rebuild)):
1029 keep_services = self.api_client.keep_services().accessible()
1030 except Exception: # API server predates Keep services.
1031 keep_services = self.api_client.keep_disks().list()
1033 # Gateway services are only used when specified by UUID,
1034 # so there's nothing to gain by filtering them by
1036 self._gateway_services = {ks['uuid']: ks for ks in
1037 keep_services.execute()['items']}
1038 if not self._gateway_services:
1039 raise arvados.errors.NoKeepServersError()
1041 # Precompute the base URI for each service.
1042 for r in self._gateway_services.values():
1043 host = r['service_host']
1044 if not host.startswith('[') and host.find(':') >= 0:
1045 # IPv6 URIs must be formatted like http://[::1]:80/...
1046 host = '[' + host + ']'
1047 r['_service_root'] = "{}://{}:{:d}/".format(
1048 'https' if r['service_ssl_flag'] else 'http',
1052 _logger.debug(str(self._gateway_services))
1053 self._keep_services = [
1054 ks for ks in self._gateway_services.values()
1055 if not ks.get('service_type', '').startswith('gateway:')]
1056 self._writable_services = [ks for ks in self._keep_services
1057 if not ks.get('read_only')]
1059 # For disk type services, max_replicas_per_service is 1
1060 # It is unknown (unlimited) for other service types.
1061 if self._any_nondisk_services(self._writable_services):
1062 self.max_replicas_per_service = None
1064 self.max_replicas_per_service = 1
1066 def _service_weight(self, data_hash, service_uuid):
1067 """Compute the weight of a Keep service endpoint for a data
1068 block with a known hash.
1070 The weight is md5(h + u) where u is the last 15 characters of
1071 the service endpoint's UUID.
1073 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1075 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1076 """Return an array of Keep service endpoints, in the order in
1077 which they should be probed when reading or writing data with
1078 the given hash+hints.
1080 self.build_services_list(force_rebuild)
1083 # Use the services indicated by the given +K@... remote
1084 # service hints, if any are present and can be resolved to a
1086 for hint in locator.hints:
1087 if hint.startswith('K@'):
1089 sorted_roots.append(
1090 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1091 elif len(hint) == 29:
1092 svc = self._gateway_services.get(hint[2:])
1094 sorted_roots.append(svc['_service_root'])
1096 # Sort the available local services by weight (heaviest first)
1097 # for this locator, and return their service_roots (base URIs)
1099 use_services = self._keep_services
1101 use_services = self._writable_services
1102 self.using_proxy = self._any_nondisk_services(use_services)
1103 sorted_roots.extend([
1104 svc['_service_root'] for svc in sorted(
1107 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1108 _logger.debug("{}: {}".format(locator, sorted_roots))
1111 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1112 # roots_map is a dictionary, mapping Keep service root strings
1113 # to KeepService objects. Poll for Keep services, and add any
1114 # new ones to roots_map. Return the current list of local
1116 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1117 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1118 for root in local_roots:
1119 if root not in roots_map:
1120 roots_map[root] = self.KeepService(
1121 root, self._user_agent_pool,
1122 upload_counter=self.upload_counter,
1123 download_counter=self.download_counter,
1125 insecure=self.insecure)
1129 def _check_loop_result(result):
1130 # KeepClient RetryLoops should save results as a 2-tuple: the
1131 # actual result of the request, and the number of servers available
1132 # to receive the request this round.
1133 # This method returns True if there's a real result, False if
1134 # there are no more servers available, otherwise None.
1135 if isinstance(result, Exception):
1137 result, tried_server_count = result
1138 if (result is not None) and (result is not False):
1140 elif tried_server_count < 1:
1141 _logger.info("No more Keep services to try; giving up")
1146 def get_from_cache(self, loc_s):
1147 """Fetch a block only if is in the cache, otherwise return None."""
1148 locator = KeepLocator(loc_s)
1149 slot = self.block_cache.get(locator.md5sum)
1150 if slot is not None and slot.ready.is_set():
1155 def refresh_signature(self, loc):
1156 """Ask Keep to get the remote block and return its local signature"""
1157 now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1158 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1161 def head(self, loc_s, **kwargs):
1162 return self._get_or_head(loc_s, method="HEAD", **kwargs)
1165 def get(self, loc_s, **kwargs):
1166 return self._get_or_head(loc_s, method="GET", **kwargs)
1168 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1169 """Get data from Keep.
1171 This method fetches one or more blocks of data from Keep. It
1172 sends a request each Keep service registered with the API
1173 server (or the proxy provided when this client was
1174 instantiated), then each service named in location hints, in
1175 sequence. As soon as one service provides the data, it's
1179 * loc_s: A string of one or more comma-separated locators to fetch.
1180 This method returns the concatenation of these blocks.
1181 * num_retries: The number of times to retry GET requests to
1182 *each* Keep server if it returns temporary failures, with
1183 exponential backoff. Note that, in each loop, the method may try
1184 to fetch data from every available Keep service, along with any
1185 that are named in location hints in the locator. The default value
1186 is set when the KeepClient is initialized.
1189 return ''.join(self.get(x) for x in loc_s.split(','))
1191 self.get_counter.add(1)
1193 request_id = (request_id or
1194 (hasattr(self, 'api_client') and self.api_client.request_id) or
1195 arvados.util.new_request_id())
1198 headers['X-Request-Id'] = request_id
1203 locator = KeepLocator(loc_s)
1205 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1208 # this is request for a prefetch, if it is
1209 # already in flight, return immediately.
1210 # clear 'slot' to prevent finally block from
1211 # calling slot.set()
1214 self.hits_counter.add(1)
1217 raise arvados.errors.KeepReadError(
1218 "failed to read {}".format(loc_s))
1221 self.misses_counter.add(1)
1223 # If the locator has hints specifying a prefix (indicating a
1224 # remote keepproxy) or the UUID of a local gateway service,
1225 # read data from the indicated service(s) instead of the usual
1226 # list of local disk services.
1227 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1228 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1229 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1230 for hint in locator.hints if (
1231 hint.startswith('K@') and
1233 self._gateway_services.get(hint[2:])
1235 # Map root URLs to their KeepService objects.
1237 root: self.KeepService(root, self._user_agent_pool,
1238 upload_counter=self.upload_counter,
1239 download_counter=self.download_counter,
1241 insecure=self.insecure)
1242 for root in hint_roots
1245 # See #3147 for a discussion of the loop implementation. Highlights:
1246 # * Refresh the list of Keep services after each failure, in case
1247 # it's being updated.
1248 # * Retry until we succeed, we're out of retries, or every available
1249 # service has returned permanent failure.
1252 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1254 for tries_left in loop:
1256 sorted_roots = self.map_new_services(
1258 force_rebuild=(tries_left < num_retries),
1259 need_writable=False,
1261 except Exception as error:
1262 loop.save_result(error)
1265 # Query KeepService objects that haven't returned
1266 # permanent failure, in our specified shuffle order.
1267 services_to_try = [roots_map[root]
1268 for root in sorted_roots
1269 if roots_map[root].usable()]
1270 for keep_service in services_to_try:
1271 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1272 if blob is not None:
1274 loop.save_result((blob, len(services_to_try)))
1276 # Always cache the result, then return it if we succeeded.
1280 if slot is not None:
1281 self.block_cache.set(slot, blob)
1283 # Q: Including 403 is necessary for the Keep tests to continue
1284 # passing, but maybe they should expect KeepReadError instead?
1285 not_founds = sum(1 for key in sorted_roots
1286 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1287 service_errors = ((key, roots_map[key].last_result()['error'])
1288 for key in sorted_roots)
1290 raise arvados.errors.KeepReadError(
1291 "[{}] failed to read {}: no Keep services available ({})".format(
1292 request_id, loc_s, loop.last_result()))
1293 elif not_founds == len(sorted_roots):
1294 raise arvados.errors.NotFoundError(
1295 "[{}] {} not found".format(request_id, loc_s), service_errors)
1297 raise arvados.errors.KeepReadError(
1298 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1301 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1302 """Save data in Keep.
1304 This method will get a list of Keep services from the API server, and
1305 send the data to each one simultaneously in a new thread. Once the
1306 uploads are finished, if enough copies are saved, this method returns
1307 the most recent HTTP response body. If requests fail to upload
1308 enough copies, this method raises KeepWriteError.
1311 * data: The string of data to upload.
1312 * copies: The number of copies that the user requires be saved.
1314 * num_retries: The number of times to retry PUT requests to
1315 *each* Keep server if it returns temporary failures, with
1316 exponential backoff. The default value is set when the
1317 KeepClient is initialized.
1318 * classes: An optional list of storage class names where copies should
1322 classes = classes or self._default_classes
1324 if not isinstance(data, bytes):
1325 data = data.encode()
1327 self.put_counter.add(1)
1329 data_hash = hashlib.md5(data).hexdigest()
1330 loc_s = data_hash + '+' + str(len(data))
1333 locator = KeepLocator(loc_s)
1335 request_id = (request_id or
1336 (hasattr(self, 'api_client') and self.api_client.request_id) or
1337 arvados.util.new_request_id())
1339 'X-Request-Id': request_id,
1340 'X-Keep-Desired-Replicas': str(copies),
1343 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1347 for tries_left in loop:
1349 sorted_roots = self.map_new_services(
1351 force_rebuild=(tries_left < num_retries),
1354 except Exception as error:
1355 loop.save_result(error)
1358 pending_classes = []
1359 if done_classes is not None:
1360 pending_classes = list(set(classes) - set(done_classes))
1361 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1362 data_hash=data_hash,
1363 copies=copies - done_copies,
1364 max_service_replicas=self.max_replicas_per_service,
1365 timeout=self.current_timeout(num_retries - tries_left),
1366 classes=pending_classes)
1367 for service_root, ks in [(root, roots_map[root])
1368 for root in sorted_roots]:
1371 writer_pool.add_task(ks, service_root)
1373 pool_copies, pool_classes = writer_pool.done()
1374 done_copies += pool_copies
1375 if (done_classes is not None) and (pool_classes is not None):
1376 done_classes += pool_classes
1378 (done_copies >= copies and set(done_classes) == set(classes),
1379 writer_pool.total_task_nr))
1381 # Old keepstore contacted without storage classes support:
1382 # success is determined only by successful copies.
1384 # Disable storage classes tracking from this point forward.
1385 if not self._storage_classes_unsupported_warning:
1386 self._storage_classes_unsupported_warning = True
1387 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1390 (done_copies >= copies, writer_pool.total_task_nr))
1393 return writer_pool.response()
1395 raise arvados.errors.KeepWriteError(
1396 "[{}] failed to write {}: no Keep services available ({})".format(
1397 request_id, data_hash, loop.last_result()))
1399 service_errors = ((key, roots_map[key].last_result()['error'])
1400 for key in sorted_roots
1401 if roots_map[key].last_result()['error'])
1402 raise arvados.errors.KeepWriteError(
1403 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1404 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1406 def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1407 """A stub for put().
1409 This method is used in place of the real put() method when
1410 using local storage (see constructor's local_store argument).
1412 copies and num_retries arguments are ignored: they are here
1413 only for the sake of offering the same call signature as
1416 Data stored this way can be retrieved via local_store_get().
1418 md5 = hashlib.md5(data).hexdigest()
1419 locator = '%s+%d' % (md5, len(data))
1420 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1422 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1423 os.path.join(self.local_store, md5))
1426 def local_store_get(self, loc_s, num_retries=None):
1427 """Companion to local_store_put()."""
1429 locator = KeepLocator(loc_s)
1431 raise arvados.errors.NotFoundError(
1432 "Invalid data locator: '%s'" % loc_s)
1433 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1435 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1438 def local_store_head(self, loc_s, num_retries=None):
1439 """Companion to local_store_put()."""
1441 locator = KeepLocator(loc_s)
1443 raise arvados.errors.NotFoundError(
1444 "Invalid data locator: '%s'" % loc_s)
1445 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1447 if os.path.exists(os.path.join(self.local_store, locator.md5sum)):