19 import arvados.config as config
21 import arvados.retry as retry
24 _logger = logging.getLogger('arvados.keep')
25 global_client_object = None
28 # Monkey patch TCP constants when not available (apple). Values sourced from:
29 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
30 if sys.platform == 'darwin':
31 if not hasattr(socket, 'TCP_KEEPALIVE'):
32 socket.TCP_KEEPALIVE = 0x010
33 if not hasattr(socket, 'TCP_KEEPINTVL'):
34 socket.TCP_KEEPINTVL = 0x101
35 if not hasattr(socket, 'TCP_KEEPCNT'):
36 socket.TCP_KEEPCNT = 0x102
39 class KeepLocator(object):
40 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
41 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
43 def __init__(self, locator_str):
46 self._perm_expiry = None
47 pieces = iter(locator_str.split('+'))
48 self.md5sum = next(pieces)
50 self.size = int(next(pieces))
54 if self.HINT_RE.match(hint) is None:
55 raise ValueError("invalid hint format: {}".format(hint))
56 elif hint.startswith('A'):
57 self.parse_permission_hint(hint)
59 self.hints.append(hint)
63 str(s) for s in [self.md5sum, self.size,
64 self.permission_hint()] + self.hints
68 if self.size is not None:
69 return "%s+%i" % (self.md5sum, self.size)
73 def _make_hex_prop(name, length):
74 # Build and return a new property with the given name that
75 # must be a hex string of the given length.
76 data_name = '_{}'.format(name)
78 return getattr(self, data_name)
79 def setter(self, hex_str):
80 if not arvados.util.is_hex(hex_str, length):
81 raise ValueError("{} is not a {}-digit hex string: {}".
82 format(name, length, hex_str))
83 setattr(self, data_name, hex_str)
84 return property(getter, setter)
86 md5sum = _make_hex_prop('md5sum', 32)
87 perm_sig = _make_hex_prop('perm_sig', 40)
90 def perm_expiry(self):
91 return self._perm_expiry
94 def perm_expiry(self, value):
95 if not arvados.util.is_hex(value, 1, 8):
97 "permission timestamp must be a hex Unix timestamp: {}".
99 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
101 def permission_hint(self):
102 data = [self.perm_sig, self.perm_expiry]
105 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
106 return "A{}@{:08x}".format(*data)
108 def parse_permission_hint(self, s):
110 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
112 raise ValueError("bad permission hint {}".format(s))
114 def permission_expired(self, as_of_dt=None):
115 if self.perm_expiry is None:
117 elif as_of_dt is None:
118 as_of_dt = datetime.datetime.now()
119 return self.perm_expiry <= as_of_dt
123 """Simple interface to a global KeepClient object.
125 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
126 own API client. The global KeepClient will build an API client from the
127 current Arvados configuration, which may not match the one you built.
132 def global_client_object(cls):
133 global global_client_object
134 # Previously, KeepClient would change its behavior at runtime based
135 # on these configuration settings. We simulate that behavior here
136 # by checking the values and returning a new KeepClient if any of
138 key = (config.get('ARVADOS_API_HOST'),
139 config.get('ARVADOS_API_TOKEN'),
140 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
141 config.get('ARVADOS_KEEP_PROXY'),
142 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
143 os.environ.get('KEEP_LOCAL_STORE'))
144 if (global_client_object is None) or (cls._last_key != key):
145 global_client_object = KeepClient()
147 return global_client_object
150 def get(locator, **kwargs):
151 return Keep.global_client_object().get(locator, **kwargs)
154 def put(data, **kwargs):
155 return Keep.global_client_object().put(data, **kwargs)
157 class KeepBlockCache(object):
158 # Default RAM cache is 256MiB
159 def __init__(self, cache_max=(256 * 1024 * 1024)):
160 self.cache_max = cache_max
162 self._cache_lock = threading.Lock()
164 class CacheSlot(object):
165 __slots__ = ("locator", "ready", "content")
167 def __init__(self, locator):
168 self.locator = locator
169 self.ready = threading.Event()
176 def set(self, value):
181 if self.content is None:
184 return len(self.content)
187 '''Cap the cache size to self.cache_max'''
188 with self._cache_lock:
189 # Select all slots except those where ready.is_set() and content is
190 # None (that means there was an error reading the block).
191 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
192 sm = sum([slot.size() for slot in self._cache])
193 while len(self._cache) > 0 and sm > self.cache_max:
194 for i in xrange(len(self._cache)-1, -1, -1):
195 if self._cache[i].ready.is_set():
198 sm = sum([slot.size() for slot in self._cache])
200 def _get(self, locator):
201 # Test if the locator is already in the cache
202 for i in xrange(0, len(self._cache)):
203 if self._cache[i].locator == locator:
206 # move it to the front
208 self._cache.insert(0, n)
212 def get(self, locator):
213 with self._cache_lock:
214 return self._get(locator)
216 def reserve_cache(self, locator):
217 '''Reserve a cache slot for the specified locator,
218 or return the existing slot.'''
219 with self._cache_lock:
220 n = self._get(locator)
224 # Add a new cache slot for the locator
225 n = KeepBlockCache.CacheSlot(locator)
226 self._cache.insert(0, n)
229 class Counter(object):
230 def __init__(self, v=0):
231 self._lk = threading.Lock()
243 class KeepClient(object):
245 # Default Keep server connection timeout: 2 seconds
246 # Default Keep server read timeout: 256 seconds
247 # Default Keep server bandwidth minimum: 32768 bytes per second
248 # Default Keep proxy connection timeout: 20 seconds
249 # Default Keep proxy read timeout: 256 seconds
250 # Default Keep proxy bandwidth minimum: 32768 bytes per second
251 DEFAULT_TIMEOUT = (2, 256, 32768)
252 DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
255 class KeepService(object):
256 """Make requests to a single Keep service, and track results.
258 A KeepService is intended to last long enough to perform one
259 transaction (GET or PUT) against one Keep service. This can
260 involve calling either get() or put() multiple times in order
261 to retry after transient failures. However, calling both get()
262 and put() on a single instance -- or using the same instance
263 to access two different Keep services -- will not produce
270 arvados.errors.HttpError,
273 def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
275 download_counter=None, **headers):
277 self._user_agent_pool = user_agent_pool
278 self._result = {'error': None}
281 self.get_headers = {'Accept': 'application/octet-stream'}
282 self.get_headers.update(headers)
283 self.put_headers = headers
284 self.upload_counter = upload_counter
285 self.download_counter = download_counter
288 """Is it worth attempting a request?"""
292 """Did the request succeed or encounter permanent failure?"""
293 return self._result['error'] == False or not self._usable
295 def last_result(self):
298 def _get_user_agent(self):
300 return self._user_agent_pool.get(block=False)
304 def _put_user_agent(self, ua):
307 self._user_agent_pool.put(ua, block=False)
311 def _socket_open(self, *args, **kwargs):
312 if len(args) + len(kwargs) == 2:
313 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
315 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
317 def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
318 return self._socket_open_pycurl_7_21_5(
320 address=collections.namedtuple(
321 'Address', ['family', 'socktype', 'protocol', 'addr'],
322 )(family, socktype, protocol, address))
324 def _socket_open_pycurl_7_21_5(self, purpose, address):
325 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
326 s = socket.socket(address.family, address.socktype, address.protocol)
327 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
328 # Will throw invalid protocol error on mac. This test prevents that.
329 if hasattr(socket, 'TCP_KEEPIDLE'):
330 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
331 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
334 def get(self, locator, method="GET", timeout=None):
335 # locator is a KeepLocator object.
336 url = self.root + str(locator)
337 _logger.debug("Request: %s %s", method, url)
338 curl = self._get_user_agent()
341 with timer.Timer() as t:
343 response_body = cStringIO.StringIO()
344 curl.setopt(pycurl.NOSIGNAL, 1)
345 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
346 curl.setopt(pycurl.URL, url.encode('utf-8'))
347 curl.setopt(pycurl.HTTPHEADER, [
348 '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
349 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
350 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
352 curl.setopt(pycurl.NOBODY, True)
353 self._setcurltimeouts(curl, timeout)
357 except Exception as e:
358 raise arvados.errors.HttpError(0, str(e))
360 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
361 'body': response_body.getvalue(),
362 'headers': self._headers,
366 ok = retry.check_http_response_success(self._result['status_code'])
368 self._result['error'] = arvados.errors.HttpError(
369 self._result['status_code'],
370 self._headers.get('x-status-line', 'Error'))
371 except self.HTTP_ERRORS as e:
375 self._usable = ok != False
376 if self._result.get('status_code', None):
377 # The client worked well enough to get an HTTP status
378 # code, so presumably any problems are just on the
379 # server side and it's OK to reuse the client.
380 self._put_user_agent(curl)
382 # Don't return this client to the pool, in case it's
386 _logger.debug("Request fail: GET %s => %s: %s",
387 url, type(self._result['error']), str(self._result['error']))
390 _logger.info("HEAD %s: %s bytes",
391 self._result['status_code'],
392 self._result.get('content-length'))
395 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
396 self._result['status_code'],
397 len(self._result['body']),
399 (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
401 if self.download_counter:
402 self.download_counter.add(len(self._result['body']))
403 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
404 if resp_md5 != locator.md5sum:
405 _logger.warning("Checksum fail: md5(%s) = %s",
407 self._result['error'] = arvados.errors.HttpError(
410 return self._result['body']
412 def put(self, hash_s, body, timeout=None):
413 url = self.root + hash_s
414 _logger.debug("Request: PUT %s", url)
415 curl = self._get_user_agent()
418 with timer.Timer() as t:
420 body_reader = cStringIO.StringIO(body)
421 response_body = cStringIO.StringIO()
422 curl.setopt(pycurl.NOSIGNAL, 1)
423 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
424 curl.setopt(pycurl.URL, url.encode('utf-8'))
425 # Using UPLOAD tells cURL to wait for a "go ahead" from the
426 # Keep server (in the form of a HTTP/1.1 "100 Continue"
427 # response) instead of sending the request body immediately.
428 # This allows the server to reject the request if the request
429 # is invalid or the server is read-only, without waiting for
430 # the client to send the entire block.
431 curl.setopt(pycurl.UPLOAD, True)
432 curl.setopt(pycurl.INFILESIZE, len(body))
433 curl.setopt(pycurl.READFUNCTION, body_reader.read)
434 curl.setopt(pycurl.HTTPHEADER, [
435 '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
436 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
437 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
438 self._setcurltimeouts(curl, timeout)
441 except Exception as e:
442 raise arvados.errors.HttpError(0, str(e))
444 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
445 'body': response_body.getvalue(),
446 'headers': self._headers,
449 ok = retry.check_http_response_success(self._result['status_code'])
451 self._result['error'] = arvados.errors.HttpError(
452 self._result['status_code'],
453 self._headers.get('x-status-line', 'Error'))
454 except self.HTTP_ERRORS as e:
458 self._usable = ok != False # still usable if ok is True or None
459 if self._result.get('status_code', None):
460 # Client is functional. See comment in get().
461 self._put_user_agent(curl)
465 _logger.debug("Request fail: PUT %s => %s: %s",
466 url, type(self._result['error']), str(self._result['error']))
468 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
469 self._result['status_code'],
472 (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
473 if self.upload_counter:
474 self.upload_counter.add(len(body))
477 def _setcurltimeouts(self, curl, timeouts):
480 elif isinstance(timeouts, tuple):
481 if len(timeouts) == 2:
482 conn_t, xfer_t = timeouts
483 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
485 conn_t, xfer_t, bandwidth_bps = timeouts
487 conn_t, xfer_t = (timeouts, timeouts)
488 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
489 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
490 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
491 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
493 def _headerfunction(self, header_line):
494 header_line = header_line.decode('iso-8859-1')
495 if ':' in header_line:
496 name, value = header_line.split(':', 1)
497 name = name.strip().lower()
498 value = value.strip()
500 name = self._lastheadername
501 value = self._headers[name] + ' ' + header_line.strip()
502 elif header_line.startswith('HTTP/'):
503 name = 'x-status-line'
506 _logger.error("Unexpected header line: %s", header_line)
508 self._lastheadername = name
509 self._headers[name] = value
510 # Returning None implies all bytes were written
513 class KeepWriterQueue(Queue.Queue):
514 def __init__(self, copies):
515 Queue.Queue.__init__(self) # Old-style superclass
516 self.wanted_copies = copies
517 self.successful_copies = 0
519 self.successful_copies_lock = threading.Lock()
520 self.pending_tries = copies
521 self.pending_tries_notification = threading.Condition()
523 def write_success(self, response, replicas_nr):
524 with self.successful_copies_lock:
525 self.successful_copies += replicas_nr
526 self.response = response
527 with self.pending_tries_notification:
528 self.pending_tries_notification.notify_all()
530 def write_fail(self, ks):
531 with self.pending_tries_notification:
532 self.pending_tries += 1
533 self.pending_tries_notification.notify()
535 def pending_copies(self):
536 with self.successful_copies_lock:
537 return self.wanted_copies - self.successful_copies
539 def get_next_task(self):
540 with self.pending_tries_notification:
542 if self.pending_copies() < 1:
543 # This notify_all() is unnecessary --
544 # write_success() already called notify_all()
545 # when pending<1 became true, so it's not
546 # possible for any other thread to be in
547 # wait() now -- but it's cheap insurance
548 # against deadlock so we do it anyway:
549 self.pending_tries_notification.notify_all()
550 # Drain the queue and then raise Queue.Empty
554 elif self.pending_tries > 0:
555 service, service_root = self.get_nowait()
556 if service.finished():
559 self.pending_tries -= 1
560 return service, service_root
562 self.pending_tries_notification.notify_all()
565 self.pending_tries_notification.wait()
568 class KeepWriterThreadPool(object):
569 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
570 self.total_task_nr = 0
571 self.wanted_copies = copies
572 if (not max_service_replicas) or (max_service_replicas >= copies):
575 num_threads = int(math.ceil(float(copies) / max_service_replicas))
576 _logger.debug("Pool max threads is %d", num_threads)
578 self.queue = KeepClient.KeepWriterQueue(copies)
580 for _ in range(num_threads):
581 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
582 self.workers.append(w)
584 def add_task(self, ks, service_root):
585 self.queue.put((ks, service_root))
586 self.total_task_nr += 1
589 return self.queue.successful_copies
593 for worker in self.workers:
595 # Wait for finished work
599 return self.queue.response
602 class KeepWriterThread(threading.Thread):
603 TaskFailed = RuntimeError()
605 def __init__(self, queue, data, data_hash, timeout=None):
606 super(KeepClient.KeepWriterThread, self).__init__()
607 self.timeout = timeout
610 self.data_hash = data_hash
616 service, service_root = self.queue.get_next_task()
620 locator, copies = self.do_task(service, service_root)
621 except Exception as e:
622 if e is not self.TaskFailed:
623 _logger.exception("Exception in KeepWriterThread")
624 self.queue.write_fail(service)
626 self.queue.write_success(locator, copies)
628 self.queue.task_done()
630 def do_task(self, service, service_root):
631 success = bool(service.put(self.data_hash,
633 timeout=self.timeout))
634 result = service.last_result()
637 if result.get('status_code', None):
638 _logger.debug("Request fail: PUT %s => %s %s",
640 result['status_code'],
642 raise self.TaskFailed
644 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
645 str(threading.current_thread()),
650 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
651 except (KeyError, ValueError):
654 return result['body'].strip(), replicas_stored
657 def __init__(self, api_client=None, proxy=None,
658 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
659 api_token=None, local_store=None, block_cache=None,
660 num_retries=0, session=None):
661 """Initialize a new KeepClient.
665 The API client to use to find Keep services. If not
666 provided, KeepClient will build one from available Arvados
670 If specified, this KeepClient will send requests to this Keep
671 proxy. Otherwise, KeepClient will fall back to the setting of the
672 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
673 If you want to KeepClient does not use a proxy, pass in an empty
677 The initial timeout (in seconds) for HTTP requests to Keep
678 non-proxy servers. A tuple of three floats is interpreted as
679 (connection_timeout, read_timeout, minimum_bandwidth). A connection
680 will be aborted if the average traffic rate falls below
681 minimum_bandwidth bytes per second over an interval of read_timeout
682 seconds. Because timeouts are often a result of transient server
683 load, the actual connection timeout will be increased by a factor
684 of two on each retry.
685 Default: (2, 256, 32768).
688 The initial timeout (in seconds) for HTTP requests to
689 Keep proxies. A tuple of three floats is interpreted as
690 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
691 described above for adjusting connection timeouts on retry also
693 Default: (20, 256, 32768).
696 If you're not using an API client, but only talking
697 directly to a Keep proxy, this parameter specifies an API token
698 to authenticate Keep requests. It is an error to specify both
699 api_client and api_token. If you specify neither, KeepClient
700 will use one available from the Arvados configuration.
703 If specified, this KeepClient will bypass Keep
704 services, and save data to the named directory. If unspecified,
705 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
706 environment variable. If you want to ensure KeepClient does not
707 use local storage, pass in an empty string. This is primarily
708 intended to mock a server for testing.
711 The default number of times to retry failed requests.
712 This will be used as the default num_retries value when get() and
713 put() are called. Default 0.
715 self.lock = threading.Lock()
717 if config.get('ARVADOS_KEEP_SERVICES'):
718 proxy = config.get('ARVADOS_KEEP_SERVICES')
720 proxy = config.get('ARVADOS_KEEP_PROXY')
721 if api_token is None:
722 if api_client is None:
723 api_token = config.get('ARVADOS_API_TOKEN')
725 api_token = api_client.api_token
726 elif api_client is not None:
728 "can't build KeepClient with both API client and token")
729 if local_store is None:
730 local_store = os.environ.get('KEEP_LOCAL_STORE')
732 self.block_cache = block_cache if block_cache else KeepBlockCache()
733 self.timeout = timeout
734 self.proxy_timeout = proxy_timeout
735 self._user_agent_pool = Queue.LifoQueue()
736 self.upload_counter = Counter()
737 self.download_counter = Counter()
738 self.put_counter = Counter()
739 self.get_counter = Counter()
740 self.hits_counter = Counter()
741 self.misses_counter = Counter()
744 self.local_store = local_store
745 self.get = self.local_store_get
746 self.put = self.local_store_put
748 self.num_retries = num_retries
749 self.max_replicas_per_service = None
751 proxy_uris = proxy.split()
752 for i in range(len(proxy_uris)):
753 if not proxy_uris[i].endswith('/'):
756 url = urlparse.urlparse(proxy_uris[i])
757 if not (url.scheme and url.netloc):
758 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
759 self.api_token = api_token
760 self._gateway_services = {}
761 self._keep_services = [{
762 'uuid': "00000-bi6l4-%015d" % idx,
763 'service_type': 'proxy',
764 '_service_root': uri,
765 } for idx, uri in enumerate(proxy_uris)]
766 self._writable_services = self._keep_services
767 self.using_proxy = True
768 self._static_services_list = True
770 # It's important to avoid instantiating an API client
771 # unless we actually need one, for testing's sake.
772 if api_client is None:
773 api_client = arvados.api('v1')
774 self.api_client = api_client
775 self.api_token = api_client.api_token
776 self._gateway_services = {}
777 self._keep_services = None
778 self._writable_services = None
779 self.using_proxy = None
780 self._static_services_list = False
782 def current_timeout(self, attempt_number):
783 """Return the appropriate timeout to use for this client.
785 The proxy timeout setting if the backend service is currently a proxy,
786 the regular timeout setting otherwise. The `attempt_number` indicates
787 how many times the operation has been tried already (starting from 0
788 for the first try), and scales the connection timeout portion of the
789 return value accordingly.
792 # TODO(twp): the timeout should be a property of a
793 # KeepService, not a KeepClient. See #4488.
794 t = self.proxy_timeout if self.using_proxy else self.timeout
796 return (t[0] * (1 << attempt_number), t[1])
798 return (t[0] * (1 << attempt_number), t[1], t[2])
799 def _any_nondisk_services(self, service_list):
800 return any(ks.get('service_type', 'disk') != 'disk'
801 for ks in service_list)
803 def build_services_list(self, force_rebuild=False):
804 if (self._static_services_list or
805 (self._keep_services and not force_rebuild)):
809 keep_services = self.api_client.keep_services().accessible()
810 except Exception: # API server predates Keep services.
811 keep_services = self.api_client.keep_disks().list()
813 # Gateway services are only used when specified by UUID,
814 # so there's nothing to gain by filtering them by
816 self._gateway_services = {ks['uuid']: ks for ks in
817 keep_services.execute()['items']}
818 if not self._gateway_services:
819 raise arvados.errors.NoKeepServersError()
821 # Precompute the base URI for each service.
822 for r in self._gateway_services.itervalues():
823 host = r['service_host']
824 if not host.startswith('[') and host.find(':') >= 0:
825 # IPv6 URIs must be formatted like http://[::1]:80/...
826 host = '[' + host + ']'
827 r['_service_root'] = "{}://{}:{:d}/".format(
828 'https' if r['service_ssl_flag'] else 'http',
832 _logger.debug(str(self._gateway_services))
833 self._keep_services = [
834 ks for ks in self._gateway_services.itervalues()
835 if not ks.get('service_type', '').startswith('gateway:')]
836 self._writable_services = [ks for ks in self._keep_services
837 if not ks.get('read_only')]
839 # For disk type services, max_replicas_per_service is 1
840 # It is unknown (unlimited) for other service types.
841 if self._any_nondisk_services(self._writable_services):
842 self.max_replicas_per_service = None
844 self.max_replicas_per_service = 1
846 def _service_weight(self, data_hash, service_uuid):
847 """Compute the weight of a Keep service endpoint for a data
848 block with a known hash.
850 The weight is md5(h + u) where u is the last 15 characters of
851 the service endpoint's UUID.
853 return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
855 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
856 """Return an array of Keep service endpoints, in the order in
857 which they should be probed when reading or writing data with
858 the given hash+hints.
860 self.build_services_list(force_rebuild)
863 # Use the services indicated by the given +K@... remote
864 # service hints, if any are present and can be resolved to a
866 for hint in locator.hints:
867 if hint.startswith('K@'):
870 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
871 elif len(hint) == 29:
872 svc = self._gateway_services.get(hint[2:])
874 sorted_roots.append(svc['_service_root'])
876 # Sort the available local services by weight (heaviest first)
877 # for this locator, and return their service_roots (base URIs)
879 use_services = self._keep_services
881 use_services = self._writable_services
882 self.using_proxy = self._any_nondisk_services(use_services)
883 sorted_roots.extend([
884 svc['_service_root'] for svc in sorted(
887 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
888 _logger.debug("{}: {}".format(locator, sorted_roots))
891 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
892 # roots_map is a dictionary, mapping Keep service root strings
893 # to KeepService objects. Poll for Keep services, and add any
894 # new ones to roots_map. Return the current list of local
896 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
897 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
898 for root in local_roots:
899 if root not in roots_map:
900 roots_map[root] = self.KeepService(
901 root, self._user_agent_pool,
902 upload_counter=self.upload_counter,
903 download_counter=self.download_counter,
908 def _check_loop_result(result):
909 # KeepClient RetryLoops should save results as a 2-tuple: the
910 # actual result of the request, and the number of servers available
911 # to receive the request this round.
912 # This method returns True if there's a real result, False if
913 # there are no more servers available, otherwise None.
914 if isinstance(result, Exception):
916 result, tried_server_count = result
917 if (result is not None) and (result is not False):
919 elif tried_server_count < 1:
920 _logger.info("No more Keep services to try; giving up")
925 def get_from_cache(self, loc):
926 """Fetch a block only if is in the cache, otherwise return None."""
927 slot = self.block_cache.get(loc)
928 if slot is not None and slot.ready.is_set():
934 def head(self, loc_s, num_retries=None):
935 return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
938 def get(self, loc_s, num_retries=None):
939 return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
941 def _get_or_head(self, loc_s, method="GET", num_retries=None):
942 """Get data from Keep.
944 This method fetches one or more blocks of data from Keep. It
945 sends a request each Keep service registered with the API
946 server (or the proxy provided when this client was
947 instantiated), then each service named in location hints, in
948 sequence. As soon as one service provides the data, it's
952 * loc_s: A string of one or more comma-separated locators to fetch.
953 This method returns the concatenation of these blocks.
954 * num_retries: The number of times to retry GET requests to
955 *each* Keep server if it returns temporary failures, with
956 exponential backoff. Note that, in each loop, the method may try
957 to fetch data from every available Keep service, along with any
958 that are named in location hints in the locator. The default value
959 is set when the KeepClient is initialized.
962 return ''.join(self.get(x) for x in loc_s.split(','))
964 self.get_counter.add(1)
966 locator = KeepLocator(loc_s)
968 slot, first = self.block_cache.reserve_cache(locator.md5sum)
970 self.hits_counter.add(1)
974 self.misses_counter.add(1)
976 # If the locator has hints specifying a prefix (indicating a
977 # remote keepproxy) or the UUID of a local gateway service,
978 # read data from the indicated service(s) instead of the usual
979 # list of local disk services.
980 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
981 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
982 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
983 for hint in locator.hints if (
984 hint.startswith('K@') and
986 self._gateway_services.get(hint[2:])
988 # Map root URLs to their KeepService objects.
990 root: self.KeepService(root, self._user_agent_pool,
991 upload_counter=self.upload_counter,
992 download_counter=self.download_counter)
993 for root in hint_roots
996 # See #3147 for a discussion of the loop implementation. Highlights:
997 # * Refresh the list of Keep services after each failure, in case
998 # it's being updated.
999 # * Retry until we succeed, we're out of retries, or every available
1000 # service has returned permanent failure.
1004 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1006 for tries_left in loop:
1008 sorted_roots = self.map_new_services(
1010 force_rebuild=(tries_left < num_retries),
1011 need_writable=False)
1012 except Exception as error:
1013 loop.save_result(error)
1016 # Query KeepService objects that haven't returned
1017 # permanent failure, in our specified shuffle order.
1018 services_to_try = [roots_map[root]
1019 for root in sorted_roots
1020 if roots_map[root].usable()]
1021 for keep_service in services_to_try:
1022 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1023 if blob is not None:
1025 loop.save_result((blob, len(services_to_try)))
1027 # Always cache the result, then return it if we succeeded.
1030 self.block_cache.cap_cache()
1032 if method == "HEAD":
1037 # Q: Including 403 is necessary for the Keep tests to continue
1038 # passing, but maybe they should expect KeepReadError instead?
1039 not_founds = sum(1 for key in sorted_roots
1040 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1041 service_errors = ((key, roots_map[key].last_result()['error'])
1042 for key in sorted_roots)
1044 raise arvados.errors.KeepReadError(
1045 "failed to read {}: no Keep services available ({})".format(
1046 loc_s, loop.last_result()))
1047 elif not_founds == len(sorted_roots):
1048 raise arvados.errors.NotFoundError(
1049 "{} not found".format(loc_s), service_errors)
1051 raise arvados.errors.KeepReadError(
1052 "failed to read {}".format(loc_s), service_errors, label="service")
1055 def put(self, data, copies=2, num_retries=None):
1056 """Save data in Keep.
1058 This method will get a list of Keep services from the API server, and
1059 send the data to each one simultaneously in a new thread. Once the
1060 uploads are finished, if enough copies are saved, this method returns
1061 the most recent HTTP response body. If requests fail to upload
1062 enough copies, this method raises KeepWriteError.
1065 * data: The string of data to upload.
1066 * copies: The number of copies that the user requires be saved.
1068 * num_retries: The number of times to retry PUT requests to
1069 *each* Keep server if it returns temporary failures, with
1070 exponential backoff. The default value is set when the
1071 KeepClient is initialized.
1074 if isinstance(data, unicode):
1075 data = data.encode("ascii")
1076 elif not isinstance(data, str):
1077 raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
1079 self.put_counter.add(1)
1081 data_hash = hashlib.md5(data).hexdigest()
1082 loc_s = data_hash + '+' + str(len(data))
1085 locator = KeepLocator(loc_s)
1088 # Tell the proxy how many copies we want it to store
1089 headers['X-Keep-Desired-Replicas'] = str(copies)
1091 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1094 for tries_left in loop:
1096 sorted_roots = self.map_new_services(
1098 force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
1099 except Exception as error:
1100 loop.save_result(error)
1103 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1104 data_hash=data_hash,
1105 copies=copies - done,
1106 max_service_replicas=self.max_replicas_per_service,
1107 timeout=self.current_timeout(num_retries - tries_left))
1108 for service_root, ks in [(root, roots_map[root])
1109 for root in sorted_roots]:
1112 writer_pool.add_task(ks, service_root)
1114 done += writer_pool.done()
1115 loop.save_result((done >= copies, writer_pool.total_task_nr))
1118 return writer_pool.response()
1120 raise arvados.errors.KeepWriteError(
1121 "failed to write {}: no Keep services available ({})".format(
1122 data_hash, loop.last_result()))
1124 service_errors = ((key, roots_map[key].last_result()['error'])
1125 for key in sorted_roots
1126 if roots_map[key].last_result()['error'])
1127 raise arvados.errors.KeepWriteError(
1128 "failed to write {} (wanted {} copies but wrote {})".format(
1129 data_hash, copies, writer_pool.done()), service_errors, label="service")
1131 def local_store_put(self, data, copies=1, num_retries=None):
1132 """A stub for put().
1134 This method is used in place of the real put() method when
1135 using local storage (see constructor's local_store argument).
1137 copies and num_retries arguments are ignored: they are here
1138 only for the sake of offering the same call signature as
1141 Data stored this way can be retrieved via local_store_get().
1143 md5 = hashlib.md5(data).hexdigest()
1144 locator = '%s+%d' % (md5, len(data))
1145 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1147 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1148 os.path.join(self.local_store, md5))
1151 def local_store_get(self, loc_s, num_retries=None):
1152 """Companion to local_store_put()."""
1154 locator = KeepLocator(loc_s)
1156 raise arvados.errors.NotFoundError(
1157 "Invalid data locator: '%s'" % loc_s)
1158 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1160 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1163 def is_cached(self, locator):
1164 return self.block_cache.reserve_cache(expect_hash)