16 import arvados.config as config
18 import arvados.retry as retry
21 _logger = logging.getLogger('arvados.keep')
22 global_client_object = None
25 class KeepLocator(object):
26 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
27 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
29 def __init__(self, locator_str):
32 self._perm_expiry = None
33 pieces = iter(locator_str.split('+'))
34 self.md5sum = next(pieces)
36 self.size = int(next(pieces))
40 if self.HINT_RE.match(hint) is None:
41 raise ValueError("invalid hint format: {}".format(hint))
42 elif hint.startswith('A'):
43 self.parse_permission_hint(hint)
45 self.hints.append(hint)
49 str(s) for s in [self.md5sum, self.size,
50 self.permission_hint()] + self.hints
54 if self.size is not None:
55 return "%s+%i" % (self.md5sum, self.size)
59 def _make_hex_prop(name, length):
60 # Build and return a new property with the given name that
61 # must be a hex string of the given length.
62 data_name = '_{}'.format(name)
64 return getattr(self, data_name)
65 def setter(self, hex_str):
66 if not arvados.util.is_hex(hex_str, length):
67 raise ValueError("{} is not a {}-digit hex string: {}".
68 format(name, length, hex_str))
69 setattr(self, data_name, hex_str)
70 return property(getter, setter)
72 md5sum = _make_hex_prop('md5sum', 32)
73 perm_sig = _make_hex_prop('perm_sig', 40)
76 def perm_expiry(self):
77 return self._perm_expiry
80 def perm_expiry(self, value):
81 if not arvados.util.is_hex(value, 1, 8):
83 "permission timestamp must be a hex Unix timestamp: {}".
85 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
87 def permission_hint(self):
88 data = [self.perm_sig, self.perm_expiry]
91 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
92 return "A{}@{:08x}".format(*data)
94 def parse_permission_hint(self, s):
96 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
98 raise ValueError("bad permission hint {}".format(s))
100 def permission_expired(self, as_of_dt=None):
101 if self.perm_expiry is None:
103 elif as_of_dt is None:
104 as_of_dt = datetime.datetime.now()
105 return self.perm_expiry <= as_of_dt
109 """Simple interface to a global KeepClient object.
111 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
112 own API client. The global KeepClient will build an API client from the
113 current Arvados configuration, which may not match the one you built.
118 def global_client_object(cls):
119 global global_client_object
120 # Previously, KeepClient would change its behavior at runtime based
121 # on these configuration settings. We simulate that behavior here
122 # by checking the values and returning a new KeepClient if any of
124 key = (config.get('ARVADOS_API_HOST'),
125 config.get('ARVADOS_API_TOKEN'),
126 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
127 config.get('ARVADOS_KEEP_PROXY'),
128 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
129 os.environ.get('KEEP_LOCAL_STORE'))
130 if (global_client_object is None) or (cls._last_key != key):
131 global_client_object = KeepClient()
133 return global_client_object
136 def get(locator, **kwargs):
137 return Keep.global_client_object().get(locator, **kwargs)
140 def put(data, **kwargs):
141 return Keep.global_client_object().put(data, **kwargs)
143 class KeepBlockCache(object):
144 # Default RAM cache is 256MiB
145 def __init__(self, cache_max=(256 * 1024 * 1024)):
146 self.cache_max = cache_max
148 self._cache_lock = threading.Lock()
150 class CacheSlot(object):
151 __slots__ = ("locator", "ready", "content")
153 def __init__(self, locator):
154 self.locator = locator
155 self.ready = threading.Event()
162 def set(self, value):
167 if self.content is None:
170 return len(self.content)
173 '''Cap the cache size to self.cache_max'''
174 with self._cache_lock:
175 # Select all slots except those where ready.is_set() and content is
176 # None (that means there was an error reading the block).
177 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
178 sm = sum([slot.size() for slot in self._cache])
179 while len(self._cache) > 0 and sm > self.cache_max:
180 for i in xrange(len(self._cache)-1, -1, -1):
181 if self._cache[i].ready.is_set():
184 sm = sum([slot.size() for slot in self._cache])
186 def _get(self, locator):
187 # Test if the locator is already in the cache
188 for i in xrange(0, len(self._cache)):
189 if self._cache[i].locator == locator:
192 # move it to the front
194 self._cache.insert(0, n)
198 def get(self, locator):
199 with self._cache_lock:
200 return self._get(locator)
202 def reserve_cache(self, locator):
203 '''Reserve a cache slot for the specified locator,
204 or return the existing slot.'''
205 with self._cache_lock:
206 n = self._get(locator)
210 # Add a new cache slot for the locator
211 n = KeepBlockCache.CacheSlot(locator)
212 self._cache.insert(0, n)
216 class Counter(object):
217 def __init__(self, v=0):
218 self._lk = threading.Lock()
230 class KeepClient(object):
232 # Default Keep server connection timeout: 2 seconds
233 # Default Keep server read timeout: 256 seconds
234 # Default Keep server bandwidth minimum: 32768 bytes per second
235 # Default Keep proxy connection timeout: 20 seconds
236 # Default Keep proxy read timeout: 256 seconds
237 # Default Keep proxy bandwidth minimum: 32768 bytes per second
238 DEFAULT_TIMEOUT = (2, 256, 32768)
239 DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
241 class ThreadLimiter(object):
242 """Limit the number of threads writing to Keep at once.
244 This ensures that only a number of writer threads that could
245 potentially achieve the desired replication level run at once.
246 Once the desired replication level is achieved, queued threads
247 are instructed not to run.
249 Should be used in a "with" block.
251 def __init__(self, want_copies, max_service_replicas):
253 self._want_copies = want_copies
255 self._response = None
256 self._start_lock = threading.Condition()
257 if (not max_service_replicas) or (max_service_replicas >= want_copies):
260 max_threads = math.ceil(float(want_copies) / max_service_replicas)
261 _logger.debug("Limiter max threads is %d", max_threads)
262 self._todo_lock = threading.Semaphore(max_threads)
263 self._done_lock = threading.Lock()
264 self._local = threading.local()
267 self._start_lock.acquire()
268 if getattr(self._local, 'sequence', None) is not None:
269 # If the calling thread has used set_sequence(N), then
270 # we wait here until N other threads have started.
271 while self._started < self._local.sequence:
272 self._start_lock.wait()
273 self._todo_lock.acquire()
275 self._start_lock.notifyAll()
276 self._start_lock.release()
279 def __exit__(self, type, value, traceback):
280 self._todo_lock.release()
282 def set_sequence(self, sequence):
283 self._local.sequence = sequence
285 def shall_i_proceed(self):
287 Return true if the current thread should write to Keep.
288 Return false otherwise.
290 with self._done_lock:
291 return (self._done < self._want_copies)
293 def save_response(self, response_body, replicas_stored):
295 Records a response body (a locator, possibly signed) returned by
296 the Keep server, and the number of replicas it stored.
298 with self._done_lock:
299 self._done += replicas_stored
300 self._response = response_body
303 """Return the body from the response to a PUT request."""
304 with self._done_lock:
305 return self._response
308 """Return the total number of replicas successfully stored."""
309 with self._done_lock:
312 class KeepService(object):
313 """Make requests to a single Keep service, and track results.
315 A KeepService is intended to last long enough to perform one
316 transaction (GET or PUT) against one Keep service. This can
317 involve calling either get() or put() multiple times in order
318 to retry after transient failures. However, calling both get()
319 and put() on a single instance -- or using the same instance
320 to access two different Keep services -- will not produce
327 arvados.errors.HttpError,
330 def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
332 download_counter=None, **headers):
334 self._user_agent_pool = user_agent_pool
335 self._result = {'error': None}
338 self.get_headers = {'Accept': 'application/octet-stream'}
339 self.get_headers.update(headers)
340 self.put_headers = headers
341 self.upload_counter = upload_counter
342 self.download_counter = download_counter
345 """Is it worth attempting a request?"""
349 """Did the request succeed or encounter permanent failure?"""
350 return self._result['error'] == False or not self._usable
352 def last_result(self):
355 def _get_user_agent(self):
357 return self._user_agent_pool.get(False)
361 def _put_user_agent(self, ua):
364 self._user_agent_pool.put(ua, False)
369 def _socket_open(family, socktype, protocol, address=None):
370 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
371 s = socket.socket(family, socktype, protocol)
372 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
373 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
374 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
377 def get(self, locator, timeout=None):
378 # locator is a KeepLocator object.
379 url = self.root + str(locator)
380 _logger.debug("Request: GET %s", url)
381 curl = self._get_user_agent()
384 with timer.Timer() as t:
386 response_body = cStringIO.StringIO()
387 curl.setopt(pycurl.NOSIGNAL, 1)
388 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
389 curl.setopt(pycurl.URL, url.encode('utf-8'))
390 curl.setopt(pycurl.HTTPHEADER, [
391 '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
392 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
393 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
394 self._setcurltimeouts(curl, timeout)
397 except Exception as e:
398 raise arvados.errors.HttpError(0, str(e))
400 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
401 'body': response_body.getvalue(),
402 'headers': self._headers,
405 ok = retry.check_http_response_success(self._result['status_code'])
407 self._result['error'] = arvados.errors.HttpError(
408 self._result['status_code'],
409 self._headers.get('x-status-line', 'Error'))
410 except self.HTTP_ERRORS as e:
414 self._usable = ok != False
415 if self._result.get('status_code', None):
416 # The client worked well enough to get an HTTP status
417 # code, so presumably any problems are just on the
418 # server side and it's OK to reuse the client.
419 self._put_user_agent(curl)
421 # Don't return this client to the pool, in case it's
425 _logger.debug("Request fail: GET %s => %s: %s",
426 url, type(self._result['error']), str(self._result['error']))
428 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
429 self._result['status_code'],
430 len(self._result['body']),
432 (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
433 if self.download_counter:
434 self.download_counter.add(len(self._result['body']))
435 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
436 if resp_md5 != locator.md5sum:
437 _logger.warning("Checksum fail: md5(%s) = %s",
439 self._result['error'] = arvados.errors.HttpError(
442 return self._result['body']
444 def put(self, hash_s, body, timeout=None):
445 url = self.root + hash_s
446 _logger.debug("Request: PUT %s", url)
447 curl = self._get_user_agent()
450 with timer.Timer() as t:
452 body_reader = cStringIO.StringIO(body)
453 response_body = cStringIO.StringIO()
454 curl.setopt(pycurl.NOSIGNAL, 1)
455 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
456 curl.setopt(pycurl.URL, url.encode('utf-8'))
457 # Using UPLOAD tells cURL to wait for a "go ahead" from the
458 # Keep server (in the form of a HTTP/1.1 "100 Continue"
459 # response) instead of sending the request body immediately.
460 # This allows the server to reject the request if the request
461 # is invalid or the server is read-only, without waiting for
462 # the client to send the entire block.
463 curl.setopt(pycurl.UPLOAD, True)
464 curl.setopt(pycurl.INFILESIZE, len(body))
465 curl.setopt(pycurl.READFUNCTION, body_reader.read)
466 curl.setopt(pycurl.HTTPHEADER, [
467 '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
468 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
469 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
470 self._setcurltimeouts(curl, timeout)
473 except Exception as e:
474 raise arvados.errors.HttpError(0, str(e))
476 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
477 'body': response_body.getvalue(),
478 'headers': self._headers,
481 ok = retry.check_http_response_success(self._result['status_code'])
483 self._result['error'] = arvados.errors.HttpError(
484 self._result['status_code'],
485 self._headers.get('x-status-line', 'Error'))
486 except self.HTTP_ERRORS as e:
490 self._usable = ok != False # still usable if ok is True or None
491 if self._result.get('status_code', None):
492 # Client is functional. See comment in get().
493 self._put_user_agent(curl)
497 _logger.debug("Request fail: PUT %s => %s: %s",
498 url, type(self._result['error']), str(self._result['error']))
500 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
501 self._result['status_code'],
504 (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
505 if self.upload_counter:
506 self.upload_counter.add(len(body))
509 def _setcurltimeouts(self, curl, timeouts):
512 elif isinstance(timeouts, tuple):
513 if len(timeouts) == 2:
514 conn_t, xfer_t = timeouts
515 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
517 conn_t, xfer_t, bandwidth_bps = timeouts
519 conn_t, xfer_t = (timeouts, timeouts)
520 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
521 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
522 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
523 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
525 def _headerfunction(self, header_line):
526 header_line = header_line.decode('iso-8859-1')
527 if ':' in header_line:
528 name, value = header_line.split(':', 1)
529 name = name.strip().lower()
530 value = value.strip()
532 name = self._lastheadername
533 value = self._headers[name] + ' ' + header_line.strip()
534 elif header_line.startswith('HTTP/'):
535 name = 'x-status-line'
538 _logger.error("Unexpected header line: %s", header_line)
540 self._lastheadername = name
541 self._headers[name] = value
542 # Returning None implies all bytes were written
545 class KeepWriterThread(threading.Thread):
547 Write a blob of data to the given Keep server. On success, call
548 save_response() of the given ThreadLimiter to save the returned
551 def __init__(self, keep_service, **kwargs):
552 super(KeepClient.KeepWriterThread, self).__init__()
553 self.service = keep_service
555 self._success = False
561 limiter = self.args['thread_limiter']
562 sequence = self.args['thread_sequence']
563 if sequence is not None:
564 limiter.set_sequence(sequence)
566 if not limiter.shall_i_proceed():
567 # My turn arrived, but the job has been done without
570 self.run_with_limiter(limiter)
572 def run_with_limiter(self, limiter):
573 if self.service.finished():
575 _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
576 str(threading.current_thread()),
577 self.args['data_hash'],
578 len(self.args['data']),
579 self.args['service_root'])
580 self._success = bool(self.service.put(
581 self.args['data_hash'],
583 timeout=self.args.get('timeout', None)))
584 result = self.service.last_result()
586 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
587 str(threading.current_thread()),
588 self.args['data_hash'],
589 len(self.args['data']),
590 self.args['service_root'])
591 # Tick the 'done' counter for the number of replica
592 # reported stored by the server, for the case that
593 # we're talking to a proxy or other backend that
594 # stores to multiple copies for us.
596 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
597 except (KeyError, ValueError):
599 limiter.save_response(result['body'].strip(), replicas_stored)
600 elif result.get('status_code', None):
601 _logger.debug("Request fail: PUT %s => %s %s",
602 self.args['data_hash'],
603 result['status_code'],
607 def __init__(self, api_client=None, proxy=None,
608 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
609 api_token=None, local_store=None, block_cache=None,
610 num_retries=0, session=None):
611 """Initialize a new KeepClient.
615 The API client to use to find Keep services. If not
616 provided, KeepClient will build one from available Arvados
620 If specified, this KeepClient will send requests to this Keep
621 proxy. Otherwise, KeepClient will fall back to the setting of the
622 ARVADOS_KEEP_PROXY configuration setting. If you want to ensure
623 KeepClient does not use a proxy, pass in an empty string.
626 The initial timeout (in seconds) for HTTP requests to Keep
627 non-proxy servers. A tuple of three floats is interpreted as
628 (connection_timeout, read_timeout, minimum_bandwidth). A connection
629 will be aborted if the average traffic rate falls below
630 minimum_bandwidth bytes per second over an interval of read_timeout
631 seconds. Because timeouts are often a result of transient server
632 load, the actual connection timeout will be increased by a factor
633 of two on each retry.
634 Default: (2, 256, 32768).
637 The initial timeout (in seconds) for HTTP requests to
638 Keep proxies. A tuple of three floats is interpreted as
639 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
640 described above for adjusting connection timeouts on retry also
642 Default: (20, 256, 32768).
645 If you're not using an API client, but only talking
646 directly to a Keep proxy, this parameter specifies an API token
647 to authenticate Keep requests. It is an error to specify both
648 api_client and api_token. If you specify neither, KeepClient
649 will use one available from the Arvados configuration.
652 If specified, this KeepClient will bypass Keep
653 services, and save data to the named directory. If unspecified,
654 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
655 environment variable. If you want to ensure KeepClient does not
656 use local storage, pass in an empty string. This is primarily
657 intended to mock a server for testing.
660 The default number of times to retry failed requests.
661 This will be used as the default num_retries value when get() and
662 put() are called. Default 0.
664 self.lock = threading.Lock()
666 proxy = config.get('ARVADOS_KEEP_PROXY')
667 if api_token is None:
668 if api_client is None:
669 api_token = config.get('ARVADOS_API_TOKEN')
671 api_token = api_client.api_token
672 elif api_client is not None:
674 "can't build KeepClient with both API client and token")
675 if local_store is None:
676 local_store = os.environ.get('KEEP_LOCAL_STORE')
678 self.block_cache = block_cache if block_cache else KeepBlockCache()
679 self.timeout = timeout
680 self.proxy_timeout = proxy_timeout
681 self._user_agent_pool = Queue.LifoQueue()
682 self.upload_counter = Counter()
683 self.download_counter = Counter()
684 self.put_counter = Counter()
685 self.get_counter = Counter()
686 self.hits_counter = Counter()
687 self.misses_counter = Counter()
690 self.local_store = local_store
691 self.get = self.local_store_get
692 self.put = self.local_store_put
694 self.num_retries = num_retries
695 self.max_replicas_per_service = None
697 if not proxy.endswith('/'):
699 self.api_token = api_token
700 self._gateway_services = {}
701 self._keep_services = [{
703 'service_type': 'proxy',
704 '_service_root': proxy,
706 self._writable_services = self._keep_services
707 self.using_proxy = True
708 self._static_services_list = True
710 # It's important to avoid instantiating an API client
711 # unless we actually need one, for testing's sake.
712 if api_client is None:
713 api_client = arvados.api('v1')
714 self.api_client = api_client
715 self.api_token = api_client.api_token
716 self._gateway_services = {}
717 self._keep_services = None
718 self._writable_services = None
719 self.using_proxy = None
720 self._static_services_list = False
722 def current_timeout(self, attempt_number):
723 """Return the appropriate timeout to use for this client.
725 The proxy timeout setting if the backend service is currently a proxy,
726 the regular timeout setting otherwise. The `attempt_number` indicates
727 how many times the operation has been tried already (starting from 0
728 for the first try), and scales the connection timeout portion of the
729 return value accordingly.
732 # TODO(twp): the timeout should be a property of a
733 # KeepService, not a KeepClient. See #4488.
734 t = self.proxy_timeout if self.using_proxy else self.timeout
736 return (t[0] * (1 << attempt_number), t[1])
738 return (t[0] * (1 << attempt_number), t[1], t[2])
739 def _any_nondisk_services(self, service_list):
740 return any(ks.get('service_type', 'disk') != 'disk'
741 for ks in service_list)
743 def build_services_list(self, force_rebuild=False):
744 if (self._static_services_list or
745 (self._keep_services and not force_rebuild)):
749 keep_services = self.api_client.keep_services().accessible()
750 except Exception: # API server predates Keep services.
751 keep_services = self.api_client.keep_disks().list()
753 # Gateway services are only used when specified by UUID,
754 # so there's nothing to gain by filtering them by
756 self._gateway_services = {ks['uuid']: ks for ks in
757 keep_services.execute()['items']}
758 if not self._gateway_services:
759 raise arvados.errors.NoKeepServersError()
761 # Precompute the base URI for each service.
762 for r in self._gateway_services.itervalues():
763 host = r['service_host']
764 if not host.startswith('[') and host.find(':') >= 0:
765 # IPv6 URIs must be formatted like http://[::1]:80/...
766 host = '[' + host + ']'
767 r['_service_root'] = "{}://{}:{:d}/".format(
768 'https' if r['service_ssl_flag'] else 'http',
772 _logger.debug(str(self._gateway_services))
773 self._keep_services = [
774 ks for ks in self._gateway_services.itervalues()
775 if not ks.get('service_type', '').startswith('gateway:')]
776 self._writable_services = [ks for ks in self._keep_services
777 if not ks.get('read_only')]
779 # For disk type services, max_replicas_per_service is 1
780 # It is unknown (unlimited) for other service types.
781 if self._any_nondisk_services(self._writable_services):
782 self.max_replicas_per_service = None
784 self.max_replicas_per_service = 1
786 def _service_weight(self, data_hash, service_uuid):
787 """Compute the weight of a Keep service endpoint for a data
788 block with a known hash.
790 The weight is md5(h + u) where u is the last 15 characters of
791 the service endpoint's UUID.
793 return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
795 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
796 """Return an array of Keep service endpoints, in the order in
797 which they should be probed when reading or writing data with
798 the given hash+hints.
800 self.build_services_list(force_rebuild)
803 # Use the services indicated by the given +K@... remote
804 # service hints, if any are present and can be resolved to a
806 for hint in locator.hints:
807 if hint.startswith('K@'):
810 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
811 elif len(hint) == 29:
812 svc = self._gateway_services.get(hint[2:])
814 sorted_roots.append(svc['_service_root'])
816 # Sort the available local services by weight (heaviest first)
817 # for this locator, and return their service_roots (base URIs)
819 use_services = self._keep_services
821 use_services = self._writable_services
822 self.using_proxy = self._any_nondisk_services(use_services)
823 sorted_roots.extend([
824 svc['_service_root'] for svc in sorted(
827 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
828 _logger.debug("{}: {}".format(locator, sorted_roots))
831 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
832 # roots_map is a dictionary, mapping Keep service root strings
833 # to KeepService objects. Poll for Keep services, and add any
834 # new ones to roots_map. Return the current list of local
836 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
837 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
838 for root in local_roots:
839 if root not in roots_map:
840 roots_map[root] = self.KeepService(
841 root, self._user_agent_pool,
842 upload_counter=self.upload_counter,
843 download_counter=self.download_counter,
848 def _check_loop_result(result):
849 # KeepClient RetryLoops should save results as a 2-tuple: the
850 # actual result of the request, and the number of servers available
851 # to receive the request this round.
852 # This method returns True if there's a real result, False if
853 # there are no more servers available, otherwise None.
854 if isinstance(result, Exception):
856 result, tried_server_count = result
857 if (result is not None) and (result is not False):
859 elif tried_server_count < 1:
860 _logger.info("No more Keep services to try; giving up")
865 def get_from_cache(self, loc):
866 """Fetch a block only if is in the cache, otherwise return None."""
867 slot = self.block_cache.get(loc)
868 if slot is not None and slot.ready.is_set():
874 def get(self, loc_s, num_retries=None):
875 """Get data from Keep.
877 This method fetches one or more blocks of data from Keep. It
878 sends a request each Keep service registered with the API
879 server (or the proxy provided when this client was
880 instantiated), then each service named in location hints, in
881 sequence. As soon as one service provides the data, it's
885 * loc_s: A string of one or more comma-separated locators to fetch.
886 This method returns the concatenation of these blocks.
887 * num_retries: The number of times to retry GET requests to
888 *each* Keep server if it returns temporary failures, with
889 exponential backoff. Note that, in each loop, the method may try
890 to fetch data from every available Keep service, along with any
891 that are named in location hints in the locator. The default value
892 is set when the KeepClient is initialized.
895 return ''.join(self.get(x) for x in loc_s.split(','))
897 self.get_counter.add(1)
899 locator = KeepLocator(loc_s)
900 slot, first = self.block_cache.reserve_cache(locator.md5sum)
902 self.hits_counter.add(1)
906 self.misses_counter.add(1)
908 # If the locator has hints specifying a prefix (indicating a
909 # remote keepproxy) or the UUID of a local gateway service,
910 # read data from the indicated service(s) instead of the usual
911 # list of local disk services.
912 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
913 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
914 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
915 for hint in locator.hints if (
916 hint.startswith('K@') and
918 self._gateway_services.get(hint[2:])
920 # Map root URLs to their KeepService objects.
922 root: self.KeepService(root, self._user_agent_pool,
923 upload_counter=self.upload_counter,
924 download_counter=self.download_counter)
925 for root in hint_roots
928 # See #3147 for a discussion of the loop implementation. Highlights:
929 # * Refresh the list of Keep services after each failure, in case
930 # it's being updated.
931 # * Retry until we succeed, we're out of retries, or every available
932 # service has returned permanent failure.
936 loop = retry.RetryLoop(num_retries, self._check_loop_result,
938 for tries_left in loop:
940 sorted_roots = self.map_new_services(
942 force_rebuild=(tries_left < num_retries),
944 except Exception as error:
945 loop.save_result(error)
948 # Query KeepService objects that haven't returned
949 # permanent failure, in our specified shuffle order.
950 services_to_try = [roots_map[root]
951 for root in sorted_roots
952 if roots_map[root].usable()]
953 for keep_service in services_to_try:
954 blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
957 loop.save_result((blob, len(services_to_try)))
959 # Always cache the result, then return it if we succeeded.
961 self.block_cache.cap_cache()
965 # Q: Including 403 is necessary for the Keep tests to continue
966 # passing, but maybe they should expect KeepReadError instead?
967 not_founds = sum(1 for key in sorted_roots
968 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
969 service_errors = ((key, roots_map[key].last_result()['error'])
970 for key in sorted_roots)
972 raise arvados.errors.KeepReadError(
973 "failed to read {}: no Keep services available ({})".format(
974 loc_s, loop.last_result()))
975 elif not_founds == len(sorted_roots):
976 raise arvados.errors.NotFoundError(
977 "{} not found".format(loc_s), service_errors)
979 raise arvados.errors.KeepReadError(
980 "failed to read {}".format(loc_s), service_errors, label="service")
983 def put(self, data, copies=2, num_retries=None):
984 """Save data in Keep.
986 This method will get a list of Keep services from the API server, and
987 send the data to each one simultaneously in a new thread. Once the
988 uploads are finished, if enough copies are saved, this method returns
989 the most recent HTTP response body. If requests fail to upload
990 enough copies, this method raises KeepWriteError.
993 * data: The string of data to upload.
994 * copies: The number of copies that the user requires be saved.
996 * num_retries: The number of times to retry PUT requests to
997 *each* Keep server if it returns temporary failures, with
998 exponential backoff. The default value is set when the
999 KeepClient is initialized.
1002 if isinstance(data, unicode):
1003 data = data.encode("ascii")
1004 elif not isinstance(data, str):
1005 raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
1007 self.put_counter.add(1)
1009 data_hash = hashlib.md5(data).hexdigest()
1010 loc_s = data_hash + '+' + str(len(data))
1013 locator = KeepLocator(loc_s)
1016 # Tell the proxy how many copies we want it to store
1017 headers['X-Keep-Desired-Replication'] = str(copies)
1019 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1022 for tries_left in loop:
1024 sorted_roots = self.map_new_services(
1026 force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
1027 except Exception as error:
1028 loop.save_result(error)
1031 thread_limiter = KeepClient.ThreadLimiter(
1032 copies - done, self.max_replicas_per_service)
1034 for service_root, ks in [(root, roots_map[root])
1035 for root in sorted_roots]:
1038 t = KeepClient.KeepWriterThread(
1041 data_hash=data_hash,
1042 service_root=service_root,
1043 thread_limiter=thread_limiter,
1044 timeout=self.current_timeout(num_retries-tries_left),
1045 thread_sequence=len(threads))
1050 done += thread_limiter.done()
1051 loop.save_result((done >= copies, len(threads)))
1054 return thread_limiter.response()
1056 raise arvados.errors.KeepWriteError(
1057 "failed to write {}: no Keep services available ({})".format(
1058 data_hash, loop.last_result()))
1060 service_errors = ((key, roots_map[key].last_result()['error'])
1061 for key in sorted_roots
1062 if roots_map[key].last_result()['error'])
1063 raise arvados.errors.KeepWriteError(
1064 "failed to write {} (wanted {} copies but wrote {})".format(
1065 data_hash, copies, thread_limiter.done()), service_errors, label="service")
1067 def local_store_put(self, data, copies=1, num_retries=None):
1068 """A stub for put().
1070 This method is used in place of the real put() method when
1071 using local storage (see constructor's local_store argument).
1073 copies and num_retries arguments are ignored: they are here
1074 only for the sake of offering the same call signature as
1077 Data stored this way can be retrieved via local_store_get().
1079 md5 = hashlib.md5(data).hexdigest()
1080 locator = '%s+%d' % (md5, len(data))
1081 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1083 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1084 os.path.join(self.local_store, md5))
1087 def local_store_get(self, loc_s, num_retries=None):
1088 """Companion to local_store_put()."""
1090 locator = KeepLocator(loc_s)
1092 raise arvados.errors.NotFoundError(
1093 "Invalid data locator: '%s'" % loc_s)
1094 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1096 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1099 def is_cached(self, locator):
1100 return self.block_cache.reserve_cache(expect_hash)