18 import arvados.config as config
20 import arvados.retry as retry
23 _logger = logging.getLogger('arvados.keep')
24 global_client_object = None
27 # Monkey patch TCP constants when not available (apple). Values sourced from:
28 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
29 if sys.platform == 'darwin':
30 if not hasattr(socket, 'TCP_KEEPALIVE'):
31 socket.TCP_KEEPALIVE = 0x010
32 if not hasattr(socket, 'TCP_KEEPINTVL'):
33 socket.TCP_KEEPINTVL = 0x101
34 if not hasattr(socket, 'TCP_KEEPCNT'):
35 socket.TCP_KEEPCNT = 0x102
38 class KeepLocator(object):
39 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
40 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
42 def __init__(self, locator_str):
45 self._perm_expiry = None
46 pieces = iter(locator_str.split('+'))
47 self.md5sum = next(pieces)
49 self.size = int(next(pieces))
53 if self.HINT_RE.match(hint) is None:
54 raise ValueError("invalid hint format: {}".format(hint))
55 elif hint.startswith('A'):
56 self.parse_permission_hint(hint)
58 self.hints.append(hint)
62 str(s) for s in [self.md5sum, self.size,
63 self.permission_hint()] + self.hints
67 if self.size is not None:
68 return "%s+%i" % (self.md5sum, self.size)
72 def _make_hex_prop(name, length):
73 # Build and return a new property with the given name that
74 # must be a hex string of the given length.
75 data_name = '_{}'.format(name)
77 return getattr(self, data_name)
78 def setter(self, hex_str):
79 if not arvados.util.is_hex(hex_str, length):
80 raise ValueError("{} is not a {}-digit hex string: {}".
81 format(name, length, hex_str))
82 setattr(self, data_name, hex_str)
83 return property(getter, setter)
85 md5sum = _make_hex_prop('md5sum', 32)
86 perm_sig = _make_hex_prop('perm_sig', 40)
89 def perm_expiry(self):
90 return self._perm_expiry
93 def perm_expiry(self, value):
94 if not arvados.util.is_hex(value, 1, 8):
96 "permission timestamp must be a hex Unix timestamp: {}".
98 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
100 def permission_hint(self):
101 data = [self.perm_sig, self.perm_expiry]
104 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
105 return "A{}@{:08x}".format(*data)
107 def parse_permission_hint(self, s):
109 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
111 raise ValueError("bad permission hint {}".format(s))
113 def permission_expired(self, as_of_dt=None):
114 if self.perm_expiry is None:
116 elif as_of_dt is None:
117 as_of_dt = datetime.datetime.now()
118 return self.perm_expiry <= as_of_dt
122 """Simple interface to a global KeepClient object.
124 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
125 own API client. The global KeepClient will build an API client from the
126 current Arvados configuration, which may not match the one you built.
131 def global_client_object(cls):
132 global global_client_object
133 # Previously, KeepClient would change its behavior at runtime based
134 # on these configuration settings. We simulate that behavior here
135 # by checking the values and returning a new KeepClient if any of
137 key = (config.get('ARVADOS_API_HOST'),
138 config.get('ARVADOS_API_TOKEN'),
139 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
140 config.get('ARVADOS_KEEP_PROXY'),
141 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
142 os.environ.get('KEEP_LOCAL_STORE'))
143 if (global_client_object is None) or (cls._last_key != key):
144 global_client_object = KeepClient()
146 return global_client_object
149 def get(locator, **kwargs):
150 return Keep.global_client_object().get(locator, **kwargs)
153 def put(data, **kwargs):
154 return Keep.global_client_object().put(data, **kwargs)
156 class KeepBlockCache(object):
157 # Default RAM cache is 256MiB
158 def __init__(self, cache_max=(256 * 1024 * 1024)):
159 self.cache_max = cache_max
161 self._cache_lock = threading.Lock()
163 class CacheSlot(object):
164 __slots__ = ("locator", "ready", "content")
166 def __init__(self, locator):
167 self.locator = locator
168 self.ready = threading.Event()
175 def set(self, value):
180 if self.content is None:
183 return len(self.content)
186 '''Cap the cache size to self.cache_max'''
187 with self._cache_lock:
188 # Select all slots except those where ready.is_set() and content is
189 # None (that means there was an error reading the block).
190 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
191 sm = sum([slot.size() for slot in self._cache])
192 while len(self._cache) > 0 and sm > self.cache_max:
193 for i in xrange(len(self._cache)-1, -1, -1):
194 if self._cache[i].ready.is_set():
197 sm = sum([slot.size() for slot in self._cache])
199 def _get(self, locator):
200 # Test if the locator is already in the cache
201 for i in xrange(0, len(self._cache)):
202 if self._cache[i].locator == locator:
205 # move it to the front
207 self._cache.insert(0, n)
211 def get(self, locator):
212 with self._cache_lock:
213 return self._get(locator)
215 def reserve_cache(self, locator):
216 '''Reserve a cache slot for the specified locator,
217 or return the existing slot.'''
218 with self._cache_lock:
219 n = self._get(locator)
223 # Add a new cache slot for the locator
224 n = KeepBlockCache.CacheSlot(locator)
225 self._cache.insert(0, n)
228 class Counter(object):
229 def __init__(self, v=0):
230 self._lk = threading.Lock()
242 class KeepClient(object):
244 # Default Keep server connection timeout: 2 seconds
245 # Default Keep server read timeout: 256 seconds
246 # Default Keep server bandwidth minimum: 32768 bytes per second
247 # Default Keep proxy connection timeout: 20 seconds
248 # Default Keep proxy read timeout: 256 seconds
249 # Default Keep proxy bandwidth minimum: 32768 bytes per second
250 DEFAULT_TIMEOUT = (2, 256, 32768)
251 DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
254 class KeepService(object):
255 """Make requests to a single Keep service, and track results.
257 A KeepService is intended to last long enough to perform one
258 transaction (GET or PUT) against one Keep service. This can
259 involve calling either get() or put() multiple times in order
260 to retry after transient failures. However, calling both get()
261 and put() on a single instance -- or using the same instance
262 to access two different Keep services -- will not produce
269 arvados.errors.HttpError,
272 def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
274 download_counter=None, **headers):
276 self._user_agent_pool = user_agent_pool
277 self._result = {'error': None}
280 self.get_headers = {'Accept': 'application/octet-stream'}
281 self.get_headers.update(headers)
282 self.put_headers = headers
283 self.upload_counter = upload_counter
284 self.download_counter = download_counter
287 """Is it worth attempting a request?"""
291 """Did the request succeed or encounter permanent failure?"""
292 return self._result['error'] == False or not self._usable
294 def last_result(self):
297 def _get_user_agent(self):
299 return self._user_agent_pool.get(block=False)
303 def _put_user_agent(self, ua):
306 self._user_agent_pool.put(ua, block=False)
311 def _socket_open(family, socktype, protocol, address=None):
312 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
313 s = socket.socket(family, socktype, protocol)
314 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
315 # Will throw invalid protocol error on mac. This test prevents that.
316 if hasattr(socket, 'TCP_KEEPIDLE'):
317 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
318 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
321 def get(self, locator, method="GET", timeout=None):
322 # locator is a KeepLocator object.
323 url = self.root + str(locator)
324 _logger.debug("Request: %s %s", method, url)
325 curl = self._get_user_agent()
328 with timer.Timer() as t:
330 response_body = cStringIO.StringIO()
331 curl.setopt(pycurl.NOSIGNAL, 1)
332 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
333 curl.setopt(pycurl.URL, url.encode('utf-8'))
334 curl.setopt(pycurl.HTTPHEADER, [
335 '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
336 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
337 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
339 curl.setopt(pycurl.NOBODY, True)
340 self._setcurltimeouts(curl, timeout)
344 except Exception as e:
345 raise arvados.errors.HttpError(0, str(e))
347 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
348 'body': response_body.getvalue(),
349 'headers': self._headers,
353 ok = retry.check_http_response_success(self._result['status_code'])
355 self._result['error'] = arvados.errors.HttpError(
356 self._result['status_code'],
357 self._headers.get('x-status-line', 'Error'))
358 except self.HTTP_ERRORS as e:
362 self._usable = ok != False
363 if self._result.get('status_code', None):
364 # The client worked well enough to get an HTTP status
365 # code, so presumably any problems are just on the
366 # server side and it's OK to reuse the client.
367 self._put_user_agent(curl)
369 # Don't return this client to the pool, in case it's
373 _logger.debug("Request fail: GET %s => %s: %s",
374 url, type(self._result['error']), str(self._result['error']))
377 _logger.info("HEAD %s: %s bytes",
378 self._result['status_code'],
379 self._result.get('content-length'))
382 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
383 self._result['status_code'],
384 len(self._result['body']),
386 (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
388 if self.download_counter:
389 self.download_counter.add(len(self._result['body']))
390 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
391 if resp_md5 != locator.md5sum:
392 _logger.warning("Checksum fail: md5(%s) = %s",
394 self._result['error'] = arvados.errors.HttpError(
397 return self._result['body']
399 def put(self, hash_s, body, timeout=None):
400 url = self.root + hash_s
401 _logger.debug("Request: PUT %s", url)
402 curl = self._get_user_agent()
405 with timer.Timer() as t:
407 body_reader = cStringIO.StringIO(body)
408 response_body = cStringIO.StringIO()
409 curl.setopt(pycurl.NOSIGNAL, 1)
410 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
411 curl.setopt(pycurl.URL, url.encode('utf-8'))
412 # Using UPLOAD tells cURL to wait for a "go ahead" from the
413 # Keep server (in the form of a HTTP/1.1 "100 Continue"
414 # response) instead of sending the request body immediately.
415 # This allows the server to reject the request if the request
416 # is invalid or the server is read-only, without waiting for
417 # the client to send the entire block.
418 curl.setopt(pycurl.UPLOAD, True)
419 curl.setopt(pycurl.INFILESIZE, len(body))
420 curl.setopt(pycurl.READFUNCTION, body_reader.read)
421 curl.setopt(pycurl.HTTPHEADER, [
422 '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
423 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
424 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
425 self._setcurltimeouts(curl, timeout)
428 except Exception as e:
429 raise arvados.errors.HttpError(0, str(e))
431 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
432 'body': response_body.getvalue(),
433 'headers': self._headers,
436 ok = retry.check_http_response_success(self._result['status_code'])
438 self._result['error'] = arvados.errors.HttpError(
439 self._result['status_code'],
440 self._headers.get('x-status-line', 'Error'))
441 except self.HTTP_ERRORS as e:
445 self._usable = ok != False # still usable if ok is True or None
446 if self._result.get('status_code', None):
447 # Client is functional. See comment in get().
448 self._put_user_agent(curl)
452 _logger.debug("Request fail: PUT %s => %s: %s",
453 url, type(self._result['error']), str(self._result['error']))
455 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
456 self._result['status_code'],
459 (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
460 if self.upload_counter:
461 self.upload_counter.add(len(body))
464 def _setcurltimeouts(self, curl, timeouts):
467 elif isinstance(timeouts, tuple):
468 if len(timeouts) == 2:
469 conn_t, xfer_t = timeouts
470 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
472 conn_t, xfer_t, bandwidth_bps = timeouts
474 conn_t, xfer_t = (timeouts, timeouts)
475 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
476 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
477 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
478 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
480 def _headerfunction(self, header_line):
481 header_line = header_line.decode('iso-8859-1')
482 if ':' in header_line:
483 name, value = header_line.split(':', 1)
484 name = name.strip().lower()
485 value = value.strip()
487 name = self._lastheadername
488 value = self._headers[name] + ' ' + header_line.strip()
489 elif header_line.startswith('HTTP/'):
490 name = 'x-status-line'
493 _logger.error("Unexpected header line: %s", header_line)
495 self._lastheadername = name
496 self._headers[name] = value
497 # Returning None implies all bytes were written
500 class KeepWriterQueue(Queue.Queue):
501 def __init__(self, copies):
502 Queue.Queue.__init__(self) # Old-style superclass
503 self.wanted_copies = copies
504 self.successful_copies = 0
506 self.successful_copies_lock = threading.Lock()
507 self.pending_tries = copies
508 self.pending_tries_notification = threading.Condition()
510 def write_success(self, response, replicas_nr):
511 with self.successful_copies_lock:
512 self.successful_copies += replicas_nr
513 self.response = response
514 with self.pending_tries_notification:
515 self.pending_tries_notification.notify_all()
517 def write_fail(self, ks):
518 with self.pending_tries_notification:
519 self.pending_tries += 1
520 self.pending_tries_notification.notify()
522 def pending_copies(self):
523 with self.successful_copies_lock:
524 return self.wanted_copies - self.successful_copies
526 def get_next_task(self):
527 with self.pending_tries_notification:
529 if self.pending_copies() < 1:
530 # This notify_all() is unnecessary --
531 # write_success() already called notify_all()
532 # when pending<1 became true, so it's not
533 # possible for any other thread to be in
534 # wait() now -- but it's cheap insurance
535 # against deadlock so we do it anyway:
536 self.pending_tries_notification.notify_all()
537 # Drain the queue and then raise Queue.Empty
541 elif self.pending_tries > 0:
542 service, service_root = self.get_nowait()
543 if service.finished():
546 self.pending_tries -= 1
547 return service, service_root
549 self.pending_tries_notification.notify_all()
552 self.pending_tries_notification.wait()
555 class KeepWriterThreadPool(object):
556 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
557 self.total_task_nr = 0
558 self.wanted_copies = copies
559 if (not max_service_replicas) or (max_service_replicas >= copies):
562 num_threads = int(math.ceil(float(copies) / max_service_replicas))
563 _logger.debug("Pool max threads is %d", num_threads)
565 self.queue = KeepClient.KeepWriterQueue(copies)
567 for _ in range(num_threads):
568 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
569 self.workers.append(w)
571 def add_task(self, ks, service_root):
572 self.queue.put((ks, service_root))
573 self.total_task_nr += 1
576 return self.queue.successful_copies
580 for worker in self.workers:
582 # Wait for finished work
586 return self.queue.response
589 class KeepWriterThread(threading.Thread):
590 TaskFailed = RuntimeError()
592 def __init__(self, queue, data, data_hash, timeout=None):
593 super(KeepClient.KeepWriterThread, self).__init__()
594 self.timeout = timeout
597 self.data_hash = data_hash
603 service, service_root = self.queue.get_next_task()
607 locator, copies = self.do_task(service, service_root)
608 except Exception as e:
609 if e is not self.TaskFailed:
610 _logger.exception("Exception in KeepWriterThread")
611 self.queue.write_fail(service)
613 self.queue.write_success(locator, copies)
615 self.queue.task_done()
617 def do_task(self, service, service_root):
618 success = bool(service.put(self.data_hash,
620 timeout=self.timeout))
621 result = service.last_result()
624 if result.get('status_code', None):
625 _logger.debug("Request fail: PUT %s => %s %s",
627 result['status_code'],
629 raise self.TaskFailed
631 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
632 str(threading.current_thread()),
637 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
638 except (KeyError, ValueError):
641 return result['body'].strip(), replicas_stored
644 def __init__(self, api_client=None, proxy=None,
645 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
646 api_token=None, local_store=None, block_cache=None,
647 num_retries=0, session=None):
648 """Initialize a new KeepClient.
652 The API client to use to find Keep services. If not
653 provided, KeepClient will build one from available Arvados
657 If specified, this KeepClient will send requests to this Keep
658 proxy. Otherwise, KeepClient will fall back to the setting of the
659 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
660 If you want to KeepClient does not use a proxy, pass in an empty
664 The initial timeout (in seconds) for HTTP requests to Keep
665 non-proxy servers. A tuple of three floats is interpreted as
666 (connection_timeout, read_timeout, minimum_bandwidth). A connection
667 will be aborted if the average traffic rate falls below
668 minimum_bandwidth bytes per second over an interval of read_timeout
669 seconds. Because timeouts are often a result of transient server
670 load, the actual connection timeout will be increased by a factor
671 of two on each retry.
672 Default: (2, 256, 32768).
675 The initial timeout (in seconds) for HTTP requests to
676 Keep proxies. A tuple of three floats is interpreted as
677 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
678 described above for adjusting connection timeouts on retry also
680 Default: (20, 256, 32768).
683 If you're not using an API client, but only talking
684 directly to a Keep proxy, this parameter specifies an API token
685 to authenticate Keep requests. It is an error to specify both
686 api_client and api_token. If you specify neither, KeepClient
687 will use one available from the Arvados configuration.
690 If specified, this KeepClient will bypass Keep
691 services, and save data to the named directory. If unspecified,
692 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
693 environment variable. If you want to ensure KeepClient does not
694 use local storage, pass in an empty string. This is primarily
695 intended to mock a server for testing.
698 The default number of times to retry failed requests.
699 This will be used as the default num_retries value when get() and
700 put() are called. Default 0.
702 self.lock = threading.Lock()
704 if config.get('ARVADOS_KEEP_SERVICES'):
705 proxy = config.get('ARVADOS_KEEP_SERVICES')
707 proxy = config.get('ARVADOS_KEEP_PROXY')
708 if api_token is None:
709 if api_client is None:
710 api_token = config.get('ARVADOS_API_TOKEN')
712 api_token = api_client.api_token
713 elif api_client is not None:
715 "can't build KeepClient with both API client and token")
716 if local_store is None:
717 local_store = os.environ.get('KEEP_LOCAL_STORE')
719 self.block_cache = block_cache if block_cache else KeepBlockCache()
720 self.timeout = timeout
721 self.proxy_timeout = proxy_timeout
722 self._user_agent_pool = Queue.LifoQueue()
723 self.upload_counter = Counter()
724 self.download_counter = Counter()
725 self.put_counter = Counter()
726 self.get_counter = Counter()
727 self.hits_counter = Counter()
728 self.misses_counter = Counter()
731 self.local_store = local_store
732 self.get = self.local_store_get
733 self.put = self.local_store_put
735 self.num_retries = num_retries
736 self.max_replicas_per_service = None
738 proxy_uris = proxy.split()
739 for i in range(len(proxy_uris)):
740 if not proxy_uris[i].endswith('/'):
743 url = urlparse.urlparse(proxy_uris[i])
744 if not (url.scheme and url.netloc):
745 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
746 self.api_token = api_token
747 self._gateway_services = {}
748 self._keep_services = [{
749 'uuid': "00000-bi6l4-%015d" % idx,
750 'service_type': 'proxy',
751 '_service_root': uri,
752 } for idx, uri in enumerate(proxy_uris)]
753 self._writable_services = self._keep_services
754 self.using_proxy = True
755 self._static_services_list = True
757 # It's important to avoid instantiating an API client
758 # unless we actually need one, for testing's sake.
759 if api_client is None:
760 api_client = arvados.api('v1')
761 self.api_client = api_client
762 self.api_token = api_client.api_token
763 self._gateway_services = {}
764 self._keep_services = None
765 self._writable_services = None
766 self.using_proxy = None
767 self._static_services_list = False
769 def current_timeout(self, attempt_number):
770 """Return the appropriate timeout to use for this client.
772 The proxy timeout setting if the backend service is currently a proxy,
773 the regular timeout setting otherwise. The `attempt_number` indicates
774 how many times the operation has been tried already (starting from 0
775 for the first try), and scales the connection timeout portion of the
776 return value accordingly.
779 # TODO(twp): the timeout should be a property of a
780 # KeepService, not a KeepClient. See #4488.
781 t = self.proxy_timeout if self.using_proxy else self.timeout
783 return (t[0] * (1 << attempt_number), t[1])
785 return (t[0] * (1 << attempt_number), t[1], t[2])
786 def _any_nondisk_services(self, service_list):
787 return any(ks.get('service_type', 'disk') != 'disk'
788 for ks in service_list)
790 def build_services_list(self, force_rebuild=False):
791 if (self._static_services_list or
792 (self._keep_services and not force_rebuild)):
796 keep_services = self.api_client.keep_services().accessible()
797 except Exception: # API server predates Keep services.
798 keep_services = self.api_client.keep_disks().list()
800 # Gateway services are only used when specified by UUID,
801 # so there's nothing to gain by filtering them by
803 self._gateway_services = {ks['uuid']: ks for ks in
804 keep_services.execute()['items']}
805 if not self._gateway_services:
806 raise arvados.errors.NoKeepServersError()
808 # Precompute the base URI for each service.
809 for r in self._gateway_services.itervalues():
810 host = r['service_host']
811 if not host.startswith('[') and host.find(':') >= 0:
812 # IPv6 URIs must be formatted like http://[::1]:80/...
813 host = '[' + host + ']'
814 r['_service_root'] = "{}://{}:{:d}/".format(
815 'https' if r['service_ssl_flag'] else 'http',
819 _logger.debug(str(self._gateway_services))
820 self._keep_services = [
821 ks for ks in self._gateway_services.itervalues()
822 if not ks.get('service_type', '').startswith('gateway:')]
823 self._writable_services = [ks for ks in self._keep_services
824 if not ks.get('read_only')]
826 # For disk type services, max_replicas_per_service is 1
827 # It is unknown (unlimited) for other service types.
828 if self._any_nondisk_services(self._writable_services):
829 self.max_replicas_per_service = None
831 self.max_replicas_per_service = 1
833 def _service_weight(self, data_hash, service_uuid):
834 """Compute the weight of a Keep service endpoint for a data
835 block with a known hash.
837 The weight is md5(h + u) where u is the last 15 characters of
838 the service endpoint's UUID.
840 return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
842 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
843 """Return an array of Keep service endpoints, in the order in
844 which they should be probed when reading or writing data with
845 the given hash+hints.
847 self.build_services_list(force_rebuild)
850 # Use the services indicated by the given +K@... remote
851 # service hints, if any are present and can be resolved to a
853 for hint in locator.hints:
854 if hint.startswith('K@'):
857 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
858 elif len(hint) == 29:
859 svc = self._gateway_services.get(hint[2:])
861 sorted_roots.append(svc['_service_root'])
863 # Sort the available local services by weight (heaviest first)
864 # for this locator, and return their service_roots (base URIs)
866 use_services = self._keep_services
868 use_services = self._writable_services
869 self.using_proxy = self._any_nondisk_services(use_services)
870 sorted_roots.extend([
871 svc['_service_root'] for svc in sorted(
874 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
875 _logger.debug("{}: {}".format(locator, sorted_roots))
878 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
879 # roots_map is a dictionary, mapping Keep service root strings
880 # to KeepService objects. Poll for Keep services, and add any
881 # new ones to roots_map. Return the current list of local
883 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
884 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
885 for root in local_roots:
886 if root not in roots_map:
887 roots_map[root] = self.KeepService(
888 root, self._user_agent_pool,
889 upload_counter=self.upload_counter,
890 download_counter=self.download_counter,
895 def _check_loop_result(result):
896 # KeepClient RetryLoops should save results as a 2-tuple: the
897 # actual result of the request, and the number of servers available
898 # to receive the request this round.
899 # This method returns True if there's a real result, False if
900 # there are no more servers available, otherwise None.
901 if isinstance(result, Exception):
903 result, tried_server_count = result
904 if (result is not None) and (result is not False):
906 elif tried_server_count < 1:
907 _logger.info("No more Keep services to try; giving up")
912 def get_from_cache(self, loc):
913 """Fetch a block only if is in the cache, otherwise return None."""
914 slot = self.block_cache.get(loc)
915 if slot is not None and slot.ready.is_set():
921 def head(self, loc_s, num_retries=None):
922 return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
925 def get(self, loc_s, num_retries=None):
926 return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
928 def _get_or_head(self, loc_s, method="GET", num_retries=None):
929 """Get data from Keep.
931 This method fetches one or more blocks of data from Keep. It
932 sends a request each Keep service registered with the API
933 server (or the proxy provided when this client was
934 instantiated), then each service named in location hints, in
935 sequence. As soon as one service provides the data, it's
939 * loc_s: A string of one or more comma-separated locators to fetch.
940 This method returns the concatenation of these blocks.
941 * num_retries: The number of times to retry GET requests to
942 *each* Keep server if it returns temporary failures, with
943 exponential backoff. Note that, in each loop, the method may try
944 to fetch data from every available Keep service, along with any
945 that are named in location hints in the locator. The default value
946 is set when the KeepClient is initialized.
949 return ''.join(self.get(x) for x in loc_s.split(','))
951 self.get_counter.add(1)
953 locator = KeepLocator(loc_s)
955 slot, first = self.block_cache.reserve_cache(locator.md5sum)
957 self.hits_counter.add(1)
961 self.misses_counter.add(1)
963 # If the locator has hints specifying a prefix (indicating a
964 # remote keepproxy) or the UUID of a local gateway service,
965 # read data from the indicated service(s) instead of the usual
966 # list of local disk services.
967 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
968 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
969 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
970 for hint in locator.hints if (
971 hint.startswith('K@') and
973 self._gateway_services.get(hint[2:])
975 # Map root URLs to their KeepService objects.
977 root: self.KeepService(root, self._user_agent_pool,
978 upload_counter=self.upload_counter,
979 download_counter=self.download_counter)
980 for root in hint_roots
983 # See #3147 for a discussion of the loop implementation. Highlights:
984 # * Refresh the list of Keep services after each failure, in case
985 # it's being updated.
986 # * Retry until we succeed, we're out of retries, or every available
987 # service has returned permanent failure.
991 loop = retry.RetryLoop(num_retries, self._check_loop_result,
993 for tries_left in loop:
995 sorted_roots = self.map_new_services(
997 force_rebuild=(tries_left < num_retries),
999 except Exception as error:
1000 loop.save_result(error)
1003 # Query KeepService objects that haven't returned
1004 # permanent failure, in our specified shuffle order.
1005 services_to_try = [roots_map[root]
1006 for root in sorted_roots
1007 if roots_map[root].usable()]
1008 for keep_service in services_to_try:
1009 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1010 if blob is not None:
1012 loop.save_result((blob, len(services_to_try)))
1014 # Always cache the result, then return it if we succeeded.
1017 self.block_cache.cap_cache()
1019 if method == "HEAD":
1024 # Q: Including 403 is necessary for the Keep tests to continue
1025 # passing, but maybe they should expect KeepReadError instead?
1026 not_founds = sum(1 for key in sorted_roots
1027 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1028 service_errors = ((key, roots_map[key].last_result()['error'])
1029 for key in sorted_roots)
1031 raise arvados.errors.KeepReadError(
1032 "failed to read {}: no Keep services available ({})".format(
1033 loc_s, loop.last_result()))
1034 elif not_founds == len(sorted_roots):
1035 raise arvados.errors.NotFoundError(
1036 "{} not found".format(loc_s), service_errors)
1038 raise arvados.errors.KeepReadError(
1039 "failed to read {}".format(loc_s), service_errors, label="service")
1042 def put(self, data, copies=2, num_retries=None):
1043 """Save data in Keep.
1045 This method will get a list of Keep services from the API server, and
1046 send the data to each one simultaneously in a new thread. Once the
1047 uploads are finished, if enough copies are saved, this method returns
1048 the most recent HTTP response body. If requests fail to upload
1049 enough copies, this method raises KeepWriteError.
1052 * data: The string of data to upload.
1053 * copies: The number of copies that the user requires be saved.
1055 * num_retries: The number of times to retry PUT requests to
1056 *each* Keep server if it returns temporary failures, with
1057 exponential backoff. The default value is set when the
1058 KeepClient is initialized.
1061 if isinstance(data, unicode):
1062 data = data.encode("ascii")
1063 elif not isinstance(data, str):
1064 raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
1066 self.put_counter.add(1)
1068 data_hash = hashlib.md5(data).hexdigest()
1069 loc_s = data_hash + '+' + str(len(data))
1072 locator = KeepLocator(loc_s)
1075 # Tell the proxy how many copies we want it to store
1076 headers['X-Keep-Desired-Replicas'] = str(copies)
1078 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1081 for tries_left in loop:
1083 sorted_roots = self.map_new_services(
1085 force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
1086 except Exception as error:
1087 loop.save_result(error)
1090 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1091 data_hash=data_hash,
1092 copies=copies - done,
1093 max_service_replicas=self.max_replicas_per_service,
1094 timeout=self.current_timeout(num_retries - tries_left))
1095 for service_root, ks in [(root, roots_map[root])
1096 for root in sorted_roots]:
1099 writer_pool.add_task(ks, service_root)
1101 done += writer_pool.done()
1102 loop.save_result((done >= copies, writer_pool.total_task_nr))
1105 return writer_pool.response()
1107 raise arvados.errors.KeepWriteError(
1108 "failed to write {}: no Keep services available ({})".format(
1109 data_hash, loop.last_result()))
1111 service_errors = ((key, roots_map[key].last_result()['error'])
1112 for key in sorted_roots
1113 if roots_map[key].last_result()['error'])
1114 raise arvados.errors.KeepWriteError(
1115 "failed to write {} (wanted {} copies but wrote {})".format(
1116 data_hash, copies, writer_pool.done()), service_errors, label="service")
1118 def local_store_put(self, data, copies=1, num_retries=None):
1119 """A stub for put().
1121 This method is used in place of the real put() method when
1122 using local storage (see constructor's local_store argument).
1124 copies and num_retries arguments are ignored: they are here
1125 only for the sake of offering the same call signature as
1128 Data stored this way can be retrieved via local_store_get().
1130 md5 = hashlib.md5(data).hexdigest()
1131 locator = '%s+%d' % (md5, len(data))
1132 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1134 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1135 os.path.join(self.local_store, md5))
1138 def local_store_get(self, loc_s, num_retries=None):
1139 """Companion to local_store_put()."""
1141 locator = KeepLocator(loc_s)
1143 raise arvados.errors.NotFoundError(
1144 "Invalid data locator: '%s'" % loc_s)
1145 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1147 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1150 def is_cached(self, locator):
1151 return self.block_cache.reserve_cache(expect_hash)