15 import arvados.config as config
17 import arvados.retry as retry
20 _logger = logging.getLogger('arvados.keep')
21 global_client_object = None
24 class KeepLocator(object):
25 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
26 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
28 def __init__(self, locator_str):
31 self._perm_expiry = None
32 pieces = iter(locator_str.split('+'))
33 self.md5sum = next(pieces)
35 self.size = int(next(pieces))
39 if self.HINT_RE.match(hint) is None:
40 raise ValueError("invalid hint format: {}".format(hint))
41 elif hint.startswith('A'):
42 self.parse_permission_hint(hint)
44 self.hints.append(hint)
48 str(s) for s in [self.md5sum, self.size,
49 self.permission_hint()] + self.hints
53 if self.size is not None:
54 return "%s+%i" % (self.md5sum, self.size)
58 def _make_hex_prop(name, length):
59 # Build and return a new property with the given name that
60 # must be a hex string of the given length.
61 data_name = '_{}'.format(name)
63 return getattr(self, data_name)
64 def setter(self, hex_str):
65 if not arvados.util.is_hex(hex_str, length):
66 raise ValueError("{} is not a {}-digit hex string: {}".
67 format(name, length, hex_str))
68 setattr(self, data_name, hex_str)
69 return property(getter, setter)
71 md5sum = _make_hex_prop('md5sum', 32)
72 perm_sig = _make_hex_prop('perm_sig', 40)
75 def perm_expiry(self):
76 return self._perm_expiry
79 def perm_expiry(self, value):
80 if not arvados.util.is_hex(value, 1, 8):
82 "permission timestamp must be a hex Unix timestamp: {}".
84 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
86 def permission_hint(self):
87 data = [self.perm_sig, self.perm_expiry]
90 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
91 return "A{}@{:08x}".format(*data)
93 def parse_permission_hint(self, s):
95 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
97 raise ValueError("bad permission hint {}".format(s))
99 def permission_expired(self, as_of_dt=None):
100 if self.perm_expiry is None:
102 elif as_of_dt is None:
103 as_of_dt = datetime.datetime.now()
104 return self.perm_expiry <= as_of_dt
108 """Simple interface to a global KeepClient object.
110 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
111 own API client. The global KeepClient will build an API client from the
112 current Arvados configuration, which may not match the one you built.
117 def global_client_object(cls):
118 global global_client_object
119 # Previously, KeepClient would change its behavior at runtime based
120 # on these configuration settings. We simulate that behavior here
121 # by checking the values and returning a new KeepClient if any of
123 key = (config.get('ARVADOS_API_HOST'),
124 config.get('ARVADOS_API_TOKEN'),
125 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
126 config.get('ARVADOS_KEEP_PROXY'),
127 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
128 os.environ.get('KEEP_LOCAL_STORE'))
129 if (global_client_object is None) or (cls._last_key != key):
130 global_client_object = KeepClient()
132 return global_client_object
135 def get(locator, **kwargs):
136 return Keep.global_client_object().get(locator, **kwargs)
139 def put(data, **kwargs):
140 return Keep.global_client_object().put(data, **kwargs)
142 class KeepBlockCache(object):
143 # Default RAM cache is 256MiB
144 def __init__(self, cache_max=(256 * 1024 * 1024)):
145 self.cache_max = cache_max
147 self._cache_lock = threading.Lock()
149 class CacheSlot(object):
150 def __init__(self, locator):
151 self.locator = locator
152 self.ready = threading.Event()
159 def set(self, value):
164 if self.content is None:
167 return len(self.content)
170 '''Cap the cache size to self.cache_max'''
171 with self._cache_lock:
172 # Select all slots except those where ready.is_set() and content is
173 # None (that means there was an error reading the block).
174 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
175 sm = sum([slot.size() for slot in self._cache])
176 while len(self._cache) > 0 and sm > self.cache_max:
177 for i in xrange(len(self._cache)-1, -1, -1):
178 if self._cache[i].ready.is_set():
181 sm = sum([slot.size() for slot in self._cache])
183 def _get(self, locator):
184 # Test if the locator is already in the cache
185 for i in xrange(0, len(self._cache)):
186 if self._cache[i].locator == locator:
189 # move it to the front
191 self._cache.insert(0, n)
195 def get(self, locator):
196 with self._cache_lock:
197 return self._get(locator)
199 def reserve_cache(self, locator):
200 '''Reserve a cache slot for the specified locator,
201 or return the existing slot.'''
202 with self._cache_lock:
203 n = self._get(locator)
207 # Add a new cache slot for the locator
208 n = KeepBlockCache.CacheSlot(locator)
209 self._cache.insert(0, n)
212 class KeepClient(object):
214 # Default Keep server connection timeout: 2 seconds
215 # Default Keep server read timeout: 300 seconds
216 # Default Keep proxy connection timeout: 20 seconds
217 # Default Keep proxy read timeout: 300 seconds
218 DEFAULT_TIMEOUT = (2, 300)
219 DEFAULT_PROXY_TIMEOUT = (20, 300)
221 class ThreadLimiter(object):
223 Limit the number of threads running at a given time to
224 {desired successes} minus {successes reported}. When successes
225 reported == desired, wake up the remaining threads and tell
228 Should be used in a "with" block.
230 def __init__(self, todo):
234 self._response = None
235 self._start_lock = threading.Condition()
236 self._todo_lock = threading.Semaphore(todo)
237 self._done_lock = threading.Lock()
238 self._local = threading.local()
241 self._start_lock.acquire()
242 if getattr(self._local, 'sequence', None) is not None:
243 # If the calling thread has used set_sequence(N), then
244 # we wait here until N other threads have started.
245 while self._started < self._local.sequence:
246 self._start_lock.wait()
247 self._todo_lock.acquire()
249 self._start_lock.notifyAll()
250 self._start_lock.release()
253 def __exit__(self, type, value, traceback):
254 self._todo_lock.release()
256 def set_sequence(self, sequence):
257 self._local.sequence = sequence
259 def shall_i_proceed(self):
261 Return true if the current thread should do stuff. Return
262 false if the current thread should just stop.
264 with self._done_lock:
265 return (self._done < self._todo)
267 def save_response(self, response_body, replicas_stored):
269 Records a response body (a locator, possibly signed) returned by
270 the Keep server. It is not necessary to save more than
271 one response, since we presume that any locator returned
272 in response to a successful request is valid.
274 with self._done_lock:
275 self._done += replicas_stored
276 self._response = response_body
280 Returns the body from the response to a PUT request.
282 with self._done_lock:
283 return self._response
287 Return how many successes were reported.
289 with self._done_lock:
293 class KeepService(object):
294 """Make requests to a single Keep service, and track results.
296 A KeepService is intended to last long enough to perform one
297 transaction (GET or PUT) against one Keep service. This can
298 involve calling either get() or put() multiple times in order
299 to retry after transient failures. However, calling both get()
300 and put() on a single instance -- or using the same instance
301 to access two different Keep services -- will not produce
308 arvados.errors.HttpError,
311 def __init__(self, root, user_agent_pool=Queue.LifoQueue(), **headers):
313 self._user_agent_pool = user_agent_pool
314 self._result = {'error': None}
317 self.get_headers = {'Accept': 'application/octet-stream'}
318 self.get_headers.update(headers)
319 self.put_headers = headers
322 """Is it worth attempting a request?"""
326 """Did the request succeed or encounter permanent failure?"""
327 return self._result['error'] == False or not self._usable
329 def last_result(self):
332 def _get_user_agent(self):
334 return self._user_agent_pool.get(False)
338 def _put_user_agent(self, ua):
341 self._user_agent_pool.put(ua, False)
346 def _socket_open(family, socktype, protocol, address=None):
347 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
348 s = socket.socket(family, socktype, protocol)
349 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
350 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
351 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
354 def get(self, locator, timeout=None):
355 # locator is a KeepLocator object.
356 url = self.root + str(locator)
357 _logger.debug("Request: GET %s", url)
358 curl = self._get_user_agent()
360 with timer.Timer() as t:
362 response_body = cStringIO.StringIO()
363 curl.setopt(pycurl.NOSIGNAL, 1)
364 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
365 curl.setopt(pycurl.URL, url.encode('utf-8'))
366 curl.setopt(pycurl.HTTPHEADER, [
367 '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
368 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
369 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
370 self._setcurltimeouts(curl, timeout)
373 except Exception as e:
374 raise arvados.errors.HttpError(0, str(e))
376 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
377 'body': response_body.getvalue(),
378 'headers': self._headers,
381 ok = retry.check_http_response_success(self._result['status_code'])
383 self._result['error'] = arvados.errors.HttpError(
384 self._result['status_code'],
385 self._headers.get('x-status-line', 'Error'))
386 except self.HTTP_ERRORS as e:
391 self._usable = ok != False
392 if self._result.get('status_code', None):
393 # The client worked well enough to get an HTTP status
394 # code, so presumably any problems are just on the
395 # server side and it's OK to reuse the client.
396 self._put_user_agent(curl)
398 # Don't return this client to the pool, in case it's
402 _logger.debug("Request fail: GET %s => %s: %s",
403 url, type(self._result['error']), str(self._result['error']))
405 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
406 self._result['status_code'],
407 len(self._result['body']),
409 (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
410 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
411 if resp_md5 != locator.md5sum:
412 _logger.warning("Checksum fail: md5(%s) = %s",
414 self._result['error'] = arvados.errors.HttpError(
417 return self._result['body']
419 def put(self, hash_s, body, timeout=None):
420 url = self.root + hash_s
421 _logger.debug("Request: PUT %s", url)
422 curl = self._get_user_agent()
425 body_reader = cStringIO.StringIO(body)
426 response_body = cStringIO.StringIO()
427 curl.setopt(pycurl.NOSIGNAL, 1)
428 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
429 curl.setopt(pycurl.URL, url.encode('utf-8'))
430 # Using UPLOAD tells cURL to wait for a "go ahead" from the
431 # Keep server (in the form of a HTTP/1.1 "100 Continue"
432 # response) instead of sending the request body immediately.
433 # This allows the server to reject the request if the request
434 # is invalid or the server is read-only, without waiting for
435 # the client to send the entire block.
436 curl.setopt(pycurl.UPLOAD, True)
437 curl.setopt(pycurl.INFILESIZE, len(body))
438 curl.setopt(pycurl.READFUNCTION, body_reader.read)
439 curl.setopt(pycurl.HTTPHEADER, [
440 '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
441 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
442 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
443 self._setcurltimeouts(curl, timeout)
446 except Exception as e:
447 raise arvados.errors.HttpError(0, str(e))
449 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
450 'body': response_body.getvalue(),
451 'headers': self._headers,
454 ok = retry.check_http_response_success(self._result['status_code'])
456 self._result['error'] = arvados.errors.HttpError(
457 self._result['status_code'],
458 self._headers.get('x-status-line', 'Error'))
459 except self.HTTP_ERRORS as e:
464 self._usable = ok != False # still usable if ok is True or None
465 if self._result.get('status_code', None):
466 # Client is functional. See comment in get().
467 self._put_user_agent(curl)
471 _logger.debug("Request fail: PUT %s => %s: %s",
472 url, type(self._result['error']), str(self._result['error']))
476 def _setcurltimeouts(self, curl, timeouts):
479 elif isinstance(timeouts, tuple):
480 conn_t, xfer_t = timeouts
482 conn_t, xfer_t = (timeouts, timeouts)
483 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
484 curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
486 def _headerfunction(self, header_line):
487 header_line = header_line.decode('iso-8859-1')
488 if ':' in header_line:
489 name, value = header_line.split(':', 1)
490 name = name.strip().lower()
491 value = value.strip()
493 name = self._lastheadername
494 value = self._headers[name] + ' ' + header_line.strip()
495 elif header_line.startswith('HTTP/'):
496 name = 'x-status-line'
499 _logger.error("Unexpected header line: %s", header_line)
501 self._lastheadername = name
502 self._headers[name] = value
503 # Returning None implies all bytes were written
506 class KeepWriterThread(threading.Thread):
508 Write a blob of data to the given Keep server. On success, call
509 save_response() of the given ThreadLimiter to save the returned
512 def __init__(self, keep_service, **kwargs):
513 super(KeepClient.KeepWriterThread, self).__init__()
514 self.service = keep_service
516 self._success = False
522 limiter = self.args['thread_limiter']
523 sequence = self.args['thread_sequence']
524 if sequence is not None:
525 limiter.set_sequence(sequence)
527 if not limiter.shall_i_proceed():
528 # My turn arrived, but the job has been done without
531 self.run_with_limiter(limiter)
533 def run_with_limiter(self, limiter):
534 if self.service.finished():
536 _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
537 str(threading.current_thread()),
538 self.args['data_hash'],
539 len(self.args['data']),
540 self.args['service_root'])
541 self._success = bool(self.service.put(
542 self.args['data_hash'],
544 timeout=self.args.get('timeout', None)))
545 result = self.service.last_result()
547 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
548 str(threading.current_thread()),
549 self.args['data_hash'],
550 len(self.args['data']),
551 self.args['service_root'])
552 # Tick the 'done' counter for the number of replica
553 # reported stored by the server, for the case that
554 # we're talking to a proxy or other backend that
555 # stores to multiple copies for us.
557 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
558 except (KeyError, ValueError):
560 limiter.save_response(result['body'].strip(), replicas_stored)
561 elif result.get('status_code', None):
562 _logger.debug("Request fail: PUT %s => %s %s",
563 self.args['data_hash'],
564 result['status_code'],
568 def __init__(self, api_client=None, proxy=None,
569 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
570 api_token=None, local_store=None, block_cache=None,
571 num_retries=0, session=None):
572 """Initialize a new KeepClient.
576 The API client to use to find Keep services. If not
577 provided, KeepClient will build one from available Arvados
581 If specified, this KeepClient will send requests to this Keep
582 proxy. Otherwise, KeepClient will fall back to the setting of the
583 ARVADOS_KEEP_PROXY configuration setting. If you want to ensure
584 KeepClient does not use a proxy, pass in an empty string.
587 The initial timeout (in seconds) for HTTP requests to Keep
588 non-proxy servers. A tuple of two floats is interpreted as
589 (connection_timeout, read_timeout): see
590 http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
591 Because timeouts are often a result of transient server load, the
592 actual connection timeout will be increased by a factor of two on
597 The initial timeout (in seconds) for HTTP requests to
598 Keep proxies. A tuple of two floats is interpreted as
599 (connection_timeout, read_timeout). The behavior described
600 above for adjusting connection timeouts on retry also applies.
604 If you're not using an API client, but only talking
605 directly to a Keep proxy, this parameter specifies an API token
606 to authenticate Keep requests. It is an error to specify both
607 api_client and api_token. If you specify neither, KeepClient
608 will use one available from the Arvados configuration.
611 If specified, this KeepClient will bypass Keep
612 services, and save data to the named directory. If unspecified,
613 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
614 environment variable. If you want to ensure KeepClient does not
615 use local storage, pass in an empty string. This is primarily
616 intended to mock a server for testing.
619 The default number of times to retry failed requests.
620 This will be used as the default num_retries value when get() and
621 put() are called. Default 0.
623 self.lock = threading.Lock()
625 proxy = config.get('ARVADOS_KEEP_PROXY')
626 if api_token is None:
627 if api_client is None:
628 api_token = config.get('ARVADOS_API_TOKEN')
630 api_token = api_client.api_token
631 elif api_client is not None:
633 "can't build KeepClient with both API client and token")
634 if local_store is None:
635 local_store = os.environ.get('KEEP_LOCAL_STORE')
637 self.block_cache = block_cache if block_cache else KeepBlockCache()
638 self.timeout = timeout
639 self.proxy_timeout = proxy_timeout
640 self._user_agent_pool = Queue.LifoQueue()
643 self.local_store = local_store
644 self.get = self.local_store_get
645 self.put = self.local_store_put
647 self.num_retries = num_retries
649 if not proxy.endswith('/'):
651 self.api_token = api_token
652 self._gateway_services = {}
653 self._keep_services = [{
655 'service_type': 'proxy',
656 '_service_root': proxy,
658 self._writable_services = self._keep_services
659 self.using_proxy = True
660 self._static_services_list = True
661 self.max_replicas_per_service = None
663 # It's important to avoid instantiating an API client
664 # unless we actually need one, for testing's sake.
665 if api_client is None:
666 api_client = arvados.api('v1')
667 self.api_client = api_client
668 self.api_token = api_client.api_token
669 self._gateway_services = {}
670 self._keep_services = None
671 self._writable_services = None
672 self.using_proxy = None
673 self._static_services_list = False
674 self.max_replicas_per_service = 1
676 def current_timeout(self, attempt_number):
677 """Return the appropriate timeout to use for this client.
679 The proxy timeout setting if the backend service is currently a proxy,
680 the regular timeout setting otherwise. The `attempt_number` indicates
681 how many times the operation has been tried already (starting from 0
682 for the first try), and scales the connection timeout portion of the
683 return value accordingly.
686 # TODO(twp): the timeout should be a property of a
687 # KeepService, not a KeepClient. See #4488.
688 t = self.proxy_timeout if self.using_proxy else self.timeout
689 return (t[0] * (1 << attempt_number), t[1])
691 def _any_nondisk_services(self, service_list):
692 return any(ks.get('service_type', 'disk') != 'disk'
693 for ks in service_list)
695 def build_services_list(self, force_rebuild=False):
696 if (self._static_services_list or
697 (self._keep_services and not force_rebuild)):
701 keep_services = self.api_client.keep_services().accessible()
702 except Exception: # API server predates Keep services.
703 keep_services = self.api_client.keep_disks().list()
705 # Gateway services are only used when specified by UUID,
706 # so there's nothing to gain by filtering them by
708 self._gateway_services = {ks['uuid']: ks for ks in
709 keep_services.execute()['items']}
710 if not self._gateway_services:
711 raise arvados.errors.NoKeepServersError()
713 # Precompute the base URI for each service.
714 for r in self._gateway_services.itervalues():
715 host = r['service_host']
716 if not host.startswith('[') and host.find(':') >= 0:
717 # IPv6 URIs must be formatted like http://[::1]:80/...
718 host = '[' + host + ']'
719 r['_service_root'] = "{}://{}:{:d}/".format(
720 'https' if r['service_ssl_flag'] else 'http',
724 _logger.debug(str(self._gateway_services))
725 self._keep_services = [
726 ks for ks in self._gateway_services.itervalues()
727 if not ks.get('service_type', '').startswith('gateway:')]
728 self._writable_services = [ks for ks in self._keep_services
729 if not ks.get('read_only')]
731 # For disk type services, max_replicas_per_service is 1
732 # It is unknown (unlimited) for other service types.
733 if self._any_nondisk_services(self._writable_services):
734 self.max_replicas_per_service = None
736 self.max_replicas_per_service = 1
738 def _service_weight(self, data_hash, service_uuid):
739 """Compute the weight of a Keep service endpoint for a data
740 block with a known hash.
742 The weight is md5(h + u) where u is the last 15 characters of
743 the service endpoint's UUID.
745 return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
747 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
748 """Return an array of Keep service endpoints, in the order in
749 which they should be probed when reading or writing data with
750 the given hash+hints.
752 self.build_services_list(force_rebuild)
755 # Use the services indicated by the given +K@... remote
756 # service hints, if any are present and can be resolved to a
758 for hint in locator.hints:
759 if hint.startswith('K@'):
762 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
763 elif len(hint) == 29:
764 svc = self._gateway_services.get(hint[2:])
766 sorted_roots.append(svc['_service_root'])
768 # Sort the available local services by weight (heaviest first)
769 # for this locator, and return their service_roots (base URIs)
771 use_services = self._keep_services
773 use_services = self._writable_services
774 self.using_proxy = self._any_nondisk_services(use_services)
775 sorted_roots.extend([
776 svc['_service_root'] for svc in sorted(
779 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
780 _logger.debug("{}: {}".format(locator, sorted_roots))
783 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
784 # roots_map is a dictionary, mapping Keep service root strings
785 # to KeepService objects. Poll for Keep services, and add any
786 # new ones to roots_map. Return the current list of local
788 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
789 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
790 for root in local_roots:
791 if root not in roots_map:
792 roots_map[root] = self.KeepService(
793 root, self._user_agent_pool, **headers)
797 def _check_loop_result(result):
798 # KeepClient RetryLoops should save results as a 2-tuple: the
799 # actual result of the request, and the number of servers available
800 # to receive the request this round.
801 # This method returns True if there's a real result, False if
802 # there are no more servers available, otherwise None.
803 if isinstance(result, Exception):
805 result, tried_server_count = result
806 if (result is not None) and (result is not False):
808 elif tried_server_count < 1:
809 _logger.info("No more Keep services to try; giving up")
814 def get_from_cache(self, loc):
815 """Fetch a block only if is in the cache, otherwise return None."""
816 slot = self.block_cache.get(loc)
817 if slot is not None and slot.ready.is_set():
823 def get(self, loc_s, num_retries=None):
824 """Get data from Keep.
826 This method fetches one or more blocks of data from Keep. It
827 sends a request each Keep service registered with the API
828 server (or the proxy provided when this client was
829 instantiated), then each service named in location hints, in
830 sequence. As soon as one service provides the data, it's
834 * loc_s: A string of one or more comma-separated locators to fetch.
835 This method returns the concatenation of these blocks.
836 * num_retries: The number of times to retry GET requests to
837 *each* Keep server if it returns temporary failures, with
838 exponential backoff. Note that, in each loop, the method may try
839 to fetch data from every available Keep service, along with any
840 that are named in location hints in the locator. The default value
841 is set when the KeepClient is initialized.
844 return ''.join(self.get(x) for x in loc_s.split(','))
845 locator = KeepLocator(loc_s)
846 slot, first = self.block_cache.reserve_cache(locator.md5sum)
851 # If the locator has hints specifying a prefix (indicating a
852 # remote keepproxy) or the UUID of a local gateway service,
853 # read data from the indicated service(s) instead of the usual
854 # list of local disk services.
855 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
856 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
857 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
858 for hint in locator.hints if (
859 hint.startswith('K@') and
861 self._gateway_services.get(hint[2:])
863 # Map root URLs to their KeepService objects.
865 root: self.KeepService(root, self._user_agent_pool)
866 for root in hint_roots
869 # See #3147 for a discussion of the loop implementation. Highlights:
870 # * Refresh the list of Keep services after each failure, in case
871 # it's being updated.
872 # * Retry until we succeed, we're out of retries, or every available
873 # service has returned permanent failure.
877 loop = retry.RetryLoop(num_retries, self._check_loop_result,
879 for tries_left in loop:
881 sorted_roots = self.map_new_services(
883 force_rebuild=(tries_left < num_retries),
885 except Exception as error:
886 loop.save_result(error)
889 # Query KeepService objects that haven't returned
890 # permanent failure, in our specified shuffle order.
891 services_to_try = [roots_map[root]
892 for root in sorted_roots
893 if roots_map[root].usable()]
894 for keep_service in services_to_try:
895 blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
898 loop.save_result((blob, len(services_to_try)))
900 # Always cache the result, then return it if we succeeded.
902 self.block_cache.cap_cache()
906 # Q: Including 403 is necessary for the Keep tests to continue
907 # passing, but maybe they should expect KeepReadError instead?
908 not_founds = sum(1 for key in sorted_roots
909 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
910 service_errors = ((key, roots_map[key].last_result()['error'])
911 for key in sorted_roots)
913 raise arvados.errors.KeepReadError(
914 "failed to read {}: no Keep services available ({})".format(
915 loc_s, loop.last_result()))
916 elif not_founds == len(sorted_roots):
917 raise arvados.errors.NotFoundError(
918 "{} not found".format(loc_s), service_errors)
920 raise arvados.errors.KeepReadError(
921 "failed to read {}".format(loc_s), service_errors, label="service")
924 def put(self, data, copies=2, num_retries=None):
925 """Save data in Keep.
927 This method will get a list of Keep services from the API server, and
928 send the data to each one simultaneously in a new thread. Once the
929 uploads are finished, if enough copies are saved, this method returns
930 the most recent HTTP response body. If requests fail to upload
931 enough copies, this method raises KeepWriteError.
934 * data: The string of data to upload.
935 * copies: The number of copies that the user requires be saved.
937 * num_retries: The number of times to retry PUT requests to
938 *each* Keep server if it returns temporary failures, with
939 exponential backoff. The default value is set when the
940 KeepClient is initialized.
943 if isinstance(data, unicode):
944 data = data.encode("ascii")
945 elif not isinstance(data, str):
946 raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
948 data_hash = hashlib.md5(data).hexdigest()
949 loc_s = data_hash + '+' + str(len(data))
952 locator = KeepLocator(loc_s)
955 # Tell the proxy how many copies we want it to store
956 headers['X-Keep-Desired-Replication'] = str(copies)
958 thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
959 loop = retry.RetryLoop(num_retries, self._check_loop_result,
962 for tries_left in loop:
964 sorted_roots = self.map_new_services(
966 force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
967 except Exception as error:
968 loop.save_result(error)
972 for service_root, ks in [(root, roots_map[root])
973 for root in sorted_roots]:
976 t = KeepClient.KeepWriterThread(
980 service_root=service_root,
981 thread_limiter=thread_limiter,
982 timeout=self.current_timeout(num_retries-tries_left),
983 thread_sequence=thread_sequence)
989 loop.save_result((thread_limiter.done() >= copies, len(threads)))
992 return thread_limiter.response()
994 raise arvados.errors.KeepWriteError(
995 "failed to write {}: no Keep services available ({})".format(
996 data_hash, loop.last_result()))
998 service_errors = ((key, roots_map[key].last_result()['error'])
999 for key in sorted_roots
1000 if roots_map[key].last_result()['error'])
1001 raise arvados.errors.KeepWriteError(
1002 "failed to write {} (wanted {} copies but wrote {})".format(
1003 data_hash, copies, thread_limiter.done()), service_errors, label="service")
1005 def local_store_put(self, data, copies=1, num_retries=None):
1006 """A stub for put().
1008 This method is used in place of the real put() method when
1009 using local storage (see constructor's local_store argument).
1011 copies and num_retries arguments are ignored: they are here
1012 only for the sake of offering the same call signature as
1015 Data stored this way can be retrieved via local_store_get().
1017 md5 = hashlib.md5(data).hexdigest()
1018 locator = '%s+%d' % (md5, len(data))
1019 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1021 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1022 os.path.join(self.local_store, md5))
1025 def local_store_get(self, loc_s, num_retries=None):
1026 """Companion to local_store_put()."""
1028 locator = KeepLocator(loc_s)
1030 raise arvados.errors.NotFoundError(
1031 "Invalid data locator: '%s'" % loc_s)
1032 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1034 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1037 def is_cached(self, locator):
1038 return self.block_cache.reserve_cache(expect_hash)