16 import arvados.config as config
18 import arvados.retry as retry
21 _logger = logging.getLogger('arvados.keep')
22 global_client_object = None
25 class KeepLocator(object):
26 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
27 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
29 def __init__(self, locator_str):
32 self._perm_expiry = None
33 pieces = iter(locator_str.split('+'))
34 self.md5sum = next(pieces)
36 self.size = int(next(pieces))
40 if self.HINT_RE.match(hint) is None:
41 raise ValueError("invalid hint format: {}".format(hint))
42 elif hint.startswith('A'):
43 self.parse_permission_hint(hint)
45 self.hints.append(hint)
49 str(s) for s in [self.md5sum, self.size,
50 self.permission_hint()] + self.hints
54 if self.size is not None:
55 return "%s+%i" % (self.md5sum, self.size)
59 def _make_hex_prop(name, length):
60 # Build and return a new property with the given name that
61 # must be a hex string of the given length.
62 data_name = '_{}'.format(name)
64 return getattr(self, data_name)
65 def setter(self, hex_str):
66 if not arvados.util.is_hex(hex_str, length):
67 raise ValueError("{} is not a {}-digit hex string: {}".
68 format(name, length, hex_str))
69 setattr(self, data_name, hex_str)
70 return property(getter, setter)
72 md5sum = _make_hex_prop('md5sum', 32)
73 perm_sig = _make_hex_prop('perm_sig', 40)
76 def perm_expiry(self):
77 return self._perm_expiry
80 def perm_expiry(self, value):
81 if not arvados.util.is_hex(value, 1, 8):
83 "permission timestamp must be a hex Unix timestamp: {}".
85 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
87 def permission_hint(self):
88 data = [self.perm_sig, self.perm_expiry]
91 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
92 return "A{}@{:08x}".format(*data)
94 def parse_permission_hint(self, s):
96 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
98 raise ValueError("bad permission hint {}".format(s))
100 def permission_expired(self, as_of_dt=None):
101 if self.perm_expiry is None:
103 elif as_of_dt is None:
104 as_of_dt = datetime.datetime.now()
105 return self.perm_expiry <= as_of_dt
109 """Simple interface to a global KeepClient object.
111 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
112 own API client. The global KeepClient will build an API client from the
113 current Arvados configuration, which may not match the one you built.
118 def global_client_object(cls):
119 global global_client_object
120 # Previously, KeepClient would change its behavior at runtime based
121 # on these configuration settings. We simulate that behavior here
122 # by checking the values and returning a new KeepClient if any of
124 key = (config.get('ARVADOS_API_HOST'),
125 config.get('ARVADOS_API_TOKEN'),
126 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
127 config.get('ARVADOS_KEEP_PROXY'),
128 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
129 os.environ.get('KEEP_LOCAL_STORE'))
130 if (global_client_object is None) or (cls._last_key != key):
131 global_client_object = KeepClient()
133 return global_client_object
136 def get(locator, **kwargs):
137 return Keep.global_client_object().get(locator, **kwargs)
140 def put(data, **kwargs):
141 return Keep.global_client_object().put(data, **kwargs)
143 class KeepBlockCache(object):
144 # Default RAM cache is 256MiB
145 def __init__(self, cache_max=(256 * 1024 * 1024)):
146 self.cache_max = cache_max
148 self._cache_lock = threading.Lock()
150 class CacheSlot(object):
151 __slots__ = ("locator", "ready", "content")
153 def __init__(self, locator):
154 self.locator = locator
155 self.ready = threading.Event()
162 def set(self, value):
167 if self.content is None:
170 return len(self.content)
173 '''Cap the cache size to self.cache_max'''
174 with self._cache_lock:
175 # Select all slots except those where ready.is_set() and content is
176 # None (that means there was an error reading the block).
177 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
178 sm = sum([slot.size() for slot in self._cache])
179 while len(self._cache) > 0 and sm > self.cache_max:
180 for i in xrange(len(self._cache)-1, -1, -1):
181 if self._cache[i].ready.is_set():
184 sm = sum([slot.size() for slot in self._cache])
186 def _get(self, locator):
187 # Test if the locator is already in the cache
188 for i in xrange(0, len(self._cache)):
189 if self._cache[i].locator == locator:
192 # move it to the front
194 self._cache.insert(0, n)
198 def get(self, locator):
199 with self._cache_lock:
200 return self._get(locator)
202 def reserve_cache(self, locator):
203 '''Reserve a cache slot for the specified locator,
204 or return the existing slot.'''
205 with self._cache_lock:
206 n = self._get(locator)
210 # Add a new cache slot for the locator
211 n = KeepBlockCache.CacheSlot(locator)
212 self._cache.insert(0, n)
215 class Counter(object):
216 def __init__(self, v=0):
217 self._lk = threading.Lock()
229 class KeepClient(object):
231 # Default Keep server connection timeout: 2 seconds
232 # Default Keep server read timeout: 256 seconds
233 # Default Keep server bandwidth minimum: 32768 bytes per second
234 # Default Keep proxy connection timeout: 20 seconds
235 # Default Keep proxy read timeout: 256 seconds
236 # Default Keep proxy bandwidth minimum: 32768 bytes per second
237 DEFAULT_TIMEOUT = (2, 256, 32768)
238 DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
241 class KeepService(object):
242 """Make requests to a single Keep service, and track results.
244 A KeepService is intended to last long enough to perform one
245 transaction (GET or PUT) against one Keep service. This can
246 involve calling either get() or put() multiple times in order
247 to retry after transient failures. However, calling both get()
248 and put() on a single instance -- or using the same instance
249 to access two different Keep services -- will not produce
256 arvados.errors.HttpError,
259 def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
261 download_counter=None, **headers):
263 self._user_agent_pool = user_agent_pool
264 self._result = {'error': None}
267 self.get_headers = {'Accept': 'application/octet-stream'}
268 self.get_headers.update(headers)
269 self.put_headers = headers
270 self.upload_counter = upload_counter
271 self.download_counter = download_counter
274 """Is it worth attempting a request?"""
278 """Did the request succeed or encounter permanent failure?"""
279 return self._result['error'] == False or not self._usable
281 def last_result(self):
284 def _get_user_agent(self):
286 return self._user_agent_pool.get(False)
290 def _put_user_agent(self, ua):
293 self._user_agent_pool.put(ua, False)
298 def _socket_open(family, socktype, protocol, address=None):
299 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
300 s = socket.socket(family, socktype, protocol)
301 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
302 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
303 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
306 def get(self, locator, method="GET", timeout=None):
307 # locator is a KeepLocator object.
308 url = self.root + str(locator)
309 _logger.debug("Request: %s %s", method, url)
310 curl = self._get_user_agent()
313 with timer.Timer() as t:
315 response_body = cStringIO.StringIO()
316 curl.setopt(pycurl.NOSIGNAL, 1)
317 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
318 curl.setopt(pycurl.URL, url.encode('utf-8'))
319 curl.setopt(pycurl.HTTPHEADER, [
320 '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
321 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
322 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
324 curl.setopt(pycurl.NOBODY, True)
325 self._setcurltimeouts(curl, timeout)
329 except Exception as e:
330 raise arvados.errors.HttpError(0, str(e))
332 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
333 'body': response_body.getvalue(),
334 'headers': self._headers,
338 ok = retry.check_http_response_success(self._result['status_code'])
340 self._result['error'] = arvados.errors.HttpError(
341 self._result['status_code'],
342 self._headers.get('x-status-line', 'Error'))
343 except self.HTTP_ERRORS as e:
347 self._usable = ok != False
348 if self._result.get('status_code', None):
349 # The client worked well enough to get an HTTP status
350 # code, so presumably any problems are just on the
351 # server side and it's OK to reuse the client.
352 self._put_user_agent(curl)
354 # Don't return this client to the pool, in case it's
358 _logger.debug("Request fail: GET %s => %s: %s",
359 url, type(self._result['error']), str(self._result['error']))
362 _logger.info("HEAD %s: %s bytes",
363 self._result['status_code'],
364 self._result.get('content-length'))
367 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
368 self._result['status_code'],
369 len(self._result['body']),
371 (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
373 if self.download_counter:
374 self.download_counter.add(len(self._result['body']))
375 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
376 if resp_md5 != locator.md5sum:
377 _logger.warning("Checksum fail: md5(%s) = %s",
379 self._result['error'] = arvados.errors.HttpError(
382 return self._result['body']
384 def put(self, hash_s, body, timeout=None):
385 url = self.root + hash_s
386 _logger.debug("Request: PUT %s", url)
387 curl = self._get_user_agent()
390 with timer.Timer() as t:
392 body_reader = cStringIO.StringIO(body)
393 response_body = cStringIO.StringIO()
394 curl.setopt(pycurl.NOSIGNAL, 1)
395 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
396 curl.setopt(pycurl.URL, url.encode('utf-8'))
397 # Using UPLOAD tells cURL to wait for a "go ahead" from the
398 # Keep server (in the form of a HTTP/1.1 "100 Continue"
399 # response) instead of sending the request body immediately.
400 # This allows the server to reject the request if the request
401 # is invalid or the server is read-only, without waiting for
402 # the client to send the entire block.
403 curl.setopt(pycurl.UPLOAD, True)
404 curl.setopt(pycurl.INFILESIZE, len(body))
405 curl.setopt(pycurl.READFUNCTION, body_reader.read)
406 curl.setopt(pycurl.HTTPHEADER, [
407 '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
408 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
409 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
410 self._setcurltimeouts(curl, timeout)
413 except Exception as e:
414 raise arvados.errors.HttpError(0, str(e))
416 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
417 'body': response_body.getvalue(),
418 'headers': self._headers,
421 ok = retry.check_http_response_success(self._result['status_code'])
423 self._result['error'] = arvados.errors.HttpError(
424 self._result['status_code'],
425 self._headers.get('x-status-line', 'Error'))
426 except self.HTTP_ERRORS as e:
430 self._usable = ok != False # still usable if ok is True or None
431 if self._result.get('status_code', None):
432 # Client is functional. See comment in get().
433 self._put_user_agent(curl)
437 _logger.debug("Request fail: PUT %s => %s: %s",
438 url, type(self._result['error']), str(self._result['error']))
440 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
441 self._result['status_code'],
444 (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
445 if self.upload_counter:
446 self.upload_counter.add(len(body))
449 def _setcurltimeouts(self, curl, timeouts):
452 elif isinstance(timeouts, tuple):
453 if len(timeouts) == 2:
454 conn_t, xfer_t = timeouts
455 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
457 conn_t, xfer_t, bandwidth_bps = timeouts
459 conn_t, xfer_t = (timeouts, timeouts)
460 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
461 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
462 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
463 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
465 def _headerfunction(self, header_line):
466 header_line = header_line.decode('iso-8859-1')
467 if ':' in header_line:
468 name, value = header_line.split(':', 1)
469 name = name.strip().lower()
470 value = value.strip()
472 name = self._lastheadername
473 value = self._headers[name] + ' ' + header_line.strip()
474 elif header_line.startswith('HTTP/'):
475 name = 'x-status-line'
478 _logger.error("Unexpected header line: %s", header_line)
480 self._lastheadername = name
481 self._headers[name] = value
482 # Returning None implies all bytes were written
485 class KeepWriterQueue(Queue.Queue):
486 def __init__(self, copies):
487 Queue.Queue.__init__(self) # Old-style superclass
488 self.wanted_copies = copies
489 self.successful_copies = 0
491 self.successful_copies_lock = threading.Lock()
492 self.pending_tries = copies
493 self.pending_tries_notification = threading.Condition()
495 def write_success(self, response, replicas_nr):
496 with self.successful_copies_lock:
497 self.successful_copies += replicas_nr
498 self.response = response
500 def write_fail(self, ks, status_code):
501 with self.pending_tries_notification:
502 self.pending_tries += 1
503 self.pending_tries_notification.notify()
505 def pending_copies(self):
506 with self.successful_copies_lock:
507 return self.wanted_copies - self.successful_copies
510 class KeepWriterThreadPool(object):
511 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
512 self.total_task_nr = 0
513 self.wanted_copies = copies
514 if (not max_service_replicas) or (max_service_replicas >= copies):
517 num_threads = int(math.ceil(float(copies) / max_service_replicas))
518 _logger.debug("Pool max threads is %d", num_threads)
520 self.queue = KeepClient.KeepWriterQueue(copies)
522 for _ in range(num_threads):
523 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
524 self.workers.append(w)
526 def add_task(self, ks, service_root):
527 self.queue.put((ks, service_root))
528 self.total_task_nr += 1
531 return self.queue.successful_copies
535 for worker in self.workers:
537 # Wait for finished work
539 with self.queue.pending_tries_notification:
540 self.queue.pending_tries_notification.notify_all()
541 for worker in self.workers:
545 return self.queue.response
548 class KeepWriterThread(threading.Thread):
549 def __init__(self, queue, data, data_hash, timeout=None):
550 super(KeepClient.KeepWriterThread, self).__init__()
551 self.timeout = timeout
554 self.data_hash = data_hash
557 while not self.queue.empty():
558 if self.queue.pending_copies() > 0:
559 # Avoid overreplication, wait for some needed re-attempt
560 with self.queue.pending_tries_notification:
561 if self.queue.pending_tries <= 0:
562 self.queue.pending_tries_notification.wait()
563 continue # try again when awake
564 self.queue.pending_tries -= 1
568 service, service_root = self.queue.get_nowait()
571 if service.finished():
572 self.queue.task_done()
574 success = bool(service.put(self.data_hash,
576 timeout=self.timeout))
577 result = service.last_result()
579 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
580 str(threading.current_thread()),
585 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
586 except (KeyError, ValueError):
589 self.queue.write_success(result['body'].strip(), replicas_stored)
591 if result.get('status_code', None):
592 _logger.debug("Request fail: PUT %s => %s %s",
594 result['status_code'],
596 self.queue.write_fail(service, result.get('status_code', None)) # Schedule a re-attempt with next service
597 # Mark as done so the queue can be join()ed
598 self.queue.task_done()
600 # Remove the task from the queue anyways
602 self.queue.get_nowait()
603 # Mark as done so the queue can be join()ed
604 self.queue.task_done()
609 def __init__(self, api_client=None, proxy=None,
610 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
611 api_token=None, local_store=None, block_cache=None,
612 num_retries=0, session=None):
613 """Initialize a new KeepClient.
617 The API client to use to find Keep services. If not
618 provided, KeepClient will build one from available Arvados
622 If specified, this KeepClient will send requests to this Keep
623 proxy. Otherwise, KeepClient will fall back to the setting of the
624 ARVADOS_KEEP_PROXY configuration setting. If you want to ensure
625 KeepClient does not use a proxy, pass in an empty string.
628 The initial timeout (in seconds) for HTTP requests to Keep
629 non-proxy servers. A tuple of three floats is interpreted as
630 (connection_timeout, read_timeout, minimum_bandwidth). A connection
631 will be aborted if the average traffic rate falls below
632 minimum_bandwidth bytes per second over an interval of read_timeout
633 seconds. Because timeouts are often a result of transient server
634 load, the actual connection timeout will be increased by a factor
635 of two on each retry.
636 Default: (2, 256, 32768).
639 The initial timeout (in seconds) for HTTP requests to
640 Keep proxies. A tuple of three floats is interpreted as
641 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
642 described above for adjusting connection timeouts on retry also
644 Default: (20, 256, 32768).
647 If you're not using an API client, but only talking
648 directly to a Keep proxy, this parameter specifies an API token
649 to authenticate Keep requests. It is an error to specify both
650 api_client and api_token. If you specify neither, KeepClient
651 will use one available from the Arvados configuration.
654 If specified, this KeepClient will bypass Keep
655 services, and save data to the named directory. If unspecified,
656 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
657 environment variable. If you want to ensure KeepClient does not
658 use local storage, pass in an empty string. This is primarily
659 intended to mock a server for testing.
662 The default number of times to retry failed requests.
663 This will be used as the default num_retries value when get() and
664 put() are called. Default 0.
666 self.lock = threading.Lock()
668 proxy = config.get('ARVADOS_KEEP_PROXY')
669 if api_token is None:
670 if api_client is None:
671 api_token = config.get('ARVADOS_API_TOKEN')
673 api_token = api_client.api_token
674 elif api_client is not None:
676 "can't build KeepClient with both API client and token")
677 if local_store is None:
678 local_store = os.environ.get('KEEP_LOCAL_STORE')
680 self.block_cache = block_cache if block_cache else KeepBlockCache()
681 self.timeout = timeout
682 self.proxy_timeout = proxy_timeout
683 self._user_agent_pool = Queue.LifoQueue()
684 self.upload_counter = Counter()
685 self.download_counter = Counter()
686 self.put_counter = Counter()
687 self.get_counter = Counter()
688 self.hits_counter = Counter()
689 self.misses_counter = Counter()
692 self.local_store = local_store
693 self.get = self.local_store_get
694 self.put = self.local_store_put
696 self.num_retries = num_retries
697 self.max_replicas_per_service = None
699 if not proxy.endswith('/'):
701 self.api_token = api_token
702 self._gateway_services = {}
703 self._keep_services = [{
705 'service_type': 'proxy',
706 '_service_root': proxy,
708 self._writable_services = self._keep_services
709 self.using_proxy = True
710 self._static_services_list = True
712 # It's important to avoid instantiating an API client
713 # unless we actually need one, for testing's sake.
714 if api_client is None:
715 api_client = arvados.api('v1')
716 self.api_client = api_client
717 self.api_token = api_client.api_token
718 self._gateway_services = {}
719 self._keep_services = None
720 self._writable_services = None
721 self.using_proxy = None
722 self._static_services_list = False
724 def current_timeout(self, attempt_number):
725 """Return the appropriate timeout to use for this client.
727 The proxy timeout setting if the backend service is currently a proxy,
728 the regular timeout setting otherwise. The `attempt_number` indicates
729 how many times the operation has been tried already (starting from 0
730 for the first try), and scales the connection timeout portion of the
731 return value accordingly.
734 # TODO(twp): the timeout should be a property of a
735 # KeepService, not a KeepClient. See #4488.
736 t = self.proxy_timeout if self.using_proxy else self.timeout
738 return (t[0] * (1 << attempt_number), t[1])
740 return (t[0] * (1 << attempt_number), t[1], t[2])
741 def _any_nondisk_services(self, service_list):
742 return any(ks.get('service_type', 'disk') != 'disk'
743 for ks in service_list)
745 def build_services_list(self, force_rebuild=False):
746 if (self._static_services_list or
747 (self._keep_services and not force_rebuild)):
751 keep_services = self.api_client.keep_services().accessible()
752 except Exception: # API server predates Keep services.
753 keep_services = self.api_client.keep_disks().list()
755 # Gateway services are only used when specified by UUID,
756 # so there's nothing to gain by filtering them by
758 self._gateway_services = {ks['uuid']: ks for ks in
759 keep_services.execute()['items']}
760 if not self._gateway_services:
761 raise arvados.errors.NoKeepServersError()
763 # Precompute the base URI for each service.
764 for r in self._gateway_services.itervalues():
765 host = r['service_host']
766 if not host.startswith('[') and host.find(':') >= 0:
767 # IPv6 URIs must be formatted like http://[::1]:80/...
768 host = '[' + host + ']'
769 r['_service_root'] = "{}://{}:{:d}/".format(
770 'https' if r['service_ssl_flag'] else 'http',
774 _logger.debug(str(self._gateway_services))
775 self._keep_services = [
776 ks for ks in self._gateway_services.itervalues()
777 if not ks.get('service_type', '').startswith('gateway:')]
778 self._writable_services = [ks for ks in self._keep_services
779 if not ks.get('read_only')]
781 # For disk type services, max_replicas_per_service is 1
782 # It is unknown (unlimited) for other service types.
783 if self._any_nondisk_services(self._writable_services):
784 self.max_replicas_per_service = None
786 self.max_replicas_per_service = 1
788 def _service_weight(self, data_hash, service_uuid):
789 """Compute the weight of a Keep service endpoint for a data
790 block with a known hash.
792 The weight is md5(h + u) where u is the last 15 characters of
793 the service endpoint's UUID.
795 return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
797 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
798 """Return an array of Keep service endpoints, in the order in
799 which they should be probed when reading or writing data with
800 the given hash+hints.
802 self.build_services_list(force_rebuild)
805 # Use the services indicated by the given +K@... remote
806 # service hints, if any are present and can be resolved to a
808 for hint in locator.hints:
809 if hint.startswith('K@'):
812 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
813 elif len(hint) == 29:
814 svc = self._gateway_services.get(hint[2:])
816 sorted_roots.append(svc['_service_root'])
818 # Sort the available local services by weight (heaviest first)
819 # for this locator, and return their service_roots (base URIs)
821 use_services = self._keep_services
823 use_services = self._writable_services
824 self.using_proxy = self._any_nondisk_services(use_services)
825 sorted_roots.extend([
826 svc['_service_root'] for svc in sorted(
829 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
830 _logger.debug("{}: {}".format(locator, sorted_roots))
833 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
834 # roots_map is a dictionary, mapping Keep service root strings
835 # to KeepService objects. Poll for Keep services, and add any
836 # new ones to roots_map. Return the current list of local
838 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
839 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
840 for root in local_roots:
841 if root not in roots_map:
842 roots_map[root] = self.KeepService(
843 root, self._user_agent_pool,
844 upload_counter=self.upload_counter,
845 download_counter=self.download_counter,
850 def _check_loop_result(result):
851 # KeepClient RetryLoops should save results as a 2-tuple: the
852 # actual result of the request, and the number of servers available
853 # to receive the request this round.
854 # This method returns True if there's a real result, False if
855 # there are no more servers available, otherwise None.
856 if isinstance(result, Exception):
858 result, tried_server_count = result
859 if (result is not None) and (result is not False):
861 elif tried_server_count < 1:
862 _logger.info("No more Keep services to try; giving up")
867 def get_from_cache(self, loc):
868 """Fetch a block only if is in the cache, otherwise return None."""
869 slot = self.block_cache.get(loc)
870 if slot is not None and slot.ready.is_set():
876 def head(self, loc_s, num_retries=None):
877 return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
880 def get(self, loc_s, num_retries=None):
881 return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
883 def _get_or_head(self, loc_s, method="GET", num_retries=None):
884 """Get data from Keep.
886 This method fetches one or more blocks of data from Keep. It
887 sends a request each Keep service registered with the API
888 server (or the proxy provided when this client was
889 instantiated), then each service named in location hints, in
890 sequence. As soon as one service provides the data, it's
894 * loc_s: A string of one or more comma-separated locators to fetch.
895 This method returns the concatenation of these blocks.
896 * num_retries: The number of times to retry GET requests to
897 *each* Keep server if it returns temporary failures, with
898 exponential backoff. Note that, in each loop, the method may try
899 to fetch data from every available Keep service, along with any
900 that are named in location hints in the locator. The default value
901 is set when the KeepClient is initialized.
904 return ''.join(self.get(x) for x in loc_s.split(','))
906 self.get_counter.add(1)
908 locator = KeepLocator(loc_s)
910 slot, first = self.block_cache.reserve_cache(locator.md5sum)
912 self.hits_counter.add(1)
916 self.misses_counter.add(1)
918 # If the locator has hints specifying a prefix (indicating a
919 # remote keepproxy) or the UUID of a local gateway service,
920 # read data from the indicated service(s) instead of the usual
921 # list of local disk services.
922 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
923 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
924 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
925 for hint in locator.hints if (
926 hint.startswith('K@') and
928 self._gateway_services.get(hint[2:])
930 # Map root URLs to their KeepService objects.
932 root: self.KeepService(root, self._user_agent_pool,
933 upload_counter=self.upload_counter,
934 download_counter=self.download_counter)
935 for root in hint_roots
938 # See #3147 for a discussion of the loop implementation. Highlights:
939 # * Refresh the list of Keep services after each failure, in case
940 # it's being updated.
941 # * Retry until we succeed, we're out of retries, or every available
942 # service has returned permanent failure.
946 loop = retry.RetryLoop(num_retries, self._check_loop_result,
948 for tries_left in loop:
950 sorted_roots = self.map_new_services(
952 force_rebuild=(tries_left < num_retries),
954 except Exception as error:
955 loop.save_result(error)
958 # Query KeepService objects that haven't returned
959 # permanent failure, in our specified shuffle order.
960 services_to_try = [roots_map[root]
961 for root in sorted_roots
962 if roots_map[root].usable()]
963 for keep_service in services_to_try:
964 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
967 loop.save_result((blob, len(services_to_try)))
969 # Always cache the result, then return it if we succeeded.
972 self.block_cache.cap_cache()
979 # Q: Including 403 is necessary for the Keep tests to continue
980 # passing, but maybe they should expect KeepReadError instead?
981 not_founds = sum(1 for key in sorted_roots
982 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
983 service_errors = ((key, roots_map[key].last_result()['error'])
984 for key in sorted_roots)
986 raise arvados.errors.KeepReadError(
987 "failed to read {}: no Keep services available ({})".format(
988 loc_s, loop.last_result()))
989 elif not_founds == len(sorted_roots):
990 raise arvados.errors.NotFoundError(
991 "{} not found".format(loc_s), service_errors)
993 raise arvados.errors.KeepReadError(
994 "failed to read {}".format(loc_s), service_errors, label="service")
997 def put(self, data, copies=2, num_retries=None):
998 """Save data in Keep.
1000 This method will get a list of Keep services from the API server, and
1001 send the data to each one simultaneously in a new thread. Once the
1002 uploads are finished, if enough copies are saved, this method returns
1003 the most recent HTTP response body. If requests fail to upload
1004 enough copies, this method raises KeepWriteError.
1007 * data: The string of data to upload.
1008 * copies: The number of copies that the user requires be saved.
1010 * num_retries: The number of times to retry PUT requests to
1011 *each* Keep server if it returns temporary failures, with
1012 exponential backoff. The default value is set when the
1013 KeepClient is initialized.
1016 if isinstance(data, unicode):
1017 data = data.encode("ascii")
1018 elif not isinstance(data, str):
1019 raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
1021 self.put_counter.add(1)
1023 data_hash = hashlib.md5(data).hexdigest()
1024 loc_s = data_hash + '+' + str(len(data))
1027 locator = KeepLocator(loc_s)
1030 # Tell the proxy how many copies we want it to store
1031 headers['X-Keep-Desired-Replicas'] = str(copies)
1033 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1036 for tries_left in loop:
1038 sorted_roots = self.map_new_services(
1040 force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
1041 except Exception as error:
1042 loop.save_result(error)
1045 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1046 data_hash=data_hash,
1047 copies=copies - done,
1048 max_service_replicas=self.max_replicas_per_service,
1049 timeout=self.current_timeout(num_retries - tries_left))
1050 for service_root, ks in [(root, roots_map[root])
1051 for root in sorted_roots]:
1054 writer_pool.add_task(ks, service_root)
1056 done += writer_pool.done()
1057 loop.save_result((done >= copies, writer_pool.total_task_nr))
1060 return writer_pool.response()
1062 raise arvados.errors.KeepWriteError(
1063 "failed to write {}: no Keep services available ({})".format(
1064 data_hash, loop.last_result()))
1066 service_errors = ((key, roots_map[key].last_result()['error'])
1067 for key in sorted_roots
1068 if roots_map[key].last_result()['error'])
1069 raise arvados.errors.KeepWriteError(
1070 "failed to write {} (wanted {} copies but wrote {})".format(
1071 data_hash, copies, writer_pool.done()), service_errors, label="service")
1073 def local_store_put(self, data, copies=1, num_retries=None):
1074 """A stub for put().
1076 This method is used in place of the real put() method when
1077 using local storage (see constructor's local_store argument).
1079 copies and num_retries arguments are ignored: they are here
1080 only for the sake of offering the same call signature as
1083 Data stored this way can be retrieved via local_store_get().
1085 md5 = hashlib.md5(data).hexdigest()
1086 locator = '%s+%d' % (md5, len(data))
1087 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1089 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1090 os.path.join(self.local_store, md5))
1093 def local_store_get(self, loc_s, num_retries=None):
1094 """Companion to local_store_put()."""
1096 locator = KeepLocator(loc_s)
1098 raise arvados.errors.NotFoundError(
1099 "Invalid data locator: '%s'" % loc_s)
1100 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1102 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1105 def is_cached(self, locator):
1106 return self.block_cache.reserve_cache(expect_hash)