15 import arvados.config as config
17 import arvados.retry as retry
20 _logger = logging.getLogger('arvados.keep')
21 global_client_object = None
24 class KeepLocator(object):
25 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
26 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
28 def __init__(self, locator_str):
31 self._perm_expiry = None
32 pieces = iter(locator_str.split('+'))
33 self.md5sum = next(pieces)
35 self.size = int(next(pieces))
39 if self.HINT_RE.match(hint) is None:
40 raise ValueError("invalid hint format: {}".format(hint))
41 elif hint.startswith('A'):
42 self.parse_permission_hint(hint)
44 self.hints.append(hint)
48 str(s) for s in [self.md5sum, self.size,
49 self.permission_hint()] + self.hints
53 if self.size is not None:
54 return "%s+%i" % (self.md5sum, self.size)
58 def _make_hex_prop(name, length):
59 # Build and return a new property with the given name that
60 # must be a hex string of the given length.
61 data_name = '_{}'.format(name)
63 return getattr(self, data_name)
64 def setter(self, hex_str):
65 if not arvados.util.is_hex(hex_str, length):
66 raise ValueError("{} is not a {}-digit hex string: {}".
67 format(name, length, hex_str))
68 setattr(self, data_name, hex_str)
69 return property(getter, setter)
71 md5sum = _make_hex_prop('md5sum', 32)
72 perm_sig = _make_hex_prop('perm_sig', 40)
75 def perm_expiry(self):
76 return self._perm_expiry
79 def perm_expiry(self, value):
80 if not arvados.util.is_hex(value, 1, 8):
82 "permission timestamp must be a hex Unix timestamp: {}".
84 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
86 def permission_hint(self):
87 data = [self.perm_sig, self.perm_expiry]
90 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
91 return "A{}@{:08x}".format(*data)
93 def parse_permission_hint(self, s):
95 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
97 raise ValueError("bad permission hint {}".format(s))
99 def permission_expired(self, as_of_dt=None):
100 if self.perm_expiry is None:
102 elif as_of_dt is None:
103 as_of_dt = datetime.datetime.now()
104 return self.perm_expiry <= as_of_dt
108 """Simple interface to a global KeepClient object.
110 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
111 own API client. The global KeepClient will build an API client from the
112 current Arvados configuration, which may not match the one you built.
117 def global_client_object(cls):
118 global global_client_object
119 # Previously, KeepClient would change its behavior at runtime based
120 # on these configuration settings. We simulate that behavior here
121 # by checking the values and returning a new KeepClient if any of
123 key = (config.get('ARVADOS_API_HOST'),
124 config.get('ARVADOS_API_TOKEN'),
125 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
126 config.get('ARVADOS_KEEP_PROXY'),
127 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
128 os.environ.get('KEEP_LOCAL_STORE'))
129 if (global_client_object is None) or (cls._last_key != key):
130 global_client_object = KeepClient()
132 return global_client_object
135 def get(locator, **kwargs):
136 return Keep.global_client_object().get(locator, **kwargs)
139 def put(data, **kwargs):
140 return Keep.global_client_object().put(data, **kwargs)
142 class KeepBlockCache(object):
143 # Default RAM cache is 256MiB
144 def __init__(self, cache_max=(256 * 1024 * 1024)):
145 self.cache_max = cache_max
147 self._cache_lock = threading.Lock()
149 class CacheSlot(object):
150 def __init__(self, locator):
151 self.locator = locator
152 self.ready = threading.Event()
159 def set(self, value):
164 if self.content is None:
167 return len(self.content)
170 '''Cap the cache size to self.cache_max'''
171 with self._cache_lock:
172 # Select all slots except those where ready.is_set() and content is
173 # None (that means there was an error reading the block).
174 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
175 sm = sum([slot.size() for slot in self._cache])
176 while len(self._cache) > 0 and sm > self.cache_max:
177 for i in xrange(len(self._cache)-1, -1, -1):
178 if self._cache[i].ready.is_set():
181 sm = sum([slot.size() for slot in self._cache])
183 def _get(self, locator):
184 # Test if the locator is already in the cache
185 for i in xrange(0, len(self._cache)):
186 if self._cache[i].locator == locator:
189 # move it to the front
191 self._cache.insert(0, n)
195 def get(self, locator):
196 with self._cache_lock:
197 return self._get(locator)
199 def reserve_cache(self, locator):
200 '''Reserve a cache slot for the specified locator,
201 or return the existing slot.'''
202 with self._cache_lock:
203 n = self._get(locator)
207 # Add a new cache slot for the locator
208 n = KeepBlockCache.CacheSlot(locator)
209 self._cache.insert(0, n)
212 class KeepClient(object):
214 # Default Keep server connection timeout: 2 seconds
215 # Default Keep server read timeout: 300 seconds
216 # Default Keep proxy connection timeout: 20 seconds
217 # Default Keep proxy read timeout: 300 seconds
218 DEFAULT_TIMEOUT = (2, 300)
219 DEFAULT_PROXY_TIMEOUT = (20, 300)
221 class ThreadLimiter(object):
223 Limit the number of threads running at a given time to
224 {desired successes} minus {successes reported}. When successes
225 reported == desired, wake up the remaining threads and tell
228 Should be used in a "with" block.
230 def __init__(self, todo):
234 self._response = None
235 self._start_lock = threading.Condition()
236 self._todo_lock = threading.Semaphore(todo)
237 self._done_lock = threading.Lock()
238 self._local = threading.local()
241 self._start_lock.acquire()
242 if getattr(self._local, 'sequence', None) is not None:
243 # If the calling thread has used set_sequence(N), then
244 # we wait here until N other threads have started.
245 while self._started < self._local.sequence:
246 self._start_lock.wait()
247 self._todo_lock.acquire()
249 self._start_lock.notifyAll()
250 self._start_lock.release()
253 def __exit__(self, type, value, traceback):
254 self._todo_lock.release()
256 def set_sequence(self, sequence):
257 self._local.sequence = sequence
259 def shall_i_proceed(self):
261 Return true if the current thread should do stuff. Return
262 false if the current thread should just stop.
264 with self._done_lock:
265 return (self._done < self._todo)
267 def save_response(self, response_body, replicas_stored):
269 Records a response body (a locator, possibly signed) returned by
270 the Keep server. It is not necessary to save more than
271 one response, since we presume that any locator returned
272 in response to a successful request is valid.
274 with self._done_lock:
275 self._done += replicas_stored
276 self._response = response_body
280 Returns the body from the response to a PUT request.
282 with self._done_lock:
283 return self._response
287 Return how many successes were reported.
289 with self._done_lock:
293 class KeepService(object):
294 """Make requests to a single Keep service, and track results.
296 A KeepService is intended to last long enough to perform one
297 transaction (GET or PUT) against one Keep service. This can
298 involve calling either get() or put() multiple times in order
299 to retry after transient failures. However, calling both get()
300 and put() on a single instance -- or using the same instance
301 to access two different Keep services -- will not produce
308 arvados.errors.HttpError,
311 def __init__(self, root, user_agent_pool=Queue.LifoQueue(), **headers):
313 self._user_agent_pool = user_agent_pool
314 self._result = {'error': None}
317 self.get_headers = {'Accept': 'application/octet-stream'}
318 self.get_headers.update(headers)
319 self.put_headers = headers
322 """Is it worth attempting a request?"""
326 """Did the request succeed or encounter permanent failure?"""
327 return self._result['error'] == False or not self._usable
329 def last_result(self):
332 def _get_user_agent(self):
334 return self._user_agent_pool.get(False)
338 def _put_user_agent(self, ua):
341 self._user_agent_pool.put(ua, False)
346 def _socket_open(family, socktype, protocol, address=None):
347 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
348 s = socket.socket(family, socktype, protocol)
349 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
350 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
351 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
354 def get(self, locator, timeout=None):
355 # locator is a KeepLocator object.
356 url = self.root + str(locator)
357 _logger.debug("Request: GET %s", url)
358 curl = self._get_user_agent()
360 with timer.Timer() as t:
362 response_body = cStringIO.StringIO()
363 curl.setopt(pycurl.NOSIGNAL, 1)
364 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
365 curl.setopt(pycurl.URL, url.encode('utf-8'))
366 curl.setopt(pycurl.HTTPHEADER, [
367 '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
368 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
369 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
370 self._setcurltimeouts(curl, timeout)
373 except Exception as e:
374 raise arvados.errors.HttpError(0, str(e))
376 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
377 'body': response_body.getvalue(),
378 'headers': self._headers,
381 ok = retry.check_http_response_success(self._result['status_code'])
383 self._result['error'] = arvados.errors.HttpError(
384 self._result['status_code'],
385 self._headers.get('x-status-line', 'Error'))
386 except self.HTTP_ERRORS as e:
391 self._usable = ok != False
392 if self._result.get('status_code', None):
393 # The client worked well enough to get an HTTP status
394 # code, so presumably any problems are just on the
395 # server side and it's OK to reuse the client.
396 self._put_user_agent(curl)
398 # Don't return this client to the pool, in case it's
402 _logger.debug("Request fail: GET %s => %s: %s",
403 url, type(self._result['error']), str(self._result['error']))
405 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
406 self._result['status_code'],
407 len(self._result['body']),
409 (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
410 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
411 if resp_md5 != locator.md5sum:
412 _logger.warning("Checksum fail: md5(%s) = %s",
414 self._result['error'] = arvados.errors.HttpError(
417 return self._result['body']
419 def put(self, hash_s, body, timeout=None):
420 url = self.root + hash_s
421 _logger.debug("Request: PUT %s", url)
422 curl = self._get_user_agent()
425 body_reader = cStringIO.StringIO(body)
426 response_body = cStringIO.StringIO()
427 curl.setopt(pycurl.NOSIGNAL, 1)
428 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
429 curl.setopt(pycurl.URL, url.encode('utf-8'))
430 # Using UPLOAD tells cURL to wait for a "go ahead" from the
431 # Keep server (in the form of a HTTP/1.1 "100 Continue"
432 # response) instead of sending the request body immediately.
433 # This allows the server to reject the request if the request
434 # is invalid or the server is read-only, without waiting for
435 # the client to send the entire block.
436 curl.setopt(pycurl.UPLOAD, True)
437 curl.setopt(pycurl.INFILESIZE, len(body))
438 curl.setopt(pycurl.READFUNCTION, body_reader.read)
439 curl.setopt(pycurl.HTTPHEADER, [
440 '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
441 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
442 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
443 self._setcurltimeouts(curl, timeout)
446 except Exception as e:
447 raise arvados.errors.HttpError(0, str(e))
449 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
450 'body': response_body.getvalue(),
451 'headers': self._headers,
454 ok = retry.check_http_response_success(self._result['status_code'])
456 self._result['error'] = arvados.errors.HttpError(
457 self._result['status_code'],
458 self._headers.get('x-status-line', 'Error'))
459 except self.HTTP_ERRORS as e:
464 self._usable = ok != False # still usable if ok is True or None
465 if self._result.get('status_code', None):
466 # Client is functional. See comment in get().
467 self._put_user_agent(curl)
471 _logger.debug("Request fail: PUT %s => %s: %s",
472 url, type(self._result['error']), str(self._result['error']))
476 def _setcurltimeouts(self, curl, timeouts):
479 elif isinstance(timeouts, tuple):
480 conn_t, xfer_t = timeouts
482 conn_t, xfer_t = (timeouts, timeouts)
483 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
484 curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
486 def _headerfunction(self, header_line):
487 header_line = header_line.decode('iso-8859-1')
488 if ':' in header_line:
489 name, value = header_line.split(':', 1)
490 name = name.strip().lower()
491 value = value.strip()
493 name = self._lastheadername
494 value = self._headers[name] + ' ' + header_line.strip()
495 elif header_line.startswith('HTTP/'):
496 name = 'x-status-line'
499 _logger.error("Unexpected header line: %s", header_line)
501 self._lastheadername = name
502 self._headers[name] = value
503 # Returning None implies all bytes were written
506 class KeepWriterThread(threading.Thread):
508 Write a blob of data to the given Keep server. On success, call
509 save_response() of the given ThreadLimiter to save the returned
512 def __init__(self, keep_service, **kwargs):
513 super(KeepClient.KeepWriterThread, self).__init__()
514 self.service = keep_service
516 self._success = False
522 limiter = self.args['thread_limiter']
523 sequence = self.args['thread_sequence']
524 if sequence is not None:
525 limiter.set_sequence(sequence)
527 if not limiter.shall_i_proceed():
528 # My turn arrived, but the job has been done without
531 self.run_with_limiter(limiter)
533 def run_with_limiter(self, limiter):
534 if self.service.finished():
536 _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
537 str(threading.current_thread()),
538 self.args['data_hash'],
539 len(self.args['data']),
540 self.args['service_root'])
541 self._success = bool(self.service.put(
542 self.args['data_hash'],
544 timeout=self.args.get('timeout', None)))
545 result = self.service.last_result()
547 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
548 str(threading.current_thread()),
549 self.args['data_hash'],
550 len(self.args['data']),
551 self.args['service_root'])
552 # Tick the 'done' counter for the number of replica
553 # reported stored by the server, for the case that
554 # we're talking to a proxy or other backend that
555 # stores to multiple copies for us.
557 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
558 except (KeyError, ValueError):
560 limiter.save_response(result['body'].strip(), replicas_stored)
561 elif result.get('status_code', None):
562 _logger.debug("Request fail: PUT %s => %s %s",
563 self.args['data_hash'],
564 result['status_code'],
568 def __init__(self, api_client=None, proxy=None,
569 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
570 api_token=None, local_store=None, block_cache=None,
571 num_retries=0, session=None):
572 """Initialize a new KeepClient.
576 The API client to use to find Keep services. If not
577 provided, KeepClient will build one from available Arvados
581 If specified, this KeepClient will send requests to this Keep
582 proxy. Otherwise, KeepClient will fall back to the setting of the
583 ARVADOS_KEEP_PROXY configuration setting. If you want to ensure
584 KeepClient does not use a proxy, pass in an empty string.
587 The initial timeout (in seconds) for HTTP requests to Keep
588 non-proxy servers. A tuple of two floats is interpreted as
589 (connection_timeout, read_timeout): see
590 http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
591 Because timeouts are often a result of transient server load, the
592 actual connection timeout will be increased by a factor of two on
597 The initial timeout (in seconds) for HTTP requests to
598 Keep proxies. A tuple of two floats is interpreted as
599 (connection_timeout, read_timeout). The behavior described
600 above for adjusting connection timeouts on retry also applies.
604 If you're not using an API client, but only talking
605 directly to a Keep proxy, this parameter specifies an API token
606 to authenticate Keep requests. It is an error to specify both
607 api_client and api_token. If you specify neither, KeepClient
608 will use one available from the Arvados configuration.
611 If specified, this KeepClient will bypass Keep
612 services, and save data to the named directory. If unspecified,
613 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
614 environment variable. If you want to ensure KeepClient does not
615 use local storage, pass in an empty string. This is primarily
616 intended to mock a server for testing.
619 The default number of times to retry failed requests.
620 This will be used as the default num_retries value when get() and
621 put() are called. Default 0.
623 self.lock = threading.Lock()
625 proxy = config.get('ARVADOS_KEEP_PROXY')
626 if api_token is None:
627 if api_client is None:
628 api_token = config.get('ARVADOS_API_TOKEN')
630 api_token = api_client.api_token
631 elif api_client is not None:
633 "can't build KeepClient with both API client and token")
634 if local_store is None:
635 local_store = os.environ.get('KEEP_LOCAL_STORE')
637 self.block_cache = block_cache if block_cache else KeepBlockCache()
638 self.timeout = timeout
639 self.proxy_timeout = proxy_timeout
640 self._user_agent_pool = Queue.LifoQueue()
643 self.local_store = local_store
644 self.get = self.local_store_get
645 self.put = self.local_store_put
647 self.num_retries = num_retries
649 if not proxy.endswith('/'):
651 self.api_token = api_token
652 self._gateway_services = {}
653 self._keep_services = [{
655 '_service_root': proxy,
657 self._writable_services = self._keep_services
658 self.using_proxy = True
659 self._static_services_list = True
660 self.max_replicas_per_service = 1
662 # It's important to avoid instantiating an API client
663 # unless we actually need one, for testing's sake.
664 if api_client is None:
665 api_client = arvados.api('v1')
666 self.api_client = api_client
667 self.api_token = api_client.api_token
668 self._gateway_services = {}
669 self._keep_services = None
670 self._writable_services = None
671 self.using_proxy = None
672 self._static_services_list = False
673 self.max_replicas_per_service = 1
675 def current_timeout(self, attempt_number):
676 """Return the appropriate timeout to use for this client.
678 The proxy timeout setting if the backend service is currently a proxy,
679 the regular timeout setting otherwise. The `attempt_number` indicates
680 how many times the operation has been tried already (starting from 0
681 for the first try), and scales the connection timeout portion of the
682 return value accordingly.
685 # TODO(twp): the timeout should be a property of a
686 # KeepService, not a KeepClient. See #4488.
687 t = self.proxy_timeout if self.using_proxy else self.timeout
688 return (t[0] * (1 << attempt_number), t[1])
690 def build_services_list(self, force_rebuild=False):
691 if (self._static_services_list or
692 (self._keep_services and not force_rebuild)):
696 keep_services = self.api_client.keep_services().accessible()
697 except Exception: # API server predates Keep services.
698 keep_services = self.api_client.keep_disks().list()
700 accessible = keep_services.execute().get('items')
702 raise arvados.errors.NoKeepServersError()
704 # Precompute the base URI for each service.
706 host = r['service_host']
707 if not host.startswith('[') and host.find(':') >= 0:
708 # IPv6 URIs must be formatted like http://[::1]:80/...
709 host = '[' + host + ']'
710 r['_service_root'] = "{}://{}:{:d}/".format(
711 'https' if r['service_ssl_flag'] else 'http',
715 # Gateway services are only used when specified by UUID,
716 # so there's nothing to gain by filtering them by
718 self._gateway_services = {ks.get('uuid'): ks for ks in accessible}
719 _logger.debug(str(self._gateway_services))
721 self._keep_services = [
722 ks for ks in accessible
723 if ks.get('service_type') in ['disk', 'proxy']]
724 self._writable_services = [
725 ks for ks in accessible
726 if (ks.get('service_type') in ['disk', 'proxy']) and (True != ks.get('read_only'))]
727 _logger.debug(str(self._keep_services))
729 self.using_proxy = any(ks.get('service_type') == 'proxy'
730 for ks in self._keep_services)
731 # For disk type services, max_replicas_per_service is 1
732 # It is unknown or unlimited for non-disk typed services.
733 for ks in accessible:
734 if ('disk' != ks.get('service_type')) and (not ks.get('read_only')):
735 self.max_replicas_per_service = None
737 def _service_weight(self, data_hash, service_uuid):
738 """Compute the weight of a Keep service endpoint for a data
739 block with a known hash.
741 The weight is md5(h + u) where u is the last 15 characters of
742 the service endpoint's UUID.
744 return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
746 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
747 """Return an array of Keep service endpoints, in the order in
748 which they should be probed when reading or writing data with
749 the given hash+hints.
751 self.build_services_list(force_rebuild)
754 # Use the services indicated by the given +K@... remote
755 # service hints, if any are present and can be resolved to a
757 for hint in locator.hints:
758 if hint.startswith('K@'):
761 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
762 elif len(hint) == 29:
763 svc = self._gateway_services.get(hint[2:])
765 sorted_roots.append(svc['_service_root'])
767 # Sort the available local services by weight (heaviest first)
768 # for this locator, and return their service_roots (base URIs)
770 use_services = self._keep_services
772 use_services = self._writable_services
773 sorted_roots.extend([
774 svc['_service_root'] for svc in sorted(
777 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
778 _logger.debug("{}: {}".format(locator, sorted_roots))
781 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
782 # roots_map is a dictionary, mapping Keep service root strings
783 # to KeepService objects. Poll for Keep services, and add any
784 # new ones to roots_map. Return the current list of local
786 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
787 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
788 for root in local_roots:
789 if root not in roots_map:
790 roots_map[root] = self.KeepService(
791 root, self._user_agent_pool, **headers)
795 def _check_loop_result(result):
796 # KeepClient RetryLoops should save results as a 2-tuple: the
797 # actual result of the request, and the number of servers available
798 # to receive the request this round.
799 # This method returns True if there's a real result, False if
800 # there are no more servers available, otherwise None.
801 if isinstance(result, Exception):
803 result, tried_server_count = result
804 if (result is not None) and (result is not False):
806 elif tried_server_count < 1:
807 _logger.info("No more Keep services to try; giving up")
812 def get_from_cache(self, loc):
813 """Fetch a block only if is in the cache, otherwise return None."""
814 slot = self.block_cache.get(loc)
815 if slot is not None and slot.ready.is_set():
821 def get(self, loc_s, num_retries=None):
822 """Get data from Keep.
824 This method fetches one or more blocks of data from Keep. It
825 sends a request each Keep service registered with the API
826 server (or the proxy provided when this client was
827 instantiated), then each service named in location hints, in
828 sequence. As soon as one service provides the data, it's
832 * loc_s: A string of one or more comma-separated locators to fetch.
833 This method returns the concatenation of these blocks.
834 * num_retries: The number of times to retry GET requests to
835 *each* Keep server if it returns temporary failures, with
836 exponential backoff. Note that, in each loop, the method may try
837 to fetch data from every available Keep service, along with any
838 that are named in location hints in the locator. The default value
839 is set when the KeepClient is initialized.
842 return ''.join(self.get(x) for x in loc_s.split(','))
843 locator = KeepLocator(loc_s)
844 slot, first = self.block_cache.reserve_cache(locator.md5sum)
849 # If the locator has hints specifying a prefix (indicating a
850 # remote keepproxy) or the UUID of a local gateway service,
851 # read data from the indicated service(s) instead of the usual
852 # list of local disk services.
853 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
854 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
855 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
856 for hint in locator.hints if (
857 hint.startswith('K@') and
859 self._gateway_services.get(hint[2:])
861 # Map root URLs to their KeepService objects.
863 root: self.KeepService(root, self._user_agent_pool)
864 for root in hint_roots
867 # See #3147 for a discussion of the loop implementation. Highlights:
868 # * Refresh the list of Keep services after each failure, in case
869 # it's being updated.
870 # * Retry until we succeed, we're out of retries, or every available
871 # service has returned permanent failure.
875 loop = retry.RetryLoop(num_retries, self._check_loop_result,
877 for tries_left in loop:
879 sorted_roots = self.map_new_services(
881 force_rebuild=(tries_left < num_retries),
883 except Exception as error:
884 loop.save_result(error)
887 # Query KeepService objects that haven't returned
888 # permanent failure, in our specified shuffle order.
889 services_to_try = [roots_map[root]
890 for root in sorted_roots
891 if roots_map[root].usable()]
892 for keep_service in services_to_try:
893 blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
896 loop.save_result((blob, len(services_to_try)))
898 # Always cache the result, then return it if we succeeded.
900 self.block_cache.cap_cache()
904 # Q: Including 403 is necessary for the Keep tests to continue
905 # passing, but maybe they should expect KeepReadError instead?
906 not_founds = sum(1 for key in sorted_roots
907 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
908 service_errors = ((key, roots_map[key].last_result()['error'])
909 for key in sorted_roots)
911 raise arvados.errors.KeepReadError(
912 "failed to read {}: no Keep services available ({})".format(
913 loc_s, loop.last_result()))
914 elif not_founds == len(sorted_roots):
915 raise arvados.errors.NotFoundError(
916 "{} not found".format(loc_s), service_errors)
918 raise arvados.errors.KeepReadError(
919 "failed to read {}".format(loc_s), service_errors, label="service")
922 def put(self, data, copies=2, num_retries=None):
923 """Save data in Keep.
925 This method will get a list of Keep services from the API server, and
926 send the data to each one simultaneously in a new thread. Once the
927 uploads are finished, if enough copies are saved, this method returns
928 the most recent HTTP response body. If requests fail to upload
929 enough copies, this method raises KeepWriteError.
932 * data: The string of data to upload.
933 * copies: The number of copies that the user requires be saved.
935 * num_retries: The number of times to retry PUT requests to
936 *each* Keep server if it returns temporary failures, with
937 exponential backoff. The default value is set when the
938 KeepClient is initialized.
941 if isinstance(data, unicode):
942 data = data.encode("ascii")
943 elif not isinstance(data, str):
944 raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
946 data_hash = hashlib.md5(data).hexdigest()
947 loc_s = data_hash + '+' + str(len(data))
950 locator = KeepLocator(loc_s)
953 # Tell the proxy how many copies we want it to store
954 headers['X-Keep-Desired-Replication'] = str(copies)
956 thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
957 loop = retry.RetryLoop(num_retries, self._check_loop_result,
960 for tries_left in loop:
962 sorted_roots = self.map_new_services(
964 force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
965 except Exception as error:
966 loop.save_result(error)
970 for service_root, ks in [(root, roots_map[root])
971 for root in sorted_roots]:
974 t = KeepClient.KeepWriterThread(
978 service_root=service_root,
979 thread_limiter=thread_limiter,
980 timeout=self.current_timeout(num_retries-tries_left),
981 thread_sequence=thread_sequence)
987 loop.save_result((thread_limiter.done() >= copies, len(threads)))
990 return thread_limiter.response()
992 raise arvados.errors.KeepWriteError(
993 "failed to write {}: no Keep services available ({})".format(
994 data_hash, loop.last_result()))
996 service_errors = ((key, roots_map[key].last_result()['error'])
997 for key in sorted_roots
998 if roots_map[key].last_result()['error'])
999 raise arvados.errors.KeepWriteError(
1000 "failed to write {} (wanted {} copies but wrote {})".format(
1001 data_hash, copies, thread_limiter.done()), service_errors, label="service")
1003 def local_store_put(self, data, copies=1, num_retries=None):
1004 """A stub for put().
1006 This method is used in place of the real put() method when
1007 using local storage (see constructor's local_store argument).
1009 copies and num_retries arguments are ignored: they are here
1010 only for the sake of offering the same call signature as
1013 Data stored this way can be retrieved via local_store_get().
1015 md5 = hashlib.md5(data).hexdigest()
1016 locator = '%s+%d' % (md5, len(data))
1017 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1019 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1020 os.path.join(self.local_store, md5))
1023 def local_store_get(self, loc_s, num_retries=None):
1024 """Companion to local_store_put()."""
1026 locator = KeepLocator(loc_s)
1028 raise arvados.errors.NotFoundError(
1029 "Invalid data locator: '%s'" % loc_s)
1030 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1032 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1035 def is_cached(self, locator):
1036 return self.block_cache.reserve_cache(expect_hash)