29 import arvados.config as config
31 import arvados.retry as retry
34 _logger = logging.getLogger('arvados.keep')
35 global_client_object = None
38 class KeepLocator(object):
39 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
40 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
42 def __init__(self, locator_str):
45 self._perm_expiry = None
46 pieces = iter(locator_str.split('+'))
47 self.md5sum = next(pieces)
49 self.size = int(next(pieces))
53 if self.HINT_RE.match(hint) is None:
54 raise ValueError("invalid hint format: {}".format(hint))
55 elif hint.startswith('A'):
56 self.parse_permission_hint(hint)
58 self.hints.append(hint)
62 str(s) for s in [self.md5sum, self.size,
63 self.permission_hint()] + self.hints
67 if self.size is not None:
68 return "%s+%i" % (self.md5sum, self.size)
72 def _make_hex_prop(name, length):
73 # Build and return a new property with the given name that
74 # must be a hex string of the given length.
75 data_name = '_{}'.format(name)
77 return getattr(self, data_name)
78 def setter(self, hex_str):
79 if not arvados.util.is_hex(hex_str, length):
80 raise ValueError("{} is not a {}-digit hex string: {}".
81 format(name, length, hex_str))
82 setattr(self, data_name, hex_str)
83 return property(getter, setter)
85 md5sum = _make_hex_prop('md5sum', 32)
86 perm_sig = _make_hex_prop('perm_sig', 40)
89 def perm_expiry(self):
90 return self._perm_expiry
93 def perm_expiry(self, value):
94 if not arvados.util.is_hex(value, 1, 8):
96 "permission timestamp must be a hex Unix timestamp: {}".
98 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
100 def permission_hint(self):
101 data = [self.perm_sig, self.perm_expiry]
104 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
105 return "A{}@{:08x}".format(*data)
107 def parse_permission_hint(self, s):
109 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
111 raise ValueError("bad permission hint {}".format(s))
113 def permission_expired(self, as_of_dt=None):
114 if self.perm_expiry is None:
116 elif as_of_dt is None:
117 as_of_dt = datetime.datetime.now()
118 return self.perm_expiry <= as_of_dt
122 """Simple interface to a global KeepClient object.
124 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
125 own API client. The global KeepClient will build an API client from the
126 current Arvados configuration, which may not match the one you built.
131 def global_client_object(cls):
132 global global_client_object
133 # Previously, KeepClient would change its behavior at runtime based
134 # on these configuration settings. We simulate that behavior here
135 # by checking the values and returning a new KeepClient if any of
137 key = (config.get('ARVADOS_API_HOST'),
138 config.get('ARVADOS_API_TOKEN'),
139 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
140 config.get('ARVADOS_KEEP_PROXY'),
141 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
142 os.environ.get('KEEP_LOCAL_STORE'))
143 if (global_client_object is None) or (cls._last_key != key):
144 global_client_object = KeepClient()
146 return global_client_object
149 def get(locator, **kwargs):
150 return Keep.global_client_object().get(locator, **kwargs)
153 def put(data, **kwargs):
154 return Keep.global_client_object().put(data, **kwargs)
156 class KeepBlockCache(object):
157 # Default RAM cache is 256MiB
158 def __init__(self, cache_max=(256 * 1024 * 1024)):
159 self.cache_max = cache_max
161 self._cache_lock = threading.Lock()
163 class CacheSlot(object):
164 def __init__(self, locator):
165 self.locator = locator
166 self.ready = threading.Event()
173 def set(self, value):
178 if self.content is None:
181 return len(self.content)
184 '''Cap the cache size to self.cache_max'''
185 with self._cache_lock:
186 # Select all slots except those where ready.is_set() and content is
187 # None (that means there was an error reading the block).
188 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
189 sm = sum([slot.size() for slot in self._cache])
190 while len(self._cache) > 0 and sm > self.cache_max:
191 for i in xrange(len(self._cache)-1, -1, -1):
192 if self._cache[i].ready.is_set():
195 sm = sum([slot.size() for slot in self._cache])
197 def _get(self, locator):
198 # Test if the locator is already in the cache
199 for i in xrange(0, len(self._cache)):
200 if self._cache[i].locator == locator:
203 # move it to the front
205 self._cache.insert(0, n)
209 def get(self, locator):
210 with self._cache_lock:
211 return self._get(locator)
213 def reserve_cache(self, locator):
214 '''Reserve a cache slot for the specified locator,
215 or return the existing slot.'''
216 with self._cache_lock:
217 n = self._get(locator)
221 # Add a new cache slot for the locator
222 n = KeepBlockCache.CacheSlot(locator)
223 self._cache.insert(0, n)
226 class KeepClient(object):
228 # Default Keep server connection timeout: 2 seconds
229 # Default Keep server read timeout: 300 seconds
230 # Default Keep proxy connection timeout: 20 seconds
231 # Default Keep proxy read timeout: 300 seconds
232 DEFAULT_TIMEOUT = (2, 300)
233 DEFAULT_PROXY_TIMEOUT = (20, 300)
235 class ThreadLimiter(object):
237 Limit the number of threads running at a given time to
238 {desired successes} minus {successes reported}. When successes
239 reported == desired, wake up the remaining threads and tell
242 Should be used in a "with" block.
244 def __init__(self, todo):
247 self._response = None
248 self._todo_lock = threading.Semaphore(todo)
249 self._done_lock = threading.Lock()
252 self._todo_lock.acquire()
255 def __exit__(self, type, value, traceback):
256 self._todo_lock.release()
258 def shall_i_proceed(self):
260 Return true if the current thread should do stuff. Return
261 false if the current thread should just stop.
263 with self._done_lock:
264 return (self._done < self._todo)
266 def save_response(self, response_body, replicas_stored):
268 Records a response body (a locator, possibly signed) returned by
269 the Keep server. It is not necessary to save more than
270 one response, since we presume that any locator returned
271 in response to a successful request is valid.
273 with self._done_lock:
274 self._done += replicas_stored
275 self._response = response_body
279 Returns the body from the response to a PUT request.
281 with self._done_lock:
282 return self._response
286 Return how many successes were reported.
288 with self._done_lock:
292 class KeepService(object):
293 """Make requests to a single Keep service, and track results.
295 A KeepService is intended to last long enough to perform one
296 transaction (GET or PUT) against one Keep service. This can
297 involve calling either get() or put() multiple times in order
298 to retry after transient failures. However, calling both get()
299 and put() on a single instance -- or using the same instance
300 to access two different Keep services -- will not produce
307 arvados.errors.HttpError,
310 def __init__(self, root, user_agent_pool=Queue.LifoQueue(), **headers):
312 self._user_agent_pool = user_agent_pool
313 self._result = {'error': None}
316 self.get_headers = {'Accept': 'application/octet-stream'}
317 self.get_headers.update(headers)
318 self.put_headers = headers
321 """Is it worth attempting a request?"""
325 """Did the request succeed or encounter permanent failure?"""
326 return self._result['error'] == False or not self._usable
328 def last_result(self):
331 def _get_user_agent(self):
333 return self._user_agent_pool.get(False)
337 def _put_user_agent(self, ua):
340 self._user_agent_pool.put(ua, False)
345 def _socket_open(family, socktype, protocol, address=None):
346 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
347 s = socket.socket(family, socktype, protocol)
348 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
349 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
350 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
353 def get(self, locator, timeout=None):
354 # locator is a KeepLocator object.
355 url = self.root + str(locator)
356 _logger.debug("Request: GET %s", url)
357 curl = self._get_user_agent()
359 with timer.Timer() as t:
361 response_body = cStringIO.StringIO()
362 curl.setopt(pycurl.NOSIGNAL, 1)
363 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
364 curl.setopt(pycurl.URL, url.encode('utf-8'))
365 curl.setopt(pycurl.HTTPHEADER, [
366 '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
367 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
368 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
369 self._setcurltimeouts(curl, timeout)
372 except Exception as e:
373 raise arvados.errors.HttpError(0, str(e))
375 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
376 'body': response_body.getvalue(),
377 'headers': self._headers,
380 ok = retry.check_http_response_success(self._result['status_code'])
382 self._result['error'] = arvados.errors.HttpError(
383 self._result['status_code'],
384 self._headers.get('x-status-line', 'Error'))
385 except self.HTTP_ERRORS as e:
390 self._usable = ok != False
391 if self._result.get('status_code', None):
392 # The client worked well enough to get an HTTP status
393 # code, so presumably any problems are just on the
394 # server side and it's OK to reuse the client.
395 self._put_user_agent(curl)
397 # Don't return this client to the pool, in case it's
401 _logger.debug("Request fail: GET %s => %s: %s",
402 url, type(self._result['error']), str(self._result['error']))
404 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
405 self._result['status_code'],
406 len(self._result['body']),
408 (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
409 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
410 if resp_md5 != locator.md5sum:
411 _logger.warning("Checksum fail: md5(%s) = %s",
413 self._result['error'] = arvados.errors.HttpError(
416 return self._result['body']
418 def put(self, hash_s, body, timeout=None):
419 url = self.root + hash_s
420 _logger.debug("Request: PUT %s", url)
421 curl = self._get_user_agent()
424 body_reader = cStringIO.StringIO(body)
425 response_body = cStringIO.StringIO()
426 curl.setopt(pycurl.NOSIGNAL, 1)
427 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
428 curl.setopt(pycurl.URL, url.encode('utf-8'))
429 # Using UPLOAD tells cURL to wait for a "go ahead" from the
430 # Keep server (in the form of a HTTP/1.1 "100 Continue"
431 # response) instead of sending the request body immediately.
432 # This allows the server to reject the request if the request
433 # is invalid or the server is read-only, without waiting for
434 # the client to send the entire block.
435 curl.setopt(pycurl.UPLOAD, True)
436 curl.setopt(pycurl.INFILESIZE, len(body))
437 curl.setopt(pycurl.READFUNCTION, body_reader.read)
438 curl.setopt(pycurl.HTTPHEADER, [
439 '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
440 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
441 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
442 self._setcurltimeouts(curl, timeout)
445 except Exception as e:
446 raise arvados.errors.HttpError(0, str(e))
448 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
449 'body': response_body.getvalue(),
450 'headers': self._headers,
453 ok = retry.check_http_response_success(self._result['status_code'])
455 self._result['error'] = arvados.errors.HttpError(
456 self._result['status_code'],
457 self._headers.get('x-status-line', 'Error'))
458 except self.HTTP_ERRORS as e:
463 self._usable = ok != False # still usable if ok is True or None
464 if self._result.get('status_code', None):
465 # Client is functional. See comment in get().
466 self._put_user_agent(curl)
470 _logger.debug("Request fail: PUT %s => %s: %s",
471 url, type(self._result['error']), str(self._result['error']))
475 def _setcurltimeouts(self, curl, timeouts):
478 elif isinstance(timeouts, tuple):
479 conn_t, xfer_t = timeouts
481 conn_t, xfer_t = (timeouts, timeouts)
482 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
483 curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
485 def _headerfunction(self, header_line):
486 header_line = header_line.decode('iso-8859-1')
487 if ':' in header_line:
488 name, value = header_line.split(':', 1)
489 name = name.strip().lower()
490 value = value.strip()
492 name = self._lastheadername
493 value = self._headers[name] + ' ' + header_line.strip()
494 elif header_line.startswith('HTTP/'):
495 name = 'x-status-line'
498 _logger.error("Unexpected header line: %s", header_line)
500 self._lastheadername = name
501 self._headers[name] = value
502 # Returning None implies all bytes were written
505 class KeepWriterThread(threading.Thread):
507 Write a blob of data to the given Keep server. On success, call
508 save_response() of the given ThreadLimiter to save the returned
511 def __init__(self, keep_service, **kwargs):
512 super(KeepClient.KeepWriterThread, self).__init__()
513 self.service = keep_service
515 self._success = False
521 with self.args['thread_limiter'] as limiter:
522 if not limiter.shall_i_proceed():
523 # My turn arrived, but the job has been done without
526 self.run_with_limiter(limiter)
528 def run_with_limiter(self, limiter):
529 if self.service.finished():
531 _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
532 str(threading.current_thread()),
533 self.args['data_hash'],
534 len(self.args['data']),
535 self.args['service_root'])
536 self._success = bool(self.service.put(
537 self.args['data_hash'],
539 timeout=self.args.get('timeout', None)))
540 result = self.service.last_result()
542 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
543 str(threading.current_thread()),
544 self.args['data_hash'],
545 len(self.args['data']),
546 self.args['service_root'])
547 # Tick the 'done' counter for the number of replica
548 # reported stored by the server, for the case that
549 # we're talking to a proxy or other backend that
550 # stores to multiple copies for us.
552 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
553 except (KeyError, ValueError):
555 limiter.save_response(result['body'].strip(), replicas_stored)
556 elif result.get('status_code', None):
557 _logger.debug("Request fail: PUT %s => %s %s",
558 self.args['data_hash'],
559 result['status_code'],
563 def __init__(self, api_client=None, proxy=None,
564 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
565 api_token=None, local_store=None, block_cache=None,
566 num_retries=0, session=None):
567 """Initialize a new KeepClient.
571 The API client to use to find Keep services. If not
572 provided, KeepClient will build one from available Arvados
576 If specified, this KeepClient will send requests to this Keep
577 proxy. Otherwise, KeepClient will fall back to the setting of the
578 ARVADOS_KEEP_PROXY configuration setting. If you want to ensure
579 KeepClient does not use a proxy, pass in an empty string.
582 The initial timeout (in seconds) for HTTP requests to Keep
583 non-proxy servers. A tuple of two floats is interpreted as
584 (connection_timeout, read_timeout): see
585 http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
586 Because timeouts are often a result of transient server load, the
587 actual connection timeout will be increased by a factor of two on
592 The initial timeout (in seconds) for HTTP requests to
593 Keep proxies. A tuple of two floats is interpreted as
594 (connection_timeout, read_timeout). The behavior described
595 above for adjusting connection timeouts on retry also applies.
599 If you're not using an API client, but only talking
600 directly to a Keep proxy, this parameter specifies an API token
601 to authenticate Keep requests. It is an error to specify both
602 api_client and api_token. If you specify neither, KeepClient
603 will use one available from the Arvados configuration.
606 If specified, this KeepClient will bypass Keep
607 services, and save data to the named directory. If unspecified,
608 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
609 environment variable. If you want to ensure KeepClient does not
610 use local storage, pass in an empty string. This is primarily
611 intended to mock a server for testing.
614 The default number of times to retry failed requests.
615 This will be used as the default num_retries value when get() and
616 put() are called. Default 0.
618 self.lock = threading.Lock()
620 proxy = config.get('ARVADOS_KEEP_PROXY')
621 if api_token is None:
622 if api_client is None:
623 api_token = config.get('ARVADOS_API_TOKEN')
625 api_token = api_client.api_token
626 elif api_client is not None:
628 "can't build KeepClient with both API client and token")
629 if local_store is None:
630 local_store = os.environ.get('KEEP_LOCAL_STORE')
632 self.block_cache = block_cache if block_cache else KeepBlockCache()
633 self.timeout = timeout
634 self.proxy_timeout = proxy_timeout
635 self._user_agent_pool = Queue.LifoQueue()
638 self.local_store = local_store
639 self.get = self.local_store_get
640 self.put = self.local_store_put
642 self.num_retries = num_retries
644 if not proxy.endswith('/'):
646 self.api_token = api_token
647 self._gateway_services = {}
648 self._keep_services = [{
650 '_service_root': proxy,
652 self._writable_services = self._keep_services
653 self.using_proxy = True
654 self._static_services_list = True
655 self.thread_count = None
657 # It's important to avoid instantiating an API client
658 # unless we actually need one, for testing's sake.
659 if api_client is None:
660 api_client = arvados.api('v1')
661 self.api_client = api_client
662 self.api_token = api_client.api_token
663 self._gateway_services = {}
664 self._keep_services = None
665 self._writable_services = None
666 self.using_proxy = None
667 self._static_services_list = False
668 self.thread_count = None
670 def current_timeout(self, attempt_number):
671 """Return the appropriate timeout to use for this client.
673 The proxy timeout setting if the backend service is currently a proxy,
674 the regular timeout setting otherwise. The `attempt_number` indicates
675 how many times the operation has been tried already (starting from 0
676 for the first try), and scales the connection timeout portion of the
677 return value accordingly.
680 # TODO(twp): the timeout should be a property of a
681 # KeepService, not a KeepClient. See #4488.
682 t = self.proxy_timeout if self.using_proxy else self.timeout
683 return (t[0] * (1 << attempt_number), t[1])
685 def build_services_list(self, force_rebuild=False):
686 if (self._static_services_list or
687 (self._keep_services and not force_rebuild)):
691 keep_services = self.api_client.keep_services().accessible()
692 except Exception: # API server predates Keep services.
693 keep_services = self.api_client.keep_disks().list()
695 accessible = keep_services.execute().get('items')
697 raise arvados.errors.NoKeepServersError()
699 # Precompute the base URI for each service.
701 host = r['service_host']
702 if not host.startswith('[') and host.find(':') >= 0:
703 # IPv6 URIs must be formatted like http://[::1]:80/...
704 host = '[' + host + ']'
705 r['_service_root'] = "{}://{}:{:d}/".format(
706 'https' if r['service_ssl_flag'] else 'http',
710 # Gateway services are only used when specified by UUID,
711 # so there's nothing to gain by filtering them by
713 self._gateway_services = {ks.get('uuid'): ks for ks in accessible}
714 _logger.debug(str(self._gateway_services))
716 self._keep_services = [
717 ks for ks in accessible
718 if ks.get('service_type') in ['disk', 'proxy']]
719 self._writable_services = [
720 ks for ks in accessible
721 if (ks.get('service_type') in ['disk', 'proxy']) and (True != ks.get('read_only'))]
722 _logger.debug(str(self._keep_services))
724 self.using_proxy = any(ks.get('service_type') == 'proxy'
725 for ks in self._keep_services)
726 # Use a thread_count of 1 if the service is not a disk
727 for ks in accessible:
728 if ('disk' != ks.get('service_type')) and (True != ks.get('read_only')):
729 self.thread_count = 1
731 def _service_weight(self, data_hash, service_uuid):
732 """Compute the weight of a Keep service endpoint for a data
733 block with a known hash.
735 The weight is md5(h + u) where u is the last 15 characters of
736 the service endpoint's UUID.
738 return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
740 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
741 """Return an array of Keep service endpoints, in the order in
742 which they should be probed when reading or writing data with
743 the given hash+hints.
745 self.build_services_list(force_rebuild)
749 # Use the services indicated by the given hints that are
750 # not size or authorization hints.
751 # If it is a K@ hint of size 7, it is a keepproxy
752 # Otherwise, expect the hint to be of len 29 and a uuid
753 # of a remote service that can be resolved to a URI.
754 for hint in locator.hints:
755 if not hint.startswith('A') and not hint[0].isdigit():
756 if len(hint) == 7 and hint.startswith('K@'):
758 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
759 elif len(hint) == 29 and re.match(util.uuid_pattern, hint[2:]):
760 svc = self._gateway_services.get(hint[2:])
762 sorted_roots.append(svc['_service_root'])
764 # Sort the available local services by weight (heaviest first)
765 # for this locator, and return their service_roots (base URIs)
767 use_services = self._keep_services
769 use_services = self._writable_services
770 sorted_roots.extend([
771 svc['_service_root'] for svc in sorted(
774 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
775 _logger.debug("{}: {}".format(locator, sorted_roots))
778 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
779 # roots_map is a dictionary, mapping Keep service root strings
780 # to KeepService objects. Poll for Keep services, and add any
781 # new ones to roots_map. Return the current list of local
783 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
784 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
785 for root in local_roots:
786 if root not in roots_map:
787 roots_map[root] = self.KeepService(
788 root, self._user_agent_pool, **headers)
792 def _check_loop_result(result):
793 # KeepClient RetryLoops should save results as a 2-tuple: the
794 # actual result of the request, and the number of servers available
795 # to receive the request this round.
796 # This method returns True if there's a real result, False if
797 # there are no more servers available, otherwise None.
798 if isinstance(result, Exception):
800 result, tried_server_count = result
801 if (result is not None) and (result is not False):
803 elif tried_server_count < 1:
804 _logger.info("No more Keep services to try; giving up")
809 def get_from_cache(self, loc):
810 """Fetch a block only if is in the cache, otherwise return None."""
811 slot = self.block_cache.get(loc)
812 if slot is not None and slot.ready.is_set():
818 def get(self, loc_s, num_retries=None):
819 """Get data from Keep.
821 This method fetches one or more blocks of data from Keep. It
822 sends a request each Keep service registered with the API
823 server (or the proxy provided when this client was
824 instantiated), then each service named in location hints, in
825 sequence. As soon as one service provides the data, it's
829 * loc_s: A string of one or more comma-separated locators to fetch.
830 This method returns the concatenation of these blocks.
831 * num_retries: The number of times to retry GET requests to
832 *each* Keep server if it returns temporary failures, with
833 exponential backoff. Note that, in each loop, the method may try
834 to fetch data from every available Keep service, along with any
835 that are named in location hints in the locator. The default value
836 is set when the KeepClient is initialized.
839 return ''.join(self.get(x) for x in loc_s.split(','))
840 locator = KeepLocator(loc_s)
841 slot, first = self.block_cache.reserve_cache(locator.md5sum)
846 # If the locator has hints specifying a prefix (indicating a
847 # remote keepproxy) or the UUID of a local gateway service,
848 # read data from the indicated service(s) instead of the usual
849 # list of local disk services.
850 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
851 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
852 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
853 for hint in locator.hints if (
854 hint.startswith('K@') and
856 self._gateway_services.get(hint[2:])
858 # Map root URLs to their KeepService objects.
860 root: self.KeepService(root, self._user_agent_pool)
861 for root in hint_roots
864 # See #3147 for a discussion of the loop implementation. Highlights:
865 # * Refresh the list of Keep services after each failure, in case
866 # it's being updated.
867 # * Retry until we succeed, we're out of retries, or every available
868 # service has returned permanent failure.
872 loop = retry.RetryLoop(num_retries, self._check_loop_result,
874 for tries_left in loop:
876 sorted_roots = self.map_new_services(
878 force_rebuild=(tries_left < num_retries),
880 except Exception as error:
881 loop.save_result(error)
884 # Query KeepService objects that haven't returned
885 # permanent failure, in our specified shuffle order.
886 services_to_try = [roots_map[root]
887 for root in sorted_roots
888 if roots_map[root].usable()]
889 for keep_service in services_to_try:
890 blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
893 loop.save_result((blob, len(services_to_try)))
895 # Always cache the result, then return it if we succeeded.
897 self.block_cache.cap_cache()
901 # Q: Including 403 is necessary for the Keep tests to continue
902 # passing, but maybe they should expect KeepReadError instead?
903 not_founds = sum(1 for key in sorted_roots
904 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
905 service_errors = ((key, roots_map[key].last_result()['error'])
906 for key in sorted_roots)
908 raise arvados.errors.KeepReadError(
909 "failed to read {}: no Keep services available ({})".format(
910 loc_s, loop.last_result()))
911 elif not_founds == len(sorted_roots):
912 raise arvados.errors.NotFoundError(
913 "{} not found".format(loc_s), service_errors)
915 raise arvados.errors.KeepReadError(
916 "failed to read {}".format(loc_s), service_errors, label="service")
919 def put(self, data, copies=2, num_retries=None):
920 """Save data in Keep.
922 This method will get a list of Keep services from the API server, and
923 send the data to each one simultaneously in a new thread. Once the
924 uploads are finished, if enough copies are saved, this method returns
925 the most recent HTTP response body. If requests fail to upload
926 enough copies, this method raises KeepWriteError.
929 * data: The string of data to upload.
930 * copies: The number of copies that the user requires be saved.
932 * num_retries: The number of times to retry PUT requests to
933 *each* Keep server if it returns temporary failures, with
934 exponential backoff. The default value is set when the
935 KeepClient is initialized.
938 if isinstance(data, unicode):
939 data = data.encode("ascii")
940 elif not isinstance(data, str):
941 raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
943 data_hash = hashlib.md5(data).hexdigest()
944 loc_s = data_hash + '+' + str(len(data))
947 locator = KeepLocator(loc_s)
950 # Tell the proxy how many copies we want it to store
951 headers['X-Keep-Desired-Replication'] = str(copies)
953 thread_limiter = KeepClient.ThreadLimiter(1 if 1 == self.thread_count else copies)
954 loop = retry.RetryLoop(num_retries, self._check_loop_result,
956 for tries_left in loop:
958 local_roots = self.map_new_services(
960 force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
961 except Exception as error:
962 loop.save_result(error)
966 for service_root, ks in roots_map.iteritems():
969 t = KeepClient.KeepWriterThread(
973 service_root=service_root,
974 thread_limiter=thread_limiter,
975 timeout=self.current_timeout(num_retries-tries_left))
980 loop.save_result((thread_limiter.done() >= copies, len(threads)))
983 return thread_limiter.response()
985 raise arvados.errors.KeepWriteError(
986 "failed to write {}: no Keep services available ({})".format(
987 data_hash, loop.last_result()))
989 service_errors = ((key, roots_map[key].last_result()['error'])
990 for key in local_roots
991 if roots_map[key].last_result()['error'])
992 raise arvados.errors.KeepWriteError(
993 "failed to write {} (wanted {} copies but wrote {})".format(
994 data_hash, copies, thread_limiter.done()), service_errors, label="service")
996 def local_store_put(self, data, copies=1, num_retries=None):
999 This method is used in place of the real put() method when
1000 using local storage (see constructor's local_store argument).
1002 copies and num_retries arguments are ignored: they are here
1003 only for the sake of offering the same call signature as
1006 Data stored this way can be retrieved via local_store_get().
1008 md5 = hashlib.md5(data).hexdigest()
1009 locator = '%s+%d' % (md5, len(data))
1010 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1012 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1013 os.path.join(self.local_store, md5))
1016 def local_store_get(self, loc_s, num_retries=None):
1017 """Companion to local_store_put()."""
1019 locator = KeepLocator(loc_s)
1021 raise arvados.errors.NotFoundError(
1022 "Invalid data locator: '%s'" % loc_s)
1023 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1025 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1028 def is_cached(self, locator):
1029 return self.block_cache.reserve_cache(expect_hash)