28 import arvados.config as config
30 import arvados.retry as retry
33 _logger = logging.getLogger('arvados.keep')
34 global_client_object = None
37 class KeepLocator(object):
38 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
39 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
41 def __init__(self, locator_str):
44 self._perm_expiry = None
45 pieces = iter(locator_str.split('+'))
46 self.md5sum = next(pieces)
48 self.size = int(next(pieces))
52 if self.HINT_RE.match(hint) is None:
53 raise ValueError("invalid hint format: {}".format(hint))
54 elif hint.startswith('A'):
55 self.parse_permission_hint(hint)
57 self.hints.append(hint)
61 str(s) for s in [self.md5sum, self.size,
62 self.permission_hint()] + self.hints
66 if self.size is not None:
67 return "%s+%i" % (self.md5sum, self.size)
71 def _make_hex_prop(name, length):
72 # Build and return a new property with the given name that
73 # must be a hex string of the given length.
74 data_name = '_{}'.format(name)
76 return getattr(self, data_name)
77 def setter(self, hex_str):
78 if not arvados.util.is_hex(hex_str, length):
79 raise ValueError("{} is not a {}-digit hex string: {}".
80 format(name, length, hex_str))
81 setattr(self, data_name, hex_str)
82 return property(getter, setter)
84 md5sum = _make_hex_prop('md5sum', 32)
85 perm_sig = _make_hex_prop('perm_sig', 40)
88 def perm_expiry(self):
89 return self._perm_expiry
92 def perm_expiry(self, value):
93 if not arvados.util.is_hex(value, 1, 8):
95 "permission timestamp must be a hex Unix timestamp: {}".
97 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
99 def permission_hint(self):
100 data = [self.perm_sig, self.perm_expiry]
103 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
104 return "A{}@{:08x}".format(*data)
106 def parse_permission_hint(self, s):
108 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
110 raise ValueError("bad permission hint {}".format(s))
112 def permission_expired(self, as_of_dt=None):
113 if self.perm_expiry is None:
115 elif as_of_dt is None:
116 as_of_dt = datetime.datetime.now()
117 return self.perm_expiry <= as_of_dt
121 """Simple interface to a global KeepClient object.
123 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
124 own API client. The global KeepClient will build an API client from the
125 current Arvados configuration, which may not match the one you built.
130 def global_client_object(cls):
131 global global_client_object
132 # Previously, KeepClient would change its behavior at runtime based
133 # on these configuration settings. We simulate that behavior here
134 # by checking the values and returning a new KeepClient if any of
136 key = (config.get('ARVADOS_API_HOST'),
137 config.get('ARVADOS_API_TOKEN'),
138 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
139 config.get('ARVADOS_KEEP_PROXY'),
140 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
141 os.environ.get('KEEP_LOCAL_STORE'))
142 if (global_client_object is None) or (cls._last_key != key):
143 global_client_object = KeepClient()
145 return global_client_object
148 def get(locator, **kwargs):
149 return Keep.global_client_object().get(locator, **kwargs)
152 def put(data, **kwargs):
153 return Keep.global_client_object().put(data, **kwargs)
155 class KeepBlockCache(object):
156 # Default RAM cache is 256MiB
157 def __init__(self, cache_max=(256 * 1024 * 1024)):
158 self.cache_max = cache_max
160 self._cache_lock = threading.Lock()
162 class CacheSlot(object):
163 def __init__(self, locator):
164 self.locator = locator
165 self.ready = threading.Event()
172 def set(self, value):
177 if self.content is None:
180 return len(self.content)
183 '''Cap the cache size to self.cache_max'''
184 with self._cache_lock:
185 # Select all slots except those where ready.is_set() and content is
186 # None (that means there was an error reading the block).
187 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
188 sm = sum([slot.size() for slot in self._cache])
189 while len(self._cache) > 0 and sm > self.cache_max:
190 for i in xrange(len(self._cache)-1, -1, -1):
191 if self._cache[i].ready.is_set():
194 sm = sum([slot.size() for slot in self._cache])
196 def _get(self, locator):
197 # Test if the locator is already in the cache
198 for i in xrange(0, len(self._cache)):
199 if self._cache[i].locator == locator:
202 # move it to the front
204 self._cache.insert(0, n)
208 def get(self, locator):
209 with self._cache_lock:
210 return self._get(locator)
212 def reserve_cache(self, locator):
213 '''Reserve a cache slot for the specified locator,
214 or return the existing slot.'''
215 with self._cache_lock:
216 n = self._get(locator)
220 # Add a new cache slot for the locator
221 n = KeepBlockCache.CacheSlot(locator)
222 self._cache.insert(0, n)
225 class KeepClient(object):
227 # Default Keep server connection timeout: 2 seconds
228 # Default Keep server read timeout: 300 seconds
229 # Default Keep proxy connection timeout: 20 seconds
230 # Default Keep proxy read timeout: 300 seconds
231 DEFAULT_TIMEOUT = (2, 300)
232 DEFAULT_PROXY_TIMEOUT = (20, 300)
234 class ThreadLimiter(object):
236 Limit the number of threads running at a given time to
237 {desired successes} minus {successes reported}. When successes
238 reported == desired, wake up the remaining threads and tell
241 Should be used in a "with" block.
243 def __init__(self, todo):
247 self._response = None
248 self._start_lock = threading.Condition()
249 self._todo_lock = threading.Semaphore(todo)
250 self._done_lock = threading.Lock()
251 self._local = threading.local()
254 self._start_lock.acquire()
255 if getattr(self._local, 'sequence', None) is not None:
256 # If the calling thread has used set_sequence(N), then
257 # we wait here until N other threads have started.
258 while self._started < self._local.sequence:
259 self._start_lock.wait()
260 self._todo_lock.acquire()
262 self._start_lock.notifyAll()
263 self._start_lock.release()
266 def __exit__(self, type, value, traceback):
267 self._todo_lock.release()
269 def set_sequence(self, sequence):
270 self._local.sequence = sequence
272 def shall_i_proceed(self):
274 Return true if the current thread should do stuff. Return
275 false if the current thread should just stop.
277 with self._done_lock:
278 return (self._done < self._todo)
280 def save_response(self, response_body, replicas_stored):
282 Records a response body (a locator, possibly signed) returned by
283 the Keep server. It is not necessary to save more than
284 one response, since we presume that any locator returned
285 in response to a successful request is valid.
287 with self._done_lock:
288 self._done += replicas_stored
289 self._response = response_body
293 Returns the body from the response to a PUT request.
295 with self._done_lock:
296 return self._response
300 Return how many successes were reported.
302 with self._done_lock:
306 class KeepService(object):
307 """Make requests to a single Keep service, and track results.
309 A KeepService is intended to last long enough to perform one
310 transaction (GET or PUT) against one Keep service. This can
311 involve calling either get() or put() multiple times in order
312 to retry after transient failures. However, calling both get()
313 and put() on a single instance -- or using the same instance
314 to access two different Keep services -- will not produce
321 arvados.errors.HttpError,
324 def __init__(self, root, user_agent_pool=Queue.LifoQueue(), **headers):
326 self._user_agent_pool = user_agent_pool
327 self._result = {'error': None}
330 self.get_headers = {'Accept': 'application/octet-stream'}
331 self.get_headers.update(headers)
332 self.put_headers = headers
335 """Is it worth attempting a request?"""
339 """Did the request succeed or encounter permanent failure?"""
340 return self._result['error'] == False or not self._usable
342 def last_result(self):
345 def _get_user_agent(self):
347 return self._user_agent_pool.get(False)
351 def _put_user_agent(self, ua):
354 self._user_agent_pool.put(ua, False)
359 def _socket_open(family, socktype, protocol, address=None):
360 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
361 s = socket.socket(family, socktype, protocol)
362 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
363 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
364 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
367 def get(self, locator, timeout=None):
368 # locator is a KeepLocator object.
369 url = self.root + str(locator)
370 _logger.debug("Request: GET %s", url)
371 curl = self._get_user_agent()
373 with timer.Timer() as t:
375 response_body = cStringIO.StringIO()
376 curl.setopt(pycurl.NOSIGNAL, 1)
377 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
378 curl.setopt(pycurl.URL, url.encode('utf-8'))
379 curl.setopt(pycurl.HTTPHEADER, [
380 '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
381 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
382 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
383 self._setcurltimeouts(curl, timeout)
386 except Exception as e:
387 raise arvados.errors.HttpError(0, str(e))
389 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
390 'body': response_body.getvalue(),
391 'headers': self._headers,
394 ok = retry.check_http_response_success(self._result['status_code'])
396 self._result['error'] = arvados.errors.HttpError(
397 self._result['status_code'],
398 self._headers.get('x-status-line', 'Error'))
399 except self.HTTP_ERRORS as e:
404 self._usable = ok != False
405 if self._result.get('status_code', None):
406 # The client worked well enough to get an HTTP status
407 # code, so presumably any problems are just on the
408 # server side and it's OK to reuse the client.
409 self._put_user_agent(curl)
411 # Don't return this client to the pool, in case it's
415 _logger.debug("Request fail: GET %s => %s: %s",
416 url, type(self._result['error']), str(self._result['error']))
418 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
419 self._result['status_code'],
420 len(self._result['body']),
422 (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
423 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
424 if resp_md5 != locator.md5sum:
425 _logger.warning("Checksum fail: md5(%s) = %s",
427 self._result['error'] = arvados.errors.HttpError(
430 return self._result['body']
432 def put(self, hash_s, body, timeout=None):
433 url = self.root + hash_s
434 _logger.debug("Request: PUT %s", url)
435 curl = self._get_user_agent()
438 body_reader = cStringIO.StringIO(body)
439 response_body = cStringIO.StringIO()
440 curl.setopt(pycurl.NOSIGNAL, 1)
441 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
442 curl.setopt(pycurl.URL, url.encode('utf-8'))
443 # Using UPLOAD tells cURL to wait for a "go ahead" from the
444 # Keep server (in the form of a HTTP/1.1 "100 Continue"
445 # response) instead of sending the request body immediately.
446 # This allows the server to reject the request if the request
447 # is invalid or the server is read-only, without waiting for
448 # the client to send the entire block.
449 curl.setopt(pycurl.UPLOAD, True)
450 curl.setopt(pycurl.INFILESIZE, len(body))
451 curl.setopt(pycurl.READFUNCTION, body_reader.read)
452 curl.setopt(pycurl.HTTPHEADER, [
453 '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
454 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
455 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
456 self._setcurltimeouts(curl, timeout)
459 except Exception as e:
460 raise arvados.errors.HttpError(0, str(e))
462 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
463 'body': response_body.getvalue(),
464 'headers': self._headers,
467 ok = retry.check_http_response_success(self._result['status_code'])
469 self._result['error'] = arvados.errors.HttpError(
470 self._result['status_code'],
471 self._headers.get('x-status-line', 'Error'))
472 except self.HTTP_ERRORS as e:
477 self._usable = ok != False # still usable if ok is True or None
478 if self._result.get('status_code', None):
479 # Client is functional. See comment in get().
480 self._put_user_agent(curl)
484 _logger.debug("Request fail: PUT %s => %s: %s",
485 url, type(self._result['error']), str(self._result['error']))
489 def _setcurltimeouts(self, curl, timeouts):
492 elif isinstance(timeouts, tuple):
493 conn_t, xfer_t = timeouts
495 conn_t, xfer_t = (timeouts, timeouts)
496 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
497 curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
499 def _headerfunction(self, header_line):
500 header_line = header_line.decode('iso-8859-1')
501 if ':' in header_line:
502 name, value = header_line.split(':', 1)
503 name = name.strip().lower()
504 value = value.strip()
506 name = self._lastheadername
507 value = self._headers[name] + ' ' + header_line.strip()
508 elif header_line.startswith('HTTP/'):
509 name = 'x-status-line'
512 _logger.error("Unexpected header line: %s", header_line)
514 self._lastheadername = name
515 self._headers[name] = value
516 # Returning None implies all bytes were written
519 class KeepWriterThread(threading.Thread):
521 Write a blob of data to the given Keep server. On success, call
522 save_response() of the given ThreadLimiter to save the returned
525 def __init__(self, keep_service, **kwargs):
526 super(KeepClient.KeepWriterThread, self).__init__()
527 self.service = keep_service
529 self._success = False
535 limiter = self.args['thread_limiter']
536 sequence = self.args['thread_sequence']
537 if sequence is not None:
538 limiter.set_sequence(sequence)
540 if not limiter.shall_i_proceed():
541 # My turn arrived, but the job has been done without
544 self.run_with_limiter(limiter)
546 def run_with_limiter(self, limiter):
547 if self.service.finished():
549 _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
550 str(threading.current_thread()),
551 self.args['data_hash'],
552 len(self.args['data']),
553 self.args['service_root'])
554 self._success = bool(self.service.put(
555 self.args['data_hash'],
557 timeout=self.args.get('timeout', None)))
558 result = self.service.last_result()
560 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
561 str(threading.current_thread()),
562 self.args['data_hash'],
563 len(self.args['data']),
564 self.args['service_root'])
565 # Tick the 'done' counter for the number of replica
566 # reported stored by the server, for the case that
567 # we're talking to a proxy or other backend that
568 # stores to multiple copies for us.
570 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
571 except (KeyError, ValueError):
573 limiter.save_response(result['body'].strip(), replicas_stored)
574 elif result.get('status_code', None):
575 _logger.debug("Request fail: PUT %s => %s %s",
576 self.args['data_hash'],
577 result['status_code'],
581 def __init__(self, api_client=None, proxy=None,
582 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
583 api_token=None, local_store=None, block_cache=None,
584 num_retries=0, session=None):
585 """Initialize a new KeepClient.
589 The API client to use to find Keep services. If not
590 provided, KeepClient will build one from available Arvados
594 If specified, this KeepClient will send requests to this Keep
595 proxy. Otherwise, KeepClient will fall back to the setting of the
596 ARVADOS_KEEP_PROXY configuration setting. If you want to ensure
597 KeepClient does not use a proxy, pass in an empty string.
600 The initial timeout (in seconds) for HTTP requests to Keep
601 non-proxy servers. A tuple of two floats is interpreted as
602 (connection_timeout, read_timeout): see
603 http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
604 Because timeouts are often a result of transient server load, the
605 actual connection timeout will be increased by a factor of two on
610 The initial timeout (in seconds) for HTTP requests to
611 Keep proxies. A tuple of two floats is interpreted as
612 (connection_timeout, read_timeout). The behavior described
613 above for adjusting connection timeouts on retry also applies.
617 If you're not using an API client, but only talking
618 directly to a Keep proxy, this parameter specifies an API token
619 to authenticate Keep requests. It is an error to specify both
620 api_client and api_token. If you specify neither, KeepClient
621 will use one available from the Arvados configuration.
624 If specified, this KeepClient will bypass Keep
625 services, and save data to the named directory. If unspecified,
626 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
627 environment variable. If you want to ensure KeepClient does not
628 use local storage, pass in an empty string. This is primarily
629 intended to mock a server for testing.
632 The default number of times to retry failed requests.
633 This will be used as the default num_retries value when get() and
634 put() are called. Default 0.
636 self.lock = threading.Lock()
638 proxy = config.get('ARVADOS_KEEP_PROXY')
639 if api_token is None:
640 if api_client is None:
641 api_token = config.get('ARVADOS_API_TOKEN')
643 api_token = api_client.api_token
644 elif api_client is not None:
646 "can't build KeepClient with both API client and token")
647 if local_store is None:
648 local_store = os.environ.get('KEEP_LOCAL_STORE')
650 self.block_cache = block_cache if block_cache else KeepBlockCache()
651 self.timeout = timeout
652 self.proxy_timeout = proxy_timeout
653 self._user_agent_pool = Queue.LifoQueue()
656 self.local_store = local_store
657 self.get = self.local_store_get
658 self.put = self.local_store_put
660 self.num_retries = num_retries
662 if not proxy.endswith('/'):
664 self.api_token = api_token
665 self._gateway_services = {}
666 self._keep_services = [{
668 '_service_root': proxy,
670 self._writable_services = self._keep_services
671 self.using_proxy = True
672 self._static_services_list = True
673 self.max_replicas_per_service = 1
675 # It's important to avoid instantiating an API client
676 # unless we actually need one, for testing's sake.
677 if api_client is None:
678 api_client = arvados.api('v1')
679 self.api_client = api_client
680 self.api_token = api_client.api_token
681 self._gateway_services = {}
682 self._keep_services = None
683 self._writable_services = None
684 self.using_proxy = None
685 self._static_services_list = False
686 self.max_replicas_per_service = 1
688 def current_timeout(self, attempt_number):
689 """Return the appropriate timeout to use for this client.
691 The proxy timeout setting if the backend service is currently a proxy,
692 the regular timeout setting otherwise. The `attempt_number` indicates
693 how many times the operation has been tried already (starting from 0
694 for the first try), and scales the connection timeout portion of the
695 return value accordingly.
698 # TODO(twp): the timeout should be a property of a
699 # KeepService, not a KeepClient. See #4488.
700 t = self.proxy_timeout if self.using_proxy else self.timeout
701 return (t[0] * (1 << attempt_number), t[1])
703 def build_services_list(self, force_rebuild=False):
704 if (self._static_services_list or
705 (self._keep_services and not force_rebuild)):
709 keep_services = self.api_client.keep_services().accessible()
710 except Exception: # API server predates Keep services.
711 keep_services = self.api_client.keep_disks().list()
713 accessible = keep_services.execute().get('items')
715 raise arvados.errors.NoKeepServersError()
717 # Precompute the base URI for each service.
719 host = r['service_host']
720 if not host.startswith('[') and host.find(':') >= 0:
721 # IPv6 URIs must be formatted like http://[::1]:80/...
722 host = '[' + host + ']'
723 r['_service_root'] = "{}://{}:{:d}/".format(
724 'https' if r['service_ssl_flag'] else 'http',
728 # Gateway services are only used when specified by UUID,
729 # so there's nothing to gain by filtering them by
731 self._gateway_services = {ks.get('uuid'): ks for ks in accessible}
732 _logger.debug(str(self._gateway_services))
734 self._keep_services = [
735 ks for ks in accessible
736 if ks.get('service_type') in ['disk', 'proxy']]
737 self._writable_services = [
738 ks for ks in accessible
739 if (ks.get('service_type') in ['disk', 'proxy']) and (True != ks.get('read_only'))]
740 _logger.debug(str(self._keep_services))
742 self.using_proxy = any(ks.get('service_type') == 'proxy'
743 for ks in self._keep_services)
744 # For disk type services, max_replicas_per_service is 1
745 # It is unknown or unlimited for non-disk typed services.
746 for ks in accessible:
747 if ('disk' != ks.get('service_type')) and (not ks.get('read_only')):
748 self.max_replicas_per_service = None
750 def _service_weight(self, data_hash, service_uuid):
751 """Compute the weight of a Keep service endpoint for a data
752 block with a known hash.
754 The weight is md5(h + u) where u is the last 15 characters of
755 the service endpoint's UUID.
757 return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
759 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
760 """Return an array of Keep service endpoints, in the order in
761 which they should be probed when reading or writing data with
762 the given hash+hints.
764 self.build_services_list(force_rebuild)
767 # Use the services indicated by the given +K@... remote
768 # service hints, if any are present and can be resolved to a
770 for hint in locator.hints:
771 if hint.startswith('K@'):
774 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
775 elif len(hint) == 29:
776 svc = self._gateway_services.get(hint[2:])
778 sorted_roots.append(svc['_service_root'])
780 # Sort the available local services by weight (heaviest first)
781 # for this locator, and return their service_roots (base URIs)
783 use_services = self._keep_services
785 use_services = self._writable_services
786 sorted_roots.extend([
787 svc['_service_root'] for svc in sorted(
790 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
791 _logger.debug("{}: {}".format(locator, sorted_roots))
794 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
795 # roots_map is a dictionary, mapping Keep service root strings
796 # to KeepService objects. Poll for Keep services, and add any
797 # new ones to roots_map. Return the current list of local
799 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
800 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
801 for root in local_roots:
802 if root not in roots_map:
803 roots_map[root] = self.KeepService(
804 root, self._user_agent_pool, **headers)
808 def _check_loop_result(result):
809 # KeepClient RetryLoops should save results as a 2-tuple: the
810 # actual result of the request, and the number of servers available
811 # to receive the request this round.
812 # This method returns True if there's a real result, False if
813 # there are no more servers available, otherwise None.
814 if isinstance(result, Exception):
816 result, tried_server_count = result
817 if (result is not None) and (result is not False):
819 elif tried_server_count < 1:
820 _logger.info("No more Keep services to try; giving up")
825 def get_from_cache(self, loc):
826 """Fetch a block only if is in the cache, otherwise return None."""
827 slot = self.block_cache.get(loc)
828 if slot is not None and slot.ready.is_set():
834 def get(self, loc_s, num_retries=None):
835 """Get data from Keep.
837 This method fetches one or more blocks of data from Keep. It
838 sends a request each Keep service registered with the API
839 server (or the proxy provided when this client was
840 instantiated), then each service named in location hints, in
841 sequence. As soon as one service provides the data, it's
845 * loc_s: A string of one or more comma-separated locators to fetch.
846 This method returns the concatenation of these blocks.
847 * num_retries: The number of times to retry GET requests to
848 *each* Keep server if it returns temporary failures, with
849 exponential backoff. Note that, in each loop, the method may try
850 to fetch data from every available Keep service, along with any
851 that are named in location hints in the locator. The default value
852 is set when the KeepClient is initialized.
855 return ''.join(self.get(x) for x in loc_s.split(','))
856 locator = KeepLocator(loc_s)
857 slot, first = self.block_cache.reserve_cache(locator.md5sum)
862 # If the locator has hints specifying a prefix (indicating a
863 # remote keepproxy) or the UUID of a local gateway service,
864 # read data from the indicated service(s) instead of the usual
865 # list of local disk services.
866 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
867 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
868 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
869 for hint in locator.hints if (
870 hint.startswith('K@') and
872 self._gateway_services.get(hint[2:])
874 # Map root URLs to their KeepService objects.
876 root: self.KeepService(root, self._user_agent_pool)
877 for root in hint_roots
880 # See #3147 for a discussion of the loop implementation. Highlights:
881 # * Refresh the list of Keep services after each failure, in case
882 # it's being updated.
883 # * Retry until we succeed, we're out of retries, or every available
884 # service has returned permanent failure.
888 loop = retry.RetryLoop(num_retries, self._check_loop_result,
890 for tries_left in loop:
892 sorted_roots = self.map_new_services(
894 force_rebuild=(tries_left < num_retries),
896 except Exception as error:
897 loop.save_result(error)
900 # Query KeepService objects that haven't returned
901 # permanent failure, in our specified shuffle order.
902 services_to_try = [roots_map[root]
903 for root in sorted_roots
904 if roots_map[root].usable()]
905 for keep_service in services_to_try:
906 blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
909 loop.save_result((blob, len(services_to_try)))
911 # Always cache the result, then return it if we succeeded.
913 self.block_cache.cap_cache()
917 # Q: Including 403 is necessary for the Keep tests to continue
918 # passing, but maybe they should expect KeepReadError instead?
919 not_founds = sum(1 for key in sorted_roots
920 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
921 service_errors = ((key, roots_map[key].last_result()['error'])
922 for key in sorted_roots)
924 raise arvados.errors.KeepReadError(
925 "failed to read {}: no Keep services available ({})".format(
926 loc_s, loop.last_result()))
927 elif not_founds == len(sorted_roots):
928 raise arvados.errors.NotFoundError(
929 "{} not found".format(loc_s), service_errors)
931 raise arvados.errors.KeepReadError(
932 "failed to read {}".format(loc_s), service_errors, label="service")
935 def put(self, data, copies=2, num_retries=None):
936 """Save data in Keep.
938 This method will get a list of Keep services from the API server, and
939 send the data to each one simultaneously in a new thread. Once the
940 uploads are finished, if enough copies are saved, this method returns
941 the most recent HTTP response body. If requests fail to upload
942 enough copies, this method raises KeepWriteError.
945 * data: The string of data to upload.
946 * copies: The number of copies that the user requires be saved.
948 * num_retries: The number of times to retry PUT requests to
949 *each* Keep server if it returns temporary failures, with
950 exponential backoff. The default value is set when the
951 KeepClient is initialized.
954 if isinstance(data, unicode):
955 data = data.encode("ascii")
956 elif not isinstance(data, str):
957 raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
959 data_hash = hashlib.md5(data).hexdigest()
960 loc_s = data_hash + '+' + str(len(data))
963 locator = KeepLocator(loc_s)
966 # Tell the proxy how many copies we want it to store
967 headers['X-Keep-Desired-Replication'] = str(copies)
969 thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
970 loop = retry.RetryLoop(num_retries, self._check_loop_result,
973 for tries_left in loop:
975 sorted_roots = self.map_new_services(
977 force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
978 except Exception as error:
979 loop.save_result(error)
983 for service_root, ks in [(root, roots_map[root])
984 for root in sorted_roots]:
987 t = KeepClient.KeepWriterThread(
991 service_root=service_root,
992 thread_limiter=thread_limiter,
993 timeout=self.current_timeout(num_retries-tries_left),
994 thread_sequence=thread_sequence)
1000 loop.save_result((thread_limiter.done() >= copies, len(threads)))
1003 return thread_limiter.response()
1005 raise arvados.errors.KeepWriteError(
1006 "failed to write {}: no Keep services available ({})".format(
1007 data_hash, loop.last_result()))
1009 service_errors = ((key, roots_map[key].last_result()['error'])
1010 for key in sorted_roots
1011 if roots_map[key].last_result()['error'])
1012 raise arvados.errors.KeepWriteError(
1013 "failed to write {} (wanted {} copies but wrote {})".format(
1014 data_hash, copies, thread_limiter.done()), service_errors, label="service")
1016 def local_store_put(self, data, copies=1, num_retries=None):
1017 """A stub for put().
1019 This method is used in place of the real put() method when
1020 using local storage (see constructor's local_store argument).
1022 copies and num_retries arguments are ignored: they are here
1023 only for the sake of offering the same call signature as
1026 Data stored this way can be retrieved via local_store_get().
1028 md5 = hashlib.md5(data).hexdigest()
1029 locator = '%s+%d' % (md5, len(data))
1030 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1032 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1033 os.path.join(self.local_store, md5))
1036 def local_store_get(self, loc_s, num_retries=None):
1037 """Companion to local_store_put()."""
1039 locator = KeepLocator(loc_s)
1041 raise arvados.errors.NotFoundError(
1042 "Invalid data locator: '%s'" % loc_s)
1043 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1045 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1048 def is_cached(self, locator):
1049 return self.block_cache.reserve_cache(expect_hash)