16 import arvados.config as config
18 import arvados.retry as retry
21 _logger = logging.getLogger('arvados.keep')
22 global_client_object = None
25 class KeepLocator(object):
26 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
27 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
29 def __init__(self, locator_str):
32 self._perm_expiry = None
33 pieces = iter(locator_str.split('+'))
34 self.md5sum = next(pieces)
36 self.size = int(next(pieces))
40 if self.HINT_RE.match(hint) is None:
41 raise ValueError("invalid hint format: {}".format(hint))
42 elif hint.startswith('A'):
43 self.parse_permission_hint(hint)
45 self.hints.append(hint)
49 str(s) for s in [self.md5sum, self.size,
50 self.permission_hint()] + self.hints
54 if self.size is not None:
55 return "%s+%i" % (self.md5sum, self.size)
59 def _make_hex_prop(name, length):
60 # Build and return a new property with the given name that
61 # must be a hex string of the given length.
62 data_name = '_{}'.format(name)
64 return getattr(self, data_name)
65 def setter(self, hex_str):
66 if not arvados.util.is_hex(hex_str, length):
67 raise ValueError("{} is not a {}-digit hex string: {}".
68 format(name, length, hex_str))
69 setattr(self, data_name, hex_str)
70 return property(getter, setter)
72 md5sum = _make_hex_prop('md5sum', 32)
73 perm_sig = _make_hex_prop('perm_sig', 40)
76 def perm_expiry(self):
77 return self._perm_expiry
80 def perm_expiry(self, value):
81 if not arvados.util.is_hex(value, 1, 8):
83 "permission timestamp must be a hex Unix timestamp: {}".
85 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
87 def permission_hint(self):
88 data = [self.perm_sig, self.perm_expiry]
91 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
92 return "A{}@{:08x}".format(*data)
94 def parse_permission_hint(self, s):
96 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
98 raise ValueError("bad permission hint {}".format(s))
100 def permission_expired(self, as_of_dt=None):
101 if self.perm_expiry is None:
103 elif as_of_dt is None:
104 as_of_dt = datetime.datetime.now()
105 return self.perm_expiry <= as_of_dt
109 """Simple interface to a global KeepClient object.
111 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
112 own API client. The global KeepClient will build an API client from the
113 current Arvados configuration, which may not match the one you built.
118 def global_client_object(cls):
119 global global_client_object
120 # Previously, KeepClient would change its behavior at runtime based
121 # on these configuration settings. We simulate that behavior here
122 # by checking the values and returning a new KeepClient if any of
124 key = (config.get('ARVADOS_API_HOST'),
125 config.get('ARVADOS_API_TOKEN'),
126 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
127 config.get('ARVADOS_KEEP_PROXY'),
128 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
129 os.environ.get('KEEP_LOCAL_STORE'))
130 if (global_client_object is None) or (cls._last_key != key):
131 global_client_object = KeepClient()
133 return global_client_object
136 def get(locator, **kwargs):
137 return Keep.global_client_object().get(locator, **kwargs)
140 def put(data, **kwargs):
141 return Keep.global_client_object().put(data, **kwargs)
143 class KeepBlockCache(object):
144 # Default RAM cache is 256MiB
145 def __init__(self, cache_max=(256 * 1024 * 1024)):
146 self.cache_max = cache_max
148 self._cache_lock = threading.Lock()
150 class CacheSlot(object):
151 __slots__ = ("locator", "ready", "content")
153 def __init__(self, locator):
154 self.locator = locator
155 self.ready = threading.Event()
162 def set(self, value):
167 if self.content is None:
170 return len(self.content)
173 '''Cap the cache size to self.cache_max'''
174 with self._cache_lock:
175 # Select all slots except those where ready.is_set() and content is
176 # None (that means there was an error reading the block).
177 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
178 sm = sum([slot.size() for slot in self._cache])
179 while len(self._cache) > 0 and sm > self.cache_max:
180 for i in xrange(len(self._cache)-1, -1, -1):
181 if self._cache[i].ready.is_set():
184 sm = sum([slot.size() for slot in self._cache])
186 def _get(self, locator):
187 # Test if the locator is already in the cache
188 for i in xrange(0, len(self._cache)):
189 if self._cache[i].locator == locator:
192 # move it to the front
194 self._cache.insert(0, n)
198 def get(self, locator):
199 with self._cache_lock:
200 return self._get(locator)
202 def reserve_cache(self, locator):
203 '''Reserve a cache slot for the specified locator,
204 or return the existing slot.'''
205 with self._cache_lock:
206 n = self._get(locator)
210 # Add a new cache slot for the locator
211 n = KeepBlockCache.CacheSlot(locator)
212 self._cache.insert(0, n)
215 class Counter(object):
216 def __init__(self, v=0):
217 self._lk = threading.Lock()
229 class KeepClient(object):
231 # Default Keep server connection timeout: 2 seconds
232 # Default Keep server read timeout: 256 seconds
233 # Default Keep server bandwidth minimum: 32768 bytes per second
234 # Default Keep proxy connection timeout: 20 seconds
235 # Default Keep proxy read timeout: 256 seconds
236 # Default Keep proxy bandwidth minimum: 32768 bytes per second
237 DEFAULT_TIMEOUT = (2, 256, 32768)
238 DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
240 class ThreadLimiter(object):
241 """Limit the number of threads writing to Keep at once.
243 This ensures that only a number of writer threads that could
244 potentially achieve the desired replication level run at once.
245 Once the desired replication level is achieved, queued threads
246 are instructed not to run.
248 Should be used in a "with" block.
250 def __init__(self, want_copies, max_service_replicas):
252 self._want_copies = want_copies
254 self._response = None
255 self._start_lock = threading.Condition()
256 if (not max_service_replicas) or (max_service_replicas >= want_copies):
259 max_threads = math.ceil(float(want_copies) / max_service_replicas)
260 _logger.debug("Limiter max threads is %d", max_threads)
261 self._todo_lock = threading.Semaphore(max_threads)
262 self._done_lock = threading.Lock()
263 self._local = threading.local()
266 self._start_lock.acquire()
267 if getattr(self._local, 'sequence', None) is not None:
268 # If the calling thread has used set_sequence(N), then
269 # we wait here until N other threads have started.
270 while self._started < self._local.sequence:
271 self._start_lock.wait()
272 self._todo_lock.acquire()
274 self._start_lock.notifyAll()
275 self._start_lock.release()
278 def __exit__(self, type, value, traceback):
279 self._todo_lock.release()
281 def set_sequence(self, sequence):
282 self._local.sequence = sequence
284 def shall_i_proceed(self):
286 Return true if the current thread should write to Keep.
287 Return false otherwise.
289 with self._done_lock:
290 return (self._done < self._want_copies)
292 def save_response(self, response_body, replicas_stored):
294 Records a response body (a locator, possibly signed) returned by
295 the Keep server, and the number of replicas it stored.
297 with self._done_lock:
298 self._done += replicas_stored
299 self._response = response_body
302 """Return the body from the response to a PUT request."""
303 with self._done_lock:
304 return self._response
307 """Return the total number of replicas successfully stored."""
308 with self._done_lock:
311 class KeepService(object):
312 """Make requests to a single Keep service, and track results.
314 A KeepService is intended to last long enough to perform one
315 transaction (GET or PUT) against one Keep service. This can
316 involve calling either get() or put() multiple times in order
317 to retry after transient failures. However, calling both get()
318 and put() on a single instance -- or using the same instance
319 to access two different Keep services -- will not produce
326 arvados.errors.HttpError,
329 def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
331 download_counter=None, **headers):
333 self._user_agent_pool = user_agent_pool
334 self._result = {'error': None}
337 self.get_headers = {'Accept': 'application/octet-stream'}
338 self.get_headers.update(headers)
339 self.put_headers = headers
340 self.upload_counter = upload_counter
341 self.download_counter = download_counter
344 """Is it worth attempting a request?"""
348 """Did the request succeed or encounter permanent failure?"""
349 return self._result['error'] == False or not self._usable
351 def last_result(self):
354 def _get_user_agent(self):
356 return self._user_agent_pool.get(False)
360 def _put_user_agent(self, ua):
363 self._user_agent_pool.put(ua, False)
368 def _socket_open(family, socktype, protocol, address=None):
369 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
370 s = socket.socket(family, socktype, protocol)
371 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
372 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
373 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
376 def get(self, locator, method="GET", timeout=None):
377 # locator is a KeepLocator object.
378 url = self.root + str(locator)
379 _logger.debug("Request: %s %s", method, url)
380 curl = self._get_user_agent()
383 with timer.Timer() as t:
385 response_body = cStringIO.StringIO()
386 curl.setopt(pycurl.NOSIGNAL, 1)
387 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
388 curl.setopt(pycurl.URL, url.encode('utf-8'))
389 curl.setopt(pycurl.HTTPHEADER, [
390 '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
391 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
392 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
394 curl.setopt(pycurl.NOBODY, True)
395 self._setcurltimeouts(curl, timeout)
399 except Exception as e:
400 raise arvados.errors.HttpError(0, str(e))
402 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
403 'body': response_body.getvalue(),
404 'headers': self._headers,
408 ok = retry.check_http_response_success(self._result['status_code'])
410 self._result['error'] = arvados.errors.HttpError(
411 self._result['status_code'],
412 self._headers.get('x-status-line', 'Error'))
413 except self.HTTP_ERRORS as e:
417 self._usable = ok != False
418 if self._result.get('status_code', None):
419 # The client worked well enough to get an HTTP status
420 # code, so presumably any problems are just on the
421 # server side and it's OK to reuse the client.
422 self._put_user_agent(curl)
424 # Don't return this client to the pool, in case it's
428 _logger.debug("Request fail: GET %s => %s: %s",
429 url, type(self._result['error']), str(self._result['error']))
432 _logger.info("HEAD %s: %s bytes",
433 self._result['status_code'],
434 self._result.get('content-length'))
437 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
438 self._result['status_code'],
439 len(self._result['body']),
441 (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
443 if self.download_counter:
444 self.download_counter.add(len(self._result['body']))
445 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
446 if resp_md5 != locator.md5sum:
447 _logger.warning("Checksum fail: md5(%s) = %s",
449 self._result['error'] = arvados.errors.HttpError(
452 return self._result['body']
454 def put(self, hash_s, body, timeout=None):
455 url = self.root + hash_s
456 _logger.debug("Request: PUT %s", url)
457 curl = self._get_user_agent()
460 with timer.Timer() as t:
462 body_reader = cStringIO.StringIO(body)
463 response_body = cStringIO.StringIO()
464 curl.setopt(pycurl.NOSIGNAL, 1)
465 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
466 curl.setopt(pycurl.URL, url.encode('utf-8'))
467 # Using UPLOAD tells cURL to wait for a "go ahead" from the
468 # Keep server (in the form of a HTTP/1.1 "100 Continue"
469 # response) instead of sending the request body immediately.
470 # This allows the server to reject the request if the request
471 # is invalid or the server is read-only, without waiting for
472 # the client to send the entire block.
473 curl.setopt(pycurl.UPLOAD, True)
474 curl.setopt(pycurl.INFILESIZE, len(body))
475 curl.setopt(pycurl.READFUNCTION, body_reader.read)
476 curl.setopt(pycurl.HTTPHEADER, [
477 '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
478 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
479 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
480 self._setcurltimeouts(curl, timeout)
483 except Exception as e:
484 raise arvados.errors.HttpError(0, str(e))
486 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
487 'body': response_body.getvalue(),
488 'headers': self._headers,
491 ok = retry.check_http_response_success(self._result['status_code'])
493 self._result['error'] = arvados.errors.HttpError(
494 self._result['status_code'],
495 self._headers.get('x-status-line', 'Error'))
496 except self.HTTP_ERRORS as e:
500 self._usable = ok != False # still usable if ok is True or None
501 if self._result.get('status_code', None):
502 # Client is functional. See comment in get().
503 self._put_user_agent(curl)
507 _logger.debug("Request fail: PUT %s => %s: %s",
508 url, type(self._result['error']), str(self._result['error']))
510 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
511 self._result['status_code'],
514 (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
515 if self.upload_counter:
516 self.upload_counter.add(len(body))
519 def _setcurltimeouts(self, curl, timeouts):
522 elif isinstance(timeouts, tuple):
523 if len(timeouts) == 2:
524 conn_t, xfer_t = timeouts
525 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
527 conn_t, xfer_t, bandwidth_bps = timeouts
529 conn_t, xfer_t = (timeouts, timeouts)
530 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
531 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
532 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
533 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
535 def _headerfunction(self, header_line):
536 header_line = header_line.decode('iso-8859-1')
537 if ':' in header_line:
538 name, value = header_line.split(':', 1)
539 name = name.strip().lower()
540 value = value.strip()
542 name = self._lastheadername
543 value = self._headers[name] + ' ' + header_line.strip()
544 elif header_line.startswith('HTTP/'):
545 name = 'x-status-line'
548 _logger.error("Unexpected header line: %s", header_line)
550 self._lastheadername = name
551 self._headers[name] = value
552 # Returning None implies all bytes were written
555 class KeepWriterThread(threading.Thread):
557 Write a blob of data to the given Keep server. On success, call
558 save_response() of the given ThreadLimiter to save the returned
561 def __init__(self, keep_service, **kwargs):
562 super(KeepClient.KeepWriterThread, self).__init__()
563 self.service = keep_service
565 self._success = False
571 limiter = self.args['thread_limiter']
572 sequence = self.args['thread_sequence']
573 if sequence is not None:
574 limiter.set_sequence(sequence)
576 if not limiter.shall_i_proceed():
577 # My turn arrived, but the job has been done without
580 self.run_with_limiter(limiter)
582 def run_with_limiter(self, limiter):
583 if self.service.finished():
585 _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
586 str(threading.current_thread()),
587 self.args['data_hash'],
588 len(self.args['data']),
589 self.args['service_root'])
590 self._success = bool(self.service.put(
591 self.args['data_hash'],
593 timeout=self.args.get('timeout', None)))
594 result = self.service.last_result()
596 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
597 str(threading.current_thread()),
598 self.args['data_hash'],
599 len(self.args['data']),
600 self.args['service_root'])
601 # Tick the 'done' counter for the number of replica
602 # reported stored by the server, for the case that
603 # we're talking to a proxy or other backend that
604 # stores to multiple copies for us.
606 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
607 except (KeyError, ValueError):
609 limiter.save_response(result['body'].strip(), replicas_stored)
610 elif result.get('status_code', None):
611 _logger.debug("Request fail: PUT %s => %s %s",
612 self.args['data_hash'],
613 result['status_code'],
617 def __init__(self, api_client=None, proxy=None,
618 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
619 api_token=None, local_store=None, block_cache=None,
620 num_retries=0, session=None):
621 """Initialize a new KeepClient.
625 The API client to use to find Keep services. If not
626 provided, KeepClient will build one from available Arvados
630 If specified, this KeepClient will send requests to this Keep
631 proxy. Otherwise, KeepClient will fall back to the setting of the
632 ARVADOS_KEEP_PROXY configuration setting. If you want to ensure
633 KeepClient does not use a proxy, pass in an empty string.
636 The initial timeout (in seconds) for HTTP requests to Keep
637 non-proxy servers. A tuple of three floats is interpreted as
638 (connection_timeout, read_timeout, minimum_bandwidth). A connection
639 will be aborted if the average traffic rate falls below
640 minimum_bandwidth bytes per second over an interval of read_timeout
641 seconds. Because timeouts are often a result of transient server
642 load, the actual connection timeout will be increased by a factor
643 of two on each retry.
644 Default: (2, 256, 32768).
647 The initial timeout (in seconds) for HTTP requests to
648 Keep proxies. A tuple of three floats is interpreted as
649 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
650 described above for adjusting connection timeouts on retry also
652 Default: (20, 256, 32768).
655 If you're not using an API client, but only talking
656 directly to a Keep proxy, this parameter specifies an API token
657 to authenticate Keep requests. It is an error to specify both
658 api_client and api_token. If you specify neither, KeepClient
659 will use one available from the Arvados configuration.
662 If specified, this KeepClient will bypass Keep
663 services, and save data to the named directory. If unspecified,
664 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
665 environment variable. If you want to ensure KeepClient does not
666 use local storage, pass in an empty string. This is primarily
667 intended to mock a server for testing.
670 The default number of times to retry failed requests.
671 This will be used as the default num_retries value when get() and
672 put() are called. Default 0.
674 self.lock = threading.Lock()
676 proxy = config.get('ARVADOS_KEEP_PROXY')
677 if api_token is None:
678 if api_client is None:
679 api_token = config.get('ARVADOS_API_TOKEN')
681 api_token = api_client.api_token
682 elif api_client is not None:
684 "can't build KeepClient with both API client and token")
685 if local_store is None:
686 local_store = os.environ.get('KEEP_LOCAL_STORE')
688 self.block_cache = block_cache if block_cache else KeepBlockCache()
689 self.timeout = timeout
690 self.proxy_timeout = proxy_timeout
691 self._user_agent_pool = Queue.LifoQueue()
692 self.upload_counter = Counter()
693 self.download_counter = Counter()
694 self.put_counter = Counter()
695 self.get_counter = Counter()
696 self.hits_counter = Counter()
697 self.misses_counter = Counter()
700 self.local_store = local_store
701 self.get = self.local_store_get
702 self.put = self.local_store_put
704 self.num_retries = num_retries
705 self.max_replicas_per_service = None
707 if not proxy.endswith('/'):
709 self.api_token = api_token
710 self._gateway_services = {}
711 self._keep_services = [{
713 'service_type': 'proxy',
714 '_service_root': proxy,
716 self._writable_services = self._keep_services
717 self.using_proxy = True
718 self._static_services_list = True
720 # It's important to avoid instantiating an API client
721 # unless we actually need one, for testing's sake.
722 if api_client is None:
723 api_client = arvados.api('v1')
724 self.api_client = api_client
725 self.api_token = api_client.api_token
726 self._gateway_services = {}
727 self._keep_services = None
728 self._writable_services = None
729 self.using_proxy = None
730 self._static_services_list = False
732 def current_timeout(self, attempt_number):
733 """Return the appropriate timeout to use for this client.
735 The proxy timeout setting if the backend service is currently a proxy,
736 the regular timeout setting otherwise. The `attempt_number` indicates
737 how many times the operation has been tried already (starting from 0
738 for the first try), and scales the connection timeout portion of the
739 return value accordingly.
742 # TODO(twp): the timeout should be a property of a
743 # KeepService, not a KeepClient. See #4488.
744 t = self.proxy_timeout if self.using_proxy else self.timeout
746 return (t[0] * (1 << attempt_number), t[1])
748 return (t[0] * (1 << attempt_number), t[1], t[2])
749 def _any_nondisk_services(self, service_list):
750 return any(ks.get('service_type', 'disk') != 'disk'
751 for ks in service_list)
753 def build_services_list(self, force_rebuild=False):
754 if (self._static_services_list or
755 (self._keep_services and not force_rebuild)):
759 keep_services = self.api_client.keep_services().accessible()
760 except Exception: # API server predates Keep services.
761 keep_services = self.api_client.keep_disks().list()
763 # Gateway services are only used when specified by UUID,
764 # so there's nothing to gain by filtering them by
766 self._gateway_services = {ks['uuid']: ks for ks in
767 keep_services.execute()['items']}
768 if not self._gateway_services:
769 raise arvados.errors.NoKeepServersError()
771 # Precompute the base URI for each service.
772 for r in self._gateway_services.itervalues():
773 host = r['service_host']
774 if not host.startswith('[') and host.find(':') >= 0:
775 # IPv6 URIs must be formatted like http://[::1]:80/...
776 host = '[' + host + ']'
777 r['_service_root'] = "{}://{}:{:d}/".format(
778 'https' if r['service_ssl_flag'] else 'http',
782 _logger.debug(str(self._gateway_services))
783 self._keep_services = [
784 ks for ks in self._gateway_services.itervalues()
785 if not ks.get('service_type', '').startswith('gateway:')]
786 self._writable_services = [ks for ks in self._keep_services
787 if not ks.get('read_only')]
789 # For disk type services, max_replicas_per_service is 1
790 # It is unknown (unlimited) for other service types.
791 if self._any_nondisk_services(self._writable_services):
792 self.max_replicas_per_service = None
794 self.max_replicas_per_service = 1
796 def _service_weight(self, data_hash, service_uuid):
797 """Compute the weight of a Keep service endpoint for a data
798 block with a known hash.
800 The weight is md5(h + u) where u is the last 15 characters of
801 the service endpoint's UUID.
803 return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
805 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
806 """Return an array of Keep service endpoints, in the order in
807 which they should be probed when reading or writing data with
808 the given hash+hints.
810 self.build_services_list(force_rebuild)
813 # Use the services indicated by the given +K@... remote
814 # service hints, if any are present and can be resolved to a
816 for hint in locator.hints:
817 if hint.startswith('K@'):
820 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
821 elif len(hint) == 29:
822 svc = self._gateway_services.get(hint[2:])
824 sorted_roots.append(svc['_service_root'])
826 # Sort the available local services by weight (heaviest first)
827 # for this locator, and return their service_roots (base URIs)
829 use_services = self._keep_services
831 use_services = self._writable_services
832 self.using_proxy = self._any_nondisk_services(use_services)
833 sorted_roots.extend([
834 svc['_service_root'] for svc in sorted(
837 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
838 _logger.debug("{}: {}".format(locator, sorted_roots))
841 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
842 # roots_map is a dictionary, mapping Keep service root strings
843 # to KeepService objects. Poll for Keep services, and add any
844 # new ones to roots_map. Return the current list of local
846 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
847 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
848 for root in local_roots:
849 if root not in roots_map:
850 roots_map[root] = self.KeepService(
851 root, self._user_agent_pool,
852 upload_counter=self.upload_counter,
853 download_counter=self.download_counter,
858 def _check_loop_result(result):
859 # KeepClient RetryLoops should save results as a 2-tuple: the
860 # actual result of the request, and the number of servers available
861 # to receive the request this round.
862 # This method returns True if there's a real result, False if
863 # there are no more servers available, otherwise None.
864 if isinstance(result, Exception):
866 result, tried_server_count = result
867 if (result is not None) and (result is not False):
869 elif tried_server_count < 1:
870 _logger.info("No more Keep services to try; giving up")
875 def get_from_cache(self, loc):
876 """Fetch a block only if is in the cache, otherwise return None."""
877 slot = self.block_cache.get(loc)
878 if slot is not None and slot.ready.is_set():
884 def head(self, loc_s, num_retries=None):
885 return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
888 def get(self, loc_s, num_retries=None):
889 return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
891 def _get_or_head(self, loc_s, method="GET", num_retries=None):
892 """Get data from Keep.
894 This method fetches one or more blocks of data from Keep. It
895 sends a request each Keep service registered with the API
896 server (or the proxy provided when this client was
897 instantiated), then each service named in location hints, in
898 sequence. As soon as one service provides the data, it's
902 * loc_s: A string of one or more comma-separated locators to fetch.
903 This method returns the concatenation of these blocks.
904 * num_retries: The number of times to retry GET requests to
905 *each* Keep server if it returns temporary failures, with
906 exponential backoff. Note that, in each loop, the method may try
907 to fetch data from every available Keep service, along with any
908 that are named in location hints in the locator. The default value
909 is set when the KeepClient is initialized.
912 return ''.join(self.get(x) for x in loc_s.split(','))
914 self.get_counter.add(1)
916 locator = KeepLocator(loc_s)
918 slot, first = self.block_cache.reserve_cache(locator.md5sum)
920 self.hits_counter.add(1)
924 self.misses_counter.add(1)
926 # If the locator has hints specifying a prefix (indicating a
927 # remote keepproxy) or the UUID of a local gateway service,
928 # read data from the indicated service(s) instead of the usual
929 # list of local disk services.
930 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
931 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
932 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
933 for hint in locator.hints if (
934 hint.startswith('K@') and
936 self._gateway_services.get(hint[2:])
938 # Map root URLs to their KeepService objects.
940 root: self.KeepService(root, self._user_agent_pool,
941 upload_counter=self.upload_counter,
942 download_counter=self.download_counter)
943 for root in hint_roots
946 # See #3147 for a discussion of the loop implementation. Highlights:
947 # * Refresh the list of Keep services after each failure, in case
948 # it's being updated.
949 # * Retry until we succeed, we're out of retries, or every available
950 # service has returned permanent failure.
954 loop = retry.RetryLoop(num_retries, self._check_loop_result,
956 for tries_left in loop:
958 sorted_roots = self.map_new_services(
960 force_rebuild=(tries_left < num_retries),
962 except Exception as error:
963 loop.save_result(error)
966 # Query KeepService objects that haven't returned
967 # permanent failure, in our specified shuffle order.
968 services_to_try = [roots_map[root]
969 for root in sorted_roots
970 if roots_map[root].usable()]
971 for keep_service in services_to_try:
972 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
975 loop.save_result((blob, len(services_to_try)))
977 # Always cache the result, then return it if we succeeded.
980 self.block_cache.cap_cache()
987 # Q: Including 403 is necessary for the Keep tests to continue
988 # passing, but maybe they should expect KeepReadError instead?
989 not_founds = sum(1 for key in sorted_roots
990 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
991 service_errors = ((key, roots_map[key].last_result()['error'])
992 for key in sorted_roots)
994 raise arvados.errors.KeepReadError(
995 "failed to read {}: no Keep services available ({})".format(
996 loc_s, loop.last_result()))
997 elif not_founds == len(sorted_roots):
998 raise arvados.errors.NotFoundError(
999 "{} not found".format(loc_s), service_errors)
1001 raise arvados.errors.KeepReadError(
1002 "failed to read {}".format(loc_s), service_errors, label="service")
1005 def put(self, data, copies=2, num_retries=None):
1006 """Save data in Keep.
1008 This method will get a list of Keep services from the API server, and
1009 send the data to each one simultaneously in a new thread. Once the
1010 uploads are finished, if enough copies are saved, this method returns
1011 the most recent HTTP response body. If requests fail to upload
1012 enough copies, this method raises KeepWriteError.
1015 * data: The string of data to upload.
1016 * copies: The number of copies that the user requires be saved.
1018 * num_retries: The number of times to retry PUT requests to
1019 *each* Keep server if it returns temporary failures, with
1020 exponential backoff. The default value is set when the
1021 KeepClient is initialized.
1024 if isinstance(data, unicode):
1025 data = data.encode("ascii")
1026 elif not isinstance(data, str):
1027 raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
1029 self.put_counter.add(1)
1031 data_hash = hashlib.md5(data).hexdigest()
1032 loc_s = data_hash + '+' + str(len(data))
1035 locator = KeepLocator(loc_s)
1038 # Tell the proxy how many copies we want it to store
1039 headers['X-Keep-Desired-Replication'] = str(copies)
1041 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1044 for tries_left in loop:
1046 sorted_roots = self.map_new_services(
1048 force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
1049 except Exception as error:
1050 loop.save_result(error)
1053 thread_limiter = KeepClient.ThreadLimiter(
1054 copies - done, self.max_replicas_per_service)
1056 for service_root, ks in [(root, roots_map[root])
1057 for root in sorted_roots]:
1060 t = KeepClient.KeepWriterThread(
1063 data_hash=data_hash,
1064 service_root=service_root,
1065 thread_limiter=thread_limiter,
1066 timeout=self.current_timeout(num_retries-tries_left),
1067 thread_sequence=len(threads))
1072 done += thread_limiter.done()
1073 loop.save_result((done >= copies, len(threads)))
1076 return thread_limiter.response()
1078 raise arvados.errors.KeepWriteError(
1079 "failed to write {}: no Keep services available ({})".format(
1080 data_hash, loop.last_result()))
1082 service_errors = ((key, roots_map[key].last_result()['error'])
1083 for key in sorted_roots
1084 if roots_map[key].last_result()['error'])
1085 raise arvados.errors.KeepWriteError(
1086 "failed to write {} (wanted {} copies but wrote {})".format(
1087 data_hash, copies, thread_limiter.done()), service_errors, label="service")
1089 def local_store_put(self, data, copies=1, num_retries=None):
1090 """A stub for put().
1092 This method is used in place of the real put() method when
1093 using local storage (see constructor's local_store argument).
1095 copies and num_retries arguments are ignored: they are here
1096 only for the sake of offering the same call signature as
1099 Data stored this way can be retrieved via local_store_get().
1101 md5 = hashlib.md5(data).hexdigest()
1102 locator = '%s+%d' % (md5, len(data))
1103 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1105 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1106 os.path.join(self.local_store, md5))
1109 def local_store_get(self, loc_s, num_retries=None):
1110 """Companion to local_store_put()."""
1112 locator = KeepLocator(loc_s)
1114 raise arvados.errors.NotFoundError(
1115 "Invalid data locator: '%s'" % loc_s)
1116 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1118 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1121 def is_cached(self, locator):
1122 return self.block_cache.reserve_cache(expect_hash)