16 import arvados.config as config
18 import arvados.retry as retry
21 _logger = logging.getLogger('arvados.keep')
22 global_client_object = None
25 class KeepLocator(object):
26 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
27 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
29 def __init__(self, locator_str):
32 self._perm_expiry = None
33 pieces = iter(locator_str.split('+'))
34 self.md5sum = next(pieces)
36 self.size = int(next(pieces))
40 if self.HINT_RE.match(hint) is None:
41 raise ValueError("invalid hint format: {}".format(hint))
42 elif hint.startswith('A'):
43 self.parse_permission_hint(hint)
45 self.hints.append(hint)
49 str(s) for s in [self.md5sum, self.size,
50 self.permission_hint()] + self.hints
54 if self.size is not None:
55 return "%s+%i" % (self.md5sum, self.size)
59 def _make_hex_prop(name, length):
60 # Build and return a new property with the given name that
61 # must be a hex string of the given length.
62 data_name = '_{}'.format(name)
64 return getattr(self, data_name)
65 def setter(self, hex_str):
66 if not arvados.util.is_hex(hex_str, length):
67 raise ValueError("{} is not a {}-digit hex string: {}".
68 format(name, length, hex_str))
69 setattr(self, data_name, hex_str)
70 return property(getter, setter)
72 md5sum = _make_hex_prop('md5sum', 32)
73 perm_sig = _make_hex_prop('perm_sig', 40)
76 def perm_expiry(self):
77 return self._perm_expiry
80 def perm_expiry(self, value):
81 if not arvados.util.is_hex(value, 1, 8):
83 "permission timestamp must be a hex Unix timestamp: {}".
85 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
87 def permission_hint(self):
88 data = [self.perm_sig, self.perm_expiry]
91 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
92 return "A{}@{:08x}".format(*data)
94 def parse_permission_hint(self, s):
96 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
98 raise ValueError("bad permission hint {}".format(s))
100 def permission_expired(self, as_of_dt=None):
101 if self.perm_expiry is None:
103 elif as_of_dt is None:
104 as_of_dt = datetime.datetime.now()
105 return self.perm_expiry <= as_of_dt
109 """Simple interface to a global KeepClient object.
111 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
112 own API client. The global KeepClient will build an API client from the
113 current Arvados configuration, which may not match the one you built.
118 def global_client_object(cls):
119 global global_client_object
120 # Previously, KeepClient would change its behavior at runtime based
121 # on these configuration settings. We simulate that behavior here
122 # by checking the values and returning a new KeepClient if any of
124 key = (config.get('ARVADOS_API_HOST'),
125 config.get('ARVADOS_API_TOKEN'),
126 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
127 config.get('ARVADOS_KEEP_PROXY'),
128 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
129 os.environ.get('KEEP_LOCAL_STORE'))
130 if (global_client_object is None) or (cls._last_key != key):
131 global_client_object = KeepClient()
133 return global_client_object
136 def get(locator, **kwargs):
137 return Keep.global_client_object().get(locator, **kwargs)
140 def put(data, **kwargs):
141 return Keep.global_client_object().put(data, **kwargs)
143 class KeepBlockCache(object):
144 # Default RAM cache is 256MiB
145 def __init__(self, cache_max=(256 * 1024 * 1024)):
146 self.cache_max = cache_max
148 self._cache_lock = threading.Lock()
150 class CacheSlot(object):
151 def __init__(self, locator):
152 self.locator = locator
153 self.ready = threading.Event()
160 def set(self, value):
165 if self.content is None:
168 return len(self.content)
171 '''Cap the cache size to self.cache_max'''
172 with self._cache_lock:
173 # Select all slots except those where ready.is_set() and content is
174 # None (that means there was an error reading the block).
175 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
176 sm = sum([slot.size() for slot in self._cache])
177 while len(self._cache) > 0 and sm > self.cache_max:
178 for i in xrange(len(self._cache)-1, -1, -1):
179 if self._cache[i].ready.is_set():
182 sm = sum([slot.size() for slot in self._cache])
184 def _get(self, locator):
185 # Test if the locator is already in the cache
186 for i in xrange(0, len(self._cache)):
187 if self._cache[i].locator == locator:
190 # move it to the front
192 self._cache.insert(0, n)
196 def get(self, locator):
197 with self._cache_lock:
198 return self._get(locator)
200 def reserve_cache(self, locator):
201 '''Reserve a cache slot for the specified locator,
202 or return the existing slot.'''
203 with self._cache_lock:
204 n = self._get(locator)
208 # Add a new cache slot for the locator
209 n = KeepBlockCache.CacheSlot(locator)
210 self._cache.insert(0, n)
213 class KeepClient(object):
215 # Default Keep server connection timeout: 2 seconds
216 # Default Keep server read timeout: 64 seconds
217 # Default Keep server bandwidth minimum: 32768 bytes per second
218 # Default Keep proxy connection timeout: 20 seconds
219 # Default Keep proxy read timeout: 64 seconds
220 # Default Keep proxy bandwidth minimum: 32768 bytes per second
221 DEFAULT_TIMEOUT = (2, 64, 32768)
222 DEFAULT_PROXY_TIMEOUT = (20, 64, 32768)
224 class ThreadLimiter(object):
225 """Limit the number of threads writing to Keep at once.
227 This ensures that only a number of writer threads that could
228 potentially achieve the desired replication level run at once.
229 Once the desired replication level is achieved, queued threads
230 are instructed not to run.
232 Should be used in a "with" block.
234 def __init__(self, want_copies, max_service_replicas):
236 self._want_copies = want_copies
238 self._response = None
239 self._start_lock = threading.Condition()
240 if (not max_service_replicas) or (max_service_replicas >= want_copies):
243 max_threads = math.ceil(float(want_copies) / max_service_replicas)
244 _logger.debug("Limiter max threads is %d", max_threads)
245 self._todo_lock = threading.Semaphore(max_threads)
246 self._done_lock = threading.Lock()
247 self._local = threading.local()
250 self._start_lock.acquire()
251 if getattr(self._local, 'sequence', None) is not None:
252 # If the calling thread has used set_sequence(N), then
253 # we wait here until N other threads have started.
254 while self._started < self._local.sequence:
255 self._start_lock.wait()
256 self._todo_lock.acquire()
258 self._start_lock.notifyAll()
259 self._start_lock.release()
262 def __exit__(self, type, value, traceback):
263 self._todo_lock.release()
265 def set_sequence(self, sequence):
266 self._local.sequence = sequence
268 def shall_i_proceed(self):
270 Return true if the current thread should write to Keep.
271 Return false otherwise.
273 with self._done_lock:
274 return (self._done < self._want_copies)
276 def save_response(self, response_body, replicas_stored):
278 Records a response body (a locator, possibly signed) returned by
279 the Keep server, and the number of replicas it stored.
281 with self._done_lock:
282 self._done += replicas_stored
283 self._response = response_body
286 """Return the body from the response to a PUT request."""
287 with self._done_lock:
288 return self._response
291 """Return the total number of replicas successfully stored."""
292 with self._done_lock:
296 class KeepService(object):
297 """Make requests to a single Keep service, and track results.
299 A KeepService is intended to last long enough to perform one
300 transaction (GET or PUT) against one Keep service. This can
301 involve calling either get() or put() multiple times in order
302 to retry after transient failures. However, calling both get()
303 and put() on a single instance -- or using the same instance
304 to access two different Keep services -- will not produce
311 arvados.errors.HttpError,
314 def __init__(self, root, user_agent_pool=Queue.LifoQueue(), **headers):
316 self._user_agent_pool = user_agent_pool
317 self._result = {'error': None}
320 self.get_headers = {'Accept': 'application/octet-stream'}
321 self.get_headers.update(headers)
322 self.put_headers = headers
325 """Is it worth attempting a request?"""
329 """Did the request succeed or encounter permanent failure?"""
330 return self._result['error'] == False or not self._usable
332 def last_result(self):
335 def _get_user_agent(self):
337 return self._user_agent_pool.get(False)
341 def _put_user_agent(self, ua):
344 self._user_agent_pool.put(ua, False)
349 def _socket_open(family, socktype, protocol, address=None):
350 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
351 s = socket.socket(family, socktype, protocol)
352 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
353 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
354 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
357 def get(self, locator, timeout=None):
358 # locator is a KeepLocator object.
359 url = self.root + str(locator)
360 _logger.debug("Request: GET %s", url)
361 curl = self._get_user_agent()
363 with timer.Timer() as t:
365 response_body = cStringIO.StringIO()
366 curl.setopt(pycurl.NOSIGNAL, 1)
367 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
368 curl.setopt(pycurl.URL, url.encode('utf-8'))
369 curl.setopt(pycurl.HTTPHEADER, [
370 '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
371 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
372 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
373 self._setcurltimeouts(curl, timeout)
376 except Exception as e:
377 raise arvados.errors.HttpError(0, str(e))
379 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
380 'body': response_body.getvalue(),
381 'headers': self._headers,
384 ok = retry.check_http_response_success(self._result['status_code'])
386 self._result['error'] = arvados.errors.HttpError(
387 self._result['status_code'],
388 self._headers.get('x-status-line', 'Error'))
389 except self.HTTP_ERRORS as e:
394 self._usable = ok != False
395 if self._result.get('status_code', None):
396 # The client worked well enough to get an HTTP status
397 # code, so presumably any problems are just on the
398 # server side and it's OK to reuse the client.
399 self._put_user_agent(curl)
401 # Don't return this client to the pool, in case it's
405 _logger.debug("Request fail: GET %s => %s: %s",
406 url, type(self._result['error']), str(self._result['error']))
408 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
409 self._result['status_code'],
410 len(self._result['body']),
412 (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
413 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
414 if resp_md5 != locator.md5sum:
415 _logger.warning("Checksum fail: md5(%s) = %s",
417 self._result['error'] = arvados.errors.HttpError(
420 return self._result['body']
422 def put(self, hash_s, body, timeout=None):
423 url = self.root + hash_s
424 _logger.debug("Request: PUT %s", url)
425 curl = self._get_user_agent()
428 body_reader = cStringIO.StringIO(body)
429 response_body = cStringIO.StringIO()
430 curl.setopt(pycurl.NOSIGNAL, 1)
431 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
432 curl.setopt(pycurl.URL, url.encode('utf-8'))
433 # Using UPLOAD tells cURL to wait for a "go ahead" from the
434 # Keep server (in the form of a HTTP/1.1 "100 Continue"
435 # response) instead of sending the request body immediately.
436 # This allows the server to reject the request if the request
437 # is invalid or the server is read-only, without waiting for
438 # the client to send the entire block.
439 curl.setopt(pycurl.UPLOAD, True)
440 curl.setopt(pycurl.INFILESIZE, len(body))
441 curl.setopt(pycurl.READFUNCTION, body_reader.read)
442 curl.setopt(pycurl.HTTPHEADER, [
443 '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
444 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
445 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
446 self._setcurltimeouts(curl, timeout)
449 except Exception as e:
450 raise arvados.errors.HttpError(0, str(e))
452 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
453 'body': response_body.getvalue(),
454 'headers': self._headers,
457 ok = retry.check_http_response_success(self._result['status_code'])
459 self._result['error'] = arvados.errors.HttpError(
460 self._result['status_code'],
461 self._headers.get('x-status-line', 'Error'))
462 except self.HTTP_ERRORS as e:
467 self._usable = ok != False # still usable if ok is True or None
468 if self._result.get('status_code', None):
469 # Client is functional. See comment in get().
470 self._put_user_agent(curl)
474 _logger.debug("Request fail: PUT %s => %s: %s",
475 url, type(self._result['error']), str(self._result['error']))
479 def _setcurltimeouts(self, curl, timeouts):
482 elif isinstance(timeouts, tuple):
483 if len(timeouts) == 2:
484 conn_t, xfer_t = timeouts
485 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
487 conn_t, xfer_t, bandwidth_bps = timeouts
489 conn_t, xfer_t = (timeouts, timeouts)
490 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
491 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
492 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
493 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
495 def _headerfunction(self, header_line):
496 header_line = header_line.decode('iso-8859-1')
497 if ':' in header_line:
498 name, value = header_line.split(':', 1)
499 name = name.strip().lower()
500 value = value.strip()
502 name = self._lastheadername
503 value = self._headers[name] + ' ' + header_line.strip()
504 elif header_line.startswith('HTTP/'):
505 name = 'x-status-line'
508 _logger.error("Unexpected header line: %s", header_line)
510 self._lastheadername = name
511 self._headers[name] = value
512 # Returning None implies all bytes were written
515 class KeepWriterThread(threading.Thread):
517 Write a blob of data to the given Keep server. On success, call
518 save_response() of the given ThreadLimiter to save the returned
521 def __init__(self, keep_service, **kwargs):
522 super(KeepClient.KeepWriterThread, self).__init__()
523 self.service = keep_service
525 self._success = False
531 limiter = self.args['thread_limiter']
532 sequence = self.args['thread_sequence']
533 if sequence is not None:
534 limiter.set_sequence(sequence)
536 if not limiter.shall_i_proceed():
537 # My turn arrived, but the job has been done without
540 self.run_with_limiter(limiter)
542 def run_with_limiter(self, limiter):
543 if self.service.finished():
545 _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
546 str(threading.current_thread()),
547 self.args['data_hash'],
548 len(self.args['data']),
549 self.args['service_root'])
550 self._success = bool(self.service.put(
551 self.args['data_hash'],
553 timeout=self.args.get('timeout', None)))
554 result = self.service.last_result()
556 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
557 str(threading.current_thread()),
558 self.args['data_hash'],
559 len(self.args['data']),
560 self.args['service_root'])
561 # Tick the 'done' counter for the number of replica
562 # reported stored by the server, for the case that
563 # we're talking to a proxy or other backend that
564 # stores to multiple copies for us.
566 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
567 except (KeyError, ValueError):
569 limiter.save_response(result['body'].strip(), replicas_stored)
570 elif result.get('status_code', None):
571 _logger.debug("Request fail: PUT %s => %s %s",
572 self.args['data_hash'],
573 result['status_code'],
577 def __init__(self, api_client=None, proxy=None,
578 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
579 api_token=None, local_store=None, block_cache=None,
580 num_retries=0, session=None):
581 """Initialize a new KeepClient.
585 The API client to use to find Keep services. If not
586 provided, KeepClient will build one from available Arvados
590 If specified, this KeepClient will send requests to this Keep
591 proxy. Otherwise, KeepClient will fall back to the setting of the
592 ARVADOS_KEEP_PROXY configuration setting. If you want to ensure
593 KeepClient does not use a proxy, pass in an empty string.
596 The initial timeout (in seconds) for HTTP requests to Keep
597 non-proxy servers. A tuple of three floats is interpreted as
598 (connection_timeout, read_timeout, minimum_bandwidth). A connection
599 will be aborted if the average traffic rate falls below
600 minimum_bandwidth bytes per second over an interval of read_timeout
601 seconds. Because timeouts are often a result of transient server
602 load, the actual connection timeout will be increased by a factor
603 of two on each retry.
604 Default: (2, 64, 32768).
607 The initial timeout (in seconds) for HTTP requests to
608 Keep proxies. A tuple of three floats is interpreted as
609 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
610 described above for adjusting connection timeouts on retry also
612 Default: (20, 64, 32768).
615 If you're not using an API client, but only talking
616 directly to a Keep proxy, this parameter specifies an API token
617 to authenticate Keep requests. It is an error to specify both
618 api_client and api_token. If you specify neither, KeepClient
619 will use one available from the Arvados configuration.
622 If specified, this KeepClient will bypass Keep
623 services, and save data to the named directory. If unspecified,
624 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
625 environment variable. If you want to ensure KeepClient does not
626 use local storage, pass in an empty string. This is primarily
627 intended to mock a server for testing.
630 The default number of times to retry failed requests.
631 This will be used as the default num_retries value when get() and
632 put() are called. Default 0.
634 self.lock = threading.Lock()
636 proxy = config.get('ARVADOS_KEEP_PROXY')
637 if api_token is None:
638 if api_client is None:
639 api_token = config.get('ARVADOS_API_TOKEN')
641 api_token = api_client.api_token
642 elif api_client is not None:
644 "can't build KeepClient with both API client and token")
645 if local_store is None:
646 local_store = os.environ.get('KEEP_LOCAL_STORE')
648 self.block_cache = block_cache if block_cache else KeepBlockCache()
649 self.timeout = timeout
650 self.proxy_timeout = proxy_timeout
651 self._user_agent_pool = Queue.LifoQueue()
654 self.local_store = local_store
655 self.get = self.local_store_get
656 self.put = self.local_store_put
658 self.num_retries = num_retries
659 self.max_replicas_per_service = None
661 if not proxy.endswith('/'):
663 self.api_token = api_token
664 self._gateway_services = {}
665 self._keep_services = [{
667 'service_type': 'proxy',
668 '_service_root': proxy,
670 self._writable_services = self._keep_services
671 self.using_proxy = True
672 self._static_services_list = True
674 # It's important to avoid instantiating an API client
675 # unless we actually need one, for testing's sake.
676 if api_client is None:
677 api_client = arvados.api('v1')
678 self.api_client = api_client
679 self.api_token = api_client.api_token
680 self._gateway_services = {}
681 self._keep_services = None
682 self._writable_services = None
683 self.using_proxy = None
684 self._static_services_list = False
686 def current_timeout(self, attempt_number):
687 """Return the appropriate timeout to use for this client.
689 The proxy timeout setting if the backend service is currently a proxy,
690 the regular timeout setting otherwise. The `attempt_number` indicates
691 how many times the operation has been tried already (starting from 0
692 for the first try), and scales the connection timeout portion of the
693 return value accordingly.
696 # TODO(twp): the timeout should be a property of a
697 # KeepService, not a KeepClient. See #4488.
698 t = self.proxy_timeout if self.using_proxy else self.timeout
700 return (t[0] * (1 << attempt_number), t[1])
702 return (t[0] * (1 << attempt_number), t[1], t[2])
703 def _any_nondisk_services(self, service_list):
704 return any(ks.get('service_type', 'disk') != 'disk'
705 for ks in service_list)
707 def build_services_list(self, force_rebuild=False):
708 if (self._static_services_list or
709 (self._keep_services and not force_rebuild)):
713 keep_services = self.api_client.keep_services().accessible()
714 except Exception: # API server predates Keep services.
715 keep_services = self.api_client.keep_disks().list()
717 # Gateway services are only used when specified by UUID,
718 # so there's nothing to gain by filtering them by
720 self._gateway_services = {ks['uuid']: ks for ks in
721 keep_services.execute()['items']}
722 if not self._gateway_services:
723 raise arvados.errors.NoKeepServersError()
725 # Precompute the base URI for each service.
726 for r in self._gateway_services.itervalues():
727 host = r['service_host']
728 if not host.startswith('[') and host.find(':') >= 0:
729 # IPv6 URIs must be formatted like http://[::1]:80/...
730 host = '[' + host + ']'
731 r['_service_root'] = "{}://{}:{:d}/".format(
732 'https' if r['service_ssl_flag'] else 'http',
736 _logger.debug(str(self._gateway_services))
737 self._keep_services = [
738 ks for ks in self._gateway_services.itervalues()
739 if not ks.get('service_type', '').startswith('gateway:')]
740 self._writable_services = [ks for ks in self._keep_services
741 if not ks.get('read_only')]
743 # For disk type services, max_replicas_per_service is 1
744 # It is unknown (unlimited) for other service types.
745 if self._any_nondisk_services(self._writable_services):
746 self.max_replicas_per_service = None
748 self.max_replicas_per_service = 1
750 def _service_weight(self, data_hash, service_uuid):
751 """Compute the weight of a Keep service endpoint for a data
752 block with a known hash.
754 The weight is md5(h + u) where u is the last 15 characters of
755 the service endpoint's UUID.
757 return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
759 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
760 """Return an array of Keep service endpoints, in the order in
761 which they should be probed when reading or writing data with
762 the given hash+hints.
764 self.build_services_list(force_rebuild)
767 # Use the services indicated by the given +K@... remote
768 # service hints, if any are present and can be resolved to a
770 for hint in locator.hints:
771 if hint.startswith('K@'):
774 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
775 elif len(hint) == 29:
776 svc = self._gateway_services.get(hint[2:])
778 sorted_roots.append(svc['_service_root'])
780 # Sort the available local services by weight (heaviest first)
781 # for this locator, and return their service_roots (base URIs)
783 use_services = self._keep_services
785 use_services = self._writable_services
786 self.using_proxy = self._any_nondisk_services(use_services)
787 sorted_roots.extend([
788 svc['_service_root'] for svc in sorted(
791 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
792 _logger.debug("{}: {}".format(locator, sorted_roots))
795 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
796 # roots_map is a dictionary, mapping Keep service root strings
797 # to KeepService objects. Poll for Keep services, and add any
798 # new ones to roots_map. Return the current list of local
800 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
801 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
802 for root in local_roots:
803 if root not in roots_map:
804 roots_map[root] = self.KeepService(
805 root, self._user_agent_pool, **headers)
809 def _check_loop_result(result):
810 # KeepClient RetryLoops should save results as a 2-tuple: the
811 # actual result of the request, and the number of servers available
812 # to receive the request this round.
813 # This method returns True if there's a real result, False if
814 # there are no more servers available, otherwise None.
815 if isinstance(result, Exception):
817 result, tried_server_count = result
818 if (result is not None) and (result is not False):
820 elif tried_server_count < 1:
821 _logger.info("No more Keep services to try; giving up")
826 def get_from_cache(self, loc):
827 """Fetch a block only if is in the cache, otherwise return None."""
828 slot = self.block_cache.get(loc)
829 if slot is not None and slot.ready.is_set():
835 def get(self, loc_s, num_retries=None):
836 """Get data from Keep.
838 This method fetches one or more blocks of data from Keep. It
839 sends a request each Keep service registered with the API
840 server (or the proxy provided when this client was
841 instantiated), then each service named in location hints, in
842 sequence. As soon as one service provides the data, it's
846 * loc_s: A string of one or more comma-separated locators to fetch.
847 This method returns the concatenation of these blocks.
848 * num_retries: The number of times to retry GET requests to
849 *each* Keep server if it returns temporary failures, with
850 exponential backoff. Note that, in each loop, the method may try
851 to fetch data from every available Keep service, along with any
852 that are named in location hints in the locator. The default value
853 is set when the KeepClient is initialized.
856 return ''.join(self.get(x) for x in loc_s.split(','))
857 locator = KeepLocator(loc_s)
858 slot, first = self.block_cache.reserve_cache(locator.md5sum)
863 # If the locator has hints specifying a prefix (indicating a
864 # remote keepproxy) or the UUID of a local gateway service,
865 # read data from the indicated service(s) instead of the usual
866 # list of local disk services.
867 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
868 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
869 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
870 for hint in locator.hints if (
871 hint.startswith('K@') and
873 self._gateway_services.get(hint[2:])
875 # Map root URLs to their KeepService objects.
877 root: self.KeepService(root, self._user_agent_pool)
878 for root in hint_roots
881 # See #3147 for a discussion of the loop implementation. Highlights:
882 # * Refresh the list of Keep services after each failure, in case
883 # it's being updated.
884 # * Retry until we succeed, we're out of retries, or every available
885 # service has returned permanent failure.
889 loop = retry.RetryLoop(num_retries, self._check_loop_result,
891 for tries_left in loop:
893 sorted_roots = self.map_new_services(
895 force_rebuild=(tries_left < num_retries),
897 except Exception as error:
898 loop.save_result(error)
901 # Query KeepService objects that haven't returned
902 # permanent failure, in our specified shuffle order.
903 services_to_try = [roots_map[root]
904 for root in sorted_roots
905 if roots_map[root].usable()]
906 for keep_service in services_to_try:
907 blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
910 loop.save_result((blob, len(services_to_try)))
912 # Always cache the result, then return it if we succeeded.
914 self.block_cache.cap_cache()
918 # Q: Including 403 is necessary for the Keep tests to continue
919 # passing, but maybe they should expect KeepReadError instead?
920 not_founds = sum(1 for key in sorted_roots
921 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
922 service_errors = ((key, roots_map[key].last_result()['error'])
923 for key in sorted_roots)
925 raise arvados.errors.KeepReadError(
926 "failed to read {}: no Keep services available ({})".format(
927 loc_s, loop.last_result()))
928 elif not_founds == len(sorted_roots):
929 raise arvados.errors.NotFoundError(
930 "{} not found".format(loc_s), service_errors)
932 raise arvados.errors.KeepReadError(
933 "failed to read {}".format(loc_s), service_errors, label="service")
936 def put(self, data, copies=2, num_retries=None):
937 """Save data in Keep.
939 This method will get a list of Keep services from the API server, and
940 send the data to each one simultaneously in a new thread. Once the
941 uploads are finished, if enough copies are saved, this method returns
942 the most recent HTTP response body. If requests fail to upload
943 enough copies, this method raises KeepWriteError.
946 * data: The string of data to upload.
947 * copies: The number of copies that the user requires be saved.
949 * num_retries: The number of times to retry PUT requests to
950 *each* Keep server if it returns temporary failures, with
951 exponential backoff. The default value is set when the
952 KeepClient is initialized.
955 if isinstance(data, unicode):
956 data = data.encode("ascii")
957 elif not isinstance(data, str):
958 raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
960 data_hash = hashlib.md5(data).hexdigest()
961 loc_s = data_hash + '+' + str(len(data))
964 locator = KeepLocator(loc_s)
967 # Tell the proxy how many copies we want it to store
968 headers['X-Keep-Desired-Replication'] = str(copies)
970 loop = retry.RetryLoop(num_retries, self._check_loop_result,
972 for tries_left in loop:
974 sorted_roots = self.map_new_services(
976 force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
977 except Exception as error:
978 loop.save_result(error)
981 thread_limiter = KeepClient.ThreadLimiter(
982 copies, self.max_replicas_per_service)
984 for service_root, ks in [(root, roots_map[root])
985 for root in sorted_roots]:
988 t = KeepClient.KeepWriterThread(
992 service_root=service_root,
993 thread_limiter=thread_limiter,
994 timeout=self.current_timeout(num_retries-tries_left),
995 thread_sequence=len(threads))
1000 loop.save_result((thread_limiter.done() >= copies, len(threads)))
1003 return thread_limiter.response()
1005 raise arvados.errors.KeepWriteError(
1006 "failed to write {}: no Keep services available ({})".format(
1007 data_hash, loop.last_result()))
1009 service_errors = ((key, roots_map[key].last_result()['error'])
1010 for key in sorted_roots
1011 if roots_map[key].last_result()['error'])
1012 raise arvados.errors.KeepWriteError(
1013 "failed to write {} (wanted {} copies but wrote {})".format(
1014 data_hash, copies, thread_limiter.done()), service_errors, label="service")
1016 def local_store_put(self, data, copies=1, num_retries=None):
1017 """A stub for put().
1019 This method is used in place of the real put() method when
1020 using local storage (see constructor's local_store argument).
1022 copies and num_retries arguments are ignored: they are here
1023 only for the sake of offering the same call signature as
1026 Data stored this way can be retrieved via local_store_get().
1028 md5 = hashlib.md5(data).hexdigest()
1029 locator = '%s+%d' % (md5, len(data))
1030 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1032 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1033 os.path.join(self.local_store, md5))
1036 def local_store_get(self, loc_s, num_retries=None):
1037 """Companion to local_store_put()."""
1039 locator = KeepLocator(loc_s)
1041 raise arvados.errors.NotFoundError(
1042 "Invalid data locator: '%s'" % loc_s)
1043 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1045 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1048 def is_cached(self, locator):
1049 return self.block_cache.reserve_cache(expect_hash)