1 from __future__ import absolute_import
19 import arvados.config as config
21 import arvados.retry as retry
24 _logger = logging.getLogger('arvados.keep')
25 global_client_object = None
28 # Monkey patch TCP constants when not available (apple). Values sourced from:
29 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
30 if sys.platform == 'darwin':
31 if not hasattr(socket, 'TCP_KEEPALIVE'):
32 socket.TCP_KEEPALIVE = 0x010
33 if not hasattr(socket, 'TCP_KEEPINTVL'):
34 socket.TCP_KEEPINTVL = 0x101
35 if not hasattr(socket, 'TCP_KEEPCNT'):
36 socket.TCP_KEEPCNT = 0x102
39 class KeepLocator(object):
40 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
41 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
43 def __init__(self, locator_str):
46 self._perm_expiry = None
47 pieces = iter(locator_str.split('+'))
48 self.md5sum = next(pieces)
50 self.size = int(next(pieces))
54 if self.HINT_RE.match(hint) is None:
55 raise ValueError("invalid hint format: {}".format(hint))
56 elif hint.startswith('A'):
57 self.parse_permission_hint(hint)
59 self.hints.append(hint)
63 str(s) for s in [self.md5sum, self.size,
64 self.permission_hint()] + self.hints
68 if self.size is not None:
69 return "%s+%i" % (self.md5sum, self.size)
73 def _make_hex_prop(name, length):
74 # Build and return a new property with the given name that
75 # must be a hex string of the given length.
76 data_name = '_{}'.format(name)
78 return getattr(self, data_name)
79 def setter(self, hex_str):
80 if not arvados.util.is_hex(hex_str, length):
81 raise ValueError("{} is not a {}-digit hex string: {}".
82 format(name, length, hex_str))
83 setattr(self, data_name, hex_str)
84 return property(getter, setter)
86 md5sum = _make_hex_prop('md5sum', 32)
87 perm_sig = _make_hex_prop('perm_sig', 40)
90 def perm_expiry(self):
91 return self._perm_expiry
94 def perm_expiry(self, value):
95 if not arvados.util.is_hex(value, 1, 8):
97 "permission timestamp must be a hex Unix timestamp: {}".
99 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
101 def permission_hint(self):
102 data = [self.perm_sig, self.perm_expiry]
105 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
106 return "A{}@{:08x}".format(*data)
108 def parse_permission_hint(self, s):
110 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
112 raise ValueError("bad permission hint {}".format(s))
114 def permission_expired(self, as_of_dt=None):
115 if self.perm_expiry is None:
117 elif as_of_dt is None:
118 as_of_dt = datetime.datetime.now()
119 return self.perm_expiry <= as_of_dt
123 """Simple interface to a global KeepClient object.
125 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
126 own API client. The global KeepClient will build an API client from the
127 current Arvados configuration, which may not match the one you built.
132 def global_client_object(cls):
133 global global_client_object
134 # Previously, KeepClient would change its behavior at runtime based
135 # on these configuration settings. We simulate that behavior here
136 # by checking the values and returning a new KeepClient if any of
138 key = (config.get('ARVADOS_API_HOST'),
139 config.get('ARVADOS_API_TOKEN'),
140 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
141 config.get('ARVADOS_KEEP_PROXY'),
142 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
143 os.environ.get('KEEP_LOCAL_STORE'))
144 if (global_client_object is None) or (cls._last_key != key):
145 global_client_object = KeepClient()
147 return global_client_object
150 def get(locator, **kwargs):
151 return Keep.global_client_object().get(locator, **kwargs)
154 def put(data, **kwargs):
155 return Keep.global_client_object().put(data, **kwargs)
157 class KeepBlockCache(object):
158 # Default RAM cache is 256MiB
159 def __init__(self, cache_max=(256 * 1024 * 1024)):
160 self.cache_max = cache_max
162 self._cache_lock = threading.Lock()
164 class CacheSlot(object):
165 __slots__ = ("locator", "ready", "content")
167 def __init__(self, locator):
168 self.locator = locator
169 self.ready = threading.Event()
176 def set(self, value):
181 if self.content is None:
184 return len(self.content)
187 '''Cap the cache size to self.cache_max'''
188 with self._cache_lock:
189 # Select all slots except those where ready.is_set() and content is
190 # None (that means there was an error reading the block).
191 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
192 sm = sum([slot.size() for slot in self._cache])
193 while len(self._cache) > 0 and sm > self.cache_max:
194 for i in xrange(len(self._cache)-1, -1, -1):
195 if self._cache[i].ready.is_set():
198 sm = sum([slot.size() for slot in self._cache])
200 def _get(self, locator):
201 # Test if the locator is already in the cache
202 for i in xrange(0, len(self._cache)):
203 if self._cache[i].locator == locator:
206 # move it to the front
208 self._cache.insert(0, n)
212 def get(self, locator):
213 with self._cache_lock:
214 return self._get(locator)
216 def reserve_cache(self, locator):
217 '''Reserve a cache slot for the specified locator,
218 or return the existing slot.'''
219 with self._cache_lock:
220 n = self._get(locator)
224 # Add a new cache slot for the locator
225 n = KeepBlockCache.CacheSlot(locator)
226 self._cache.insert(0, n)
229 class Counter(object):
230 def __init__(self, v=0):
231 self._lk = threading.Lock()
243 class KeepClient(object):
245 # Default Keep server connection timeout: 2 seconds
246 # Default Keep server read timeout: 256 seconds
247 # Default Keep server bandwidth minimum: 32768 bytes per second
248 # Default Keep proxy connection timeout: 20 seconds
249 # Default Keep proxy read timeout: 256 seconds
250 # Default Keep proxy bandwidth minimum: 32768 bytes per second
251 DEFAULT_TIMEOUT = (2, 256, 32768)
252 DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
255 class KeepService(object):
256 """Make requests to a single Keep service, and track results.
258 A KeepService is intended to last long enough to perform one
259 transaction (GET or PUT) against one Keep service. This can
260 involve calling either get() or put() multiple times in order
261 to retry after transient failures. However, calling both get()
262 and put() on a single instance -- or using the same instance
263 to access two different Keep services -- will not produce
270 arvados.errors.HttpError,
273 def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
275 download_counter=None, **headers):
277 self._user_agent_pool = user_agent_pool
278 self._result = {'error': None}
281 self.get_headers = {'Accept': 'application/octet-stream'}
282 self.get_headers.update(headers)
283 self.put_headers = headers
284 self.upload_counter = upload_counter
285 self.download_counter = download_counter
288 """Is it worth attempting a request?"""
292 """Did the request succeed or encounter permanent failure?"""
293 return self._result['error'] == False or not self._usable
295 def last_result(self):
298 def _get_user_agent(self):
300 return self._user_agent_pool.get(block=False)
304 def _put_user_agent(self, ua):
307 self._user_agent_pool.put(ua, block=False)
312 def _socket_open(family, socktype, protocol, address=None):
313 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
314 s = socket.socket(family, socktype, protocol)
315 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
316 # Will throw invalid protocol error on mac. This test prevents that.
317 if hasattr(socket, 'TCP_KEEPIDLE'):
318 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
319 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
322 def get(self, locator, method="GET", timeout=None):
323 # locator is a KeepLocator object.
324 url = self.root + str(locator)
325 _logger.debug("Request: %s %s", method, url)
326 curl = self._get_user_agent()
329 with timer.Timer() as t:
331 response_body = cStringIO.StringIO()
332 curl.setopt(pycurl.NOSIGNAL, 1)
333 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
334 curl.setopt(pycurl.URL, url.encode('utf-8'))
335 curl.setopt(pycurl.HTTPHEADER, [
336 '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
337 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
338 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
340 curl.setopt(pycurl.NOBODY, True)
341 self._setcurltimeouts(curl, timeout)
345 except Exception as e:
346 raise arvados.errors.HttpError(0, str(e))
348 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
349 'body': response_body.getvalue(),
350 'headers': self._headers,
354 ok = retry.check_http_response_success(self._result['status_code'])
356 self._result['error'] = arvados.errors.HttpError(
357 self._result['status_code'],
358 self._headers.get('x-status-line', 'Error'))
359 except self.HTTP_ERRORS as e:
363 self._usable = ok != False
364 if self._result.get('status_code', None):
365 # The client worked well enough to get an HTTP status
366 # code, so presumably any problems are just on the
367 # server side and it's OK to reuse the client.
368 self._put_user_agent(curl)
370 # Don't return this client to the pool, in case it's
374 _logger.debug("Request fail: GET %s => %s: %s",
375 url, type(self._result['error']), str(self._result['error']))
378 _logger.info("HEAD %s: %s bytes",
379 self._result['status_code'],
380 self._result.get('content-length'))
383 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
384 self._result['status_code'],
385 len(self._result['body']),
387 (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
389 if self.download_counter:
390 self.download_counter.add(len(self._result['body']))
391 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
392 if resp_md5 != locator.md5sum:
393 _logger.warning("Checksum fail: md5(%s) = %s",
395 self._result['error'] = arvados.errors.HttpError(
398 return self._result['body']
400 def put(self, hash_s, body, timeout=None):
401 url = self.root + hash_s
402 _logger.debug("Request: PUT %s", url)
403 curl = self._get_user_agent()
406 with timer.Timer() as t:
408 body_reader = cStringIO.StringIO(body)
409 response_body = cStringIO.StringIO()
410 curl.setopt(pycurl.NOSIGNAL, 1)
411 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
412 curl.setopt(pycurl.URL, url.encode('utf-8'))
413 # Using UPLOAD tells cURL to wait for a "go ahead" from the
414 # Keep server (in the form of a HTTP/1.1 "100 Continue"
415 # response) instead of sending the request body immediately.
416 # This allows the server to reject the request if the request
417 # is invalid or the server is read-only, without waiting for
418 # the client to send the entire block.
419 curl.setopt(pycurl.UPLOAD, True)
420 curl.setopt(pycurl.INFILESIZE, len(body))
421 curl.setopt(pycurl.READFUNCTION, body_reader.read)
422 curl.setopt(pycurl.HTTPHEADER, [
423 '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
424 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
425 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
426 self._setcurltimeouts(curl, timeout)
429 except Exception as e:
430 raise arvados.errors.HttpError(0, str(e))
432 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
433 'body': response_body.getvalue(),
434 'headers': self._headers,
437 ok = retry.check_http_response_success(self._result['status_code'])
439 self._result['error'] = arvados.errors.HttpError(
440 self._result['status_code'],
441 self._headers.get('x-status-line', 'Error'))
442 except self.HTTP_ERRORS as e:
446 self._usable = ok != False # still usable if ok is True or None
447 if self._result.get('status_code', None):
448 # Client is functional. See comment in get().
449 self._put_user_agent(curl)
453 _logger.debug("Request fail: PUT %s => %s: %s",
454 url, type(self._result['error']), str(self._result['error']))
456 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
457 self._result['status_code'],
460 (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
461 if self.upload_counter:
462 self.upload_counter.add(len(body))
465 def _setcurltimeouts(self, curl, timeouts):
468 elif isinstance(timeouts, tuple):
469 if len(timeouts) == 2:
470 conn_t, xfer_t = timeouts
471 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
473 conn_t, xfer_t, bandwidth_bps = timeouts
475 conn_t, xfer_t = (timeouts, timeouts)
476 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
477 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
478 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
479 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
481 def _headerfunction(self, header_line):
482 header_line = header_line.decode('iso-8859-1')
483 if ':' in header_line:
484 name, value = header_line.split(':', 1)
485 name = name.strip().lower()
486 value = value.strip()
488 name = self._lastheadername
489 value = self._headers[name] + ' ' + header_line.strip()
490 elif header_line.startswith('HTTP/'):
491 name = 'x-status-line'
494 _logger.error("Unexpected header line: %s", header_line)
496 self._lastheadername = name
497 self._headers[name] = value
498 # Returning None implies all bytes were written
501 class KeepWriterQueue(Queue.Queue):
502 def __init__(self, copies):
503 Queue.Queue.__init__(self) # Old-style superclass
504 self.wanted_copies = copies
505 self.successful_copies = 0
507 self.successful_copies_lock = threading.Lock()
508 self.pending_tries = copies
509 self.pending_tries_notification = threading.Condition()
511 def write_success(self, response, replicas_nr):
512 with self.successful_copies_lock:
513 self.successful_copies += replicas_nr
514 self.response = response
515 with self.pending_tries_notification:
516 self.pending_tries_notification.notify_all()
518 def write_fail(self, ks):
519 with self.pending_tries_notification:
520 self.pending_tries += 1
521 self.pending_tries_notification.notify()
523 def pending_copies(self):
524 with self.successful_copies_lock:
525 return self.wanted_copies - self.successful_copies
527 def get_next_task(self):
528 with self.pending_tries_notification:
530 if self.pending_copies() < 1:
531 # This notify_all() is unnecessary --
532 # write_success() already called notify_all()
533 # when pending<1 became true, so it's not
534 # possible for any other thread to be in
535 # wait() now -- but it's cheap insurance
536 # against deadlock so we do it anyway:
537 self.pending_tries_notification.notify_all()
538 # Drain the queue and then raise Queue.Empty
542 elif self.pending_tries > 0:
543 service, service_root = self.get_nowait()
544 if service.finished():
547 self.pending_tries -= 1
548 return service, service_root
550 self.pending_tries_notification.notify_all()
553 self.pending_tries_notification.wait()
556 class KeepWriterThreadPool(object):
557 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
558 self.total_task_nr = 0
559 self.wanted_copies = copies
560 if (not max_service_replicas) or (max_service_replicas >= copies):
563 num_threads = int(math.ceil(float(copies) / max_service_replicas))
564 _logger.debug("Pool max threads is %d", num_threads)
566 self.queue = KeepClient.KeepWriterQueue(copies)
568 for _ in range(num_threads):
569 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
570 self.workers.append(w)
572 def add_task(self, ks, service_root):
573 self.queue.put((ks, service_root))
574 self.total_task_nr += 1
577 return self.queue.successful_copies
581 for worker in self.workers:
583 # Wait for finished work
587 return self.queue.response
590 class KeepWriterThread(threading.Thread):
591 TaskFailed = RuntimeError()
593 def __init__(self, queue, data, data_hash, timeout=None):
594 super(KeepClient.KeepWriterThread, self).__init__()
595 self.timeout = timeout
598 self.data_hash = data_hash
604 service, service_root = self.queue.get_next_task()
608 locator, copies = self.do_task(service, service_root)
609 except Exception as e:
610 if e is not self.TaskFailed:
611 _logger.exception("Exception in KeepWriterThread")
612 self.queue.write_fail(service)
614 self.queue.write_success(locator, copies)
616 self.queue.task_done()
618 def do_task(self, service, service_root):
619 success = bool(service.put(self.data_hash,
621 timeout=self.timeout))
622 result = service.last_result()
625 if result.get('status_code', None):
626 _logger.debug("Request fail: PUT %s => %s %s",
628 result['status_code'],
630 raise self.TaskFailed
632 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
633 str(threading.current_thread()),
638 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
639 except (KeyError, ValueError):
642 return result['body'].strip(), replicas_stored
645 def __init__(self, api_client=None, proxy=None,
646 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
647 api_token=None, local_store=None, block_cache=None,
648 num_retries=0, session=None):
649 """Initialize a new KeepClient.
653 The API client to use to find Keep services. If not
654 provided, KeepClient will build one from available Arvados
658 If specified, this KeepClient will send requests to this Keep
659 proxy. Otherwise, KeepClient will fall back to the setting of the
660 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
661 If you want to KeepClient does not use a proxy, pass in an empty
665 The initial timeout (in seconds) for HTTP requests to Keep
666 non-proxy servers. A tuple of three floats is interpreted as
667 (connection_timeout, read_timeout, minimum_bandwidth). A connection
668 will be aborted if the average traffic rate falls below
669 minimum_bandwidth bytes per second over an interval of read_timeout
670 seconds. Because timeouts are often a result of transient server
671 load, the actual connection timeout will be increased by a factor
672 of two on each retry.
673 Default: (2, 256, 32768).
676 The initial timeout (in seconds) for HTTP requests to
677 Keep proxies. A tuple of three floats is interpreted as
678 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
679 described above for adjusting connection timeouts on retry also
681 Default: (20, 256, 32768).
684 If you're not using an API client, but only talking
685 directly to a Keep proxy, this parameter specifies an API token
686 to authenticate Keep requests. It is an error to specify both
687 api_client and api_token. If you specify neither, KeepClient
688 will use one available from the Arvados configuration.
691 If specified, this KeepClient will bypass Keep
692 services, and save data to the named directory. If unspecified,
693 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
694 environment variable. If you want to ensure KeepClient does not
695 use local storage, pass in an empty string. This is primarily
696 intended to mock a server for testing.
699 The default number of times to retry failed requests.
700 This will be used as the default num_retries value when get() and
701 put() are called. Default 0.
703 self.lock = threading.Lock()
705 if config.get('ARVADOS_KEEP_SERVICES'):
706 proxy = config.get('ARVADOS_KEEP_SERVICES')
708 proxy = config.get('ARVADOS_KEEP_PROXY')
709 if api_token is None:
710 if api_client is None:
711 api_token = config.get('ARVADOS_API_TOKEN')
713 api_token = api_client.api_token
714 elif api_client is not None:
716 "can't build KeepClient with both API client and token")
717 if local_store is None:
718 local_store = os.environ.get('KEEP_LOCAL_STORE')
720 self.block_cache = block_cache if block_cache else KeepBlockCache()
721 self.timeout = timeout
722 self.proxy_timeout = proxy_timeout
723 self._user_agent_pool = Queue.LifoQueue()
724 self.upload_counter = Counter()
725 self.download_counter = Counter()
726 self.put_counter = Counter()
727 self.get_counter = Counter()
728 self.hits_counter = Counter()
729 self.misses_counter = Counter()
732 self.local_store = local_store
733 self.get = self.local_store_get
734 self.put = self.local_store_put
736 self.num_retries = num_retries
737 self.max_replicas_per_service = None
739 proxy_uris = proxy.split()
740 for i in range(len(proxy_uris)):
741 if not proxy_uris[i].endswith('/'):
744 url = urlparse.urlparse(proxy_uris[i])
745 if not (url.scheme and url.netloc):
746 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
747 self.api_token = api_token
748 self._gateway_services = {}
749 self._keep_services = [{
750 'uuid': "00000-bi6l4-%015d" % idx,
751 'service_type': 'proxy',
752 '_service_root': uri,
753 } for idx, uri in enumerate(proxy_uris)]
754 self._writable_services = self._keep_services
755 self.using_proxy = True
756 self._static_services_list = True
758 # It's important to avoid instantiating an API client
759 # unless we actually need one, for testing's sake.
760 if api_client is None:
761 api_client = arvados.api('v1')
762 self.api_client = api_client
763 self.api_token = api_client.api_token
764 self._gateway_services = {}
765 self._keep_services = None
766 self._writable_services = None
767 self.using_proxy = None
768 self._static_services_list = False
770 def current_timeout(self, attempt_number):
771 """Return the appropriate timeout to use for this client.
773 The proxy timeout setting if the backend service is currently a proxy,
774 the regular timeout setting otherwise. The `attempt_number` indicates
775 how many times the operation has been tried already (starting from 0
776 for the first try), and scales the connection timeout portion of the
777 return value accordingly.
780 # TODO(twp): the timeout should be a property of a
781 # KeepService, not a KeepClient. See #4488.
782 t = self.proxy_timeout if self.using_proxy else self.timeout
784 return (t[0] * (1 << attempt_number), t[1])
786 return (t[0] * (1 << attempt_number), t[1], t[2])
787 def _any_nondisk_services(self, service_list):
788 return any(ks.get('service_type', 'disk') != 'disk'
789 for ks in service_list)
791 def build_services_list(self, force_rebuild=False):
792 if (self._static_services_list or
793 (self._keep_services and not force_rebuild)):
797 keep_services = self.api_client.keep_services().accessible()
798 except Exception: # API server predates Keep services.
799 keep_services = self.api_client.keep_disks().list()
801 # Gateway services are only used when specified by UUID,
802 # so there's nothing to gain by filtering them by
804 self._gateway_services = {ks['uuid']: ks for ks in
805 keep_services.execute()['items']}
806 if not self._gateway_services:
807 raise arvados.errors.NoKeepServersError()
809 # Precompute the base URI for each service.
810 for r in self._gateway_services.itervalues():
811 host = r['service_host']
812 if not host.startswith('[') and host.find(':') >= 0:
813 # IPv6 URIs must be formatted like http://[::1]:80/...
814 host = '[' + host + ']'
815 r['_service_root'] = "{}://{}:{:d}/".format(
816 'https' if r['service_ssl_flag'] else 'http',
820 _logger.debug(str(self._gateway_services))
821 self._keep_services = [
822 ks for ks in self._gateway_services.itervalues()
823 if not ks.get('service_type', '').startswith('gateway:')]
824 self._writable_services = [ks for ks in self._keep_services
825 if not ks.get('read_only')]
827 # For disk type services, max_replicas_per_service is 1
828 # It is unknown (unlimited) for other service types.
829 if self._any_nondisk_services(self._writable_services):
830 self.max_replicas_per_service = None
832 self.max_replicas_per_service = 1
834 def _service_weight(self, data_hash, service_uuid):
835 """Compute the weight of a Keep service endpoint for a data
836 block with a known hash.
838 The weight is md5(h + u) where u is the last 15 characters of
839 the service endpoint's UUID.
841 return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
843 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
844 """Return an array of Keep service endpoints, in the order in
845 which they should be probed when reading or writing data with
846 the given hash+hints.
848 self.build_services_list(force_rebuild)
851 # Use the services indicated by the given +K@... remote
852 # service hints, if any are present and can be resolved to a
854 for hint in locator.hints:
855 if hint.startswith('K@'):
858 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
859 elif len(hint) == 29:
860 svc = self._gateway_services.get(hint[2:])
862 sorted_roots.append(svc['_service_root'])
864 # Sort the available local services by weight (heaviest first)
865 # for this locator, and return their service_roots (base URIs)
867 use_services = self._keep_services
869 use_services = self._writable_services
870 self.using_proxy = self._any_nondisk_services(use_services)
871 sorted_roots.extend([
872 svc['_service_root'] for svc in sorted(
875 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
876 _logger.debug("{}: {}".format(locator, sorted_roots))
879 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
880 # roots_map is a dictionary, mapping Keep service root strings
881 # to KeepService objects. Poll for Keep services, and add any
882 # new ones to roots_map. Return the current list of local
884 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
885 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
886 for root in local_roots:
887 if root not in roots_map:
888 roots_map[root] = self.KeepService(
889 root, self._user_agent_pool,
890 upload_counter=self.upload_counter,
891 download_counter=self.download_counter,
896 def _check_loop_result(result):
897 # KeepClient RetryLoops should save results as a 2-tuple: the
898 # actual result of the request, and the number of servers available
899 # to receive the request this round.
900 # This method returns True if there's a real result, False if
901 # there are no more servers available, otherwise None.
902 if isinstance(result, Exception):
904 result, tried_server_count = result
905 if (result is not None) and (result is not False):
907 elif tried_server_count < 1:
908 _logger.info("No more Keep services to try; giving up")
913 def get_from_cache(self, loc):
914 """Fetch a block only if is in the cache, otherwise return None."""
915 slot = self.block_cache.get(loc)
916 if slot is not None and slot.ready.is_set():
922 def head(self, loc_s, num_retries=None):
923 return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
926 def get(self, loc_s, num_retries=None):
927 return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
929 def _get_or_head(self, loc_s, method="GET", num_retries=None):
930 """Get data from Keep.
932 This method fetches one or more blocks of data from Keep. It
933 sends a request each Keep service registered with the API
934 server (or the proxy provided when this client was
935 instantiated), then each service named in location hints, in
936 sequence. As soon as one service provides the data, it's
940 * loc_s: A string of one or more comma-separated locators to fetch.
941 This method returns the concatenation of these blocks.
942 * num_retries: The number of times to retry GET requests to
943 *each* Keep server if it returns temporary failures, with
944 exponential backoff. Note that, in each loop, the method may try
945 to fetch data from every available Keep service, along with any
946 that are named in location hints in the locator. The default value
947 is set when the KeepClient is initialized.
950 return ''.join(self.get(x) for x in loc_s.split(','))
952 self.get_counter.add(1)
954 locator = KeepLocator(loc_s)
956 slot, first = self.block_cache.reserve_cache(locator.md5sum)
958 self.hits_counter.add(1)
962 self.misses_counter.add(1)
964 # If the locator has hints specifying a prefix (indicating a
965 # remote keepproxy) or the UUID of a local gateway service,
966 # read data from the indicated service(s) instead of the usual
967 # list of local disk services.
968 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
969 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
970 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
971 for hint in locator.hints if (
972 hint.startswith('K@') and
974 self._gateway_services.get(hint[2:])
976 # Map root URLs to their KeepService objects.
978 root: self.KeepService(root, self._user_agent_pool,
979 upload_counter=self.upload_counter,
980 download_counter=self.download_counter)
981 for root in hint_roots
984 # See #3147 for a discussion of the loop implementation. Highlights:
985 # * Refresh the list of Keep services after each failure, in case
986 # it's being updated.
987 # * Retry until we succeed, we're out of retries, or every available
988 # service has returned permanent failure.
992 loop = retry.RetryLoop(num_retries, self._check_loop_result,
994 for tries_left in loop:
996 sorted_roots = self.map_new_services(
998 force_rebuild=(tries_left < num_retries),
1000 except Exception as error:
1001 loop.save_result(error)
1004 # Query KeepService objects that haven't returned
1005 # permanent failure, in our specified shuffle order.
1006 services_to_try = [roots_map[root]
1007 for root in sorted_roots
1008 if roots_map[root].usable()]
1009 for keep_service in services_to_try:
1010 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1011 if blob is not None:
1013 loop.save_result((blob, len(services_to_try)))
1015 # Always cache the result, then return it if we succeeded.
1018 self.block_cache.cap_cache()
1020 if method == "HEAD":
1025 # Q: Including 403 is necessary for the Keep tests to continue
1026 # passing, but maybe they should expect KeepReadError instead?
1027 not_founds = sum(1 for key in sorted_roots
1028 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1029 service_errors = ((key, roots_map[key].last_result()['error'])
1030 for key in sorted_roots)
1032 raise arvados.errors.KeepReadError(
1033 "failed to read {}: no Keep services available ({})".format(
1034 loc_s, loop.last_result()))
1035 elif not_founds == len(sorted_roots):
1036 raise arvados.errors.NotFoundError(
1037 "{} not found".format(loc_s), service_errors)
1039 raise arvados.errors.KeepReadError(
1040 "failed to read {}".format(loc_s), service_errors, label="service")
1043 def put(self, data, copies=2, num_retries=None):
1044 """Save data in Keep.
1046 This method will get a list of Keep services from the API server, and
1047 send the data to each one simultaneously in a new thread. Once the
1048 uploads are finished, if enough copies are saved, this method returns
1049 the most recent HTTP response body. If requests fail to upload
1050 enough copies, this method raises KeepWriteError.
1053 * data: The string of data to upload.
1054 * copies: The number of copies that the user requires be saved.
1056 * num_retries: The number of times to retry PUT requests to
1057 *each* Keep server if it returns temporary failures, with
1058 exponential backoff. The default value is set when the
1059 KeepClient is initialized.
1062 if isinstance(data, unicode):
1063 data = data.encode("ascii")
1064 elif not isinstance(data, str):
1065 raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
1067 self.put_counter.add(1)
1069 data_hash = hashlib.md5(data).hexdigest()
1070 loc_s = data_hash + '+' + str(len(data))
1073 locator = KeepLocator(loc_s)
1076 # Tell the proxy how many copies we want it to store
1077 headers['X-Keep-Desired-Replicas'] = str(copies)
1079 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1082 for tries_left in loop:
1084 sorted_roots = self.map_new_services(
1086 force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
1087 except Exception as error:
1088 loop.save_result(error)
1091 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1092 data_hash=data_hash,
1093 copies=copies - done,
1094 max_service_replicas=self.max_replicas_per_service,
1095 timeout=self.current_timeout(num_retries - tries_left))
1096 for service_root, ks in [(root, roots_map[root])
1097 for root in sorted_roots]:
1100 writer_pool.add_task(ks, service_root)
1102 done += writer_pool.done()
1103 loop.save_result((done >= copies, writer_pool.total_task_nr))
1106 return writer_pool.response()
1108 raise arvados.errors.KeepWriteError(
1109 "failed to write {}: no Keep services available ({})".format(
1110 data_hash, loop.last_result()))
1112 service_errors = ((key, roots_map[key].last_result()['error'])
1113 for key in sorted_roots
1114 if roots_map[key].last_result()['error'])
1115 raise arvados.errors.KeepWriteError(
1116 "failed to write {} (wanted {} copies but wrote {})".format(
1117 data_hash, copies, writer_pool.done()), service_errors, label="service")
1119 def local_store_put(self, data, copies=1, num_retries=None):
1120 """A stub for put().
1122 This method is used in place of the real put() method when
1123 using local storage (see constructor's local_store argument).
1125 copies and num_retries arguments are ignored: they are here
1126 only for the sake of offering the same call signature as
1129 Data stored this way can be retrieved via local_store_get().
1131 md5 = hashlib.md5(data).hexdigest()
1132 locator = '%s+%d' % (md5, len(data))
1133 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1135 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1136 os.path.join(self.local_store, md5))
1139 def local_store_get(self, loc_s, num_retries=None):
1140 """Companion to local_store_put()."""
1142 locator = KeepLocator(loc_s)
1144 raise arvados.errors.NotFoundError(
1145 "Invalid data locator: '%s'" % loc_s)
1146 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1148 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1151 def is_cached(self, locator):
1152 return self.block_cache.reserve_cache(expect_hash)