16 import arvados.config as config
18 import arvados.retry as retry
21 _logger = logging.getLogger('arvados.keep')
22 global_client_object = None
25 class KeepLocator(object):
26 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
27 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
29 def __init__(self, locator_str):
32 self._perm_expiry = None
33 pieces = iter(locator_str.split('+'))
34 self.md5sum = next(pieces)
36 self.size = int(next(pieces))
40 if self.HINT_RE.match(hint) is None:
41 raise ValueError("invalid hint format: {}".format(hint))
42 elif hint.startswith('A'):
43 self.parse_permission_hint(hint)
45 self.hints.append(hint)
49 str(s) for s in [self.md5sum, self.size,
50 self.permission_hint()] + self.hints
54 if self.size is not None:
55 return "%s+%i" % (self.md5sum, self.size)
59 def _make_hex_prop(name, length):
60 # Build and return a new property with the given name that
61 # must be a hex string of the given length.
62 data_name = '_{}'.format(name)
64 return getattr(self, data_name)
65 def setter(self, hex_str):
66 if not arvados.util.is_hex(hex_str, length):
67 raise ValueError("{} is not a {}-digit hex string: {}".
68 format(name, length, hex_str))
69 setattr(self, data_name, hex_str)
70 return property(getter, setter)
72 md5sum = _make_hex_prop('md5sum', 32)
73 perm_sig = _make_hex_prop('perm_sig', 40)
76 def perm_expiry(self):
77 return self._perm_expiry
80 def perm_expiry(self, value):
81 if not arvados.util.is_hex(value, 1, 8):
83 "permission timestamp must be a hex Unix timestamp: {}".
85 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
87 def permission_hint(self):
88 data = [self.perm_sig, self.perm_expiry]
91 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
92 return "A{}@{:08x}".format(*data)
94 def parse_permission_hint(self, s):
96 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
98 raise ValueError("bad permission hint {}".format(s))
100 def permission_expired(self, as_of_dt=None):
101 if self.perm_expiry is None:
103 elif as_of_dt is None:
104 as_of_dt = datetime.datetime.now()
105 return self.perm_expiry <= as_of_dt
109 """Simple interface to a global KeepClient object.
111 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
112 own API client. The global KeepClient will build an API client from the
113 current Arvados configuration, which may not match the one you built.
118 def global_client_object(cls):
119 global global_client_object
120 # Previously, KeepClient would change its behavior at runtime based
121 # on these configuration settings. We simulate that behavior here
122 # by checking the values and returning a new KeepClient if any of
124 key = (config.get('ARVADOS_API_HOST'),
125 config.get('ARVADOS_API_TOKEN'),
126 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
127 config.get('ARVADOS_KEEP_PROXY'),
128 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
129 os.environ.get('KEEP_LOCAL_STORE'))
130 if (global_client_object is None) or (cls._last_key != key):
131 global_client_object = KeepClient()
133 return global_client_object
136 def get(locator, **kwargs):
137 return Keep.global_client_object().get(locator, **kwargs)
140 def put(data, **kwargs):
141 return Keep.global_client_object().put(data, **kwargs)
143 class KeepBlockCache(object):
144 # Default RAM cache is 256MiB
145 def __init__(self, cache_max=(256 * 1024 * 1024)):
146 self.cache_max = cache_max
148 self._cache_lock = threading.Lock()
150 class CacheSlot(object):
151 def __init__(self, locator):
152 self.locator = locator
153 self.ready = threading.Event()
160 def set(self, value):
165 if self.content is None:
168 return len(self.content)
171 '''Cap the cache size to self.cache_max'''
172 with self._cache_lock:
173 # Select all slots except those where ready.is_set() and content is
174 # None (that means there was an error reading the block).
175 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
176 sm = sum([slot.size() for slot in self._cache])
177 while len(self._cache) > 0 and sm > self.cache_max:
178 for i in xrange(len(self._cache)-1, -1, -1):
179 if self._cache[i].ready.is_set():
182 sm = sum([slot.size() for slot in self._cache])
184 def _get(self, locator):
185 # Test if the locator is already in the cache
186 for i in xrange(0, len(self._cache)):
187 if self._cache[i].locator == locator:
190 # move it to the front
192 self._cache.insert(0, n)
196 def get(self, locator):
197 with self._cache_lock:
198 return self._get(locator)
200 def reserve_cache(self, locator):
201 '''Reserve a cache slot for the specified locator,
202 or return the existing slot.'''
203 with self._cache_lock:
204 n = self._get(locator)
208 # Add a new cache slot for the locator
209 n = KeepBlockCache.CacheSlot(locator)
210 self._cache.insert(0, n)
214 class Counter(object):
215 def __init__(self, v=0):
216 self._lk = threading.Lock()
228 class KeepClient(object):
230 # Default Keep server connection timeout: 2 seconds
231 # Default Keep server read timeout: 64 seconds
232 # Default Keep server bandwidth minimum: 32768 bytes per second
233 # Default Keep proxy connection timeout: 20 seconds
234 # Default Keep proxy read timeout: 64 seconds
235 # Default Keep proxy bandwidth minimum: 32768 bytes per second
236 DEFAULT_TIMEOUT = (2, 64, 32768)
237 DEFAULT_PROXY_TIMEOUT = (20, 64, 32768)
239 class ThreadLimiter(object):
240 """Limit the number of threads writing to Keep at once.
242 This ensures that only a number of writer threads that could
243 potentially achieve the desired replication level run at once.
244 Once the desired replication level is achieved, queued threads
245 are instructed not to run.
247 Should be used in a "with" block.
249 def __init__(self, want_copies, max_service_replicas):
251 self._want_copies = want_copies
253 self._response = None
254 self._start_lock = threading.Condition()
255 if (not max_service_replicas) or (max_service_replicas >= want_copies):
258 max_threads = math.ceil(float(want_copies) / max_service_replicas)
259 _logger.debug("Limiter max threads is %d", max_threads)
260 self._todo_lock = threading.Semaphore(max_threads)
261 self._done_lock = threading.Lock()
262 self._local = threading.local()
265 self._start_lock.acquire()
266 if getattr(self._local, 'sequence', None) is not None:
267 # If the calling thread has used set_sequence(N), then
268 # we wait here until N other threads have started.
269 while self._started < self._local.sequence:
270 self._start_lock.wait()
271 self._todo_lock.acquire()
273 self._start_lock.notifyAll()
274 self._start_lock.release()
277 def __exit__(self, type, value, traceback):
278 self._todo_lock.release()
280 def set_sequence(self, sequence):
281 self._local.sequence = sequence
283 def shall_i_proceed(self):
285 Return true if the current thread should write to Keep.
286 Return false otherwise.
288 with self._done_lock:
289 return (self._done < self._want_copies)
291 def save_response(self, response_body, replicas_stored):
293 Records a response body (a locator, possibly signed) returned by
294 the Keep server, and the number of replicas it stored.
296 with self._done_lock:
297 self._done += replicas_stored
298 self._response = response_body
301 """Return the body from the response to a PUT request."""
302 with self._done_lock:
303 return self._response
306 """Return the total number of replicas successfully stored."""
307 with self._done_lock:
310 class KeepService(object):
311 """Make requests to a single Keep service, and track results.
313 A KeepService is intended to last long enough to perform one
314 transaction (GET or PUT) against one Keep service. This can
315 involve calling either get() or put() multiple times in order
316 to retry after transient failures. However, calling both get()
317 and put() on a single instance -- or using the same instance
318 to access two different Keep services -- will not produce
325 arvados.errors.HttpError,
328 def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
330 download_counter=None, **headers):
332 self._user_agent_pool = user_agent_pool
333 self._result = {'error': None}
336 self.get_headers = {'Accept': 'application/octet-stream'}
337 self.get_headers.update(headers)
338 self.put_headers = headers
339 self.upload_counter = upload_counter
340 self.download_counter = download_counter
343 """Is it worth attempting a request?"""
347 """Did the request succeed or encounter permanent failure?"""
348 return self._result['error'] == False or not self._usable
350 def last_result(self):
353 def _get_user_agent(self):
355 return self._user_agent_pool.get(False)
359 def _put_user_agent(self, ua):
362 self._user_agent_pool.put(ua, False)
367 def _socket_open(family, socktype, protocol, address=None):
368 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
369 s = socket.socket(family, socktype, protocol)
370 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
371 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
372 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
375 def get(self, locator, timeout=None):
376 # locator is a KeepLocator object.
377 url = self.root + str(locator)
378 _logger.debug("Request: GET %s", url)
379 curl = self._get_user_agent()
381 with timer.Timer() as t:
383 response_body = cStringIO.StringIO()
384 curl.setopt(pycurl.NOSIGNAL, 1)
385 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
386 curl.setopt(pycurl.URL, url.encode('utf-8'))
387 curl.setopt(pycurl.HTTPHEADER, [
388 '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
389 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
390 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
391 self._setcurltimeouts(curl, timeout)
394 except Exception as e:
395 raise arvados.errors.HttpError(0, str(e))
397 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
398 'body': response_body.getvalue(),
399 'headers': self._headers,
402 ok = retry.check_http_response_success(self._result['status_code'])
404 self._result['error'] = arvados.errors.HttpError(
405 self._result['status_code'],
406 self._headers.get('x-status-line', 'Error'))
407 except self.HTTP_ERRORS as e:
412 self._usable = ok != False
413 if self._result.get('status_code', None):
414 # The client worked well enough to get an HTTP status
415 # code, so presumably any problems are just on the
416 # server side and it's OK to reuse the client.
417 self._put_user_agent(curl)
419 # Don't return this client to the pool, in case it's
423 _logger.debug("Request fail: GET %s => %s: %s",
424 url, type(self._result['error']), str(self._result['error']))
426 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
427 self._result['status_code'],
428 len(self._result['body']),
430 (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
431 if self.download_counter:
432 self.download_counter.add(len(self._result['body']))
433 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
434 if resp_md5 != locator.md5sum:
435 _logger.warning("Checksum fail: md5(%s) = %s",
437 self._result['error'] = arvados.errors.HttpError(
440 return self._result['body']
442 def put(self, hash_s, body, timeout=None):
443 url = self.root + hash_s
444 _logger.debug("Request: PUT %s", url)
445 curl = self._get_user_agent()
447 with timer.Timer() as t:
449 body_reader = cStringIO.StringIO(body)
450 response_body = cStringIO.StringIO()
451 curl.setopt(pycurl.NOSIGNAL, 1)
452 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
453 curl.setopt(pycurl.URL, url.encode('utf-8'))
454 # Using UPLOAD tells cURL to wait for a "go ahead" from the
455 # Keep server (in the form of a HTTP/1.1 "100 Continue"
456 # response) instead of sending the request body immediately.
457 # This allows the server to reject the request if the request
458 # is invalid or the server is read-only, without waiting for
459 # the client to send the entire block.
460 curl.setopt(pycurl.UPLOAD, True)
461 curl.setopt(pycurl.INFILESIZE, len(body))
462 curl.setopt(pycurl.READFUNCTION, body_reader.read)
463 curl.setopt(pycurl.HTTPHEADER, [
464 '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
465 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
466 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
467 self._setcurltimeouts(curl, timeout)
470 except Exception as e:
471 raise arvados.errors.HttpError(0, str(e))
473 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
474 'body': response_body.getvalue(),
475 'headers': self._headers,
478 ok = retry.check_http_response_success(self._result['status_code'])
480 self._result['error'] = arvados.errors.HttpError(
481 self._result['status_code'],
482 self._headers.get('x-status-line', 'Error'))
483 except self.HTTP_ERRORS as e:
488 self._usable = ok != False # still usable if ok is True or None
489 if self._result.get('status_code', None):
490 # Client is functional. See comment in get().
491 self._put_user_agent(curl)
495 _logger.debug("Request fail: PUT %s => %s: %s",
496 url, type(self._result['error']), str(self._result['error']))
498 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
499 self._result['status_code'],
502 (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
503 if self.upload_counter:
504 self.upload_counter.add(len(body))
507 def _setcurltimeouts(self, curl, timeouts):
510 elif isinstance(timeouts, tuple):
511 if len(timeouts) == 2:
512 conn_t, xfer_t = timeouts
513 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
515 conn_t, xfer_t, bandwidth_bps = timeouts
517 conn_t, xfer_t = (timeouts, timeouts)
518 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
519 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
520 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
521 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
523 def _headerfunction(self, header_line):
524 header_line = header_line.decode('iso-8859-1')
525 if ':' in header_line:
526 name, value = header_line.split(':', 1)
527 name = name.strip().lower()
528 value = value.strip()
530 name = self._lastheadername
531 value = self._headers[name] + ' ' + header_line.strip()
532 elif header_line.startswith('HTTP/'):
533 name = 'x-status-line'
536 _logger.error("Unexpected header line: %s", header_line)
538 self._lastheadername = name
539 self._headers[name] = value
540 # Returning None implies all bytes were written
543 class KeepWriterThread(threading.Thread):
545 Write a blob of data to the given Keep server. On success, call
546 save_response() of the given ThreadLimiter to save the returned
549 def __init__(self, keep_service, **kwargs):
550 super(KeepClient.KeepWriterThread, self).__init__()
551 self.service = keep_service
553 self._success = False
559 limiter = self.args['thread_limiter']
560 sequence = self.args['thread_sequence']
561 if sequence is not None:
562 limiter.set_sequence(sequence)
564 if not limiter.shall_i_proceed():
565 # My turn arrived, but the job has been done without
568 self.run_with_limiter(limiter)
570 def run_with_limiter(self, limiter):
571 if self.service.finished():
573 _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
574 str(threading.current_thread()),
575 self.args['data_hash'],
576 len(self.args['data']),
577 self.args['service_root'])
578 self._success = bool(self.service.put(
579 self.args['data_hash'],
581 timeout=self.args.get('timeout', None)))
582 result = self.service.last_result()
584 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
585 str(threading.current_thread()),
586 self.args['data_hash'],
587 len(self.args['data']),
588 self.args['service_root'])
589 # Tick the 'done' counter for the number of replica
590 # reported stored by the server, for the case that
591 # we're talking to a proxy or other backend that
592 # stores to multiple copies for us.
594 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
595 except (KeyError, ValueError):
597 limiter.save_response(result['body'].strip(), replicas_stored)
598 elif result.get('status_code', None):
599 _logger.debug("Request fail: PUT %s => %s %s",
600 self.args['data_hash'],
601 result['status_code'],
605 def __init__(self, api_client=None, proxy=None,
606 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
607 api_token=None, local_store=None, block_cache=None,
608 num_retries=0, session=None):
609 """Initialize a new KeepClient.
613 The API client to use to find Keep services. If not
614 provided, KeepClient will build one from available Arvados
618 If specified, this KeepClient will send requests to this Keep
619 proxy. Otherwise, KeepClient will fall back to the setting of the
620 ARVADOS_KEEP_PROXY configuration setting. If you want to ensure
621 KeepClient does not use a proxy, pass in an empty string.
624 The initial timeout (in seconds) for HTTP requests to Keep
625 non-proxy servers. A tuple of three floats is interpreted as
626 (connection_timeout, read_timeout, minimum_bandwidth). A connection
627 will be aborted if the average traffic rate falls below
628 minimum_bandwidth bytes per second over an interval of read_timeout
629 seconds. Because timeouts are often a result of transient server
630 load, the actual connection timeout will be increased by a factor
631 of two on each retry.
632 Default: (2, 64, 32768).
635 The initial timeout (in seconds) for HTTP requests to
636 Keep proxies. A tuple of three floats is interpreted as
637 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
638 described above for adjusting connection timeouts on retry also
640 Default: (20, 64, 32768).
643 If you're not using an API client, but only talking
644 directly to a Keep proxy, this parameter specifies an API token
645 to authenticate Keep requests. It is an error to specify both
646 api_client and api_token. If you specify neither, KeepClient
647 will use one available from the Arvados configuration.
650 If specified, this KeepClient will bypass Keep
651 services, and save data to the named directory. If unspecified,
652 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
653 environment variable. If you want to ensure KeepClient does not
654 use local storage, pass in an empty string. This is primarily
655 intended to mock a server for testing.
658 The default number of times to retry failed requests.
659 This will be used as the default num_retries value when get() and
660 put() are called. Default 0.
662 self.lock = threading.Lock()
664 proxy = config.get('ARVADOS_KEEP_PROXY')
665 if api_token is None:
666 if api_client is None:
667 api_token = config.get('ARVADOS_API_TOKEN')
669 api_token = api_client.api_token
670 elif api_client is not None:
672 "can't build KeepClient with both API client and token")
673 if local_store is None:
674 local_store = os.environ.get('KEEP_LOCAL_STORE')
676 self.block_cache = block_cache if block_cache else KeepBlockCache()
677 self.timeout = timeout
678 self.proxy_timeout = proxy_timeout
679 self._user_agent_pool = Queue.LifoQueue()
680 self.upload_counter = Counter()
681 self.download_counter = Counter()
682 self.put_counter = Counter()
683 self.get_counter = Counter()
684 self.hits_counter = Counter()
685 self.misses_counter = Counter()
688 self.local_store = local_store
689 self.get = self.local_store_get
690 self.put = self.local_store_put
692 self.num_retries = num_retries
693 self.max_replicas_per_service = None
695 if not proxy.endswith('/'):
697 self.api_token = api_token
698 self._gateway_services = {}
699 self._keep_services = [{
701 'service_type': 'proxy',
702 '_service_root': proxy,
704 self._writable_services = self._keep_services
705 self.using_proxy = True
706 self._static_services_list = True
708 # It's important to avoid instantiating an API client
709 # unless we actually need one, for testing's sake.
710 if api_client is None:
711 api_client = arvados.api('v1')
712 self.api_client = api_client
713 self.api_token = api_client.api_token
714 self._gateway_services = {}
715 self._keep_services = None
716 self._writable_services = None
717 self.using_proxy = None
718 self._static_services_list = False
720 def current_timeout(self, attempt_number):
721 """Return the appropriate timeout to use for this client.
723 The proxy timeout setting if the backend service is currently a proxy,
724 the regular timeout setting otherwise. The `attempt_number` indicates
725 how many times the operation has been tried already (starting from 0
726 for the first try), and scales the connection timeout portion of the
727 return value accordingly.
730 # TODO(twp): the timeout should be a property of a
731 # KeepService, not a KeepClient. See #4488.
732 t = self.proxy_timeout if self.using_proxy else self.timeout
734 return (t[0] * (1 << attempt_number), t[1])
736 return (t[0] * (1 << attempt_number), t[1], t[2])
737 def _any_nondisk_services(self, service_list):
738 return any(ks.get('service_type', 'disk') != 'disk'
739 for ks in service_list)
741 def build_services_list(self, force_rebuild=False):
742 if (self._static_services_list or
743 (self._keep_services and not force_rebuild)):
747 keep_services = self.api_client.keep_services().accessible()
748 except Exception: # API server predates Keep services.
749 keep_services = self.api_client.keep_disks().list()
751 # Gateway services are only used when specified by UUID,
752 # so there's nothing to gain by filtering them by
754 self._gateway_services = {ks['uuid']: ks for ks in
755 keep_services.execute()['items']}
756 if not self._gateway_services:
757 raise arvados.errors.NoKeepServersError()
759 # Precompute the base URI for each service.
760 for r in self._gateway_services.itervalues():
761 host = r['service_host']
762 if not host.startswith('[') and host.find(':') >= 0:
763 # IPv6 URIs must be formatted like http://[::1]:80/...
764 host = '[' + host + ']'
765 r['_service_root'] = "{}://{}:{:d}/".format(
766 'https' if r['service_ssl_flag'] else 'http',
770 _logger.debug(str(self._gateway_services))
771 self._keep_services = [
772 ks for ks in self._gateway_services.itervalues()
773 if not ks.get('service_type', '').startswith('gateway:')]
774 self._writable_services = [ks for ks in self._keep_services
775 if not ks.get('read_only')]
777 # For disk type services, max_replicas_per_service is 1
778 # It is unknown (unlimited) for other service types.
779 if self._any_nondisk_services(self._writable_services):
780 self.max_replicas_per_service = None
782 self.max_replicas_per_service = 1
784 def _service_weight(self, data_hash, service_uuid):
785 """Compute the weight of a Keep service endpoint for a data
786 block with a known hash.
788 The weight is md5(h + u) where u is the last 15 characters of
789 the service endpoint's UUID.
791 return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
793 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
794 """Return an array of Keep service endpoints, in the order in
795 which they should be probed when reading or writing data with
796 the given hash+hints.
798 self.build_services_list(force_rebuild)
801 # Use the services indicated by the given +K@... remote
802 # service hints, if any are present and can be resolved to a
804 for hint in locator.hints:
805 if hint.startswith('K@'):
808 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
809 elif len(hint) == 29:
810 svc = self._gateway_services.get(hint[2:])
812 sorted_roots.append(svc['_service_root'])
814 # Sort the available local services by weight (heaviest first)
815 # for this locator, and return their service_roots (base URIs)
817 use_services = self._keep_services
819 use_services = self._writable_services
820 self.using_proxy = self._any_nondisk_services(use_services)
821 sorted_roots.extend([
822 svc['_service_root'] for svc in sorted(
825 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
826 _logger.debug("{}: {}".format(locator, sorted_roots))
829 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
830 # roots_map is a dictionary, mapping Keep service root strings
831 # to KeepService objects. Poll for Keep services, and add any
832 # new ones to roots_map. Return the current list of local
834 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
835 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
836 for root in local_roots:
837 if root not in roots_map:
838 roots_map[root] = self.KeepService(
839 root, self._user_agent_pool,
840 upload_counter=self.upload_counter,
841 download_counter=self.download_counter,
846 def _check_loop_result(result):
847 # KeepClient RetryLoops should save results as a 2-tuple: the
848 # actual result of the request, and the number of servers available
849 # to receive the request this round.
850 # This method returns True if there's a real result, False if
851 # there are no more servers available, otherwise None.
852 if isinstance(result, Exception):
854 result, tried_server_count = result
855 if (result is not None) and (result is not False):
857 elif tried_server_count < 1:
858 _logger.info("No more Keep services to try; giving up")
863 def get_from_cache(self, loc):
864 """Fetch a block only if is in the cache, otherwise return None."""
865 slot = self.block_cache.get(loc)
866 if slot is not None and slot.ready.is_set():
872 def get(self, loc_s, num_retries=None):
873 """Get data from Keep.
875 This method fetches one or more blocks of data from Keep. It
876 sends a request each Keep service registered with the API
877 server (or the proxy provided when this client was
878 instantiated), then each service named in location hints, in
879 sequence. As soon as one service provides the data, it's
883 * loc_s: A string of one or more comma-separated locators to fetch.
884 This method returns the concatenation of these blocks.
885 * num_retries: The number of times to retry GET requests to
886 *each* Keep server if it returns temporary failures, with
887 exponential backoff. Note that, in each loop, the method may try
888 to fetch data from every available Keep service, along with any
889 that are named in location hints in the locator. The default value
890 is set when the KeepClient is initialized.
893 return ''.join(self.get(x) for x in loc_s.split(','))
895 self.get_counter.add(1)
897 locator = KeepLocator(loc_s)
898 slot, first = self.block_cache.reserve_cache(locator.md5sum)
900 self.hits_counter.add(1)
904 self.misses_counter.add(1)
906 # If the locator has hints specifying a prefix (indicating a
907 # remote keepproxy) or the UUID of a local gateway service,
908 # read data from the indicated service(s) instead of the usual
909 # list of local disk services.
910 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
911 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
912 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
913 for hint in locator.hints if (
914 hint.startswith('K@') and
916 self._gateway_services.get(hint[2:])
918 # Map root URLs to their KeepService objects.
920 root: self.KeepService(root, self._user_agent_pool,
921 upload_counter=self.upload_counter,
922 download_counter=self.download_counter)
923 for root in hint_roots
926 # See #3147 for a discussion of the loop implementation. Highlights:
927 # * Refresh the list of Keep services after each failure, in case
928 # it's being updated.
929 # * Retry until we succeed, we're out of retries, or every available
930 # service has returned permanent failure.
934 loop = retry.RetryLoop(num_retries, self._check_loop_result,
936 for tries_left in loop:
938 sorted_roots = self.map_new_services(
940 force_rebuild=(tries_left < num_retries),
942 except Exception as error:
943 loop.save_result(error)
946 # Query KeepService objects that haven't returned
947 # permanent failure, in our specified shuffle order.
948 services_to_try = [roots_map[root]
949 for root in sorted_roots
950 if roots_map[root].usable()]
951 for keep_service in services_to_try:
952 blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
955 loop.save_result((blob, len(services_to_try)))
957 # Always cache the result, then return it if we succeeded.
959 self.block_cache.cap_cache()
963 # Q: Including 403 is necessary for the Keep tests to continue
964 # passing, but maybe they should expect KeepReadError instead?
965 not_founds = sum(1 for key in sorted_roots
966 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
967 service_errors = ((key, roots_map[key].last_result()['error'])
968 for key in sorted_roots)
970 raise arvados.errors.KeepReadError(
971 "failed to read {}: no Keep services available ({})".format(
972 loc_s, loop.last_result()))
973 elif not_founds == len(sorted_roots):
974 raise arvados.errors.NotFoundError(
975 "{} not found".format(loc_s), service_errors)
977 raise arvados.errors.KeepReadError(
978 "failed to read {}".format(loc_s), service_errors, label="service")
981 def put(self, data, copies=2, num_retries=None):
982 """Save data in Keep.
984 This method will get a list of Keep services from the API server, and
985 send the data to each one simultaneously in a new thread. Once the
986 uploads are finished, if enough copies are saved, this method returns
987 the most recent HTTP response body. If requests fail to upload
988 enough copies, this method raises KeepWriteError.
991 * data: The string of data to upload.
992 * copies: The number of copies that the user requires be saved.
994 * num_retries: The number of times to retry PUT requests to
995 *each* Keep server if it returns temporary failures, with
996 exponential backoff. The default value is set when the
997 KeepClient is initialized.
1000 if isinstance(data, unicode):
1001 data = data.encode("ascii")
1002 elif not isinstance(data, str):
1003 raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
1005 self.put_counter.add(1)
1007 data_hash = hashlib.md5(data).hexdigest()
1008 loc_s = data_hash + '+' + str(len(data))
1011 locator = KeepLocator(loc_s)
1014 # Tell the proxy how many copies we want it to store
1015 headers['X-Keep-Desired-Replication'] = str(copies)
1017 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1019 for tries_left in loop:
1021 sorted_roots = self.map_new_services(
1023 force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
1024 except Exception as error:
1025 loop.save_result(error)
1028 thread_limiter = KeepClient.ThreadLimiter(
1029 copies, self.max_replicas_per_service)
1031 for service_root, ks in [(root, roots_map[root])
1032 for root in sorted_roots]:
1035 t = KeepClient.KeepWriterThread(
1038 data_hash=data_hash,
1039 service_root=service_root,
1040 thread_limiter=thread_limiter,
1041 timeout=self.current_timeout(num_retries-tries_left),
1042 thread_sequence=len(threads))
1047 loop.save_result((thread_limiter.done() >= copies, len(threads)))
1050 return thread_limiter.response()
1052 raise arvados.errors.KeepWriteError(
1053 "failed to write {}: no Keep services available ({})".format(
1054 data_hash, loop.last_result()))
1056 service_errors = ((key, roots_map[key].last_result()['error'])
1057 for key in sorted_roots
1058 if roots_map[key].last_result()['error'])
1059 raise arvados.errors.KeepWriteError(
1060 "failed to write {} (wanted {} copies but wrote {})".format(
1061 data_hash, copies, thread_limiter.done()), service_errors, label="service")
1063 def local_store_put(self, data, copies=1, num_retries=None):
1064 """A stub for put().
1066 This method is used in place of the real put() method when
1067 using local storage (see constructor's local_store argument).
1069 copies and num_retries arguments are ignored: they are here
1070 only for the sake of offering the same call signature as
1073 Data stored this way can be retrieved via local_store_get().
1075 md5 = hashlib.md5(data).hexdigest()
1076 locator = '%s+%d' % (md5, len(data))
1077 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1079 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1080 os.path.join(self.local_store, md5))
1083 def local_store_get(self, loc_s, num_retries=None):
1084 """Companion to local_store_put()."""
1086 locator = KeepLocator(loc_s)
1088 raise arvados.errors.NotFoundError(
1089 "Invalid data locator: '%s'" % loc_s)
1090 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1092 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1095 def is_cached(self, locator):
1096 return self.block_cache.reserve_cache(expect_hash)