16 import arvados.config as config
18 import arvados.retry as retry
21 _logger = logging.getLogger('arvados.keep')
22 global_client_object = None
25 class KeepLocator(object):
26 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
27 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
29 def __init__(self, locator_str):
32 self._perm_expiry = None
33 pieces = iter(locator_str.split('+'))
34 self.md5sum = next(pieces)
36 self.size = int(next(pieces))
40 if self.HINT_RE.match(hint) is None:
41 raise ValueError("invalid hint format: {}".format(hint))
42 elif hint.startswith('A'):
43 self.parse_permission_hint(hint)
45 self.hints.append(hint)
49 str(s) for s in [self.md5sum, self.size,
50 self.permission_hint()] + self.hints
54 if self.size is not None:
55 return "%s+%i" % (self.md5sum, self.size)
59 def _make_hex_prop(name, length):
60 # Build and return a new property with the given name that
61 # must be a hex string of the given length.
62 data_name = '_{}'.format(name)
64 return getattr(self, data_name)
65 def setter(self, hex_str):
66 if not arvados.util.is_hex(hex_str, length):
67 raise ValueError("{} is not a {}-digit hex string: {}".
68 format(name, length, hex_str))
69 setattr(self, data_name, hex_str)
70 return property(getter, setter)
72 md5sum = _make_hex_prop('md5sum', 32)
73 perm_sig = _make_hex_prop('perm_sig', 40)
76 def perm_expiry(self):
77 return self._perm_expiry
80 def perm_expiry(self, value):
81 if not arvados.util.is_hex(value, 1, 8):
83 "permission timestamp must be a hex Unix timestamp: {}".
85 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
87 def permission_hint(self):
88 data = [self.perm_sig, self.perm_expiry]
91 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
92 return "A{}@{:08x}".format(*data)
94 def parse_permission_hint(self, s):
96 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
98 raise ValueError("bad permission hint {}".format(s))
100 def permission_expired(self, as_of_dt=None):
101 if self.perm_expiry is None:
103 elif as_of_dt is None:
104 as_of_dt = datetime.datetime.now()
105 return self.perm_expiry <= as_of_dt
109 """Simple interface to a global KeepClient object.
111 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
112 own API client. The global KeepClient will build an API client from the
113 current Arvados configuration, which may not match the one you built.
118 def global_client_object(cls):
119 global global_client_object
120 # Previously, KeepClient would change its behavior at runtime based
121 # on these configuration settings. We simulate that behavior here
122 # by checking the values and returning a new KeepClient if any of
124 key = (config.get('ARVADOS_API_HOST'),
125 config.get('ARVADOS_API_TOKEN'),
126 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
127 config.get('ARVADOS_KEEP_PROXY'),
128 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
129 os.environ.get('KEEP_LOCAL_STORE'))
130 if (global_client_object is None) or (cls._last_key != key):
131 global_client_object = KeepClient()
133 return global_client_object
136 def get(locator, **kwargs):
137 return Keep.global_client_object().get(locator, **kwargs)
140 def put(data, **kwargs):
141 return Keep.global_client_object().put(data, **kwargs)
143 class KeepBlockCache(object):
144 # Default RAM cache is 256MiB
145 def __init__(self, cache_max=(256 * 1024 * 1024)):
146 self.cache_max = cache_max
148 self._cache_lock = threading.Lock()
150 class CacheSlot(object):
151 def __init__(self, locator):
152 self.locator = locator
153 self.ready = threading.Event()
160 def set(self, value):
165 if self.content is None:
168 return len(self.content)
171 '''Cap the cache size to self.cache_max'''
172 with self._cache_lock:
173 # Select all slots except those where ready.is_set() and content is
174 # None (that means there was an error reading the block).
175 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
176 sm = sum([slot.size() for slot in self._cache])
177 while len(self._cache) > 0 and sm > self.cache_max:
178 for i in xrange(len(self._cache)-1, -1, -1):
179 if self._cache[i].ready.is_set():
182 sm = sum([slot.size() for slot in self._cache])
184 def _get(self, locator):
185 # Test if the locator is already in the cache
186 for i in xrange(0, len(self._cache)):
187 if self._cache[i].locator == locator:
190 # move it to the front
192 self._cache.insert(0, n)
196 def get(self, locator):
197 with self._cache_lock:
198 return self._get(locator)
200 def reserve_cache(self, locator):
201 '''Reserve a cache slot for the specified locator,
202 or return the existing slot.'''
203 with self._cache_lock:
204 n = self._get(locator)
208 # Add a new cache slot for the locator
209 n = KeepBlockCache.CacheSlot(locator)
210 self._cache.insert(0, n)
213 class KeepClient(object):
215 # Default Keep server connection timeout: 2 seconds
216 # Default Keep server read timeout: 300 seconds
217 # Default Keep proxy connection timeout: 20 seconds
218 # Default Keep proxy read timeout: 300 seconds
219 DEFAULT_TIMEOUT = (2, 300)
220 DEFAULT_PROXY_TIMEOUT = (20, 300)
222 class ThreadLimiter(object):
223 """Limit the number of threads writing to Keep at once.
225 This ensures that only a number of writer threads that could
226 potentially achieve the desired replication level run at once.
227 Once the desired replication level is achieved, queued threads
228 are instructed not to run.
230 Should be used in a "with" block.
232 def __init__(self, want_copies, max_service_replicas):
234 self._want_copies = want_copies
236 self._response = None
237 self._start_lock = threading.Condition()
238 if (not max_service_replicas) or (max_service_replicas >= want_copies):
241 max_threads = math.ceil(float(want_copies) / max_service_replicas)
242 _logger.debug("Limiter max threads is %d", max_threads)
243 self._todo_lock = threading.Semaphore(max_threads)
244 self._done_lock = threading.Lock()
245 self._local = threading.local()
248 self._start_lock.acquire()
249 if getattr(self._local, 'sequence', None) is not None:
250 # If the calling thread has used set_sequence(N), then
251 # we wait here until N other threads have started.
252 while self._started < self._local.sequence:
253 self._start_lock.wait()
254 self._todo_lock.acquire()
256 self._start_lock.notifyAll()
257 self._start_lock.release()
260 def __exit__(self, type, value, traceback):
261 self._todo_lock.release()
263 def set_sequence(self, sequence):
264 self._local.sequence = sequence
266 def shall_i_proceed(self):
268 Return true if the current thread should write to Keep.
269 Return false otherwise.
271 with self._done_lock:
272 return (self._done < self._want_copies)
274 def save_response(self, response_body, replicas_stored):
276 Records a response body (a locator, possibly signed) returned by
277 the Keep server, and the number of replicas it stored.
279 with self._done_lock:
280 self._done += replicas_stored
281 self._response = response_body
284 """Return the body from the response to a PUT request."""
285 with self._done_lock:
286 return self._response
289 """Return the total number of replicas successfully stored."""
290 with self._done_lock:
294 class KeepService(object):
295 """Make requests to a single Keep service, and track results.
297 A KeepService is intended to last long enough to perform one
298 transaction (GET or PUT) against one Keep service. This can
299 involve calling either get() or put() multiple times in order
300 to retry after transient failures. However, calling both get()
301 and put() on a single instance -- or using the same instance
302 to access two different Keep services -- will not produce
309 arvados.errors.HttpError,
312 def __init__(self, root, user_agent_pool=Queue.LifoQueue(), **headers):
314 self._user_agent_pool = user_agent_pool
315 self._result = {'error': None}
318 self.get_headers = {'Accept': 'application/octet-stream'}
319 self.get_headers.update(headers)
320 self.put_headers = headers
323 """Is it worth attempting a request?"""
327 """Did the request succeed or encounter permanent failure?"""
328 return self._result['error'] == False or not self._usable
330 def last_result(self):
333 def _get_user_agent(self):
335 return self._user_agent_pool.get(False)
339 def _put_user_agent(self, ua):
342 self._user_agent_pool.put(ua, False)
347 def _socket_open(family, socktype, protocol, address=None):
348 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
349 s = socket.socket(family, socktype, protocol)
350 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
351 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
352 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
355 def get(self, locator, timeout=None):
356 # locator is a KeepLocator object.
357 url = self.root + str(locator)
358 _logger.debug("Request: GET %s", url)
359 curl = self._get_user_agent()
361 with timer.Timer() as t:
363 response_body = cStringIO.StringIO()
364 curl.setopt(pycurl.NOSIGNAL, 1)
365 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
366 curl.setopt(pycurl.URL, url.encode('utf-8'))
367 curl.setopt(pycurl.HTTPHEADER, [
368 '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
369 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
370 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
371 self._setcurltimeouts(curl, timeout)
374 except Exception as e:
375 raise arvados.errors.HttpError(0, str(e))
377 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
378 'body': response_body.getvalue(),
379 'headers': self._headers,
382 ok = retry.check_http_response_success(self._result['status_code'])
384 self._result['error'] = arvados.errors.HttpError(
385 self._result['status_code'],
386 self._headers.get('x-status-line', 'Error'))
387 except self.HTTP_ERRORS as e:
392 self._usable = ok != False
393 if self._result.get('status_code', None):
394 # The client worked well enough to get an HTTP status
395 # code, so presumably any problems are just on the
396 # server side and it's OK to reuse the client.
397 self._put_user_agent(curl)
399 # Don't return this client to the pool, in case it's
403 _logger.debug("Request fail: GET %s => %s: %s",
404 url, type(self._result['error']), str(self._result['error']))
406 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
407 self._result['status_code'],
408 len(self._result['body']),
410 (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
411 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
412 if resp_md5 != locator.md5sum:
413 _logger.warning("Checksum fail: md5(%s) = %s",
415 self._result['error'] = arvados.errors.HttpError(
418 return self._result['body']
420 def put(self, hash_s, body, timeout=None):
421 url = self.root + hash_s
422 _logger.debug("Request: PUT %s", url)
423 curl = self._get_user_agent()
426 body_reader = cStringIO.StringIO(body)
427 response_body = cStringIO.StringIO()
428 curl.setopt(pycurl.NOSIGNAL, 1)
429 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
430 curl.setopt(pycurl.URL, url.encode('utf-8'))
431 # Using UPLOAD tells cURL to wait for a "go ahead" from the
432 # Keep server (in the form of a HTTP/1.1 "100 Continue"
433 # response) instead of sending the request body immediately.
434 # This allows the server to reject the request if the request
435 # is invalid or the server is read-only, without waiting for
436 # the client to send the entire block.
437 curl.setopt(pycurl.UPLOAD, True)
438 curl.setopt(pycurl.INFILESIZE, len(body))
439 curl.setopt(pycurl.READFUNCTION, body_reader.read)
440 curl.setopt(pycurl.HTTPHEADER, [
441 '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
442 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
443 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
444 self._setcurltimeouts(curl, timeout)
447 except Exception as e:
448 raise arvados.errors.HttpError(0, str(e))
450 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
451 'body': response_body.getvalue(),
452 'headers': self._headers,
455 ok = retry.check_http_response_success(self._result['status_code'])
457 self._result['error'] = arvados.errors.HttpError(
458 self._result['status_code'],
459 self._headers.get('x-status-line', 'Error'))
460 except self.HTTP_ERRORS as e:
465 self._usable = ok != False # still usable if ok is True or None
466 if self._result.get('status_code', None):
467 # Client is functional. See comment in get().
468 self._put_user_agent(curl)
472 _logger.debug("Request fail: PUT %s => %s: %s",
473 url, type(self._result['error']), str(self._result['error']))
477 def _setcurltimeouts(self, curl, timeouts):
480 elif isinstance(timeouts, tuple):
481 conn_t, xfer_t = timeouts
483 conn_t, xfer_t = (timeouts, timeouts)
484 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
485 curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
487 def _headerfunction(self, header_line):
488 header_line = header_line.decode('iso-8859-1')
489 if ':' in header_line:
490 name, value = header_line.split(':', 1)
491 name = name.strip().lower()
492 value = value.strip()
494 name = self._lastheadername
495 value = self._headers[name] + ' ' + header_line.strip()
496 elif header_line.startswith('HTTP/'):
497 name = 'x-status-line'
500 _logger.error("Unexpected header line: %s", header_line)
502 self._lastheadername = name
503 self._headers[name] = value
504 # Returning None implies all bytes were written
507 class KeepWriterThread(threading.Thread):
509 Write a blob of data to the given Keep server. On success, call
510 save_response() of the given ThreadLimiter to save the returned
513 def __init__(self, keep_service, **kwargs):
514 super(KeepClient.KeepWriterThread, self).__init__()
515 self.service = keep_service
517 self._success = False
523 limiter = self.args['thread_limiter']
524 sequence = self.args['thread_sequence']
525 if sequence is not None:
526 limiter.set_sequence(sequence)
528 if not limiter.shall_i_proceed():
529 # My turn arrived, but the job has been done without
532 self.run_with_limiter(limiter)
534 def run_with_limiter(self, limiter):
535 if self.service.finished():
537 _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
538 str(threading.current_thread()),
539 self.args['data_hash'],
540 len(self.args['data']),
541 self.args['service_root'])
542 self._success = bool(self.service.put(
543 self.args['data_hash'],
545 timeout=self.args.get('timeout', None)))
546 result = self.service.last_result()
548 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
549 str(threading.current_thread()),
550 self.args['data_hash'],
551 len(self.args['data']),
552 self.args['service_root'])
553 # Tick the 'done' counter for the number of replica
554 # reported stored by the server, for the case that
555 # we're talking to a proxy or other backend that
556 # stores to multiple copies for us.
558 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
559 except (KeyError, ValueError):
561 limiter.save_response(result['body'].strip(), replicas_stored)
562 elif result.get('status_code', None):
563 _logger.debug("Request fail: PUT %s => %s %s",
564 self.args['data_hash'],
565 result['status_code'],
569 def __init__(self, api_client=None, proxy=None,
570 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
571 api_token=None, local_store=None, block_cache=None,
572 num_retries=0, session=None):
573 """Initialize a new KeepClient.
577 The API client to use to find Keep services. If not
578 provided, KeepClient will build one from available Arvados
582 If specified, this KeepClient will send requests to this Keep
583 proxy. Otherwise, KeepClient will fall back to the setting of the
584 ARVADOS_KEEP_PROXY configuration setting. If you want to ensure
585 KeepClient does not use a proxy, pass in an empty string.
588 The initial timeout (in seconds) for HTTP requests to Keep
589 non-proxy servers. A tuple of two floats is interpreted as
590 (connection_timeout, read_timeout): see
591 http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
592 Because timeouts are often a result of transient server load, the
593 actual connection timeout will be increased by a factor of two on
598 The initial timeout (in seconds) for HTTP requests to
599 Keep proxies. A tuple of two floats is interpreted as
600 (connection_timeout, read_timeout). The behavior described
601 above for adjusting connection timeouts on retry also applies.
605 If you're not using an API client, but only talking
606 directly to a Keep proxy, this parameter specifies an API token
607 to authenticate Keep requests. It is an error to specify both
608 api_client and api_token. If you specify neither, KeepClient
609 will use one available from the Arvados configuration.
612 If specified, this KeepClient will bypass Keep
613 services, and save data to the named directory. If unspecified,
614 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
615 environment variable. If you want to ensure KeepClient does not
616 use local storage, pass in an empty string. This is primarily
617 intended to mock a server for testing.
620 The default number of times to retry failed requests.
621 This will be used as the default num_retries value when get() and
622 put() are called. Default 0.
624 self.lock = threading.Lock()
626 proxy = config.get('ARVADOS_KEEP_PROXY')
627 if api_token is None:
628 if api_client is None:
629 api_token = config.get('ARVADOS_API_TOKEN')
631 api_token = api_client.api_token
632 elif api_client is not None:
634 "can't build KeepClient with both API client and token")
635 if local_store is None:
636 local_store = os.environ.get('KEEP_LOCAL_STORE')
638 self.block_cache = block_cache if block_cache else KeepBlockCache()
639 self.timeout = timeout
640 self.proxy_timeout = proxy_timeout
641 self._user_agent_pool = Queue.LifoQueue()
644 self.local_store = local_store
645 self.get = self.local_store_get
646 self.put = self.local_store_put
648 self.num_retries = num_retries
649 self.max_replicas_per_service = None
651 if not proxy.endswith('/'):
653 self.api_token = api_token
654 self._gateway_services = {}
655 self._keep_services = [{
657 'service_type': 'proxy',
658 '_service_root': proxy,
660 self._writable_services = self._keep_services
661 self.using_proxy = True
662 self._static_services_list = True
664 # It's important to avoid instantiating an API client
665 # unless we actually need one, for testing's sake.
666 if api_client is None:
667 api_client = arvados.api('v1')
668 self.api_client = api_client
669 self.api_token = api_client.api_token
670 self._gateway_services = {}
671 self._keep_services = None
672 self._writable_services = None
673 self.using_proxy = None
674 self._static_services_list = False
676 def current_timeout(self, attempt_number):
677 """Return the appropriate timeout to use for this client.
679 The proxy timeout setting if the backend service is currently a proxy,
680 the regular timeout setting otherwise. The `attempt_number` indicates
681 how many times the operation has been tried already (starting from 0
682 for the first try), and scales the connection timeout portion of the
683 return value accordingly.
686 # TODO(twp): the timeout should be a property of a
687 # KeepService, not a KeepClient. See #4488.
688 t = self.proxy_timeout if self.using_proxy else self.timeout
689 return (t[0] * (1 << attempt_number), t[1])
691 def _any_nondisk_services(self, service_list):
692 return any(ks.get('service_type', 'disk') != 'disk'
693 for ks in service_list)
695 def build_services_list(self, force_rebuild=False):
696 if (self._static_services_list or
697 (self._keep_services and not force_rebuild)):
701 keep_services = self.api_client.keep_services().accessible()
702 except Exception: # API server predates Keep services.
703 keep_services = self.api_client.keep_disks().list()
705 # Gateway services are only used when specified by UUID,
706 # so there's nothing to gain by filtering them by
708 self._gateway_services = {ks['uuid']: ks for ks in
709 keep_services.execute()['items']}
710 if not self._gateway_services:
711 raise arvados.errors.NoKeepServersError()
713 # Precompute the base URI for each service.
714 for r in self._gateway_services.itervalues():
715 host = r['service_host']
716 if not host.startswith('[') and host.find(':') >= 0:
717 # IPv6 URIs must be formatted like http://[::1]:80/...
718 host = '[' + host + ']'
719 r['_service_root'] = "{}://{}:{:d}/".format(
720 'https' if r['service_ssl_flag'] else 'http',
724 _logger.debug(str(self._gateway_services))
725 self._keep_services = [
726 ks for ks in self._gateway_services.itervalues()
727 if not ks.get('service_type', '').startswith('gateway:')]
728 self._writable_services = [ks for ks in self._keep_services
729 if not ks.get('read_only')]
731 # For disk type services, max_replicas_per_service is 1
732 # It is unknown (unlimited) for other service types.
733 if self._any_nondisk_services(self._writable_services):
734 self.max_replicas_per_service = None
736 self.max_replicas_per_service = 1
738 def _service_weight(self, data_hash, service_uuid):
739 """Compute the weight of a Keep service endpoint for a data
740 block with a known hash.
742 The weight is md5(h + u) where u is the last 15 characters of
743 the service endpoint's UUID.
745 return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
747 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
748 """Return an array of Keep service endpoints, in the order in
749 which they should be probed when reading or writing data with
750 the given hash+hints.
752 self.build_services_list(force_rebuild)
755 # Use the services indicated by the given +K@... remote
756 # service hints, if any are present and can be resolved to a
758 for hint in locator.hints:
759 if hint.startswith('K@'):
762 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
763 elif len(hint) == 29:
764 svc = self._gateway_services.get(hint[2:])
766 sorted_roots.append(svc['_service_root'])
768 # Sort the available local services by weight (heaviest first)
769 # for this locator, and return their service_roots (base URIs)
771 use_services = self._keep_services
773 use_services = self._writable_services
774 self.using_proxy = self._any_nondisk_services(use_services)
775 sorted_roots.extend([
776 svc['_service_root'] for svc in sorted(
779 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
780 _logger.debug("{}: {}".format(locator, sorted_roots))
783 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
784 # roots_map is a dictionary, mapping Keep service root strings
785 # to KeepService objects. Poll for Keep services, and add any
786 # new ones to roots_map. Return the current list of local
788 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
789 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
790 for root in local_roots:
791 if root not in roots_map:
792 roots_map[root] = self.KeepService(
793 root, self._user_agent_pool, **headers)
797 def _check_loop_result(result):
798 # KeepClient RetryLoops should save results as a 2-tuple: the
799 # actual result of the request, and the number of servers available
800 # to receive the request this round.
801 # This method returns True if there's a real result, False if
802 # there are no more servers available, otherwise None.
803 if isinstance(result, Exception):
805 result, tried_server_count = result
806 if (result is not None) and (result is not False):
808 elif tried_server_count < 1:
809 _logger.info("No more Keep services to try; giving up")
814 def get_from_cache(self, loc):
815 """Fetch a block only if is in the cache, otherwise return None."""
816 slot = self.block_cache.get(loc)
817 if slot is not None and slot.ready.is_set():
823 def get(self, loc_s, num_retries=None):
824 """Get data from Keep.
826 This method fetches one or more blocks of data from Keep. It
827 sends a request each Keep service registered with the API
828 server (or the proxy provided when this client was
829 instantiated), then each service named in location hints, in
830 sequence. As soon as one service provides the data, it's
834 * loc_s: A string of one or more comma-separated locators to fetch.
835 This method returns the concatenation of these blocks.
836 * num_retries: The number of times to retry GET requests to
837 *each* Keep server if it returns temporary failures, with
838 exponential backoff. Note that, in each loop, the method may try
839 to fetch data from every available Keep service, along with any
840 that are named in location hints in the locator. The default value
841 is set when the KeepClient is initialized.
844 return ''.join(self.get(x) for x in loc_s.split(','))
845 locator = KeepLocator(loc_s)
846 slot, first = self.block_cache.reserve_cache(locator.md5sum)
851 # If the locator has hints specifying a prefix (indicating a
852 # remote keepproxy) or the UUID of a local gateway service,
853 # read data from the indicated service(s) instead of the usual
854 # list of local disk services.
855 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
856 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
857 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
858 for hint in locator.hints if (
859 hint.startswith('K@') and
861 self._gateway_services.get(hint[2:])
863 # Map root URLs to their KeepService objects.
865 root: self.KeepService(root, self._user_agent_pool)
866 for root in hint_roots
869 # See #3147 for a discussion of the loop implementation. Highlights:
870 # * Refresh the list of Keep services after each failure, in case
871 # it's being updated.
872 # * Retry until we succeed, we're out of retries, or every available
873 # service has returned permanent failure.
877 loop = retry.RetryLoop(num_retries, self._check_loop_result,
879 for tries_left in loop:
881 sorted_roots = self.map_new_services(
883 force_rebuild=(tries_left < num_retries),
885 except Exception as error:
886 loop.save_result(error)
889 # Query KeepService objects that haven't returned
890 # permanent failure, in our specified shuffle order.
891 services_to_try = [roots_map[root]
892 for root in sorted_roots
893 if roots_map[root].usable()]
894 for keep_service in services_to_try:
895 blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
898 loop.save_result((blob, len(services_to_try)))
900 # Always cache the result, then return it if we succeeded.
902 self.block_cache.cap_cache()
906 # Q: Including 403 is necessary for the Keep tests to continue
907 # passing, but maybe they should expect KeepReadError instead?
908 not_founds = sum(1 for key in sorted_roots
909 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
910 service_errors = ((key, roots_map[key].last_result()['error'])
911 for key in sorted_roots)
913 raise arvados.errors.KeepReadError(
914 "failed to read {}: no Keep services available ({})".format(
915 loc_s, loop.last_result()))
916 elif not_founds == len(sorted_roots):
917 raise arvados.errors.NotFoundError(
918 "{} not found".format(loc_s), service_errors)
920 raise arvados.errors.KeepReadError(
921 "failed to read {}".format(loc_s), service_errors, label="service")
924 def put(self, data, copies=2, num_retries=None):
925 """Save data in Keep.
927 This method will get a list of Keep services from the API server, and
928 send the data to each one simultaneously in a new thread. Once the
929 uploads are finished, if enough copies are saved, this method returns
930 the most recent HTTP response body. If requests fail to upload
931 enough copies, this method raises KeepWriteError.
934 * data: The string of data to upload.
935 * copies: The number of copies that the user requires be saved.
937 * num_retries: The number of times to retry PUT requests to
938 *each* Keep server if it returns temporary failures, with
939 exponential backoff. The default value is set when the
940 KeepClient is initialized.
943 if isinstance(data, unicode):
944 data = data.encode("ascii")
945 elif not isinstance(data, str):
946 raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
948 data_hash = hashlib.md5(data).hexdigest()
949 loc_s = data_hash + '+' + str(len(data))
952 locator = KeepLocator(loc_s)
955 # Tell the proxy how many copies we want it to store
956 headers['X-Keep-Desired-Replication'] = str(copies)
958 loop = retry.RetryLoop(num_retries, self._check_loop_result,
960 for tries_left in loop:
962 sorted_roots = self.map_new_services(
964 force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
965 except Exception as error:
966 loop.save_result(error)
969 thread_limiter = KeepClient.ThreadLimiter(
970 copies, self.max_replicas_per_service)
972 for service_root, ks in [(root, roots_map[root])
973 for root in sorted_roots]:
976 t = KeepClient.KeepWriterThread(
980 service_root=service_root,
981 thread_limiter=thread_limiter,
982 timeout=self.current_timeout(num_retries-tries_left),
983 thread_sequence=len(threads))
988 loop.save_result((thread_limiter.done() >= copies, len(threads)))
991 return thread_limiter.response()
993 raise arvados.errors.KeepWriteError(
994 "failed to write {}: no Keep services available ({})".format(
995 data_hash, loop.last_result()))
997 service_errors = ((key, roots_map[key].last_result()['error'])
998 for key in sorted_roots
999 if roots_map[key].last_result()['error'])
1000 raise arvados.errors.KeepWriteError(
1001 "failed to write {} (wanted {} copies but wrote {})".format(
1002 data_hash, copies, thread_limiter.done()), service_errors, label="service")
1004 def local_store_put(self, data, copies=1, num_retries=None):
1005 """A stub for put().
1007 This method is used in place of the real put() method when
1008 using local storage (see constructor's local_store argument).
1010 copies and num_retries arguments are ignored: they are here
1011 only for the sake of offering the same call signature as
1014 Data stored this way can be retrieved via local_store_get().
1016 md5 = hashlib.md5(data).hexdigest()
1017 locator = '%s+%d' % (md5, len(data))
1018 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1020 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1021 os.path.join(self.local_store, md5))
1024 def local_store_get(self, loc_s, num_retries=None):
1025 """Companion to local_store_put()."""
1027 locator = KeepLocator(loc_s)
1029 raise arvados.errors.NotFoundError(
1030 "Invalid data locator: '%s'" % loc_s)
1031 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1033 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1036 def is_cached(self, locator):
1037 return self.block_cache.reserve_cache(expect_hash)