28 import arvados.config as config
30 import arvados.retry as retry
33 _logger = logging.getLogger('arvados.keep')
34 global_client_object = None
37 class KeepLocator(object):
38 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
39 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
41 def __init__(self, locator_str):
44 self._perm_expiry = None
45 pieces = iter(locator_str.split('+'))
46 self.md5sum = next(pieces)
48 self.size = int(next(pieces))
52 if self.HINT_RE.match(hint) is None:
53 raise ValueError("invalid hint format: {}".format(hint))
54 elif hint.startswith('A'):
55 self.parse_permission_hint(hint)
57 self.hints.append(hint)
61 str(s) for s in [self.md5sum, self.size,
62 self.permission_hint()] + self.hints
66 if self.size is not None:
67 return "%s+%i" % (self.md5sum, self.size)
71 def _make_hex_prop(name, length):
72 # Build and return a new property with the given name that
73 # must be a hex string of the given length.
74 data_name = '_{}'.format(name)
76 return getattr(self, data_name)
77 def setter(self, hex_str):
78 if not arvados.util.is_hex(hex_str, length):
79 raise ValueError("{} must be a {}-digit hex string: {}".
80 format(name, length, hex_str))
81 setattr(self, data_name, hex_str)
82 return property(getter, setter)
84 md5sum = _make_hex_prop('md5sum', 32)
85 perm_sig = _make_hex_prop('perm_sig', 40)
88 def perm_expiry(self):
89 return self._perm_expiry
92 def perm_expiry(self, value):
93 if not arvados.util.is_hex(value, 1, 8):
95 "permission timestamp must be a hex Unix timestamp: {}".
97 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
99 def permission_hint(self):
100 data = [self.perm_sig, self.perm_expiry]
103 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
104 return "A{}@{:08x}".format(*data)
106 def parse_permission_hint(self, s):
108 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
110 raise ValueError("bad permission hint {}".format(s))
112 def permission_expired(self, as_of_dt=None):
113 if self.perm_expiry is None:
115 elif as_of_dt is None:
116 as_of_dt = datetime.datetime.now()
117 return self.perm_expiry <= as_of_dt
121 """Simple interface to a global KeepClient object.
123 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
124 own API client. The global KeepClient will build an API client from the
125 current Arvados configuration, which may not match the one you built.
130 def global_client_object(cls):
131 global global_client_object
132 # Previously, KeepClient would change its behavior at runtime based
133 # on these configuration settings. We simulate that behavior here
134 # by checking the values and returning a new KeepClient if any of
136 key = (config.get('ARVADOS_API_HOST'),
137 config.get('ARVADOS_API_TOKEN'),
138 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
139 config.get('ARVADOS_KEEP_PROXY'),
140 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
141 os.environ.get('KEEP_LOCAL_STORE'))
142 if (global_client_object is None) or (cls._last_key != key):
143 global_client_object = KeepClient()
145 return global_client_object
148 def get(locator, **kwargs):
149 return Keep.global_client_object().get(locator, **kwargs)
152 def put(data, **kwargs):
153 return Keep.global_client_object().put(data, **kwargs)
155 class KeepBlockCache(object):
156 # Default RAM cache is 256MiB
157 def __init__(self, cache_max=(256 * 1024 * 1024)):
158 self.cache_max = cache_max
160 self._cache_lock = threading.Lock()
162 class CacheSlot(object):
163 def __init__(self, locator):
164 self.locator = locator
165 self.ready = threading.Event()
172 def set(self, value):
177 if self.content is None:
180 return len(self.content)
183 '''Cap the cache size to self.cache_max'''
184 with self._cache_lock:
185 # Select all slots except those where ready.is_set() and content is
186 # None (that means there was an error reading the block).
187 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
188 sm = sum([slot.size() for slot in self._cache])
189 while len(self._cache) > 0 and sm > self.cache_max:
190 for i in xrange(len(self._cache)-1, -1, -1):
191 if self._cache[i].ready.is_set():
194 sm = sum([slot.size() for slot in self._cache])
196 def _get(self, locator):
197 # Test if the locator is already in the cache
198 for i in xrange(0, len(self._cache)):
199 if self._cache[i].locator == locator:
202 # move it to the front
204 self._cache.insert(0, n)
208 def get(self, locator):
209 with self._cache_lock:
210 return self._get(locator)
212 def reserve_cache(self, locator):
213 '''Reserve a cache slot for the specified locator,
214 or return the existing slot.'''
215 with self._cache_lock:
216 n = self._get(locator)
220 # Add a new cache slot for the locator
221 n = KeepBlockCache.CacheSlot(locator)
222 self._cache.insert(0, n)
225 class KeepClient(object):
227 # Default Keep server connection timeout: 2 seconds
228 # Default Keep server read timeout: 300 seconds
229 # Default Keep proxy connection timeout: 20 seconds
230 # Default Keep proxy read timeout: 300 seconds
231 DEFAULT_TIMEOUT = (2, 300)
232 DEFAULT_PROXY_TIMEOUT = (20, 300)
234 class ThreadLimiter(object):
236 Limit the number of threads running at a given time to
237 {desired successes} minus {successes reported}. When successes
238 reported == desired, wake up the remaining threads and tell
241 Should be used in a "with" block.
243 def __init__(self, todo):
246 self._response = None
247 self._todo_lock = threading.Semaphore(todo)
248 self._done_lock = threading.Lock()
251 self._todo_lock.acquire()
254 def __exit__(self, type, value, traceback):
255 self._todo_lock.release()
257 def shall_i_proceed(self):
259 Return true if the current thread should do stuff. Return
260 false if the current thread should just stop.
262 with self._done_lock:
263 return (self._done < self._todo)
265 def save_response(self, response_body, replicas_stored):
267 Records a response body (a locator, possibly signed) returned by
268 the Keep server. It is not necessary to save more than
269 one response, since we presume that any locator returned
270 in response to a successful request is valid.
272 with self._done_lock:
273 self._done += replicas_stored
274 self._response = response_body
278 Returns the body from the response to a PUT request.
280 with self._done_lock:
281 return self._response
285 Return how many successes were reported.
287 with self._done_lock:
291 class KeepService(object):
292 """Make requests to a single Keep service, and track results.
294 A KeepService is intended to last long enough to perform one
295 transaction (GET or PUT) against one Keep service. This can
296 involve calling either get() or put() multiple times in order
297 to retry after transient failures. However, calling both get()
298 and put() on a single instance -- or using the same instance
299 to access two different Keep services -- will not produce
306 arvados.errors.HttpError,
309 def __init__(self, root, user_agent_pool=Queue.LifoQueue(), **headers):
311 self._user_agent_pool = user_agent_pool
312 self._result = {'error': None}
315 self.get_headers = {'Accept': 'application/octet-stream'}
316 self.get_headers.update(headers)
317 self.put_headers = headers
320 """Is it worth attempting a request?"""
324 """Did the request succeed or encounter permanent failure?"""
325 return self._result['error'] == False or not self._usable
327 def last_result(self):
330 def _get_user_agent(self):
332 return self._user_agent_pool.get(False)
336 def _put_user_agent(self, ua):
339 self._user_agent_pool.put(ua, False)
344 def _socket_open(family, socktype, protocol, address=None):
345 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
346 s = socket.socket(family, socktype, protocol)
347 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
348 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
349 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
352 def get(self, locator, timeout=None):
353 # locator is a KeepLocator object.
354 url = self.root + str(locator)
355 _logger.debug("Request: GET %s", url)
356 curl = self._get_user_agent()
358 with timer.Timer() as t:
360 response_body = cStringIO.StringIO()
361 curl.setopt(pycurl.NOSIGNAL, 1)
362 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
363 curl.setopt(pycurl.URL, url.encode('utf-8'))
364 curl.setopt(pycurl.HTTPHEADER, [
365 '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
366 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
367 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
368 self._setcurltimeouts(curl, timeout)
371 except Exception as e:
372 raise arvados.errors.HttpError(0, str(e))
374 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
375 'body': response_body.getvalue(),
376 'headers': self._headers,
379 ok = retry.check_http_response_success(self._result['status_code'])
381 self._result['error'] = arvados.errors.HttpError(
382 self._result['status_code'],
383 self._headers.get('x-status-line', 'Error'))
384 except self.HTTP_ERRORS as e:
389 self._usable = ok != False
390 if self._result.get('status_code', None):
391 # The client worked well enough to get an HTTP status
392 # code, so presumably any problems are just on the
393 # server side and it's OK to reuse the client.
394 self._put_user_agent(curl)
396 # Don't return this client to the pool, in case it's
400 _logger.debug("Request fail: GET %s => %s: %s",
401 url, type(self._result['error']), str(self._result['error']))
403 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
404 self._result['status_code'],
405 len(self._result['body']),
407 (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
408 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
409 if resp_md5 != locator.md5sum:
410 _logger.warning("Checksum fail: md5(%s) = %s",
412 self._result['error'] = arvados.errors.HttpError(
415 return self._result['body']
417 def put(self, hash_s, body, timeout=None):
418 url = self.root + hash_s
419 _logger.debug("Request: PUT %s", url)
420 curl = self._get_user_agent()
423 body_reader = cStringIO.StringIO(body)
424 response_body = cStringIO.StringIO()
425 curl.setopt(pycurl.NOSIGNAL, 1)
426 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
427 curl.setopt(pycurl.URL, url.encode('utf-8'))
428 # Using UPLOAD tells cURL to wait for a "go ahead" from the
429 # Keep server (in the form of a HTTP/1.1 "100 Continue"
430 # response) instead of sending the request body immediately.
431 # This allows the server to reject the request if the request
432 # is invalid or the server is read-only, without waiting for
433 # the client to send the entire block.
434 curl.setopt(pycurl.UPLOAD, True)
435 curl.setopt(pycurl.INFILESIZE, len(body))
436 curl.setopt(pycurl.READFUNCTION, body_reader.read)
437 curl.setopt(pycurl.HTTPHEADER, [
438 '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
439 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
440 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
441 self._setcurltimeouts(curl, timeout)
444 except Exception as e:
445 raise arvados.errors.HttpError(0, str(e))
447 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
448 'body': response_body.getvalue(),
449 'headers': self._headers,
452 ok = retry.check_http_response_success(self._result['status_code'])
454 self._result['error'] = arvados.errors.HttpError(
455 self._result['status_code'],
456 self._headers.get('x-status-line', 'Error'))
457 except self.HTTP_ERRORS as e:
462 self._usable = ok != False # still usable if ok is True or None
463 if self._result.get('status_code', None):
464 # Client is functional. See comment in get().
465 self._put_user_agent(curl)
469 _logger.debug("Request fail: PUT %s => %s: %s",
470 url, type(self._result['error']), str(self._result['error']))
474 def _setcurltimeouts(self, curl, timeouts):
477 elif isinstance(timeouts, tuple):
478 conn_t, xfer_t = timeouts
480 conn_t, xfer_t = (timeouts, timeouts)
481 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
482 curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
484 def _headerfunction(self, header_line):
485 header_line = header_line.decode('iso-8859-1')
486 if ':' in header_line:
487 name, value = header_line.split(':', 1)
488 name = name.strip().lower()
489 value = value.strip()
491 name = self._lastheadername
492 value = self._headers[name] + ' ' + header_line.strip()
493 elif header_line.startswith('HTTP/'):
494 name = 'x-status-line'
497 _logger.error("Unexpected header line: %s", header_line)
499 self._lastheadername = name
500 self._headers[name] = value
501 # Returning None implies all bytes were written
504 class KeepWriterThread(threading.Thread):
506 Write a blob of data to the given Keep server. On success, call
507 save_response() of the given ThreadLimiter to save the returned
510 def __init__(self, keep_service, **kwargs):
511 super(KeepClient.KeepWriterThread, self).__init__()
512 self.service = keep_service
514 self._success = False
520 with self.args['thread_limiter'] as limiter:
521 if not limiter.shall_i_proceed():
522 # My turn arrived, but the job has been done without
525 self.run_with_limiter(limiter)
527 def run_with_limiter(self, limiter):
528 if self.service.finished():
530 _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
531 str(threading.current_thread()),
532 self.args['data_hash'],
533 len(self.args['data']),
534 self.args['service_root'])
535 self._success = bool(self.service.put(
536 self.args['data_hash'],
538 timeout=self.args.get('timeout', None)))
539 result = self.service.last_result()
541 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
542 str(threading.current_thread()),
543 self.args['data_hash'],
544 len(self.args['data']),
545 self.args['service_root'])
546 # Tick the 'done' counter for the number of replica
547 # reported stored by the server, for the case that
548 # we're talking to a proxy or other backend that
549 # stores to multiple copies for us.
551 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
552 except (KeyError, ValueError):
554 limiter.save_response(result['body'].strip(), replicas_stored)
555 elif result.get('status_code', None):
556 _logger.debug("Request fail: PUT %s => %s %s",
557 self.args['data_hash'],
558 result['status_code'],
562 def __init__(self, api_client=None, proxy=None,
563 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
564 api_token=None, local_store=None, block_cache=None,
565 num_retries=0, session=None):
566 """Initialize a new KeepClient.
570 The API client to use to find Keep services. If not
571 provided, KeepClient will build one from available Arvados
575 If specified, this KeepClient will send requests to this Keep
576 proxy. Otherwise, KeepClient will fall back to the setting of the
577 ARVADOS_KEEP_PROXY configuration setting. If you want to ensure
578 KeepClient does not use a proxy, pass in an empty string.
581 The initial timeout (in seconds) for HTTP requests to Keep
582 non-proxy servers. A tuple of two floats is interpreted as
583 (connection_timeout, read_timeout): see
584 http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
585 Because timeouts are often a result of transient server load, the
586 actual connection timeout will be increased by a factor of two on
591 The initial timeout (in seconds) for HTTP requests to
592 Keep proxies. A tuple of two floats is interpreted as
593 (connection_timeout, read_timeout). The behavior described
594 above for adjusting connection timeouts on retry also applies.
598 If you're not using an API client, but only talking
599 directly to a Keep proxy, this parameter specifies an API token
600 to authenticate Keep requests. It is an error to specify both
601 api_client and api_token. If you specify neither, KeepClient
602 will use one available from the Arvados configuration.
605 If specified, this KeepClient will bypass Keep
606 services, and save data to the named directory. If unspecified,
607 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
608 environment variable. If you want to ensure KeepClient does not
609 use local storage, pass in an empty string. This is primarily
610 intended to mock a server for testing.
613 The default number of times to retry failed requests.
614 This will be used as the default num_retries value when get() and
615 put() are called. Default 0.
617 self.lock = threading.Lock()
619 proxy = config.get('ARVADOS_KEEP_PROXY')
620 if api_token is None:
621 if api_client is None:
622 api_token = config.get('ARVADOS_API_TOKEN')
624 api_token = api_client.api_token
625 elif api_client is not None:
627 "can't build KeepClient with both API client and token")
628 if local_store is None:
629 local_store = os.environ.get('KEEP_LOCAL_STORE')
631 self.block_cache = block_cache if block_cache else KeepBlockCache()
632 self.timeout = timeout
633 self.proxy_timeout = proxy_timeout
634 self._user_agent_pool = Queue.LifoQueue()
637 self.local_store = local_store
638 self.get = self.local_store_get
639 self.put = self.local_store_put
641 self.num_retries = num_retries
643 if not proxy.endswith('/'):
645 self.api_token = api_token
646 self._gateway_services = {}
647 self._keep_services = [{
649 '_service_root': proxy,
651 self.using_proxy = True
652 self._static_services_list = True
654 # It's important to avoid instantiating an API client
655 # unless we actually need one, for testing's sake.
656 if api_client is None:
657 api_client = arvados.api('v1')
658 self.api_client = api_client
659 self.api_token = api_client.api_token
660 self._gateway_services = {}
661 self._keep_services = None
662 self.using_proxy = None
663 self._static_services_list = False
665 def current_timeout(self, attempt_number):
666 """Return the appropriate timeout to use for this client.
668 The proxy timeout setting if the backend service is currently a proxy,
669 the regular timeout setting otherwise. The `attempt_number` indicates
670 how many times the operation has been tried already (starting from 0
671 for the first try), and scales the connection timeout portion of the
672 return value accordingly.
675 # TODO(twp): the timeout should be a property of a
676 # KeepService, not a KeepClient. See #4488.
677 t = self.proxy_timeout if self.using_proxy else self.timeout
678 return (t[0] * (1 << attempt_number), t[1])
680 def build_services_list(self, force_rebuild=False):
681 if (self._static_services_list or
682 (self._keep_services and not force_rebuild)):
686 keep_services = self.api_client.keep_services().accessible()
687 except Exception: # API server predates Keep services.
688 keep_services = self.api_client.keep_disks().list()
690 accessible = keep_services.execute().get('items')
692 raise arvados.errors.NoKeepServersError()
694 # Precompute the base URI for each service.
696 host = r['service_host']
697 if not host.startswith('[') and host.find(':') >= 0:
698 # IPv6 URIs must be formatted like http://[::1]:80/...
699 host = '[' + host + ']'
700 r['_service_root'] = "{}://{}:{:d}/".format(
701 'https' if r['service_ssl_flag'] else 'http',
705 # Gateway services are only used when specified by UUID,
706 # so there's nothing to gain by filtering them by
708 self._gateway_services = {ks.get('uuid'): ks for ks in accessible}
709 _logger.debug(str(self._gateway_services))
711 self._keep_services = [
712 ks for ks in accessible
713 if ks.get('service_type') in ['disk', 'proxy']]
714 _logger.debug(str(self._keep_services))
716 self.using_proxy = any(ks.get('service_type') == 'proxy'
717 for ks in self._keep_services)
719 def _service_weight(self, data_hash, service_uuid):
720 """Compute the weight of a Keep service endpoint for a data
721 block with a known hash.
723 The weight is md5(h + u) where u is the last 15 characters of
724 the service endpoint's UUID.
726 return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
728 def weighted_service_roots(self, locator, force_rebuild=False):
729 """Return an array of Keep service endpoints, in the order in
730 which they should be probed when reading or writing data with
731 the given hash+hints.
733 self.build_services_list(force_rebuild)
737 # Use the services indicated by the given +K@... remote
738 # service hints, if any are present and can be resolved to a
740 for hint in locator.hints:
741 if hint.startswith('K@'):
744 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
745 elif len(hint) == 29:
746 svc = self._gateway_services.get(hint[2:])
748 sorted_roots.append(svc['_service_root'])
750 # Sort the available local services by weight (heaviest first)
751 # for this locator, and return their service_roots (base URIs)
753 sorted_roots.extend([
754 svc['_service_root'] for svc in sorted(
757 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
758 _logger.debug("{}: {}".format(locator, sorted_roots))
761 def map_new_services(self, roots_map, locator, force_rebuild, **headers):
762 # roots_map is a dictionary, mapping Keep service root strings
763 # to KeepService objects. Poll for Keep services, and add any
764 # new ones to roots_map. Return the current list of local
766 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
767 local_roots = self.weighted_service_roots(locator, force_rebuild)
768 for root in local_roots:
769 if root not in roots_map:
770 roots_map[root] = self.KeepService(
771 root, self._user_agent_pool, **headers)
775 def _check_loop_result(result):
776 # KeepClient RetryLoops should save results as a 2-tuple: the
777 # actual result of the request, and the number of servers available
778 # to receive the request this round.
779 # This method returns True if there's a real result, False if
780 # there are no more servers available, otherwise None.
781 if isinstance(result, Exception):
783 result, tried_server_count = result
784 if (result is not None) and (result is not False):
786 elif tried_server_count < 1:
787 _logger.info("No more Keep services to try; giving up")
792 def get_from_cache(self, loc):
793 """Fetch a block only if is in the cache, otherwise return None."""
794 slot = self.block_cache.get(loc)
795 if slot is not None and slot.ready.is_set():
801 def get(self, loc_s, num_retries=None):
802 """Get data from Keep.
804 This method fetches one or more blocks of data from Keep. It
805 sends a request each Keep service registered with the API
806 server (or the proxy provided when this client was
807 instantiated), then each service named in location hints, in
808 sequence. As soon as one service provides the data, it's
812 * loc_s: A string of one or more comma-separated locators to fetch.
813 This method returns the concatenation of these blocks.
814 * num_retries: The number of times to retry GET requests to
815 *each* Keep server if it returns temporary failures, with
816 exponential backoff. Note that, in each loop, the method may try
817 to fetch data from every available Keep service, along with any
818 that are named in location hints in the locator. The default value
819 is set when the KeepClient is initialized.
822 return ''.join(self.get(x) for x in loc_s.split(','))
823 locator = KeepLocator(loc_s)
824 slot, first = self.block_cache.reserve_cache(locator.md5sum)
829 # If the locator has hints specifying a prefix (indicating a
830 # remote keepproxy) or the UUID of a local gateway service,
831 # read data from the indicated service(s) instead of the usual
832 # list of local disk services.
833 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
834 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
835 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
836 for hint in locator.hints if (
837 hint.startswith('K@') and
839 self._gateway_services.get(hint[2:])
841 # Map root URLs to their KeepService objects.
843 root: self.KeepService(root, self._user_agent_pool)
844 for root in hint_roots
847 # See #3147 for a discussion of the loop implementation. Highlights:
848 # * Refresh the list of Keep services after each failure, in case
849 # it's being updated.
850 # * Retry until we succeed, we're out of retries, or every available
851 # service has returned permanent failure.
855 loop = retry.RetryLoop(num_retries, self._check_loop_result,
857 for tries_left in loop:
859 sorted_roots = self.map_new_services(
861 force_rebuild=(tries_left < num_retries))
862 except Exception as error:
863 loop.save_result(error)
866 # Query KeepService objects that haven't returned
867 # permanent failure, in our specified shuffle order.
868 services_to_try = [roots_map[root]
869 for root in sorted_roots
870 if roots_map[root].usable()]
871 for keep_service in services_to_try:
872 blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
875 loop.save_result((blob, len(services_to_try)))
877 # Always cache the result, then return it if we succeeded.
879 self.block_cache.cap_cache()
883 # Q: Including 403 is necessary for the Keep tests to continue
884 # passing, but maybe they should expect KeepReadError instead?
885 not_founds = sum(1 for key in sorted_roots
886 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
887 service_errors = ((key, roots_map[key].last_result()['error'])
888 for key in sorted_roots)
890 raise arvados.errors.KeepReadError(
891 "failed to read {}: no Keep services available ({})".format(
892 loc_s, loop.last_result()))
893 elif not_founds == len(sorted_roots):
894 raise arvados.errors.NotFoundError(
895 "{} not found".format(loc_s), service_errors)
897 raise arvados.errors.KeepReadError(
898 "failed to read {}".format(loc_s), service_errors, label="service")
901 def put(self, data, copies=2, num_retries=None):
902 """Save data in Keep.
904 This method will get a list of Keep services from the API server, and
905 send the data to each one simultaneously in a new thread. Once the
906 uploads are finished, if enough copies are saved, this method returns
907 the most recent HTTP response body. If requests fail to upload
908 enough copies, this method raises KeepWriteError.
911 * data: The string of data to upload.
912 * copies: The number of copies that the user requires be saved.
914 * num_retries: The number of times to retry PUT requests to
915 *each* Keep server if it returns temporary failures, with
916 exponential backoff. The default value is set when the
917 KeepClient is initialized.
920 if isinstance(data, unicode):
921 data = data.encode("ascii")
922 elif not isinstance(data, str):
923 raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'")
925 data_hash = hashlib.md5(data).hexdigest()
928 locator = KeepLocator(data_hash + '+' + str(len(data)))
932 # Tell the proxy how many copies we want it to store
933 headers['X-Keep-Desired-Replication'] = str(copies)
935 thread_limiter = KeepClient.ThreadLimiter(copies)
936 loop = retry.RetryLoop(num_retries, self._check_loop_result,
938 for tries_left in loop:
940 local_roots = self.map_new_services(
942 force_rebuild=(tries_left < num_retries), **headers)
943 except Exception as error:
944 loop.save_result(error)
948 for service_root, ks in roots_map.iteritems():
951 t = KeepClient.KeepWriterThread(
955 service_root=service_root,
956 thread_limiter=thread_limiter,
957 timeout=self.current_timeout(num_retries-tries_left))
962 loop.save_result((thread_limiter.done() >= copies, len(threads)))
965 return thread_limiter.response()
967 raise arvados.errors.KeepWriteError(
968 "failed to write {}: no Keep services available ({})".format(
969 data_hash, loop.last_result()))
971 service_errors = ((key, roots_map[key].last_result()['error'])
972 for key in local_roots
973 if roots_map[key].last_result()['error'])
974 raise arvados.errors.KeepWriteError(
975 "failed to write {} (wanted {} copies but wrote {})".format(
976 data_hash, copies, thread_limiter.done()), service_errors, label="service")
978 def local_store_put(self, data, copies=1, num_retries=None):
981 This method is used in place of the real put() method when
982 using local storage (see constructor's local_store argument).
984 copies and num_retries arguments are ignored: they are here
985 only for the sake of offering the same call signature as
988 Data stored this way can be retrieved via local_store_get().
990 md5 = hashlib.md5(data).hexdigest()
991 locator = '%s+%d' % (md5, len(data))
992 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
994 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
995 os.path.join(self.local_store, md5))
998 def local_store_get(self, loc_s, num_retries=None):
999 """Companion to local_store_put()."""
1001 locator = KeepLocator(loc_s)
1003 raise arvados.errors.NotFoundError(
1004 "Invalid data locator: '%s'" % loc_s)
1005 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1007 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1010 def is_cached(self, locator):
1011 return self.block_cache.reserve_cache(expect_hash)