16 import arvados.config as config
18 import arvados.retry as retry
21 _logger = logging.getLogger('arvados.keep')
22 global_client_object = None
25 class KeepLocator(object):
26 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
27 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
29 def __init__(self, locator_str):
32 self._perm_expiry = None
33 pieces = iter(locator_str.split('+'))
34 self.md5sum = next(pieces)
36 self.size = int(next(pieces))
40 if self.HINT_RE.match(hint) is None:
41 raise ValueError("invalid hint format: {}".format(hint))
42 elif hint.startswith('A'):
43 self.parse_permission_hint(hint)
45 self.hints.append(hint)
49 str(s) for s in [self.md5sum, self.size,
50 self.permission_hint()] + self.hints
54 if self.size is not None:
55 return "%s+%i" % (self.md5sum, self.size)
59 def _make_hex_prop(name, length):
60 # Build and return a new property with the given name that
61 # must be a hex string of the given length.
62 data_name = '_{}'.format(name)
64 return getattr(self, data_name)
65 def setter(self, hex_str):
66 if not arvados.util.is_hex(hex_str, length):
67 raise ValueError("{} is not a {}-digit hex string: {}".
68 format(name, length, hex_str))
69 setattr(self, data_name, hex_str)
70 return property(getter, setter)
72 md5sum = _make_hex_prop('md5sum', 32)
73 perm_sig = _make_hex_prop('perm_sig', 40)
76 def perm_expiry(self):
77 return self._perm_expiry
80 def perm_expiry(self, value):
81 if not arvados.util.is_hex(value, 1, 8):
83 "permission timestamp must be a hex Unix timestamp: {}".
85 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
87 def permission_hint(self):
88 data = [self.perm_sig, self.perm_expiry]
91 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
92 return "A{}@{:08x}".format(*data)
94 def parse_permission_hint(self, s):
96 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
98 raise ValueError("bad permission hint {}".format(s))
100 def permission_expired(self, as_of_dt=None):
101 if self.perm_expiry is None:
103 elif as_of_dt is None:
104 as_of_dt = datetime.datetime.now()
105 return self.perm_expiry <= as_of_dt
109 """Simple interface to a global KeepClient object.
111 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
112 own API client. The global KeepClient will build an API client from the
113 current Arvados configuration, which may not match the one you built.
118 def global_client_object(cls):
119 global global_client_object
120 # Previously, KeepClient would change its behavior at runtime based
121 # on these configuration settings. We simulate that behavior here
122 # by checking the values and returning a new KeepClient if any of
124 key = (config.get('ARVADOS_API_HOST'),
125 config.get('ARVADOS_API_TOKEN'),
126 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
127 config.get('ARVADOS_KEEP_PROXY'),
128 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
129 os.environ.get('KEEP_LOCAL_STORE'))
130 if (global_client_object is None) or (cls._last_key != key):
131 global_client_object = KeepClient()
133 return global_client_object
136 def get(locator, **kwargs):
137 return Keep.global_client_object().get(locator, **kwargs)
140 def put(data, **kwargs):
141 return Keep.global_client_object().put(data, **kwargs)
143 class KeepBlockCache(object):
144 # Default RAM cache is 256MiB
145 def __init__(self, cache_max=(256 * 1024 * 1024)):
146 self.cache_max = cache_max
148 self._cache_lock = threading.Lock()
150 class CacheSlot(object):
151 __slots__ = ("locator", "ready", "content")
153 def __init__(self, locator):
154 self.locator = locator
155 self.ready = threading.Event()
162 def set(self, value):
167 if self.content is None:
170 return len(self.content)
173 '''Cap the cache size to self.cache_max'''
174 with self._cache_lock:
175 # Select all slots except those where ready.is_set() and content is
176 # None (that means there was an error reading the block).
177 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
178 sm = sum([slot.size() for slot in self._cache])
179 while len(self._cache) > 0 and sm > self.cache_max:
180 for i in xrange(len(self._cache)-1, -1, -1):
181 if self._cache[i].ready.is_set():
184 sm = sum([slot.size() for slot in self._cache])
186 def _get(self, locator):
187 # Test if the locator is already in the cache
188 for i in xrange(0, len(self._cache)):
189 if self._cache[i].locator == locator:
192 # move it to the front
194 self._cache.insert(0, n)
198 def get(self, locator):
199 with self._cache_lock:
200 return self._get(locator)
202 def reserve_cache(self, locator, reserve=True):
203 '''Reserve a cache slot for the specified locator,
204 or return the existing slot.'''
205 with self._cache_lock:
206 n = self._get(locator)
209 elif reserve == True:
210 # Add a new cache slot for the locator
211 n = KeepBlockCache.CacheSlot(locator)
212 self._cache.insert(0, n)
217 class Counter(object):
218 def __init__(self, v=0):
219 self._lk = threading.Lock()
231 class KeepClient(object):
233 # Default Keep server connection timeout: 2 seconds
234 # Default Keep server read timeout: 256 seconds
235 # Default Keep server bandwidth minimum: 32768 bytes per second
236 # Default Keep proxy connection timeout: 20 seconds
237 # Default Keep proxy read timeout: 256 seconds
238 # Default Keep proxy bandwidth minimum: 32768 bytes per second
239 DEFAULT_TIMEOUT = (2, 256, 32768)
240 DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
242 class ThreadLimiter(object):
243 """Limit the number of threads writing to Keep at once.
245 This ensures that only a number of writer threads that could
246 potentially achieve the desired replication level run at once.
247 Once the desired replication level is achieved, queued threads
248 are instructed not to run.
250 Should be used in a "with" block.
252 def __init__(self, want_copies, max_service_replicas):
254 self._want_copies = want_copies
256 self._response = None
257 self._start_lock = threading.Condition()
258 if (not max_service_replicas) or (max_service_replicas >= want_copies):
261 max_threads = math.ceil(float(want_copies) / max_service_replicas)
262 _logger.debug("Limiter max threads is %d", max_threads)
263 self._todo_lock = threading.Semaphore(max_threads)
264 self._done_lock = threading.Lock()
265 self._local = threading.local()
268 self._start_lock.acquire()
269 if getattr(self._local, 'sequence', None) is not None:
270 # If the calling thread has used set_sequence(N), then
271 # we wait here until N other threads have started.
272 while self._started < self._local.sequence:
273 self._start_lock.wait()
274 self._todo_lock.acquire()
276 self._start_lock.notifyAll()
277 self._start_lock.release()
280 def __exit__(self, type, value, traceback):
281 self._todo_lock.release()
283 def set_sequence(self, sequence):
284 self._local.sequence = sequence
286 def shall_i_proceed(self):
288 Return true if the current thread should write to Keep.
289 Return false otherwise.
291 with self._done_lock:
292 return (self._done < self._want_copies)
294 def save_response(self, response_body, replicas_stored):
296 Records a response body (a locator, possibly signed) returned by
297 the Keep server, and the number of replicas it stored.
299 with self._done_lock:
300 self._done += replicas_stored
301 self._response = response_body
304 """Return the body from the response to a PUT request."""
305 with self._done_lock:
306 return self._response
309 """Return the total number of replicas successfully stored."""
310 with self._done_lock:
313 class KeepService(object):
314 """Make requests to a single Keep service, and track results.
316 A KeepService is intended to last long enough to perform one
317 transaction (GET or PUT) against one Keep service. This can
318 involve calling either get() or put() multiple times in order
319 to retry after transient failures. However, calling both get()
320 and put() on a single instance -- or using the same instance
321 to access two different Keep services -- will not produce
328 arvados.errors.HttpError,
331 def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
333 download_counter=None, **headers):
335 self._user_agent_pool = user_agent_pool
336 self._result = {'error': None}
339 self.get_headers = {'Accept': 'application/octet-stream'}
340 self.get_headers.update(headers)
341 self.put_headers = headers
342 self.upload_counter = upload_counter
343 self.download_counter = download_counter
346 """Is it worth attempting a request?"""
350 """Did the request succeed or encounter permanent failure?"""
351 return self._result['error'] == False or not self._usable
353 def last_result(self):
356 def _get_user_agent(self):
358 return self._user_agent_pool.get(False)
362 def _put_user_agent(self, ua):
365 self._user_agent_pool.put(ua, False)
370 def _socket_open(family, socktype, protocol, address=None):
371 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
372 s = socket.socket(family, socktype, protocol)
373 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
374 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
375 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
378 def get(self, locator, method="GET", timeout=None):
379 # locator is a KeepLocator object.
380 url = self.root + str(locator)
381 _logger.debug("Request: %s %s", method, url)
382 curl = self._get_user_agent()
385 with timer.Timer() as t:
387 response_body = cStringIO.StringIO()
388 curl.setopt(pycurl.NOSIGNAL, 1)
389 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
390 curl.setopt(pycurl.URL, url.encode('utf-8'))
391 curl.setopt(pycurl.HTTPHEADER, [
392 '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
393 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
394 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
396 curl.setopt(pycurl.NOBODY, True)
397 self._setcurltimeouts(curl, timeout)
401 except Exception as e:
402 raise arvados.errors.HttpError(0, str(e))
404 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
405 'body': response_body.getvalue(),
406 'headers': self._headers,
410 ok = retry.check_http_response_success(self._result['status_code'])
412 self._result['error'] = arvados.errors.HttpError(
413 self._result['status_code'],
414 self._headers.get('x-status-line', 'Error'))
415 except self.HTTP_ERRORS as e:
419 self._usable = ok != False
420 if self._result.get('status_code', None):
421 # The client worked well enough to get an HTTP status
422 # code, so presumably any problems are just on the
423 # server side and it's OK to reuse the client.
424 self._put_user_agent(curl)
426 # Don't return this client to the pool, in case it's
430 _logger.debug("Request fail: GET %s => %s: %s",
431 url, type(self._result['error']), str(self._result['error']))
434 _logger.info("HEAD %s: %s bytes",
435 self._result['status_code'],
436 self._headers.get('content-length'))
437 content_len = self._headers.get('content-length')
438 if content_len is None:
439 content_len = self._result['body']
440 return str(content_len)
442 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
443 self._result['status_code'],
444 len(self._result['body']),
446 (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
448 if self.download_counter:
449 self.download_counter.add(len(self._result['body']))
450 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
451 if resp_md5 != locator.md5sum:
452 _logger.warning("Checksum fail: md5(%s) = %s",
454 self._result['error'] = arvados.errors.HttpError(
457 return self._result['body']
459 def put(self, hash_s, body, timeout=None):
460 url = self.root + hash_s
461 _logger.debug("Request: PUT %s", url)
462 curl = self._get_user_agent()
465 with timer.Timer() as t:
467 body_reader = cStringIO.StringIO(body)
468 response_body = cStringIO.StringIO()
469 curl.setopt(pycurl.NOSIGNAL, 1)
470 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
471 curl.setopt(pycurl.URL, url.encode('utf-8'))
472 # Using UPLOAD tells cURL to wait for a "go ahead" from the
473 # Keep server (in the form of a HTTP/1.1 "100 Continue"
474 # response) instead of sending the request body immediately.
475 # This allows the server to reject the request if the request
476 # is invalid or the server is read-only, without waiting for
477 # the client to send the entire block.
478 curl.setopt(pycurl.UPLOAD, True)
479 curl.setopt(pycurl.INFILESIZE, len(body))
480 curl.setopt(pycurl.READFUNCTION, body_reader.read)
481 curl.setopt(pycurl.HTTPHEADER, [
482 '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
483 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
484 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
485 self._setcurltimeouts(curl, timeout)
488 except Exception as e:
489 raise arvados.errors.HttpError(0, str(e))
491 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
492 'body': response_body.getvalue(),
493 'headers': self._headers,
496 ok = retry.check_http_response_success(self._result['status_code'])
498 self._result['error'] = arvados.errors.HttpError(
499 self._result['status_code'],
500 self._headers.get('x-status-line', 'Error'))
501 except self.HTTP_ERRORS as e:
505 self._usable = ok != False # still usable if ok is True or None
506 if self._result.get('status_code', None):
507 # Client is functional. See comment in get().
508 self._put_user_agent(curl)
512 _logger.debug("Request fail: PUT %s => %s: %s",
513 url, type(self._result['error']), str(self._result['error']))
515 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
516 self._result['status_code'],
519 (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
520 if self.upload_counter:
521 self.upload_counter.add(len(body))
524 def _setcurltimeouts(self, curl, timeouts):
527 elif isinstance(timeouts, tuple):
528 if len(timeouts) == 2:
529 conn_t, xfer_t = timeouts
530 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
532 conn_t, xfer_t, bandwidth_bps = timeouts
534 conn_t, xfer_t = (timeouts, timeouts)
535 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
536 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
537 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
538 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
540 def _headerfunction(self, header_line):
541 header_line = header_line.decode('iso-8859-1')
542 if ':' in header_line:
543 name, value = header_line.split(':', 1)
544 name = name.strip().lower()
545 value = value.strip()
547 name = self._lastheadername
548 value = self._headers[name] + ' ' + header_line.strip()
549 elif header_line.startswith('HTTP/'):
550 name = 'x-status-line'
553 _logger.error("Unexpected header line: %s", header_line)
555 self._lastheadername = name
556 self._headers[name] = value
557 # Returning None implies all bytes were written
560 class KeepWriterThread(threading.Thread):
562 Write a blob of data to the given Keep server. On success, call
563 save_response() of the given ThreadLimiter to save the returned
566 def __init__(self, keep_service, **kwargs):
567 super(KeepClient.KeepWriterThread, self).__init__()
568 self.service = keep_service
570 self._success = False
576 limiter = self.args['thread_limiter']
577 sequence = self.args['thread_sequence']
578 if sequence is not None:
579 limiter.set_sequence(sequence)
581 if not limiter.shall_i_proceed():
582 # My turn arrived, but the job has been done without
585 self.run_with_limiter(limiter)
587 def run_with_limiter(self, limiter):
588 if self.service.finished():
590 _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
591 str(threading.current_thread()),
592 self.args['data_hash'],
593 len(self.args['data']),
594 self.args['service_root'])
595 self._success = bool(self.service.put(
596 self.args['data_hash'],
598 timeout=self.args.get('timeout', None)))
599 result = self.service.last_result()
601 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
602 str(threading.current_thread()),
603 self.args['data_hash'],
604 len(self.args['data']),
605 self.args['service_root'])
606 # Tick the 'done' counter for the number of replica
607 # reported stored by the server, for the case that
608 # we're talking to a proxy or other backend that
609 # stores to multiple copies for us.
611 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
612 except (KeyError, ValueError):
614 limiter.save_response(result['body'].strip(), replicas_stored)
615 elif result.get('status_code', None):
616 _logger.debug("Request fail: PUT %s => %s %s",
617 self.args['data_hash'],
618 result['status_code'],
622 def __init__(self, api_client=None, proxy=None,
623 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
624 api_token=None, local_store=None, block_cache=None,
625 num_retries=0, session=None):
626 """Initialize a new KeepClient.
630 The API client to use to find Keep services. If not
631 provided, KeepClient will build one from available Arvados
635 If specified, this KeepClient will send requests to this Keep
636 proxy. Otherwise, KeepClient will fall back to the setting of the
637 ARVADOS_KEEP_PROXY configuration setting. If you want to ensure
638 KeepClient does not use a proxy, pass in an empty string.
641 The initial timeout (in seconds) for HTTP requests to Keep
642 non-proxy servers. A tuple of three floats is interpreted as
643 (connection_timeout, read_timeout, minimum_bandwidth). A connection
644 will be aborted if the average traffic rate falls below
645 minimum_bandwidth bytes per second over an interval of read_timeout
646 seconds. Because timeouts are often a result of transient server
647 load, the actual connection timeout will be increased by a factor
648 of two on each retry.
649 Default: (2, 256, 32768).
652 The initial timeout (in seconds) for HTTP requests to
653 Keep proxies. A tuple of three floats is interpreted as
654 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
655 described above for adjusting connection timeouts on retry also
657 Default: (20, 256, 32768).
660 If you're not using an API client, but only talking
661 directly to a Keep proxy, this parameter specifies an API token
662 to authenticate Keep requests. It is an error to specify both
663 api_client and api_token. If you specify neither, KeepClient
664 will use one available from the Arvados configuration.
667 If specified, this KeepClient will bypass Keep
668 services, and save data to the named directory. If unspecified,
669 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
670 environment variable. If you want to ensure KeepClient does not
671 use local storage, pass in an empty string. This is primarily
672 intended to mock a server for testing.
675 The default number of times to retry failed requests.
676 This will be used as the default num_retries value when get() and
677 put() are called. Default 0.
679 self.lock = threading.Lock()
681 proxy = config.get('ARVADOS_KEEP_PROXY')
682 if api_token is None:
683 if api_client is None:
684 api_token = config.get('ARVADOS_API_TOKEN')
686 api_token = api_client.api_token
687 elif api_client is not None:
689 "can't build KeepClient with both API client and token")
690 if local_store is None:
691 local_store = os.environ.get('KEEP_LOCAL_STORE')
693 self.block_cache = block_cache if block_cache else KeepBlockCache()
694 self.timeout = timeout
695 self.proxy_timeout = proxy_timeout
696 self._user_agent_pool = Queue.LifoQueue()
697 self.upload_counter = Counter()
698 self.download_counter = Counter()
699 self.put_counter = Counter()
700 self.get_counter = Counter()
701 self.hits_counter = Counter()
702 self.misses_counter = Counter()
705 self.local_store = local_store
706 self.get = self.local_store_get
707 self.put = self.local_store_put
709 self.num_retries = num_retries
710 self.max_replicas_per_service = None
712 if not proxy.endswith('/'):
714 self.api_token = api_token
715 self._gateway_services = {}
716 self._keep_services = [{
718 'service_type': 'proxy',
719 '_service_root': proxy,
721 self._writable_services = self._keep_services
722 self.using_proxy = True
723 self._static_services_list = True
725 # It's important to avoid instantiating an API client
726 # unless we actually need one, for testing's sake.
727 if api_client is None:
728 api_client = arvados.api('v1')
729 self.api_client = api_client
730 self.api_token = api_client.api_token
731 self._gateway_services = {}
732 self._keep_services = None
733 self._writable_services = None
734 self.using_proxy = None
735 self._static_services_list = False
737 def current_timeout(self, attempt_number):
738 """Return the appropriate timeout to use for this client.
740 The proxy timeout setting if the backend service is currently a proxy,
741 the regular timeout setting otherwise. The `attempt_number` indicates
742 how many times the operation has been tried already (starting from 0
743 for the first try), and scales the connection timeout portion of the
744 return value accordingly.
747 # TODO(twp): the timeout should be a property of a
748 # KeepService, not a KeepClient. See #4488.
749 t = self.proxy_timeout if self.using_proxy else self.timeout
751 return (t[0] * (1 << attempt_number), t[1])
753 return (t[0] * (1 << attempt_number), t[1], t[2])
754 def _any_nondisk_services(self, service_list):
755 return any(ks.get('service_type', 'disk') != 'disk'
756 for ks in service_list)
758 def build_services_list(self, force_rebuild=False):
759 if (self._static_services_list or
760 (self._keep_services and not force_rebuild)):
764 keep_services = self.api_client.keep_services().accessible()
765 except Exception: # API server predates Keep services.
766 keep_services = self.api_client.keep_disks().list()
768 # Gateway services are only used when specified by UUID,
769 # so there's nothing to gain by filtering them by
771 self._gateway_services = {ks['uuid']: ks for ks in
772 keep_services.execute()['items']}
773 if not self._gateway_services:
774 raise arvados.errors.NoKeepServersError()
776 # Precompute the base URI for each service.
777 for r in self._gateway_services.itervalues():
778 host = r['service_host']
779 if not host.startswith('[') and host.find(':') >= 0:
780 # IPv6 URIs must be formatted like http://[::1]:80/...
781 host = '[' + host + ']'
782 r['_service_root'] = "{}://{}:{:d}/".format(
783 'https' if r['service_ssl_flag'] else 'http',
787 _logger.debug(str(self._gateway_services))
788 self._keep_services = [
789 ks for ks in self._gateway_services.itervalues()
790 if not ks.get('service_type', '').startswith('gateway:')]
791 self._writable_services = [ks for ks in self._keep_services
792 if not ks.get('read_only')]
794 # For disk type services, max_replicas_per_service is 1
795 # It is unknown (unlimited) for other service types.
796 if self._any_nondisk_services(self._writable_services):
797 self.max_replicas_per_service = None
799 self.max_replicas_per_service = 1
801 def _service_weight(self, data_hash, service_uuid):
802 """Compute the weight of a Keep service endpoint for a data
803 block with a known hash.
805 The weight is md5(h + u) where u is the last 15 characters of
806 the service endpoint's UUID.
808 return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
810 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
811 """Return an array of Keep service endpoints, in the order in
812 which they should be probed when reading or writing data with
813 the given hash+hints.
815 self.build_services_list(force_rebuild)
818 # Use the services indicated by the given +K@... remote
819 # service hints, if any are present and can be resolved to a
821 for hint in locator.hints:
822 if hint.startswith('K@'):
825 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
826 elif len(hint) == 29:
827 svc = self._gateway_services.get(hint[2:])
829 sorted_roots.append(svc['_service_root'])
831 # Sort the available local services by weight (heaviest first)
832 # for this locator, and return their service_roots (base URIs)
834 use_services = self._keep_services
836 use_services = self._writable_services
837 self.using_proxy = self._any_nondisk_services(use_services)
838 sorted_roots.extend([
839 svc['_service_root'] for svc in sorted(
842 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
843 _logger.debug("{}: {}".format(locator, sorted_roots))
846 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
847 # roots_map is a dictionary, mapping Keep service root strings
848 # to KeepService objects. Poll for Keep services, and add any
849 # new ones to roots_map. Return the current list of local
851 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
852 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
853 for root in local_roots:
854 if root not in roots_map:
855 roots_map[root] = self.KeepService(
856 root, self._user_agent_pool,
857 upload_counter=self.upload_counter,
858 download_counter=self.download_counter,
863 def _check_loop_result(result):
864 # KeepClient RetryLoops should save results as a 2-tuple: the
865 # actual result of the request, and the number of servers available
866 # to receive the request this round.
867 # This method returns True if there's a real result, False if
868 # there are no more servers available, otherwise None.
869 if isinstance(result, Exception):
871 result, tried_server_count = result
872 if (result is not None) and (result is not False):
874 elif tried_server_count < 1:
875 _logger.info("No more Keep services to try; giving up")
880 def get_from_cache(self, loc):
881 """Fetch a block only if is in the cache, otherwise return None."""
882 slot = self.block_cache.get(loc)
883 if slot is not None and slot.ready.is_set():
889 def head(self, loc_s, num_retries=None):
890 return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
893 def get(self, loc_s, num_retries=None):
894 return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
896 def _get_or_head(self, loc_s, method="GET", num_retries=None):
897 """Get data from Keep.
899 This method fetches one or more blocks of data from Keep. It
900 sends a request each Keep service registered with the API
901 server (or the proxy provided when this client was
902 instantiated), then each service named in location hints, in
903 sequence. As soon as one service provides the data, it's
907 * loc_s: A string of one or more comma-separated locators to fetch.
908 This method returns the concatenation of these blocks.
909 * num_retries: The number of times to retry GET requests to
910 *each* Keep server if it returns temporary failures, with
911 exponential backoff. Note that, in each loop, the method may try
912 to fetch data from every available Keep service, along with any
913 that are named in location hints in the locator. The default value
914 is set when the KeepClient is initialized.
917 return ''.join(self.get(x) for x in loc_s.split(','))
919 self.get_counter.add(1)
921 locator = KeepLocator(loc_s)
922 slot, first = self.block_cache.reserve_cache(locator.md5sum, True if method == "GET" else False)
923 if not first and slot is not None:
924 self.hits_counter.add(1)
931 self.misses_counter.add(1)
933 # If the locator has hints specifying a prefix (indicating a
934 # remote keepproxy) or the UUID of a local gateway service,
935 # read data from the indicated service(s) instead of the usual
936 # list of local disk services.
937 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
938 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
939 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
940 for hint in locator.hints if (
941 hint.startswith('K@') and
943 self._gateway_services.get(hint[2:])
945 # Map root URLs to their KeepService objects.
947 root: self.KeepService(root, self._user_agent_pool,
948 upload_counter=self.upload_counter,
949 download_counter=self.download_counter)
950 for root in hint_roots
953 # See #3147 for a discussion of the loop implementation. Highlights:
954 # * Refresh the list of Keep services after each failure, in case
955 # it's being updated.
956 # * Retry until we succeed, we're out of retries, or every available
957 # service has returned permanent failure.
961 loop = retry.RetryLoop(num_retries, self._check_loop_result,
963 for tries_left in loop:
965 sorted_roots = self.map_new_services(
967 force_rebuild=(tries_left < num_retries),
969 except Exception as error:
970 loop.save_result(error)
973 # Query KeepService objects that haven't returned
974 # permanent failure, in our specified shuffle order.
975 services_to_try = [roots_map[root]
976 for root in sorted_roots
977 if roots_map[root].usable()]
978 for keep_service in services_to_try:
979 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
982 loop.save_result((blob, len(services_to_try)))
984 # Always cache the result, then return it if we succeeded.
987 self.block_cache.cap_cache()
991 # Q: Including 403 is necessary for the Keep tests to continue
992 # passing, but maybe they should expect KeepReadError instead?
993 not_founds = sum(1 for key in sorted_roots
994 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
995 service_errors = ((key, roots_map[key].last_result()['error'])
996 for key in sorted_roots)
998 raise arvados.errors.KeepReadError(
999 "failed to read {}: no Keep services available ({})".format(
1000 loc_s, loop.last_result()))
1001 elif not_founds == len(sorted_roots):
1002 raise arvados.errors.NotFoundError(
1003 "{} not found".format(loc_s), service_errors)
1005 raise arvados.errors.KeepReadError(
1006 "failed to read {}".format(loc_s), service_errors, label="service")
1009 def put(self, data, copies=2, num_retries=None):
1010 """Save data in Keep.
1012 This method will get a list of Keep services from the API server, and
1013 send the data to each one simultaneously in a new thread. Once the
1014 uploads are finished, if enough copies are saved, this method returns
1015 the most recent HTTP response body. If requests fail to upload
1016 enough copies, this method raises KeepWriteError.
1019 * data: The string of data to upload.
1020 * copies: The number of copies that the user requires be saved.
1022 * num_retries: The number of times to retry PUT requests to
1023 *each* Keep server if it returns temporary failures, with
1024 exponential backoff. The default value is set when the
1025 KeepClient is initialized.
1028 if isinstance(data, unicode):
1029 data = data.encode("ascii")
1030 elif not isinstance(data, str):
1031 raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
1033 self.put_counter.add(1)
1035 data_hash = hashlib.md5(data).hexdigest()
1036 loc_s = data_hash + '+' + str(len(data))
1039 locator = KeepLocator(loc_s)
1042 # Tell the proxy how many copies we want it to store
1043 headers['X-Keep-Desired-Replication'] = str(copies)
1045 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1048 for tries_left in loop:
1050 sorted_roots = self.map_new_services(
1052 force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
1053 except Exception as error:
1054 loop.save_result(error)
1057 thread_limiter = KeepClient.ThreadLimiter(
1058 copies - done, self.max_replicas_per_service)
1060 for service_root, ks in [(root, roots_map[root])
1061 for root in sorted_roots]:
1064 t = KeepClient.KeepWriterThread(
1067 data_hash=data_hash,
1068 service_root=service_root,
1069 thread_limiter=thread_limiter,
1070 timeout=self.current_timeout(num_retries-tries_left),
1071 thread_sequence=len(threads))
1076 done += thread_limiter.done()
1077 loop.save_result((done >= copies, len(threads)))
1080 return thread_limiter.response()
1082 raise arvados.errors.KeepWriteError(
1083 "failed to write {}: no Keep services available ({})".format(
1084 data_hash, loop.last_result()))
1086 service_errors = ((key, roots_map[key].last_result()['error'])
1087 for key in sorted_roots
1088 if roots_map[key].last_result()['error'])
1089 raise arvados.errors.KeepWriteError(
1090 "failed to write {} (wanted {} copies but wrote {})".format(
1091 data_hash, copies, thread_limiter.done()), service_errors, label="service")
1093 def local_store_put(self, data, copies=1, num_retries=None):
1094 """A stub for put().
1096 This method is used in place of the real put() method when
1097 using local storage (see constructor's local_store argument).
1099 copies and num_retries arguments are ignored: they are here
1100 only for the sake of offering the same call signature as
1103 Data stored this way can be retrieved via local_store_get().
1105 md5 = hashlib.md5(data).hexdigest()
1106 locator = '%s+%d' % (md5, len(data))
1107 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1109 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1110 os.path.join(self.local_store, md5))
1113 def local_store_get(self, loc_s, num_retries=None):
1114 """Companion to local_store_put()."""
1116 locator = KeepLocator(loc_s)
1118 raise arvados.errors.NotFoundError(
1119 "Invalid data locator: '%s'" % loc_s)
1120 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1122 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1125 def is_cached(self, locator):
1126 return self.block_cache.reserve_cache(expect_hash)