28 import arvados.config as config
30 import arvados.retry as retry
33 _logger = logging.getLogger('arvados.keep')
34 global_client_object = None
37 class KeepLocator(object):
38 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
39 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
41 def __init__(self, locator_str):
44 self._perm_expiry = None
45 pieces = iter(locator_str.split('+'))
46 self.md5sum = next(pieces)
48 self.size = int(next(pieces))
52 if self.HINT_RE.match(hint) is None:
53 raise ValueError("invalid hint format: {}".format(hint))
54 elif hint.startswith('A'):
55 self.parse_permission_hint(hint)
57 self.hints.append(hint)
61 str(s) for s in [self.md5sum, self.size,
62 self.permission_hint()] + self.hints
66 if self.size is not None:
67 return "%s+%i" % (self.md5sum, self.size)
71 def _make_hex_prop(name, length):
72 # Build and return a new property with the given name that
73 # must be a hex string of the given length.
74 data_name = '_{}'.format(name)
76 return getattr(self, data_name)
77 def setter(self, hex_str):
78 if not arvados.util.is_hex(hex_str, length):
79 raise ValueError("{} must be a {}-digit hex string: {}".
80 format(name, length, hex_str))
81 setattr(self, data_name, hex_str)
82 return property(getter, setter)
84 md5sum = _make_hex_prop('md5sum', 32)
85 perm_sig = _make_hex_prop('perm_sig', 40)
88 def perm_expiry(self):
89 return self._perm_expiry
92 def perm_expiry(self, value):
93 if not arvados.util.is_hex(value, 1, 8):
95 "permission timestamp must be a hex Unix timestamp: {}".
97 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
99 def permission_hint(self):
100 data = [self.perm_sig, self.perm_expiry]
103 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
104 return "A{}@{:08x}".format(*data)
106 def parse_permission_hint(self, s):
108 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
110 raise ValueError("bad permission hint {}".format(s))
112 def permission_expired(self, as_of_dt=None):
113 if self.perm_expiry is None:
115 elif as_of_dt is None:
116 as_of_dt = datetime.datetime.now()
117 return self.perm_expiry <= as_of_dt
121 """Simple interface to a global KeepClient object.
123 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
124 own API client. The global KeepClient will build an API client from the
125 current Arvados configuration, which may not match the one you built.
130 def global_client_object(cls):
131 global global_client_object
132 # Previously, KeepClient would change its behavior at runtime based
133 # on these configuration settings. We simulate that behavior here
134 # by checking the values and returning a new KeepClient if any of
136 key = (config.get('ARVADOS_API_HOST'),
137 config.get('ARVADOS_API_TOKEN'),
138 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
139 config.get('ARVADOS_KEEP_PROXY'),
140 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
141 os.environ.get('KEEP_LOCAL_STORE'))
142 if (global_client_object is None) or (cls._last_key != key):
143 global_client_object = KeepClient()
145 return global_client_object
148 def get(locator, **kwargs):
149 return Keep.global_client_object().get(locator, **kwargs)
152 def put(data, **kwargs):
153 return Keep.global_client_object().put(data, **kwargs)
155 class KeepBlockCache(object):
156 # Default RAM cache is 256MiB
157 def __init__(self, cache_max=(256 * 1024 * 1024)):
158 self.cache_max = cache_max
160 self._cache_lock = threading.Lock()
162 class CacheSlot(object):
163 def __init__(self, locator):
164 self.locator = locator
165 self.ready = threading.Event()
172 def set(self, value):
177 if self.content is None:
180 return len(self.content)
183 '''Cap the cache size to self.cache_max'''
184 with self._cache_lock:
185 # Select all slots except those where ready.is_set() and content is
186 # None (that means there was an error reading the block).
187 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
188 sm = sum([slot.size() for slot in self._cache])
189 while len(self._cache) > 0 and sm > self.cache_max:
190 for i in xrange(len(self._cache)-1, -1, -1):
191 if self._cache[i].ready.is_set():
194 sm = sum([slot.size() for slot in self._cache])
196 def _get(self, locator):
197 # Test if the locator is already in the cache
198 for i in xrange(0, len(self._cache)):
199 if self._cache[i].locator == locator:
202 # move it to the front
204 self._cache.insert(0, n)
208 def get(self, locator):
209 with self._cache_lock:
210 return self._get(locator)
212 def reserve_cache(self, locator):
213 '''Reserve a cache slot for the specified locator,
214 or return the existing slot.'''
215 with self._cache_lock:
216 n = self._get(locator)
220 # Add a new cache slot for the locator
221 n = KeepBlockCache.CacheSlot(locator)
222 self._cache.insert(0, n)
225 class KeepClient(object):
227 # Default Keep server connection timeout: 2 seconds
228 # Default Keep server read timeout: 300 seconds
229 # Default Keep proxy connection timeout: 20 seconds
230 # Default Keep proxy read timeout: 300 seconds
231 DEFAULT_TIMEOUT = (2, 300)
232 DEFAULT_PROXY_TIMEOUT = (20, 300)
234 class ThreadLimiter(object):
236 Limit the number of threads running at a given time to
237 {desired successes} minus {successes reported}. When successes
238 reported == desired, wake up the remaining threads and tell
241 Should be used in a "with" block.
243 def __init__(self, todo):
246 self._response = None
247 self._todo_lock = threading.Semaphore(todo)
248 self._done_lock = threading.Lock()
251 self._todo_lock.acquire()
254 def __exit__(self, type, value, traceback):
255 self._todo_lock.release()
257 def shall_i_proceed(self):
259 Return true if the current thread should do stuff. Return
260 false if the current thread should just stop.
262 with self._done_lock:
263 return (self._done < self._todo)
265 def save_response(self, response_body, replicas_stored):
267 Records a response body (a locator, possibly signed) returned by
268 the Keep server. It is not necessary to save more than
269 one response, since we presume that any locator returned
270 in response to a successful request is valid.
272 with self._done_lock:
273 self._done += replicas_stored
274 self._response = response_body
278 Returns the body from the response to a PUT request.
280 with self._done_lock:
281 return self._response
285 Return how many successes were reported.
287 with self._done_lock:
291 class KeepService(object):
292 """Make requests to a single Keep service, and track results.
294 A KeepService is intended to last long enough to perform one
295 transaction (GET or PUT) against one Keep service. This can
296 involve calling either get() or put() multiple times in order
297 to retry after transient failures. However, calling both get()
298 and put() on a single instance -- or using the same instance
299 to access two different Keep services -- will not produce
306 arvados.errors.HttpError,
309 def __init__(self, root, user_agent_pool=Queue.LifoQueue(), **headers):
311 self._user_agent_pool = user_agent_pool
312 self._result = {'error': None}
315 self.get_headers = {'Accept': 'application/octet-stream'}
316 self.get_headers.update(headers)
317 self.put_headers = headers
320 """Is it worth attempting a request?"""
324 """Did the request succeed or encounter permanent failure?"""
325 return self._result['error'] == False or not self._usable
327 def last_result(self):
330 def _get_user_agent(self):
332 return self._user_agent_pool.get(False)
336 def _put_user_agent(self, ua):
339 self._user_agent_pool.put(ua, False)
343 def get(self, locator, timeout=None):
344 # locator is a KeepLocator object.
345 url = self.root + str(locator)
346 _logger.debug("Request: GET %s", url)
347 curl = self._get_user_agent()
349 with timer.Timer() as t:
351 response_body = StringIO.StringIO()
352 curl.setopt(pycurl.NOSIGNAL, 1)
353 curl.setopt(pycurl.URL, url.encode('utf-8'))
354 curl.setopt(pycurl.HTTPHEADER, [
355 '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
356 curl.setopt(pycurl.WRITEDATA, response_body)
357 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
358 self._setcurltimeouts(curl, timeout)
361 except Exception as e:
362 raise arvados.errors.HttpError(0, str(e))
364 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
365 'body': response_body.getvalue(),
366 'headers': self._headers,
369 ok = retry.check_http_response_success(self._result['status_code'])
371 self._result['error'] = arvados.errors.HttpError(
372 self._result['status_code'],
373 self._headers.get('x-status-line', 'Error'))
374 except self.HTTP_ERRORS as e:
379 self._usable = ok != False
381 _logger.debug("Request fail: GET %s => %s: %s",
382 url, type(self._result['error']), str(self._result['error']))
383 # Don't return this ua to the pool, in case it's broken.
386 self._put_user_agent(curl)
387 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
388 self._result['status_code'],
389 len(self._result['body']),
391 (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
392 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
393 if resp_md5 != locator.md5sum:
394 _logger.warning("Checksum fail: md5(%s) = %s",
396 self._result['error'] = arvados.errors.HttpError(
399 return self._result['body']
401 def put(self, hash_s, body, timeout=None):
402 url = self.root + hash_s
403 _logger.debug("Request: PUT %s", url)
404 curl = self._get_user_agent()
407 response_body = StringIO.StringIO()
408 curl.setopt(pycurl.NOSIGNAL, 1)
409 curl.setopt(pycurl.URL, url.encode('utf-8'))
410 curl.setopt(pycurl.POSTFIELDS, body)
411 curl.setopt(pycurl.CUSTOMREQUEST, 'PUT')
412 curl.setopt(pycurl.HTTPHEADER, [
413 '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
414 curl.setopt(pycurl.WRITEDATA, response_body)
415 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
416 self._setcurltimeouts(curl, timeout)
419 except Exception as e:
420 raise arvados.errors.HttpError(0, str(e))
422 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
423 'body': response_body.getvalue(),
424 'headers': self._headers,
427 ok = retry.check_http_response_success(self._result['status_code'])
429 self._result['error'] = arvados.errors.HttpError(
430 self._result['status_code'],
431 self._headers.get('x-status-line', 'Error'))
432 except self.HTTP_ERRORS as e:
437 self._usable = ok != False # still usable if ok is True or None
439 _logger.debug("Request fail: PUT %s => %s: %s",
440 url, type(self._result['error']), str(self._result['error']))
441 # Don't return this ua to the pool, in case it's broken.
444 self._put_user_agent(curl)
447 def _setcurltimeouts(self, curl, timeouts):
450 elif isinstance(timeouts, tuple):
451 conn_t, xfer_t = timeouts
453 conn_t, xfer_t = (timeouts, timeouts)
454 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
455 curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
457 def _headerfunction(self, header_line):
458 header_line = header_line.decode('iso-8859-1')
459 if ':' in header_line:
460 name, value = header_line.split(':', 1)
461 name = name.strip().lower()
462 value = value.strip()
464 name = self._lastheadername
465 value = self._headers[name] + ' ' + header_line.strip()
466 elif header_line.startswith('HTTP/'):
467 name = 'x-status-line'
470 _logger.error("Unexpected header line: %s", header_line)
472 self._lastheadername = name
473 self._headers[name] = value
474 # Returning None implies all bytes were written
477 class KeepWriterThread(threading.Thread):
479 Write a blob of data to the given Keep server. On success, call
480 save_response() of the given ThreadLimiter to save the returned
483 def __init__(self, keep_service, **kwargs):
484 super(KeepClient.KeepWriterThread, self).__init__()
485 self.service = keep_service
487 self._success = False
493 with self.args['thread_limiter'] as limiter:
494 if not limiter.shall_i_proceed():
495 # My turn arrived, but the job has been done without
498 self.run_with_limiter(limiter)
500 def run_with_limiter(self, limiter):
501 if self.service.finished():
503 _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
504 str(threading.current_thread()),
505 self.args['data_hash'],
506 len(self.args['data']),
507 self.args['service_root'])
508 self._success = bool(self.service.put(
509 self.args['data_hash'],
511 timeout=self.args.get('timeout', None)))
512 result = self.service.last_result()
514 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
515 str(threading.current_thread()),
516 self.args['data_hash'],
517 len(self.args['data']),
518 self.args['service_root'])
519 # Tick the 'done' counter for the number of replica
520 # reported stored by the server, for the case that
521 # we're talking to a proxy or other backend that
522 # stores to multiple copies for us.
524 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
525 except (KeyError, ValueError):
527 limiter.save_response(result['body'].strip(), replicas_stored)
528 elif result.get('status_code', None):
529 _logger.debug("Request fail: PUT %s => %s %s",
530 self.args['data_hash'],
531 result['status_code'],
535 def __init__(self, api_client=None, proxy=None,
536 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
537 api_token=None, local_store=None, block_cache=None,
538 num_retries=0, session=None):
539 """Initialize a new KeepClient.
543 The API client to use to find Keep services. If not
544 provided, KeepClient will build one from available Arvados
548 If specified, this KeepClient will send requests to this Keep
549 proxy. Otherwise, KeepClient will fall back to the setting of the
550 ARVADOS_KEEP_PROXY configuration setting. If you want to ensure
551 KeepClient does not use a proxy, pass in an empty string.
554 The initial timeout (in seconds) for HTTP requests to Keep
555 non-proxy servers. A tuple of two floats is interpreted as
556 (connection_timeout, read_timeout): see
557 http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
558 Because timeouts are often a result of transient server load, the
559 actual connection timeout will be increased by a factor of two on
564 The initial timeout (in seconds) for HTTP requests to
565 Keep proxies. A tuple of two floats is interpreted as
566 (connection_timeout, read_timeout). The behavior described
567 above for adjusting connection timeouts on retry also applies.
571 If you're not using an API client, but only talking
572 directly to a Keep proxy, this parameter specifies an API token
573 to authenticate Keep requests. It is an error to specify both
574 api_client and api_token. If you specify neither, KeepClient
575 will use one available from the Arvados configuration.
578 If specified, this KeepClient will bypass Keep
579 services, and save data to the named directory. If unspecified,
580 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
581 environment variable. If you want to ensure KeepClient does not
582 use local storage, pass in an empty string. This is primarily
583 intended to mock a server for testing.
586 The default number of times to retry failed requests.
587 This will be used as the default num_retries value when get() and
588 put() are called. Default 0.
590 self.lock = threading.Lock()
592 proxy = config.get('ARVADOS_KEEP_PROXY')
593 if api_token is None:
594 if api_client is None:
595 api_token = config.get('ARVADOS_API_TOKEN')
597 api_token = api_client.api_token
598 elif api_client is not None:
600 "can't build KeepClient with both API client and token")
601 if local_store is None:
602 local_store = os.environ.get('KEEP_LOCAL_STORE')
604 self.block_cache = block_cache if block_cache else KeepBlockCache()
605 self.timeout = timeout
606 self.proxy_timeout = proxy_timeout
607 self._user_agent_pool = Queue.LifoQueue()
610 self.local_store = local_store
611 self.get = self.local_store_get
612 self.put = self.local_store_put
614 self.num_retries = num_retries
616 if not proxy.endswith('/'):
618 self.api_token = api_token
619 self._gateway_services = {}
620 self._keep_services = [{
622 '_service_root': proxy,
624 self.using_proxy = True
625 self._static_services_list = True
627 # It's important to avoid instantiating an API client
628 # unless we actually need one, for testing's sake.
629 if api_client is None:
630 api_client = arvados.api('v1')
631 self.api_client = api_client
632 self.api_token = api_client.api_token
633 self._gateway_services = {}
634 self._keep_services = None
635 self.using_proxy = None
636 self._static_services_list = False
638 def current_timeout(self, attempt_number):
639 """Return the appropriate timeout to use for this client.
641 The proxy timeout setting if the backend service is currently a proxy,
642 the regular timeout setting otherwise. The `attempt_number` indicates
643 how many times the operation has been tried already (starting from 0
644 for the first try), and scales the connection timeout portion of the
645 return value accordingly.
648 # TODO(twp): the timeout should be a property of a
649 # KeepService, not a KeepClient. See #4488.
650 t = self.proxy_timeout if self.using_proxy else self.timeout
651 return (t[0] * (1 << attempt_number), t[1])
653 def build_services_list(self, force_rebuild=False):
654 if (self._static_services_list or
655 (self._keep_services and not force_rebuild)):
659 keep_services = self.api_client.keep_services().accessible()
660 except Exception: # API server predates Keep services.
661 keep_services = self.api_client.keep_disks().list()
663 accessible = keep_services.execute().get('items')
665 raise arvados.errors.NoKeepServersError()
667 # Precompute the base URI for each service.
669 host = r['service_host']
670 if not host.startswith('[') and host.find(':') >= 0:
671 # IPv6 URIs must be formatted like http://[::1]:80/...
672 host = '[' + host + ']'
673 r['_service_root'] = "{}://{}:{:d}/".format(
674 'https' if r['service_ssl_flag'] else 'http',
678 # Gateway services are only used when specified by UUID,
679 # so there's nothing to gain by filtering them by
681 self._gateway_services = {ks.get('uuid'): ks for ks in accessible}
682 _logger.debug(str(self._gateway_services))
684 self._keep_services = [
685 ks for ks in accessible
686 if ks.get('service_type') in ['disk', 'proxy']]
687 _logger.debug(str(self._keep_services))
689 self.using_proxy = any(ks.get('service_type') == 'proxy'
690 for ks in self._keep_services)
692 def _service_weight(self, data_hash, service_uuid):
693 """Compute the weight of a Keep service endpoint for a data
694 block with a known hash.
696 The weight is md5(h + u) where u is the last 15 characters of
697 the service endpoint's UUID.
699 return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
701 def weighted_service_roots(self, locator, force_rebuild=False):
702 """Return an array of Keep service endpoints, in the order in
703 which they should be probed when reading or writing data with
704 the given hash+hints.
706 self.build_services_list(force_rebuild)
710 # Use the services indicated by the given +K@... remote
711 # service hints, if any are present and can be resolved to a
713 for hint in locator.hints:
714 if hint.startswith('K@'):
717 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
718 elif len(hint) == 29:
719 svc = self._gateway_services.get(hint[2:])
721 sorted_roots.append(svc['_service_root'])
723 # Sort the available local services by weight (heaviest first)
724 # for this locator, and return their service_roots (base URIs)
726 sorted_roots.extend([
727 svc['_service_root'] for svc in sorted(
730 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
731 _logger.debug("{}: {}".format(locator, sorted_roots))
734 def map_new_services(self, roots_map, locator, force_rebuild, **headers):
735 # roots_map is a dictionary, mapping Keep service root strings
736 # to KeepService objects. Poll for Keep services, and add any
737 # new ones to roots_map. Return the current list of local
739 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
740 local_roots = self.weighted_service_roots(locator, force_rebuild)
741 for root in local_roots:
742 if root not in roots_map:
743 roots_map[root] = self.KeepService(
744 root, self._user_agent_pool, **headers)
748 def _check_loop_result(result):
749 # KeepClient RetryLoops should save results as a 2-tuple: the
750 # actual result of the request, and the number of servers available
751 # to receive the request this round.
752 # This method returns True if there's a real result, False if
753 # there are no more servers available, otherwise None.
754 if isinstance(result, Exception):
756 result, tried_server_count = result
757 if (result is not None) and (result is not False):
759 elif tried_server_count < 1:
760 _logger.info("No more Keep services to try; giving up")
765 def get_from_cache(self, loc):
766 """Fetch a block only if is in the cache, otherwise return None."""
767 slot = self.block_cache.get(loc)
768 if slot.ready.is_set():
774 def get(self, loc_s, num_retries=None):
775 """Get data from Keep.
777 This method fetches one or more blocks of data from Keep. It
778 sends a request each Keep service registered with the API
779 server (or the proxy provided when this client was
780 instantiated), then each service named in location hints, in
781 sequence. As soon as one service provides the data, it's
785 * loc_s: A string of one or more comma-separated locators to fetch.
786 This method returns the concatenation of these blocks.
787 * num_retries: The number of times to retry GET requests to
788 *each* Keep server if it returns temporary failures, with
789 exponential backoff. Note that, in each loop, the method may try
790 to fetch data from every available Keep service, along with any
791 that are named in location hints in the locator. The default value
792 is set when the KeepClient is initialized.
795 return ''.join(self.get(x) for x in loc_s.split(','))
796 locator = KeepLocator(loc_s)
797 slot, first = self.block_cache.reserve_cache(locator.md5sum)
802 # If the locator has hints specifying a prefix (indicating a
803 # remote keepproxy) or the UUID of a local gateway service,
804 # read data from the indicated service(s) instead of the usual
805 # list of local disk services.
806 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
807 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
808 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
809 for hint in locator.hints if (
810 hint.startswith('K@') and
812 self._gateway_services.get(hint[2:])
814 # Map root URLs to their KeepService objects.
816 root: self.KeepService(root, self._user_agent_pool)
817 for root in hint_roots
820 # See #3147 for a discussion of the loop implementation. Highlights:
821 # * Refresh the list of Keep services after each failure, in case
822 # it's being updated.
823 # * Retry until we succeed, we're out of retries, or every available
824 # service has returned permanent failure.
828 loop = retry.RetryLoop(num_retries, self._check_loop_result,
830 for tries_left in loop:
832 sorted_roots = self.map_new_services(
834 force_rebuild=(tries_left < num_retries))
835 except Exception as error:
836 loop.save_result(error)
839 # Query KeepService objects that haven't returned
840 # permanent failure, in our specified shuffle order.
841 services_to_try = [roots_map[root]
842 for root in sorted_roots
843 if roots_map[root].usable()]
844 for keep_service in services_to_try:
845 blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
848 loop.save_result((blob, len(services_to_try)))
850 # Always cache the result, then return it if we succeeded.
852 self.block_cache.cap_cache()
856 # Q: Including 403 is necessary for the Keep tests to continue
857 # passing, but maybe they should expect KeepReadError instead?
858 not_founds = sum(1 for key in sorted_roots
859 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
860 service_errors = ((key, roots_map[key].last_result()['error'])
861 for key in sorted_roots)
863 raise arvados.errors.KeepReadError(
864 "failed to read {}: no Keep services available ({})".format(
865 loc_s, loop.last_result()))
866 elif not_founds == len(sorted_roots):
867 raise arvados.errors.NotFoundError(
868 "{} not found".format(loc_s), service_errors)
870 raise arvados.errors.KeepReadError(
871 "failed to read {}".format(loc_s), service_errors, label="service")
874 def put(self, data, copies=2, num_retries=None):
875 """Save data in Keep.
877 This method will get a list of Keep services from the API server, and
878 send the data to each one simultaneously in a new thread. Once the
879 uploads are finished, if enough copies are saved, this method returns
880 the most recent HTTP response body. If requests fail to upload
881 enough copies, this method raises KeepWriteError.
884 * data: The string of data to upload.
885 * copies: The number of copies that the user requires be saved.
887 * num_retries: The number of times to retry PUT requests to
888 *each* Keep server if it returns temporary failures, with
889 exponential backoff. The default value is set when the
890 KeepClient is initialized.
893 if isinstance(data, unicode):
894 data = data.encode("ascii")
895 elif not isinstance(data, str):
896 raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'")
898 data_hash = hashlib.md5(data).hexdigest()
901 locator = KeepLocator(data_hash + '+' + str(len(data)))
905 # Tell the proxy how many copies we want it to store
906 headers['X-Keep-Desired-Replication'] = str(copies)
908 thread_limiter = KeepClient.ThreadLimiter(copies)
909 loop = retry.RetryLoop(num_retries, self._check_loop_result,
911 for tries_left in loop:
913 local_roots = self.map_new_services(
915 force_rebuild=(tries_left < num_retries), **headers)
916 except Exception as error:
917 loop.save_result(error)
921 for service_root, ks in roots_map.iteritems():
924 t = KeepClient.KeepWriterThread(
928 service_root=service_root,
929 thread_limiter=thread_limiter,
930 timeout=self.current_timeout(num_retries-tries_left))
935 loop.save_result((thread_limiter.done() >= copies, len(threads)))
938 return thread_limiter.response()
940 raise arvados.errors.KeepWriteError(
941 "failed to write {}: no Keep services available ({})".format(
942 data_hash, loop.last_result()))
944 service_errors = ((key, roots_map[key].last_result()['error'])
945 for key in local_roots
946 if roots_map[key].last_result()['error'])
947 raise arvados.errors.KeepWriteError(
948 "failed to write {} (wanted {} copies but wrote {})".format(
949 data_hash, copies, thread_limiter.done()), service_errors, label="service")
951 def local_store_put(self, data, copies=1, num_retries=None):
954 This method is used in place of the real put() method when
955 using local storage (see constructor's local_store argument).
957 copies and num_retries arguments are ignored: they are here
958 only for the sake of offering the same call signature as
961 Data stored this way can be retrieved via local_store_get().
963 md5 = hashlib.md5(data).hexdigest()
964 locator = '%s+%d' % (md5, len(data))
965 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
967 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
968 os.path.join(self.local_store, md5))
971 def local_store_get(self, loc_s, num_retries=None):
972 """Companion to local_store_put()."""
974 locator = KeepLocator(loc_s)
976 raise arvados.errors.NotFoundError(
977 "Invalid data locator: '%s'" % loc_s)
978 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
980 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
983 def is_cached(self, locator):
984 return self.block_cache.reserve_cache(expect_hash)