28 import arvados.config as config
30 import arvados.retry as retry
33 _logger = logging.getLogger('arvados.keep')
34 global_client_object = None
37 class KeepLocator(object):
38 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
39 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
41 def __init__(self, locator_str):
44 self._perm_expiry = None
45 pieces = iter(locator_str.split('+'))
46 self.md5sum = next(pieces)
48 self.size = int(next(pieces))
52 if self.HINT_RE.match(hint) is None:
53 raise ValueError("invalid hint format: {}".format(hint))
54 elif hint.startswith('A'):
55 self.parse_permission_hint(hint)
57 self.hints.append(hint)
61 str(s) for s in [self.md5sum, self.size,
62 self.permission_hint()] + self.hints
66 if self.size is not None:
67 return "%s+%i" % (self.md5sum, self.size)
71 def _make_hex_prop(name, length):
72 # Build and return a new property with the given name that
73 # must be a hex string of the given length.
74 data_name = '_{}'.format(name)
76 return getattr(self, data_name)
77 def setter(self, hex_str):
78 if not arvados.util.is_hex(hex_str, length):
79 raise ValueError("{} must be a {}-digit hex string: {}".
80 format(name, length, hex_str))
81 setattr(self, data_name, hex_str)
82 return property(getter, setter)
84 md5sum = _make_hex_prop('md5sum', 32)
85 perm_sig = _make_hex_prop('perm_sig', 40)
88 def perm_expiry(self):
89 return self._perm_expiry
92 def perm_expiry(self, value):
93 if not arvados.util.is_hex(value, 1, 8):
95 "permission timestamp must be a hex Unix timestamp: {}".
97 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
99 def permission_hint(self):
100 data = [self.perm_sig, self.perm_expiry]
103 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
104 return "A{}@{:08x}".format(*data)
106 def parse_permission_hint(self, s):
108 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
110 raise ValueError("bad permission hint {}".format(s))
112 def permission_expired(self, as_of_dt=None):
113 if self.perm_expiry is None:
115 elif as_of_dt is None:
116 as_of_dt = datetime.datetime.now()
117 return self.perm_expiry <= as_of_dt
121 """Simple interface to a global KeepClient object.
123 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
124 own API client. The global KeepClient will build an API client from the
125 current Arvados configuration, which may not match the one you built.
130 def global_client_object(cls):
131 global global_client_object
132 # Previously, KeepClient would change its behavior at runtime based
133 # on these configuration settings. We simulate that behavior here
134 # by checking the values and returning a new KeepClient if any of
136 key = (config.get('ARVADOS_API_HOST'),
137 config.get('ARVADOS_API_TOKEN'),
138 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
139 config.get('ARVADOS_KEEP_PROXY'),
140 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
141 os.environ.get('KEEP_LOCAL_STORE'))
142 if (global_client_object is None) or (cls._last_key != key):
143 global_client_object = KeepClient()
145 return global_client_object
148 def get(locator, **kwargs):
149 return Keep.global_client_object().get(locator, **kwargs)
152 def put(data, **kwargs):
153 return Keep.global_client_object().put(data, **kwargs)
155 class KeepBlockCache(object):
156 # Default RAM cache is 256MiB
157 def __init__(self, cache_max=(256 * 1024 * 1024)):
158 self.cache_max = cache_max
160 self._cache_lock = threading.Lock()
162 class CacheSlot(object):
163 def __init__(self, locator):
164 self.locator = locator
165 self.ready = threading.Event()
172 def set(self, value):
177 if self.content is None:
180 return len(self.content)
183 '''Cap the cache size to self.cache_max'''
184 with self._cache_lock:
185 # Select all slots except those where ready.is_set() and content is
186 # None (that means there was an error reading the block).
187 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
188 sm = sum([slot.size() for slot in self._cache])
189 while len(self._cache) > 0 and sm > self.cache_max:
190 for i in xrange(len(self._cache)-1, -1, -1):
191 if self._cache[i].ready.is_set():
194 sm = sum([slot.size() for slot in self._cache])
196 def _get(self, locator):
197 # Test if the locator is already in the cache
198 for i in xrange(0, len(self._cache)):
199 if self._cache[i].locator == locator:
202 # move it to the front
204 self._cache.insert(0, n)
208 def get(self, locator):
209 with self._cache_lock:
210 return self._get(locator)
212 def reserve_cache(self, locator):
213 '''Reserve a cache slot for the specified locator,
214 or return the existing slot.'''
215 with self._cache_lock:
216 n = self._get(locator)
220 # Add a new cache slot for the locator
221 n = KeepBlockCache.CacheSlot(locator)
222 self._cache.insert(0, n)
225 class KeepClient(object):
227 # Default Keep server connection timeout: 2 seconds
228 # Default Keep server read timeout: 300 seconds
229 # Default Keep proxy connection timeout: 20 seconds
230 # Default Keep proxy read timeout: 300 seconds
231 DEFAULT_TIMEOUT = (2, 300)
232 DEFAULT_PROXY_TIMEOUT = (20, 300)
234 class ThreadLimiter(object):
236 Limit the number of threads running at a given time to
237 {desired successes} minus {successes reported}. When successes
238 reported == desired, wake up the remaining threads and tell
241 Should be used in a "with" block.
243 def __init__(self, todo):
246 self._response = None
247 self._todo_lock = threading.Semaphore(todo)
248 self._done_lock = threading.Lock()
251 self._todo_lock.acquire()
254 def __exit__(self, type, value, traceback):
255 self._todo_lock.release()
257 def shall_i_proceed(self):
259 Return true if the current thread should do stuff. Return
260 false if the current thread should just stop.
262 with self._done_lock:
263 return (self._done < self._todo)
265 def save_response(self, response_body, replicas_stored):
267 Records a response body (a locator, possibly signed) returned by
268 the Keep server. It is not necessary to save more than
269 one response, since we presume that any locator returned
270 in response to a successful request is valid.
272 with self._done_lock:
273 self._done += replicas_stored
274 self._response = response_body
278 Returns the body from the response to a PUT request.
280 with self._done_lock:
281 return self._response
285 Return how many successes were reported.
287 with self._done_lock:
291 class KeepService(object):
292 """Make requests to a single Keep service, and track results.
294 A KeepService is intended to last long enough to perform one
295 transaction (GET or PUT) against one Keep service. This can
296 involve calling either get() or put() multiple times in order
297 to retry after transient failures. However, calling both get()
298 and put() on a single instance -- or using the same instance
299 to access two different Keep services -- will not produce
306 arvados.errors.HttpError,
309 def __init__(self, root, user_agent_pool=Queue.LifoQueue(), **headers):
311 self._user_agent_pool = user_agent_pool
312 self._result = {'error': None}
315 self.get_headers = {'Accept': 'application/octet-stream'}
316 self.get_headers.update(headers)
317 self.put_headers = headers
320 """Is it worth attempting a request?"""
324 """Did the request succeed or encounter permanent failure?"""
325 return self._result['error'] == False or not self._usable
327 def last_result(self):
330 def _get_user_agent(self):
332 return self._user_agent_pool.get(False)
336 def _put_user_agent(self, ua):
339 self._user_agent_pool.put(ua, False)
343 def _socket_open(self, family, socktype, protocol, address):
344 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
345 s = socket.socket(family, socktype, protocol)
346 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
347 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
348 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
351 def get(self, locator, timeout=None):
352 # locator is a KeepLocator object.
353 url = self.root + str(locator)
354 _logger.debug("Request: GET %s", url)
355 curl = self._get_user_agent()
357 with timer.Timer() as t:
359 response_body = StringIO.StringIO()
360 curl.setopt(pycurl.NOSIGNAL, 1)
361 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
362 curl.setopt(pycurl.URL, url.encode('utf-8'))
363 curl.setopt(pycurl.HTTPHEADER, [
364 '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
365 curl.setopt(pycurl.WRITEDATA, response_body)
366 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
367 self._setcurltimeouts(curl, timeout)
370 except Exception as e:
371 raise arvados.errors.HttpError(0, str(e))
373 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
374 'body': response_body.getvalue(),
375 'headers': self._headers,
378 ok = retry.check_http_response_success(self._result['status_code'])
380 self._result['error'] = arvados.errors.HttpError(
381 self._result['status_code'],
382 self._headers.get('x-status-line', 'Error'))
383 except self.HTTP_ERRORS as e:
388 self._usable = ok != False
390 _logger.debug("Request fail: GET %s => %s: %s",
391 url, type(self._result['error']), str(self._result['error']))
392 # Don't return this ua to the pool, in case it's broken.
395 self._put_user_agent(curl)
396 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
397 self._result['status_code'],
398 len(self._result['body']),
400 (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
401 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
402 if resp_md5 != locator.md5sum:
403 _logger.warning("Checksum fail: md5(%s) = %s",
405 self._result['error'] = arvados.errors.HttpError(
408 return self._result['body']
410 def put(self, hash_s, body, timeout=None):
411 url = self.root + hash_s
412 _logger.debug("Request: PUT %s", url)
413 curl = self._get_user_agent()
416 response_body = StringIO.StringIO()
417 curl.setopt(pycurl.NOSIGNAL, 1)
418 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
419 curl.setopt(pycurl.URL, url.encode('utf-8'))
420 curl.setopt(pycurl.POSTFIELDS, body)
421 curl.setopt(pycurl.CUSTOMREQUEST, 'PUT')
422 curl.setopt(pycurl.HTTPHEADER, [
423 '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
424 curl.setopt(pycurl.WRITEDATA, response_body)
425 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
426 self._setcurltimeouts(curl, timeout)
429 except Exception as e:
430 raise arvados.errors.HttpError(0, str(e))
432 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
433 'body': response_body.getvalue(),
434 'headers': self._headers,
437 ok = retry.check_http_response_success(self._result['status_code'])
439 self._result['error'] = arvados.errors.HttpError(
440 self._result['status_code'],
441 self._headers.get('x-status-line', 'Error'))
442 except self.HTTP_ERRORS as e:
447 self._usable = ok != False # still usable if ok is True or None
449 _logger.debug("Request fail: PUT %s => %s: %s",
450 url, type(self._result['error']), str(self._result['error']))
451 # Don't return this ua to the pool, in case it's broken.
454 self._put_user_agent(curl)
457 def _setcurltimeouts(self, curl, timeouts):
460 elif isinstance(timeouts, tuple):
461 conn_t, xfer_t = timeouts
463 conn_t, xfer_t = (timeouts, timeouts)
464 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
465 curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
467 def _headerfunction(self, header_line):
468 header_line = header_line.decode('iso-8859-1')
469 if ':' in header_line:
470 name, value = header_line.split(':', 1)
471 name = name.strip().lower()
472 value = value.strip()
474 name = self._lastheadername
475 value = self._headers[name] + ' ' + header_line.strip()
476 elif header_line.startswith('HTTP/'):
477 name = 'x-status-line'
480 _logger.error("Unexpected header line: %s", header_line)
482 self._lastheadername = name
483 self._headers[name] = value
484 # Returning None implies all bytes were written
487 class KeepWriterThread(threading.Thread):
489 Write a blob of data to the given Keep server. On success, call
490 save_response() of the given ThreadLimiter to save the returned
493 def __init__(self, keep_service, **kwargs):
494 super(KeepClient.KeepWriterThread, self).__init__()
495 self.service = keep_service
497 self._success = False
503 with self.args['thread_limiter'] as limiter:
504 if not limiter.shall_i_proceed():
505 # My turn arrived, but the job has been done without
508 self.run_with_limiter(limiter)
510 def run_with_limiter(self, limiter):
511 if self.service.finished():
513 _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
514 str(threading.current_thread()),
515 self.args['data_hash'],
516 len(self.args['data']),
517 self.args['service_root'])
518 self._success = bool(self.service.put(
519 self.args['data_hash'],
521 timeout=self.args.get('timeout', None)))
522 result = self.service.last_result()
524 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
525 str(threading.current_thread()),
526 self.args['data_hash'],
527 len(self.args['data']),
528 self.args['service_root'])
529 # Tick the 'done' counter for the number of replica
530 # reported stored by the server, for the case that
531 # we're talking to a proxy or other backend that
532 # stores to multiple copies for us.
534 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
535 except (KeyError, ValueError):
537 limiter.save_response(result['body'].strip(), replicas_stored)
538 elif result.get('status_code', None):
539 _logger.debug("Request fail: PUT %s => %s %s",
540 self.args['data_hash'],
541 result['status_code'],
545 def __init__(self, api_client=None, proxy=None,
546 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
547 api_token=None, local_store=None, block_cache=None,
548 num_retries=0, session=None):
549 """Initialize a new KeepClient.
553 The API client to use to find Keep services. If not
554 provided, KeepClient will build one from available Arvados
558 If specified, this KeepClient will send requests to this Keep
559 proxy. Otherwise, KeepClient will fall back to the setting of the
560 ARVADOS_KEEP_PROXY configuration setting. If you want to ensure
561 KeepClient does not use a proxy, pass in an empty string.
564 The initial timeout (in seconds) for HTTP requests to Keep
565 non-proxy servers. A tuple of two floats is interpreted as
566 (connection_timeout, read_timeout): see
567 http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
568 Because timeouts are often a result of transient server load, the
569 actual connection timeout will be increased by a factor of two on
574 The initial timeout (in seconds) for HTTP requests to
575 Keep proxies. A tuple of two floats is interpreted as
576 (connection_timeout, read_timeout). The behavior described
577 above for adjusting connection timeouts on retry also applies.
581 If you're not using an API client, but only talking
582 directly to a Keep proxy, this parameter specifies an API token
583 to authenticate Keep requests. It is an error to specify both
584 api_client and api_token. If you specify neither, KeepClient
585 will use one available from the Arvados configuration.
588 If specified, this KeepClient will bypass Keep
589 services, and save data to the named directory. If unspecified,
590 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
591 environment variable. If you want to ensure KeepClient does not
592 use local storage, pass in an empty string. This is primarily
593 intended to mock a server for testing.
596 The default number of times to retry failed requests.
597 This will be used as the default num_retries value when get() and
598 put() are called. Default 0.
600 self.lock = threading.Lock()
602 proxy = config.get('ARVADOS_KEEP_PROXY')
603 if api_token is None:
604 if api_client is None:
605 api_token = config.get('ARVADOS_API_TOKEN')
607 api_token = api_client.api_token
608 elif api_client is not None:
610 "can't build KeepClient with both API client and token")
611 if local_store is None:
612 local_store = os.environ.get('KEEP_LOCAL_STORE')
614 self.block_cache = block_cache if block_cache else KeepBlockCache()
615 self.timeout = timeout
616 self.proxy_timeout = proxy_timeout
617 self._user_agent_pool = Queue.LifoQueue()
620 self.local_store = local_store
621 self.get = self.local_store_get
622 self.put = self.local_store_put
624 self.num_retries = num_retries
626 if not proxy.endswith('/'):
628 self.api_token = api_token
629 self._gateway_services = {}
630 self._keep_services = [{
632 '_service_root': proxy,
634 self.using_proxy = True
635 self._static_services_list = True
637 # It's important to avoid instantiating an API client
638 # unless we actually need one, for testing's sake.
639 if api_client is None:
640 api_client = arvados.api('v1')
641 self.api_client = api_client
642 self.api_token = api_client.api_token
643 self._gateway_services = {}
644 self._keep_services = None
645 self.using_proxy = None
646 self._static_services_list = False
648 def current_timeout(self, attempt_number):
649 """Return the appropriate timeout to use for this client.
651 The proxy timeout setting if the backend service is currently a proxy,
652 the regular timeout setting otherwise. The `attempt_number` indicates
653 how many times the operation has been tried already (starting from 0
654 for the first try), and scales the connection timeout portion of the
655 return value accordingly.
658 # TODO(twp): the timeout should be a property of a
659 # KeepService, not a KeepClient. See #4488.
660 t = self.proxy_timeout if self.using_proxy else self.timeout
661 return (t[0] * (1 << attempt_number), t[1])
663 def build_services_list(self, force_rebuild=False):
664 if (self._static_services_list or
665 (self._keep_services and not force_rebuild)):
669 keep_services = self.api_client.keep_services().accessible()
670 except Exception: # API server predates Keep services.
671 keep_services = self.api_client.keep_disks().list()
673 accessible = keep_services.execute().get('items')
675 raise arvados.errors.NoKeepServersError()
677 # Precompute the base URI for each service.
679 host = r['service_host']
680 if not host.startswith('[') and host.find(':') >= 0:
681 # IPv6 URIs must be formatted like http://[::1]:80/...
682 host = '[' + host + ']'
683 r['_service_root'] = "{}://{}:{:d}/".format(
684 'https' if r['service_ssl_flag'] else 'http',
688 # Gateway services are only used when specified by UUID,
689 # so there's nothing to gain by filtering them by
691 self._gateway_services = {ks.get('uuid'): ks for ks in accessible}
692 _logger.debug(str(self._gateway_services))
694 self._keep_services = [
695 ks for ks in accessible
696 if ks.get('service_type') in ['disk', 'proxy']]
697 _logger.debug(str(self._keep_services))
699 self.using_proxy = any(ks.get('service_type') == 'proxy'
700 for ks in self._keep_services)
702 def _service_weight(self, data_hash, service_uuid):
703 """Compute the weight of a Keep service endpoint for a data
704 block with a known hash.
706 The weight is md5(h + u) where u is the last 15 characters of
707 the service endpoint's UUID.
709 return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
711 def weighted_service_roots(self, locator, force_rebuild=False):
712 """Return an array of Keep service endpoints, in the order in
713 which they should be probed when reading or writing data with
714 the given hash+hints.
716 self.build_services_list(force_rebuild)
720 # Use the services indicated by the given +K@... remote
721 # service hints, if any are present and can be resolved to a
723 for hint in locator.hints:
724 if hint.startswith('K@'):
727 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
728 elif len(hint) == 29:
729 svc = self._gateway_services.get(hint[2:])
731 sorted_roots.append(svc['_service_root'])
733 # Sort the available local services by weight (heaviest first)
734 # for this locator, and return their service_roots (base URIs)
736 sorted_roots.extend([
737 svc['_service_root'] for svc in sorted(
740 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
741 _logger.debug("{}: {}".format(locator, sorted_roots))
744 def map_new_services(self, roots_map, locator, force_rebuild, **headers):
745 # roots_map is a dictionary, mapping Keep service root strings
746 # to KeepService objects. Poll for Keep services, and add any
747 # new ones to roots_map. Return the current list of local
749 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
750 local_roots = self.weighted_service_roots(locator, force_rebuild)
751 for root in local_roots:
752 if root not in roots_map:
753 roots_map[root] = self.KeepService(
754 root, self._user_agent_pool, **headers)
758 def _check_loop_result(result):
759 # KeepClient RetryLoops should save results as a 2-tuple: the
760 # actual result of the request, and the number of servers available
761 # to receive the request this round.
762 # This method returns True if there's a real result, False if
763 # there are no more servers available, otherwise None.
764 if isinstance(result, Exception):
766 result, tried_server_count = result
767 if (result is not None) and (result is not False):
769 elif tried_server_count < 1:
770 _logger.info("No more Keep services to try; giving up")
775 def get_from_cache(self, loc):
776 """Fetch a block only if is in the cache, otherwise return None."""
777 slot = self.block_cache.get(loc)
778 if slot.ready.is_set():
784 def get(self, loc_s, num_retries=None):
785 """Get data from Keep.
787 This method fetches one or more blocks of data from Keep. It
788 sends a request each Keep service registered with the API
789 server (or the proxy provided when this client was
790 instantiated), then each service named in location hints, in
791 sequence. As soon as one service provides the data, it's
795 * loc_s: A string of one or more comma-separated locators to fetch.
796 This method returns the concatenation of these blocks.
797 * num_retries: The number of times to retry GET requests to
798 *each* Keep server if it returns temporary failures, with
799 exponential backoff. Note that, in each loop, the method may try
800 to fetch data from every available Keep service, along with any
801 that are named in location hints in the locator. The default value
802 is set when the KeepClient is initialized.
805 return ''.join(self.get(x) for x in loc_s.split(','))
806 locator = KeepLocator(loc_s)
807 slot, first = self.block_cache.reserve_cache(locator.md5sum)
812 # If the locator has hints specifying a prefix (indicating a
813 # remote keepproxy) or the UUID of a local gateway service,
814 # read data from the indicated service(s) instead of the usual
815 # list of local disk services.
816 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
817 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
818 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
819 for hint in locator.hints if (
820 hint.startswith('K@') and
822 self._gateway_services.get(hint[2:])
824 # Map root URLs to their KeepService objects.
826 root: self.KeepService(root, self._user_agent_pool)
827 for root in hint_roots
830 # See #3147 for a discussion of the loop implementation. Highlights:
831 # * Refresh the list of Keep services after each failure, in case
832 # it's being updated.
833 # * Retry until we succeed, we're out of retries, or every available
834 # service has returned permanent failure.
838 loop = retry.RetryLoop(num_retries, self._check_loop_result,
840 for tries_left in loop:
842 sorted_roots = self.map_new_services(
844 force_rebuild=(tries_left < num_retries))
845 except Exception as error:
846 loop.save_result(error)
849 # Query KeepService objects that haven't returned
850 # permanent failure, in our specified shuffle order.
851 services_to_try = [roots_map[root]
852 for root in sorted_roots
853 if roots_map[root].usable()]
854 for keep_service in services_to_try:
855 blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
858 loop.save_result((blob, len(services_to_try)))
860 # Always cache the result, then return it if we succeeded.
862 self.block_cache.cap_cache()
866 # Q: Including 403 is necessary for the Keep tests to continue
867 # passing, but maybe they should expect KeepReadError instead?
868 not_founds = sum(1 for key in sorted_roots
869 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
870 service_errors = ((key, roots_map[key].last_result()['error'])
871 for key in sorted_roots)
873 raise arvados.errors.KeepReadError(
874 "failed to read {}: no Keep services available ({})".format(
875 loc_s, loop.last_result()))
876 elif not_founds == len(sorted_roots):
877 raise arvados.errors.NotFoundError(
878 "{} not found".format(loc_s), service_errors)
880 raise arvados.errors.KeepReadError(
881 "failed to read {}".format(loc_s), service_errors, label="service")
884 def put(self, data, copies=2, num_retries=None):
885 """Save data in Keep.
887 This method will get a list of Keep services from the API server, and
888 send the data to each one simultaneously in a new thread. Once the
889 uploads are finished, if enough copies are saved, this method returns
890 the most recent HTTP response body. If requests fail to upload
891 enough copies, this method raises KeepWriteError.
894 * data: The string of data to upload.
895 * copies: The number of copies that the user requires be saved.
897 * num_retries: The number of times to retry PUT requests to
898 *each* Keep server if it returns temporary failures, with
899 exponential backoff. The default value is set when the
900 KeepClient is initialized.
903 if isinstance(data, unicode):
904 data = data.encode("ascii")
905 elif not isinstance(data, str):
906 raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'")
908 data_hash = hashlib.md5(data).hexdigest()
911 locator = KeepLocator(data_hash + '+' + str(len(data)))
915 # Tell the proxy how many copies we want it to store
916 headers['X-Keep-Desired-Replication'] = str(copies)
918 thread_limiter = KeepClient.ThreadLimiter(copies)
919 loop = retry.RetryLoop(num_retries, self._check_loop_result,
921 for tries_left in loop:
923 local_roots = self.map_new_services(
925 force_rebuild=(tries_left < num_retries), **headers)
926 except Exception as error:
927 loop.save_result(error)
931 for service_root, ks in roots_map.iteritems():
934 t = KeepClient.KeepWriterThread(
938 service_root=service_root,
939 thread_limiter=thread_limiter,
940 timeout=self.current_timeout(num_retries-tries_left))
945 loop.save_result((thread_limiter.done() >= copies, len(threads)))
948 return thread_limiter.response()
950 raise arvados.errors.KeepWriteError(
951 "failed to write {}: no Keep services available ({})".format(
952 data_hash, loop.last_result()))
954 service_errors = ((key, roots_map[key].last_result()['error'])
955 for key in local_roots
956 if roots_map[key].last_result()['error'])
957 raise arvados.errors.KeepWriteError(
958 "failed to write {} (wanted {} copies but wrote {})".format(
959 data_hash, copies, thread_limiter.done()), service_errors, label="service")
961 def local_store_put(self, data, copies=1, num_retries=None):
964 This method is used in place of the real put() method when
965 using local storage (see constructor's local_store argument).
967 copies and num_retries arguments are ignored: they are here
968 only for the sake of offering the same call signature as
971 Data stored this way can be retrieved via local_store_get().
973 md5 = hashlib.md5(data).hexdigest()
974 locator = '%s+%d' % (md5, len(data))
975 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
977 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
978 os.path.join(self.local_store, md5))
981 def local_store_get(self, loc_s, num_retries=None):
982 """Companion to local_store_put()."""
984 locator = KeepLocator(loc_s)
986 raise arvados.errors.NotFoundError(
987 "Invalid data locator: '%s'" % loc_s)
988 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
990 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
993 def is_cached(self, locator):
994 return self.block_cache.reserve_cache(expect_hash)