18 import arvados.config as config
20 import arvados.retry as retry
23 _logger = logging.getLogger('arvados.keep')
24 global_client_object = None
27 # Monkey patch TCP constants when not available (apple). Values sourced from:
28 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
29 if sys.platform == 'darwin':
30 if not hasattr(socket, 'TCP_KEEPALIVE'):
31 socket.TCP_KEEPALIVE = 0x010
32 if not hasattr(socket, 'TCP_KEEPINTVL'):
33 socket.TCP_KEEPINTVL = 0x101
34 if not hasattr(socket, 'TCP_KEEPCNT'):
35 socket.TCP_KEEPCNT = 0x102
38 class KeepLocator(object):
39 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
40 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
42 def __init__(self, locator_str):
45 self._perm_expiry = None
46 pieces = iter(locator_str.split('+'))
47 self.md5sum = next(pieces)
49 self.size = int(next(pieces))
53 if self.HINT_RE.match(hint) is None:
54 raise ValueError("invalid hint format: {}".format(hint))
55 elif hint.startswith('A'):
56 self.parse_permission_hint(hint)
58 self.hints.append(hint)
62 str(s) for s in [self.md5sum, self.size,
63 self.permission_hint()] + self.hints
67 if self.size is not None:
68 return "%s+%i" % (self.md5sum, self.size)
72 def _make_hex_prop(name, length):
73 # Build and return a new property with the given name that
74 # must be a hex string of the given length.
75 data_name = '_{}'.format(name)
77 return getattr(self, data_name)
78 def setter(self, hex_str):
79 if not arvados.util.is_hex(hex_str, length):
80 raise ValueError("{} is not a {}-digit hex string: {}".
81 format(name, length, hex_str))
82 setattr(self, data_name, hex_str)
83 return property(getter, setter)
85 md5sum = _make_hex_prop('md5sum', 32)
86 perm_sig = _make_hex_prop('perm_sig', 40)
89 def perm_expiry(self):
90 return self._perm_expiry
93 def perm_expiry(self, value):
94 if not arvados.util.is_hex(value, 1, 8):
96 "permission timestamp must be a hex Unix timestamp: {}".
98 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
100 def permission_hint(self):
101 data = [self.perm_sig, self.perm_expiry]
104 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
105 return "A{}@{:08x}".format(*data)
107 def parse_permission_hint(self, s):
109 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
111 raise ValueError("bad permission hint {}".format(s))
113 def permission_expired(self, as_of_dt=None):
114 if self.perm_expiry is None:
116 elif as_of_dt is None:
117 as_of_dt = datetime.datetime.now()
118 return self.perm_expiry <= as_of_dt
122 """Simple interface to a global KeepClient object.
124 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
125 own API client. The global KeepClient will build an API client from the
126 current Arvados configuration, which may not match the one you built.
131 def global_client_object(cls):
132 global global_client_object
133 # Previously, KeepClient would change its behavior at runtime based
134 # on these configuration settings. We simulate that behavior here
135 # by checking the values and returning a new KeepClient if any of
137 key = (config.get('ARVADOS_API_HOST'),
138 config.get('ARVADOS_API_TOKEN'),
139 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
140 config.get('ARVADOS_KEEP_PROXY'),
141 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
142 os.environ.get('KEEP_LOCAL_STORE'))
143 if (global_client_object is None) or (cls._last_key != key):
144 global_client_object = KeepClient()
146 return global_client_object
149 def get(locator, **kwargs):
150 return Keep.global_client_object().get(locator, **kwargs)
153 def put(data, **kwargs):
154 return Keep.global_client_object().put(data, **kwargs)
156 class KeepBlockCache(object):
157 # Default RAM cache is 256MiB
158 def __init__(self, cache_max=(256 * 1024 * 1024)):
159 self.cache_max = cache_max
161 self._cache_lock = threading.Lock()
163 class CacheSlot(object):
164 __slots__ = ("locator", "ready", "content")
166 def __init__(self, locator):
167 self.locator = locator
168 self.ready = threading.Event()
175 def set(self, value):
180 if self.content is None:
183 return len(self.content)
186 '''Cap the cache size to self.cache_max'''
187 with self._cache_lock:
188 # Select all slots except those where ready.is_set() and content is
189 # None (that means there was an error reading the block).
190 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
191 sm = sum([slot.size() for slot in self._cache])
192 while len(self._cache) > 0 and sm > self.cache_max:
193 for i in xrange(len(self._cache)-1, -1, -1):
194 if self._cache[i].ready.is_set():
197 sm = sum([slot.size() for slot in self._cache])
199 def _get(self, locator):
200 # Test if the locator is already in the cache
201 for i in xrange(0, len(self._cache)):
202 if self._cache[i].locator == locator:
205 # move it to the front
207 self._cache.insert(0, n)
211 def get(self, locator):
212 with self._cache_lock:
213 return self._get(locator)
215 def reserve_cache(self, locator):
216 '''Reserve a cache slot for the specified locator,
217 or return the existing slot.'''
218 with self._cache_lock:
219 n = self._get(locator)
223 # Add a new cache slot for the locator
224 n = KeepBlockCache.CacheSlot(locator)
225 self._cache.insert(0, n)
228 class Counter(object):
229 def __init__(self, v=0):
230 self._lk = threading.Lock()
242 class KeepClient(object):
244 # Default Keep server connection timeout: 2 seconds
245 # Default Keep server read timeout: 256 seconds
246 # Default Keep server bandwidth minimum: 32768 bytes per second
247 # Default Keep proxy connection timeout: 20 seconds
248 # Default Keep proxy read timeout: 256 seconds
249 # Default Keep proxy bandwidth minimum: 32768 bytes per second
250 DEFAULT_TIMEOUT = (2, 256, 32768)
251 DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
254 class KeepService(object):
255 """Make requests to a single Keep service, and track results.
257 A KeepService is intended to last long enough to perform one
258 transaction (GET or PUT) against one Keep service. This can
259 involve calling either get() or put() multiple times in order
260 to retry after transient failures. However, calling both get()
261 and put() on a single instance -- or using the same instance
262 to access two different Keep services -- will not produce
269 arvados.errors.HttpError,
272 def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
274 download_counter=None, **headers):
276 self._user_agent_pool = user_agent_pool
277 self._result = {'error': None}
280 self.get_headers = {'Accept': 'application/octet-stream'}
281 self.get_headers.update(headers)
282 self.put_headers = headers
283 self.upload_counter = upload_counter
284 self.download_counter = download_counter
287 """Is it worth attempting a request?"""
291 """Did the request succeed or encounter permanent failure?"""
292 return self._result['error'] == False or not self._usable
294 def last_result(self):
297 def _get_user_agent(self):
299 return self._user_agent_pool.get(False)
303 def _put_user_agent(self, ua):
306 self._user_agent_pool.put(ua, False)
311 def _socket_open(family, socktype, protocol, address=None):
312 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
313 s = socket.socket(family, socktype, protocol)
314 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
315 # Will throw invalid protocol error on mac. This test prevents that.
316 if hasattr(socket, 'TCP_KEEPIDLE'):
317 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
318 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
321 def get(self, locator, method="GET", timeout=None):
322 # locator is a KeepLocator object.
323 url = self.root + str(locator)
324 _logger.debug("Request: %s %s", method, url)
325 curl = self._get_user_agent()
328 with timer.Timer() as t:
330 response_body = cStringIO.StringIO()
331 curl.setopt(pycurl.NOSIGNAL, 1)
332 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
333 curl.setopt(pycurl.URL, url.encode('utf-8'))
334 curl.setopt(pycurl.HTTPHEADER, [
335 '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
336 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
337 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
339 curl.setopt(pycurl.NOBODY, True)
340 self._setcurltimeouts(curl, timeout)
344 except Exception as e:
345 raise arvados.errors.HttpError(0, str(e))
347 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
348 'body': response_body.getvalue(),
349 'headers': self._headers,
353 ok = retry.check_http_response_success(self._result['status_code'])
355 self._result['error'] = arvados.errors.HttpError(
356 self._result['status_code'],
357 self._headers.get('x-status-line', 'Error'))
358 except self.HTTP_ERRORS as e:
362 self._usable = ok != False
363 if self._result.get('status_code', None):
364 # The client worked well enough to get an HTTP status
365 # code, so presumably any problems are just on the
366 # server side and it's OK to reuse the client.
367 self._put_user_agent(curl)
369 # Don't return this client to the pool, in case it's
373 _logger.debug("Request fail: GET %s => %s: %s",
374 url, type(self._result['error']), str(self._result['error']))
377 _logger.info("HEAD %s: %s bytes",
378 self._result['status_code'],
379 self._result.get('content-length'))
382 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
383 self._result['status_code'],
384 len(self._result['body']),
386 (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
388 if self.download_counter:
389 self.download_counter.add(len(self._result['body']))
390 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
391 if resp_md5 != locator.md5sum:
392 _logger.warning("Checksum fail: md5(%s) = %s",
394 self._result['error'] = arvados.errors.HttpError(
397 return self._result['body']
399 def put(self, hash_s, body, timeout=None):
400 url = self.root + hash_s
401 _logger.debug("Request: PUT %s", url)
402 curl = self._get_user_agent()
405 with timer.Timer() as t:
407 body_reader = cStringIO.StringIO(body)
408 response_body = cStringIO.StringIO()
409 curl.setopt(pycurl.NOSIGNAL, 1)
410 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
411 curl.setopt(pycurl.URL, url.encode('utf-8'))
412 # Using UPLOAD tells cURL to wait for a "go ahead" from the
413 # Keep server (in the form of a HTTP/1.1 "100 Continue"
414 # response) instead of sending the request body immediately.
415 # This allows the server to reject the request if the request
416 # is invalid or the server is read-only, without waiting for
417 # the client to send the entire block.
418 curl.setopt(pycurl.UPLOAD, True)
419 curl.setopt(pycurl.INFILESIZE, len(body))
420 curl.setopt(pycurl.READFUNCTION, body_reader.read)
421 curl.setopt(pycurl.HTTPHEADER, [
422 '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
423 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
424 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
425 self._setcurltimeouts(curl, timeout)
428 except Exception as e:
429 raise arvados.errors.HttpError(0, str(e))
431 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
432 'body': response_body.getvalue(),
433 'headers': self._headers,
436 ok = retry.check_http_response_success(self._result['status_code'])
438 self._result['error'] = arvados.errors.HttpError(
439 self._result['status_code'],
440 self._headers.get('x-status-line', 'Error'))
441 except self.HTTP_ERRORS as e:
445 self._usable = ok != False # still usable if ok is True or None
446 if self._result.get('status_code', None):
447 # Client is functional. See comment in get().
448 self._put_user_agent(curl)
452 _logger.debug("Request fail: PUT %s => %s: %s",
453 url, type(self._result['error']), str(self._result['error']))
455 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
456 self._result['status_code'],
459 (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
460 if self.upload_counter:
461 self.upload_counter.add(len(body))
464 def _setcurltimeouts(self, curl, timeouts):
467 elif isinstance(timeouts, tuple):
468 if len(timeouts) == 2:
469 conn_t, xfer_t = timeouts
470 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
472 conn_t, xfer_t, bandwidth_bps = timeouts
474 conn_t, xfer_t = (timeouts, timeouts)
475 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
476 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
477 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
478 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
480 def _headerfunction(self, header_line):
481 header_line = header_line.decode('iso-8859-1')
482 if ':' in header_line:
483 name, value = header_line.split(':', 1)
484 name = name.strip().lower()
485 value = value.strip()
487 name = self._lastheadername
488 value = self._headers[name] + ' ' + header_line.strip()
489 elif header_line.startswith('HTTP/'):
490 name = 'x-status-line'
493 _logger.error("Unexpected header line: %s", header_line)
495 self._lastheadername = name
496 self._headers[name] = value
497 # Returning None implies all bytes were written
500 class KeepWriterQueue(Queue.Queue):
501 def __init__(self, copies):
502 Queue.Queue.__init__(self) # Old-style superclass
503 self.wanted_copies = copies
504 self.successful_copies = 0
506 self.successful_copies_lock = threading.Lock()
507 self.pending_tries = copies
508 self.pending_tries_notification = threading.Condition()
510 def write_success(self, response, replicas_nr):
511 with self.successful_copies_lock:
512 self.successful_copies += replicas_nr
513 self.response = response
515 def write_fail(self, ks, status_code):
516 with self.pending_tries_notification:
517 self.pending_tries += 1
518 self.pending_tries_notification.notify()
520 def pending_copies(self):
521 with self.successful_copies_lock:
522 return self.wanted_copies - self.successful_copies
525 class KeepWriterThreadPool(object):
526 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
527 self.total_task_nr = 0
528 self.wanted_copies = copies
529 if (not max_service_replicas) or (max_service_replicas >= copies):
532 num_threads = int(math.ceil(float(copies) / max_service_replicas))
533 _logger.debug("Pool max threads is %d", num_threads)
535 self.queue = KeepClient.KeepWriterQueue(copies)
537 for _ in range(num_threads):
538 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
539 self.workers.append(w)
541 def add_task(self, ks, service_root):
542 self.queue.put((ks, service_root))
543 self.total_task_nr += 1
546 return self.queue.successful_copies
550 for worker in self.workers:
552 # Wait for finished work
554 with self.queue.pending_tries_notification:
555 self.queue.pending_tries_notification.notify_all()
556 for worker in self.workers:
560 return self.queue.response
563 class KeepWriterThread(threading.Thread):
564 def __init__(self, queue, data, data_hash, timeout=None):
565 super(KeepClient.KeepWriterThread, self).__init__()
566 self.timeout = timeout
569 self.data_hash = data_hash
572 while not self.queue.empty():
573 if self.queue.pending_copies() > 0:
574 # Avoid overreplication, wait for some needed re-attempt
575 with self.queue.pending_tries_notification:
576 if self.queue.pending_tries <= 0:
577 self.queue.pending_tries_notification.wait()
578 continue # try again when awake
579 self.queue.pending_tries -= 1
583 service, service_root = self.queue.get_nowait()
586 if service.finished():
587 self.queue.task_done()
589 success = bool(service.put(self.data_hash,
591 timeout=self.timeout))
592 result = service.last_result()
594 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
595 str(threading.current_thread()),
600 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
601 except (KeyError, ValueError):
604 self.queue.write_success(result['body'].strip(), replicas_stored)
606 if result.get('status_code', None):
607 _logger.debug("Request fail: PUT %s => %s %s",
609 result['status_code'],
611 self.queue.write_fail(service, result.get('status_code', None)) # Schedule a re-attempt with next service
612 # Mark as done so the queue can be join()ed
613 self.queue.task_done()
615 # Remove the task from the queue anyways
617 self.queue.get_nowait()
618 # Mark as done so the queue can be join()ed
619 self.queue.task_done()
624 def __init__(self, api_client=None, proxy=None,
625 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
626 api_token=None, local_store=None, block_cache=None,
627 num_retries=0, session=None):
628 """Initialize a new KeepClient.
632 The API client to use to find Keep services. If not
633 provided, KeepClient will build one from available Arvados
637 If specified, this KeepClient will send requests to this Keep
638 proxy. Otherwise, KeepClient will fall back to the setting of the
639 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
640 If you want to KeepClient does not use a proxy, pass in an empty
644 The initial timeout (in seconds) for HTTP requests to Keep
645 non-proxy servers. A tuple of three floats is interpreted as
646 (connection_timeout, read_timeout, minimum_bandwidth). A connection
647 will be aborted if the average traffic rate falls below
648 minimum_bandwidth bytes per second over an interval of read_timeout
649 seconds. Because timeouts are often a result of transient server
650 load, the actual connection timeout will be increased by a factor
651 of two on each retry.
652 Default: (2, 256, 32768).
655 The initial timeout (in seconds) for HTTP requests to
656 Keep proxies. A tuple of three floats is interpreted as
657 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
658 described above for adjusting connection timeouts on retry also
660 Default: (20, 256, 32768).
663 If you're not using an API client, but only talking
664 directly to a Keep proxy, this parameter specifies an API token
665 to authenticate Keep requests. It is an error to specify both
666 api_client and api_token. If you specify neither, KeepClient
667 will use one available from the Arvados configuration.
670 If specified, this KeepClient will bypass Keep
671 services, and save data to the named directory. If unspecified,
672 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
673 environment variable. If you want to ensure KeepClient does not
674 use local storage, pass in an empty string. This is primarily
675 intended to mock a server for testing.
678 The default number of times to retry failed requests.
679 This will be used as the default num_retries value when get() and
680 put() are called. Default 0.
682 self.lock = threading.Lock()
684 if config.get('ARVADOS_KEEP_SERVICES'):
685 proxy = config.get('ARVADOS_KEEP_SERVICES')
687 proxy = config.get('ARVADOS_KEEP_PROXY')
688 if api_token is None:
689 if api_client is None:
690 api_token = config.get('ARVADOS_API_TOKEN')
692 api_token = api_client.api_token
693 elif api_client is not None:
695 "can't build KeepClient with both API client and token")
696 if local_store is None:
697 local_store = os.environ.get('KEEP_LOCAL_STORE')
699 self.block_cache = block_cache if block_cache else KeepBlockCache()
700 self.timeout = timeout
701 self.proxy_timeout = proxy_timeout
702 self._user_agent_pool = Queue.LifoQueue()
703 self.upload_counter = Counter()
704 self.download_counter = Counter()
705 self.put_counter = Counter()
706 self.get_counter = Counter()
707 self.hits_counter = Counter()
708 self.misses_counter = Counter()
711 self.local_store = local_store
712 self.get = self.local_store_get
713 self.put = self.local_store_put
715 self.num_retries = num_retries
716 self.max_replicas_per_service = None
718 proxy_uris = proxy.split()
719 for i in range(len(proxy_uris)):
720 if not proxy_uris[i].endswith('/'):
723 url = urlparse.urlparse(proxy_uris[i])
724 if not (url.scheme and url.netloc):
725 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
726 self.api_token = api_token
727 self._gateway_services = {}
728 self._keep_services = [{
729 'uuid': "00000-bi6l4-%015d" % idx,
730 'service_type': 'proxy',
731 '_service_root': uri,
732 } for idx, uri in enumerate(proxy_uris)]
733 self._writable_services = self._keep_services
734 self.using_proxy = True
735 self._static_services_list = True
737 # It's important to avoid instantiating an API client
738 # unless we actually need one, for testing's sake.
739 if api_client is None:
740 api_client = arvados.api('v1')
741 self.api_client = api_client
742 self.api_token = api_client.api_token
743 self._gateway_services = {}
744 self._keep_services = None
745 self._writable_services = None
746 self.using_proxy = None
747 self._static_services_list = False
749 def current_timeout(self, attempt_number):
750 """Return the appropriate timeout to use for this client.
752 The proxy timeout setting if the backend service is currently a proxy,
753 the regular timeout setting otherwise. The `attempt_number` indicates
754 how many times the operation has been tried already (starting from 0
755 for the first try), and scales the connection timeout portion of the
756 return value accordingly.
759 # TODO(twp): the timeout should be a property of a
760 # KeepService, not a KeepClient. See #4488.
761 t = self.proxy_timeout if self.using_proxy else self.timeout
763 return (t[0] * (1 << attempt_number), t[1])
765 return (t[0] * (1 << attempt_number), t[1], t[2])
766 def _any_nondisk_services(self, service_list):
767 return any(ks.get('service_type', 'disk') != 'disk'
768 for ks in service_list)
770 def build_services_list(self, force_rebuild=False):
771 if (self._static_services_list or
772 (self._keep_services and not force_rebuild)):
776 keep_services = self.api_client.keep_services().accessible()
777 except Exception: # API server predates Keep services.
778 keep_services = self.api_client.keep_disks().list()
780 # Gateway services are only used when specified by UUID,
781 # so there's nothing to gain by filtering them by
783 self._gateway_services = {ks['uuid']: ks for ks in
784 keep_services.execute()['items']}
785 if not self._gateway_services:
786 raise arvados.errors.NoKeepServersError()
788 # Precompute the base URI for each service.
789 for r in self._gateway_services.itervalues():
790 host = r['service_host']
791 if not host.startswith('[') and host.find(':') >= 0:
792 # IPv6 URIs must be formatted like http://[::1]:80/...
793 host = '[' + host + ']'
794 r['_service_root'] = "{}://{}:{:d}/".format(
795 'https' if r['service_ssl_flag'] else 'http',
799 _logger.debug(str(self._gateway_services))
800 self._keep_services = [
801 ks for ks in self._gateway_services.itervalues()
802 if not ks.get('service_type', '').startswith('gateway:')]
803 self._writable_services = [ks for ks in self._keep_services
804 if not ks.get('read_only')]
806 # For disk type services, max_replicas_per_service is 1
807 # It is unknown (unlimited) for other service types.
808 if self._any_nondisk_services(self._writable_services):
809 self.max_replicas_per_service = None
811 self.max_replicas_per_service = 1
813 def _service_weight(self, data_hash, service_uuid):
814 """Compute the weight of a Keep service endpoint for a data
815 block with a known hash.
817 The weight is md5(h + u) where u is the last 15 characters of
818 the service endpoint's UUID.
820 return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
822 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
823 """Return an array of Keep service endpoints, in the order in
824 which they should be probed when reading or writing data with
825 the given hash+hints.
827 self.build_services_list(force_rebuild)
830 # Use the services indicated by the given +K@... remote
831 # service hints, if any are present and can be resolved to a
833 for hint in locator.hints:
834 if hint.startswith('K@'):
837 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
838 elif len(hint) == 29:
839 svc = self._gateway_services.get(hint[2:])
841 sorted_roots.append(svc['_service_root'])
843 # Sort the available local services by weight (heaviest first)
844 # for this locator, and return their service_roots (base URIs)
846 use_services = self._keep_services
848 use_services = self._writable_services
849 self.using_proxy = self._any_nondisk_services(use_services)
850 sorted_roots.extend([
851 svc['_service_root'] for svc in sorted(
854 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
855 _logger.debug("{}: {}".format(locator, sorted_roots))
858 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
859 # roots_map is a dictionary, mapping Keep service root strings
860 # to KeepService objects. Poll for Keep services, and add any
861 # new ones to roots_map. Return the current list of local
863 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
864 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
865 for root in local_roots:
866 if root not in roots_map:
867 roots_map[root] = self.KeepService(
868 root, self._user_agent_pool,
869 upload_counter=self.upload_counter,
870 download_counter=self.download_counter,
875 def _check_loop_result(result):
876 # KeepClient RetryLoops should save results as a 2-tuple: the
877 # actual result of the request, and the number of servers available
878 # to receive the request this round.
879 # This method returns True if there's a real result, False if
880 # there are no more servers available, otherwise None.
881 if isinstance(result, Exception):
883 result, tried_server_count = result
884 if (result is not None) and (result is not False):
886 elif tried_server_count < 1:
887 _logger.info("No more Keep services to try; giving up")
892 def get_from_cache(self, loc):
893 """Fetch a block only if is in the cache, otherwise return None."""
894 slot = self.block_cache.get(loc)
895 if slot is not None and slot.ready.is_set():
901 def head(self, loc_s, num_retries=None):
902 return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
905 def get(self, loc_s, num_retries=None):
906 return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
908 def _get_or_head(self, loc_s, method="GET", num_retries=None):
909 """Get data from Keep.
911 This method fetches one or more blocks of data from Keep. It
912 sends a request each Keep service registered with the API
913 server (or the proxy provided when this client was
914 instantiated), then each service named in location hints, in
915 sequence. As soon as one service provides the data, it's
919 * loc_s: A string of one or more comma-separated locators to fetch.
920 This method returns the concatenation of these blocks.
921 * num_retries: The number of times to retry GET requests to
922 *each* Keep server if it returns temporary failures, with
923 exponential backoff. Note that, in each loop, the method may try
924 to fetch data from every available Keep service, along with any
925 that are named in location hints in the locator. The default value
926 is set when the KeepClient is initialized.
929 return ''.join(self.get(x) for x in loc_s.split(','))
931 self.get_counter.add(1)
933 locator = KeepLocator(loc_s)
935 slot, first = self.block_cache.reserve_cache(locator.md5sum)
937 self.hits_counter.add(1)
941 self.misses_counter.add(1)
943 # If the locator has hints specifying a prefix (indicating a
944 # remote keepproxy) or the UUID of a local gateway service,
945 # read data from the indicated service(s) instead of the usual
946 # list of local disk services.
947 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
948 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
949 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
950 for hint in locator.hints if (
951 hint.startswith('K@') and
953 self._gateway_services.get(hint[2:])
955 # Map root URLs to their KeepService objects.
957 root: self.KeepService(root, self._user_agent_pool,
958 upload_counter=self.upload_counter,
959 download_counter=self.download_counter)
960 for root in hint_roots
963 # See #3147 for a discussion of the loop implementation. Highlights:
964 # * Refresh the list of Keep services after each failure, in case
965 # it's being updated.
966 # * Retry until we succeed, we're out of retries, or every available
967 # service has returned permanent failure.
971 loop = retry.RetryLoop(num_retries, self._check_loop_result,
973 for tries_left in loop:
975 sorted_roots = self.map_new_services(
977 force_rebuild=(tries_left < num_retries),
979 except Exception as error:
980 loop.save_result(error)
983 # Query KeepService objects that haven't returned
984 # permanent failure, in our specified shuffle order.
985 services_to_try = [roots_map[root]
986 for root in sorted_roots
987 if roots_map[root].usable()]
988 for keep_service in services_to_try:
989 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
992 loop.save_result((blob, len(services_to_try)))
994 # Always cache the result, then return it if we succeeded.
997 self.block_cache.cap_cache()
1004 # Q: Including 403 is necessary for the Keep tests to continue
1005 # passing, but maybe they should expect KeepReadError instead?
1006 not_founds = sum(1 for key in sorted_roots
1007 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1008 service_errors = ((key, roots_map[key].last_result()['error'])
1009 for key in sorted_roots)
1011 raise arvados.errors.KeepReadError(
1012 "failed to read {}: no Keep services available ({})".format(
1013 loc_s, loop.last_result()))
1014 elif not_founds == len(sorted_roots):
1015 raise arvados.errors.NotFoundError(
1016 "{} not found".format(loc_s), service_errors)
1018 raise arvados.errors.KeepReadError(
1019 "failed to read {}".format(loc_s), service_errors, label="service")
1022 def put(self, data, copies=2, num_retries=None):
1023 """Save data in Keep.
1025 This method will get a list of Keep services from the API server, and
1026 send the data to each one simultaneously in a new thread. Once the
1027 uploads are finished, if enough copies are saved, this method returns
1028 the most recent HTTP response body. If requests fail to upload
1029 enough copies, this method raises KeepWriteError.
1032 * data: The string of data to upload.
1033 * copies: The number of copies that the user requires be saved.
1035 * num_retries: The number of times to retry PUT requests to
1036 *each* Keep server if it returns temporary failures, with
1037 exponential backoff. The default value is set when the
1038 KeepClient is initialized.
1041 if isinstance(data, unicode):
1042 data = data.encode("ascii")
1043 elif not isinstance(data, str):
1044 raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
1046 self.put_counter.add(1)
1048 data_hash = hashlib.md5(data).hexdigest()
1049 loc_s = data_hash + '+' + str(len(data))
1052 locator = KeepLocator(loc_s)
1055 # Tell the proxy how many copies we want it to store
1056 headers['X-Keep-Desired-Replicas'] = str(copies)
1058 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1061 for tries_left in loop:
1063 sorted_roots = self.map_new_services(
1065 force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
1066 except Exception as error:
1067 loop.save_result(error)
1070 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1071 data_hash=data_hash,
1072 copies=copies - done,
1073 max_service_replicas=self.max_replicas_per_service,
1074 timeout=self.current_timeout(num_retries - tries_left))
1075 for service_root, ks in [(root, roots_map[root])
1076 for root in sorted_roots]:
1079 writer_pool.add_task(ks, service_root)
1081 done += writer_pool.done()
1082 loop.save_result((done >= copies, writer_pool.total_task_nr))
1085 return writer_pool.response()
1087 raise arvados.errors.KeepWriteError(
1088 "failed to write {}: no Keep services available ({})".format(
1089 data_hash, loop.last_result()))
1091 service_errors = ((key, roots_map[key].last_result()['error'])
1092 for key in sorted_roots
1093 if roots_map[key].last_result()['error'])
1094 raise arvados.errors.KeepWriteError(
1095 "failed to write {} (wanted {} copies but wrote {})".format(
1096 data_hash, copies, writer_pool.done()), service_errors, label="service")
1098 def local_store_put(self, data, copies=1, num_retries=None):
1099 """A stub for put().
1101 This method is used in place of the real put() method when
1102 using local storage (see constructor's local_store argument).
1104 copies and num_retries arguments are ignored: they are here
1105 only for the sake of offering the same call signature as
1108 Data stored this way can be retrieved via local_store_get().
1110 md5 = hashlib.md5(data).hexdigest()
1111 locator = '%s+%d' % (md5, len(data))
1112 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1114 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1115 os.path.join(self.local_store, md5))
1118 def local_store_get(self, loc_s, num_retries=None):
1119 """Companion to local_store_put()."""
1121 locator = KeepLocator(loc_s)
1123 raise arvados.errors.NotFoundError(
1124 "Invalid data locator: '%s'" % loc_s)
1125 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1127 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1130 def is_cached(self, locator):
1131 return self.block_cache.reserve_cache(expect_hash)