16 import arvados.config as config
18 import arvados.retry as retry
21 _logger = logging.getLogger('arvados.keep')
22 global_client_object = None
25 class KeepLocator(object):
26 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
27 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
29 def __init__(self, locator_str):
32 self._perm_expiry = None
33 pieces = iter(locator_str.split('+'))
34 self.md5sum = next(pieces)
36 self.size = int(next(pieces))
40 if self.HINT_RE.match(hint) is None:
41 raise ValueError("invalid hint format: {}".format(hint))
42 elif hint.startswith('A'):
43 self.parse_permission_hint(hint)
45 self.hints.append(hint)
49 str(s) for s in [self.md5sum, self.size,
50 self.permission_hint()] + self.hints
54 if self.size is not None:
55 return "%s+%i" % (self.md5sum, self.size)
59 def _make_hex_prop(name, length):
60 # Build and return a new property with the given name that
61 # must be a hex string of the given length.
62 data_name = '_{}'.format(name)
64 return getattr(self, data_name)
65 def setter(self, hex_str):
66 if not arvados.util.is_hex(hex_str, length):
67 raise ValueError("{} is not a {}-digit hex string: {}".
68 format(name, length, hex_str))
69 setattr(self, data_name, hex_str)
70 return property(getter, setter)
72 md5sum = _make_hex_prop('md5sum', 32)
73 perm_sig = _make_hex_prop('perm_sig', 40)
76 def perm_expiry(self):
77 return self._perm_expiry
80 def perm_expiry(self, value):
81 if not arvados.util.is_hex(value, 1, 8):
83 "permission timestamp must be a hex Unix timestamp: {}".
85 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
87 def permission_hint(self):
88 data = [self.perm_sig, self.perm_expiry]
91 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
92 return "A{}@{:08x}".format(*data)
94 def parse_permission_hint(self, s):
96 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
98 raise ValueError("bad permission hint {}".format(s))
100 def permission_expired(self, as_of_dt=None):
101 if self.perm_expiry is None:
103 elif as_of_dt is None:
104 as_of_dt = datetime.datetime.now()
105 return self.perm_expiry <= as_of_dt
109 """Simple interface to a global KeepClient object.
111 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
112 own API client. The global KeepClient will build an API client from the
113 current Arvados configuration, which may not match the one you built.
118 def global_client_object(cls):
119 global global_client_object
120 # Previously, KeepClient would change its behavior at runtime based
121 # on these configuration settings. We simulate that behavior here
122 # by checking the values and returning a new KeepClient if any of
124 key = (config.get('ARVADOS_API_HOST'),
125 config.get('ARVADOS_API_TOKEN'),
126 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
127 config.get('ARVADOS_KEEP_PROXY'),
128 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
129 os.environ.get('KEEP_LOCAL_STORE'))
130 if (global_client_object is None) or (cls._last_key != key):
131 global_client_object = KeepClient()
133 return global_client_object
136 def get(locator, **kwargs):
137 return Keep.global_client_object().get(locator, **kwargs)
140 def put(data, **kwargs):
141 return Keep.global_client_object().put(data, **kwargs)
143 class KeepBlockCache(object):
144 # Default RAM cache is 256MiB
145 def __init__(self, cache_max=(256 * 1024 * 1024)):
146 self.cache_max = cache_max
148 self._cache_lock = threading.Lock()
150 class CacheSlot(object):
151 __slots__ = ("locator", "ready", "content")
153 def __init__(self, locator):
154 self.locator = locator
155 self.ready = threading.Event()
162 def set(self, value):
167 if self.content is None:
170 return len(self.content)
173 '''Cap the cache size to self.cache_max'''
174 with self._cache_lock:
175 # Select all slots except those where ready.is_set() and content is
176 # None (that means there was an error reading the block).
177 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
178 sm = sum([slot.size() for slot in self._cache])
179 while len(self._cache) > 0 and sm > self.cache_max:
180 for i in xrange(len(self._cache)-1, -1, -1):
181 if self._cache[i].ready.is_set():
184 sm = sum([slot.size() for slot in self._cache])
186 def _get(self, locator):
187 # Test if the locator is already in the cache
188 for i in xrange(0, len(self._cache)):
189 if self._cache[i].locator == locator:
192 # move it to the front
194 self._cache.insert(0, n)
198 def get(self, locator):
199 with self._cache_lock:
200 return self._get(locator)
202 def reserve_cache(self, locator):
203 '''Reserve a cache slot for the specified locator,
204 or return the existing slot.'''
205 with self._cache_lock:
206 n = self._get(locator)
210 # Add a new cache slot for the locator
211 n = KeepBlockCache.CacheSlot(locator)
212 self._cache.insert(0, n)
215 class Counter(object):
216 def __init__(self, v=0):
217 self._lk = threading.Lock()
229 class KeepClient(object):
231 # Default Keep server connection timeout: 2 seconds
232 # Default Keep server read timeout: 256 seconds
233 # Default Keep server bandwidth minimum: 32768 bytes per second
234 # Default Keep proxy connection timeout: 20 seconds
235 # Default Keep proxy read timeout: 256 seconds
236 # Default Keep proxy bandwidth minimum: 32768 bytes per second
237 DEFAULT_TIMEOUT = (2, 256, 32768)
238 DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
240 class ThreadLimiter(object):
241 """Limit the number of threads writing to Keep at once.
243 This ensures that only a number of writer threads that could
244 potentially achieve the desired replication level run at once.
245 Once the desired replication level is achieved, queued threads
246 are instructed not to run.
248 Should be used in a "with" block.
250 def __init__(self, want_copies, max_service_replicas):
252 self._want_copies = want_copies
254 self._thread_failures = 0
255 self._response = None
256 self._start_lock = threading.Condition()
257 if (not max_service_replicas) or (max_service_replicas >= want_copies):
260 max_threads = math.ceil(float(want_copies) / max_service_replicas)
261 _logger.debug("Limiter max threads is %d", max_threads)
262 self._todo_lock = threading.Semaphore(max_threads)
263 self._done_lock = threading.Lock()
264 self._thread_failures_lock = threading.Lock()
265 self._local = threading.local()
268 self._start_lock.acquire()
269 if getattr(self._local, 'sequence', None) is not None:
270 # If the calling thread has used set_sequence(N), then
271 # we wait here until N other threads have started.
272 while self._started < self._local.sequence:
273 self._start_lock.wait()
274 self._todo_lock.acquire()
276 self._start_lock.notifyAll()
277 self._start_lock.release()
280 def __exit__(self, type, value, traceback):
281 with self._thread_failures_lock:
282 if self._thread_failures > 0:
283 self._thread_failures -= 1
284 self._todo_lock.release()
286 # If work is finished, release al pending threads
287 if not self.shall_i_proceed():
288 self._todo_lock.release()
290 def set_sequence(self, sequence):
291 self._local.sequence = sequence
293 def shall_i_proceed(self):
295 Return true if the current thread should write to Keep.
296 Return false otherwise.
298 with self._done_lock:
299 return (self._done < self._want_copies)
301 def save_response(self, response_body, replicas_stored):
303 Records a response body (a locator, possibly signed) returned by
304 the Keep server, and the number of replicas it stored.
306 if replicas_stored == 0:
307 # Failure notification, should start a new thread to try to reach full replication
308 with self._thread_failures_lock:
309 self._thread_failures += 1
311 with self._done_lock:
312 self._done += replicas_stored
313 self._response = response_body
316 """Return the body from the response to a PUT request."""
317 with self._done_lock:
318 return self._response
321 """Return the total number of replicas successfully stored."""
322 with self._done_lock:
325 class KeepService(object):
326 """Make requests to a single Keep service, and track results.
328 A KeepService is intended to last long enough to perform one
329 transaction (GET or PUT) against one Keep service. This can
330 involve calling either get() or put() multiple times in order
331 to retry after transient failures. However, calling both get()
332 and put() on a single instance -- or using the same instance
333 to access two different Keep services -- will not produce
340 arvados.errors.HttpError,
343 def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
345 download_counter=None, **headers):
347 self._user_agent_pool = user_agent_pool
348 self._result = {'error': None}
351 self.get_headers = {'Accept': 'application/octet-stream'}
352 self.get_headers.update(headers)
353 self.put_headers = headers
354 self.upload_counter = upload_counter
355 self.download_counter = download_counter
358 """Is it worth attempting a request?"""
362 """Did the request succeed or encounter permanent failure?"""
363 return self._result['error'] == False or not self._usable
365 def last_result(self):
368 def _get_user_agent(self):
370 return self._user_agent_pool.get(False)
374 def _put_user_agent(self, ua):
377 self._user_agent_pool.put(ua, False)
382 def _socket_open(family, socktype, protocol, address=None):
383 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
384 s = socket.socket(family, socktype, protocol)
385 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
386 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
387 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
390 def get(self, locator, method="GET", timeout=None):
391 # locator is a KeepLocator object.
392 url = self.root + str(locator)
393 _logger.debug("Request: %s %s", method, url)
394 curl = self._get_user_agent()
397 with timer.Timer() as t:
399 response_body = cStringIO.StringIO()
400 curl.setopt(pycurl.NOSIGNAL, 1)
401 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
402 curl.setopt(pycurl.URL, url.encode('utf-8'))
403 curl.setopt(pycurl.HTTPHEADER, [
404 '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
405 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
406 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
408 curl.setopt(pycurl.NOBODY, True)
409 self._setcurltimeouts(curl, timeout)
413 except Exception as e:
414 raise arvados.errors.HttpError(0, str(e))
416 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
417 'body': response_body.getvalue(),
418 'headers': self._headers,
422 ok = retry.check_http_response_success(self._result['status_code'])
424 self._result['error'] = arvados.errors.HttpError(
425 self._result['status_code'],
426 self._headers.get('x-status-line', 'Error'))
427 except self.HTTP_ERRORS as e:
431 self._usable = ok != False
432 if self._result.get('status_code', None):
433 # The client worked well enough to get an HTTP status
434 # code, so presumably any problems are just on the
435 # server side and it's OK to reuse the client.
436 self._put_user_agent(curl)
438 # Don't return this client to the pool, in case it's
442 _logger.debug("Request fail: GET %s => %s: %s",
443 url, type(self._result['error']), str(self._result['error']))
446 _logger.info("HEAD %s: %s bytes",
447 self._result['status_code'],
448 self._result.get('content-length'))
451 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
452 self._result['status_code'],
453 len(self._result['body']),
455 (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
457 if self.download_counter:
458 self.download_counter.add(len(self._result['body']))
459 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
460 if resp_md5 != locator.md5sum:
461 _logger.warning("Checksum fail: md5(%s) = %s",
463 self._result['error'] = arvados.errors.HttpError(
466 return self._result['body']
468 def put(self, hash_s, body, timeout=None):
469 url = self.root + hash_s
470 _logger.debug("Request: PUT %s", url)
471 curl = self._get_user_agent()
474 with timer.Timer() as t:
476 body_reader = cStringIO.StringIO(body)
477 response_body = cStringIO.StringIO()
478 curl.setopt(pycurl.NOSIGNAL, 1)
479 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
480 curl.setopt(pycurl.URL, url.encode('utf-8'))
481 # Using UPLOAD tells cURL to wait for a "go ahead" from the
482 # Keep server (in the form of a HTTP/1.1 "100 Continue"
483 # response) instead of sending the request body immediately.
484 # This allows the server to reject the request if the request
485 # is invalid or the server is read-only, without waiting for
486 # the client to send the entire block.
487 curl.setopt(pycurl.UPLOAD, True)
488 curl.setopt(pycurl.INFILESIZE, len(body))
489 curl.setopt(pycurl.READFUNCTION, body_reader.read)
490 curl.setopt(pycurl.HTTPHEADER, [
491 '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
492 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
493 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
494 self._setcurltimeouts(curl, timeout)
497 except Exception as e:
498 raise arvados.errors.HttpError(0, str(e))
500 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
501 'body': response_body.getvalue(),
502 'headers': self._headers,
505 ok = retry.check_http_response_success(self._result['status_code'])
507 self._result['error'] = arvados.errors.HttpError(
508 self._result['status_code'],
509 self._headers.get('x-status-line', 'Error'))
510 except self.HTTP_ERRORS as e:
514 self._usable = ok != False # still usable if ok is True or None
515 if self._result.get('status_code', None):
516 # Client is functional. See comment in get().
517 self._put_user_agent(curl)
521 _logger.debug("Request fail: PUT %s => %s: %s",
522 url, type(self._result['error']), str(self._result['error']))
524 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
525 self._result['status_code'],
528 (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
529 if self.upload_counter:
530 self.upload_counter.add(len(body))
533 def _setcurltimeouts(self, curl, timeouts):
536 elif isinstance(timeouts, tuple):
537 if len(timeouts) == 2:
538 conn_t, xfer_t = timeouts
539 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
541 conn_t, xfer_t, bandwidth_bps = timeouts
543 conn_t, xfer_t = (timeouts, timeouts)
544 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
545 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
546 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
547 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
549 def _headerfunction(self, header_line):
550 header_line = header_line.decode('iso-8859-1')
551 if ':' in header_line:
552 name, value = header_line.split(':', 1)
553 name = name.strip().lower()
554 value = value.strip()
556 name = self._lastheadername
557 value = self._headers[name] + ' ' + header_line.strip()
558 elif header_line.startswith('HTTP/'):
559 name = 'x-status-line'
562 _logger.error("Unexpected header line: %s", header_line)
564 self._lastheadername = name
565 self._headers[name] = value
566 # Returning None implies all bytes were written
569 class KeepWriterQueue(Queue.Queue):
570 def __init__(self, copies):
571 super(KeepClient.KeepWriterQueue, self).__init__()
572 self.wanted_copies = copies
573 self.successful_copies = 0
574 self.successful_copies_lock = threading.Lock()
575 self.retries = copies
576 self.retries_notification = threading.Condition()
578 def write_success(self, replicas_nr):
579 with self.successful_copies_lock:
580 self.successful_copies += replicas_nr
582 def write_fail(self, ks, status_code):
583 with self.retries_notification:
585 self.retries_notification.notify()
587 def pending_copies(self):
588 return self.wanted_copies - self.successful_copies
591 class KeepWriterThreadPool(object):
592 def __init__(self, data, data_hash, num_threads, copies=2):
593 self.wanted_copies = copies
595 self.queue = KeepClient.KeepWriterQueue(copies)
597 for _ in range(num_threads):
598 self.workers.append(KeepClient.KeepWriterThreadNew(self.queue, data, data_hash))
600 def add_task(self, ks, service_root):
601 self.queue.put((ks, service_root))
603 def successful_copies(self):
604 return self.queue.successful_copies
607 for worker in self.workers:
611 class KeepWriterThreadNew(threading.Thread):
612 def __init__(self, queue, data, data_hash):
613 super(KeepClient.KeepWriterThreadNew, self).__init__()
616 self.data_hash = data_hash
620 while not self.queue.empty():
621 if self.queue.pending_copies() > 0:
622 # Avoid overreplication, wait for some needed retry
623 with self.queue.retries_notification:
624 if not self.queue.retries > 0:
625 self.queue.retries_notification.wait()
626 continue # try again when awake
627 self.queue.retries -= 1
630 service, service_root = self.queue.get()
632 success = bool(self.service.put(self.data_hash,
635 result = service.last_result()
637 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
638 str(threading.current_thread()),
643 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
644 except (KeyError, ValueError):
646 self.queue.write_success(replicas_stored)
648 if result.get('status_code', None):
649 _logger.debug("Request fail: PUT %s => %s %s",
651 result['status_code'],
653 self.queue.write_fail(service, result.get('status_code', None)) # Schedule a retry
655 # Remove the task from the queue anyways
657 # Mark as done so the queue can be join()ed
658 self.queue.task_done()
661 class KeepWriterThread(threading.Thread):
663 Write a blob of data to the given Keep server. On success, call
664 save_response() of the given ThreadLimiter to save the returned
667 def __init__(self, keep_service, **kwargs):
668 super(KeepClient.KeepWriterThread, self).__init__()
669 self.service = keep_service
671 self._success = False
677 limiter = self.args['thread_limiter']
678 sequence = self.args['thread_sequence']
679 if sequence is not None:
680 limiter.set_sequence(sequence)
682 if not limiter.shall_i_proceed():
683 # My turn arrived, but the job has been done without
686 self.run_with_limiter(limiter)
688 def run_with_limiter(self, limiter):
689 if self.service.finished():
691 _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
692 str(threading.current_thread()),
693 self.args['data_hash'],
694 len(self.args['data']),
695 self.args['service_root'])
696 self._success = bool(self.service.put(
697 self.args['data_hash'],
699 timeout=self.args.get('timeout', None)))
700 result = self.service.last_result()
702 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
703 str(threading.current_thread()),
704 self.args['data_hash'],
705 len(self.args['data']),
706 self.args['service_root'])
707 # Tick the 'done' counter for the number of replica
708 # reported stored by the server, for the case that
709 # we're talking to a proxy or other backend that
710 # stores to multiple copies for us.
712 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
713 except (KeyError, ValueError):
715 limiter.save_response(result['body'].strip(), replicas_stored)
716 elif result.get('status_code', None):
717 _logger.debug("Request fail: PUT %s => %s %s",
718 self.args['data_hash'],
719 result['status_code'],
721 if not self._success:
722 # Notify the failure so that the Thread limiter allows
724 limiter.save_response(None, 0)
727 def __init__(self, api_client=None, proxy=None,
728 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
729 api_token=None, local_store=None, block_cache=None,
730 num_retries=0, session=None):
731 """Initialize a new KeepClient.
735 The API client to use to find Keep services. If not
736 provided, KeepClient will build one from available Arvados
740 If specified, this KeepClient will send requests to this Keep
741 proxy. Otherwise, KeepClient will fall back to the setting of the
742 ARVADOS_KEEP_PROXY configuration setting. If you want to ensure
743 KeepClient does not use a proxy, pass in an empty string.
746 The initial timeout (in seconds) for HTTP requests to Keep
747 non-proxy servers. A tuple of three floats is interpreted as
748 (connection_timeout, read_timeout, minimum_bandwidth). A connection
749 will be aborted if the average traffic rate falls below
750 minimum_bandwidth bytes per second over an interval of read_timeout
751 seconds. Because timeouts are often a result of transient server
752 load, the actual connection timeout will be increased by a factor
753 of two on each retry.
754 Default: (2, 256, 32768).
757 The initial timeout (in seconds) for HTTP requests to
758 Keep proxies. A tuple of three floats is interpreted as
759 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
760 described above for adjusting connection timeouts on retry also
762 Default: (20, 256, 32768).
765 If you're not using an API client, but only talking
766 directly to a Keep proxy, this parameter specifies an API token
767 to authenticate Keep requests. It is an error to specify both
768 api_client and api_token. If you specify neither, KeepClient
769 will use one available from the Arvados configuration.
772 If specified, this KeepClient will bypass Keep
773 services, and save data to the named directory. If unspecified,
774 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
775 environment variable. If you want to ensure KeepClient does not
776 use local storage, pass in an empty string. This is primarily
777 intended to mock a server for testing.
780 The default number of times to retry failed requests.
781 This will be used as the default num_retries value when get() and
782 put() are called. Default 0.
784 self.lock = threading.Lock()
786 proxy = config.get('ARVADOS_KEEP_PROXY')
787 if api_token is None:
788 if api_client is None:
789 api_token = config.get('ARVADOS_API_TOKEN')
791 api_token = api_client.api_token
792 elif api_client is not None:
794 "can't build KeepClient with both API client and token")
795 if local_store is None:
796 local_store = os.environ.get('KEEP_LOCAL_STORE')
798 self.block_cache = block_cache if block_cache else KeepBlockCache()
799 self.timeout = timeout
800 self.proxy_timeout = proxy_timeout
801 self._user_agent_pool = Queue.LifoQueue()
802 self.upload_counter = Counter()
803 self.download_counter = Counter()
804 self.put_counter = Counter()
805 self.get_counter = Counter()
806 self.hits_counter = Counter()
807 self.misses_counter = Counter()
810 self.local_store = local_store
811 self.get = self.local_store_get
812 self.put = self.local_store_put
814 self.num_retries = num_retries
815 self.max_replicas_per_service = None
817 if not proxy.endswith('/'):
819 self.api_token = api_token
820 self._gateway_services = {}
821 self._keep_services = [{
823 'service_type': 'proxy',
824 '_service_root': proxy,
826 self._writable_services = self._keep_services
827 self.using_proxy = True
828 self._static_services_list = True
830 # It's important to avoid instantiating an API client
831 # unless we actually need one, for testing's sake.
832 if api_client is None:
833 api_client = arvados.api('v1')
834 self.api_client = api_client
835 self.api_token = api_client.api_token
836 self._gateway_services = {}
837 self._keep_services = None
838 self._writable_services = None
839 self.using_proxy = None
840 self._static_services_list = False
842 def current_timeout(self, attempt_number):
843 """Return the appropriate timeout to use for this client.
845 The proxy timeout setting if the backend service is currently a proxy,
846 the regular timeout setting otherwise. The `attempt_number` indicates
847 how many times the operation has been tried already (starting from 0
848 for the first try), and scales the connection timeout portion of the
849 return value accordingly.
852 # TODO(twp): the timeout should be a property of a
853 # KeepService, not a KeepClient. See #4488.
854 t = self.proxy_timeout if self.using_proxy else self.timeout
856 return (t[0] * (1 << attempt_number), t[1])
858 return (t[0] * (1 << attempt_number), t[1], t[2])
859 def _any_nondisk_services(self, service_list):
860 return any(ks.get('service_type', 'disk') != 'disk'
861 for ks in service_list)
863 def build_services_list(self, force_rebuild=False):
864 if (self._static_services_list or
865 (self._keep_services and not force_rebuild)):
869 keep_services = self.api_client.keep_services().accessible()
870 except Exception: # API server predates Keep services.
871 keep_services = self.api_client.keep_disks().list()
873 # Gateway services are only used when specified by UUID,
874 # so there's nothing to gain by filtering them by
876 self._gateway_services = {ks['uuid']: ks for ks in
877 keep_services.execute()['items']}
878 if not self._gateway_services:
879 raise arvados.errors.NoKeepServersError()
881 # Precompute the base URI for each service.
882 for r in self._gateway_services.itervalues():
883 host = r['service_host']
884 if not host.startswith('[') and host.find(':') >= 0:
885 # IPv6 URIs must be formatted like http://[::1]:80/...
886 host = '[' + host + ']'
887 r['_service_root'] = "{}://{}:{:d}/".format(
888 'https' if r['service_ssl_flag'] else 'http',
892 _logger.debug(str(self._gateway_services))
893 self._keep_services = [
894 ks for ks in self._gateway_services.itervalues()
895 if not ks.get('service_type', '').startswith('gateway:')]
896 self._writable_services = [ks for ks in self._keep_services
897 if not ks.get('read_only')]
899 # For disk type services, max_replicas_per_service is 1
900 # It is unknown (unlimited) for other service types.
901 if self._any_nondisk_services(self._writable_services):
902 self.max_replicas_per_service = None
904 self.max_replicas_per_service = 1
906 def _service_weight(self, data_hash, service_uuid):
907 """Compute the weight of a Keep service endpoint for a data
908 block with a known hash.
910 The weight is md5(h + u) where u is the last 15 characters of
911 the service endpoint's UUID.
913 return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
915 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
916 """Return an array of Keep service endpoints, in the order in
917 which they should be probed when reading or writing data with
918 the given hash+hints.
920 self.build_services_list(force_rebuild)
923 # Use the services indicated by the given +K@... remote
924 # service hints, if any are present and can be resolved to a
926 for hint in locator.hints:
927 if hint.startswith('K@'):
930 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
931 elif len(hint) == 29:
932 svc = self._gateway_services.get(hint[2:])
934 sorted_roots.append(svc['_service_root'])
936 # Sort the available local services by weight (heaviest first)
937 # for this locator, and return their service_roots (base URIs)
939 use_services = self._keep_services
941 use_services = self._writable_services
942 self.using_proxy = self._any_nondisk_services(use_services)
943 sorted_roots.extend([
944 svc['_service_root'] for svc in sorted(
947 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
948 _logger.debug("{}: {}".format(locator, sorted_roots))
951 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
952 # roots_map is a dictionary, mapping Keep service root strings
953 # to KeepService objects. Poll for Keep services, and add any
954 # new ones to roots_map. Return the current list of local
956 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
957 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
958 for root in local_roots:
959 if root not in roots_map:
960 roots_map[root] = self.KeepService(
961 root, self._user_agent_pool,
962 upload_counter=self.upload_counter,
963 download_counter=self.download_counter,
968 def _check_loop_result(result):
969 # KeepClient RetryLoops should save results as a 2-tuple: the
970 # actual result of the request, and the number of servers available
971 # to receive the request this round.
972 # This method returns True if there's a real result, False if
973 # there are no more servers available, otherwise None.
974 if isinstance(result, Exception):
976 result, tried_server_count = result
977 if (result is not None) and (result is not False):
979 elif tried_server_count < 1:
980 _logger.info("No more Keep services to try; giving up")
985 def get_from_cache(self, loc):
986 """Fetch a block only if is in the cache, otherwise return None."""
987 slot = self.block_cache.get(loc)
988 if slot is not None and slot.ready.is_set():
994 def head(self, loc_s, num_retries=None):
995 return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
998 def get(self, loc_s, num_retries=None):
999 return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
1001 def _get_or_head(self, loc_s, method="GET", num_retries=None):
1002 """Get data from Keep.
1004 This method fetches one or more blocks of data from Keep. It
1005 sends a request each Keep service registered with the API
1006 server (or the proxy provided when this client was
1007 instantiated), then each service named in location hints, in
1008 sequence. As soon as one service provides the data, it's
1012 * loc_s: A string of one or more comma-separated locators to fetch.
1013 This method returns the concatenation of these blocks.
1014 * num_retries: The number of times to retry GET requests to
1015 *each* Keep server if it returns temporary failures, with
1016 exponential backoff. Note that, in each loop, the method may try
1017 to fetch data from every available Keep service, along with any
1018 that are named in location hints in the locator. The default value
1019 is set when the KeepClient is initialized.
1022 return ''.join(self.get(x) for x in loc_s.split(','))
1024 self.get_counter.add(1)
1026 locator = KeepLocator(loc_s)
1028 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1030 self.hits_counter.add(1)
1034 self.misses_counter.add(1)
1036 # If the locator has hints specifying a prefix (indicating a
1037 # remote keepproxy) or the UUID of a local gateway service,
1038 # read data from the indicated service(s) instead of the usual
1039 # list of local disk services.
1040 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1041 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1042 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1043 for hint in locator.hints if (
1044 hint.startswith('K@') and
1046 self._gateway_services.get(hint[2:])
1048 # Map root URLs to their KeepService objects.
1050 root: self.KeepService(root, self._user_agent_pool,
1051 upload_counter=self.upload_counter,
1052 download_counter=self.download_counter)
1053 for root in hint_roots
1056 # See #3147 for a discussion of the loop implementation. Highlights:
1057 # * Refresh the list of Keep services after each failure, in case
1058 # it's being updated.
1059 # * Retry until we succeed, we're out of retries, or every available
1060 # service has returned permanent failure.
1064 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1066 for tries_left in loop:
1068 sorted_roots = self.map_new_services(
1070 force_rebuild=(tries_left < num_retries),
1071 need_writable=False)
1072 except Exception as error:
1073 loop.save_result(error)
1076 # Query KeepService objects that haven't returned
1077 # permanent failure, in our specified shuffle order.
1078 services_to_try = [roots_map[root]
1079 for root in sorted_roots
1080 if roots_map[root].usable()]
1081 for keep_service in services_to_try:
1082 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1083 if blob is not None:
1085 loop.save_result((blob, len(services_to_try)))
1087 # Always cache the result, then return it if we succeeded.
1090 self.block_cache.cap_cache()
1092 if method == "HEAD":
1097 # Q: Including 403 is necessary for the Keep tests to continue
1098 # passing, but maybe they should expect KeepReadError instead?
1099 not_founds = sum(1 for key in sorted_roots
1100 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1101 service_errors = ((key, roots_map[key].last_result()['error'])
1102 for key in sorted_roots)
1104 raise arvados.errors.KeepReadError(
1105 "failed to read {}: no Keep services available ({})".format(
1106 loc_s, loop.last_result()))
1107 elif not_founds == len(sorted_roots):
1108 raise arvados.errors.NotFoundError(
1109 "{} not found".format(loc_s), service_errors)
1111 raise arvados.errors.KeepReadError(
1112 "failed to read {}".format(loc_s), service_errors, label="service")
1115 def put(self, data, copies=2, num_retries=None):
1116 """Save data in Keep.
1118 This method will get a list of Keep services from the API server, and
1119 send the data to each one simultaneously in a new thread. Once the
1120 uploads are finished, if enough copies are saved, this method returns
1121 the most recent HTTP response body. If requests fail to upload
1122 enough copies, this method raises KeepWriteError.
1125 * data: The string of data to upload.
1126 * copies: The number of copies that the user requires be saved.
1128 * num_retries: The number of times to retry PUT requests to
1129 *each* Keep server if it returns temporary failures, with
1130 exponential backoff. The default value is set when the
1131 KeepClient is initialized.
1134 if isinstance(data, unicode):
1135 data = data.encode("ascii")
1136 elif not isinstance(data, str):
1137 raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
1139 self.put_counter.add(1)
1141 data_hash = hashlib.md5(data).hexdigest()
1142 loc_s = data_hash + '+' + str(len(data))
1145 locator = KeepLocator(loc_s)
1148 # Tell the proxy how many copies we want it to store
1149 headers['X-Keep-Desired-Replication'] = str(copies)
1151 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1154 for tries_left in loop:
1156 sorted_roots = self.map_new_services(
1158 force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
1159 except Exception as error:
1160 loop.save_result(error)
1163 thread_limiter = KeepClient.ThreadLimiter(
1164 copies - done, self.max_replicas_per_service)
1166 for service_root, ks in [(root, roots_map[root])
1167 for root in sorted_roots]:
1170 t = KeepClient.KeepWriterThread(
1173 data_hash=data_hash,
1174 service_root=service_root,
1175 thread_limiter=thread_limiter,
1176 timeout=self.current_timeout(num_retries-tries_left),
1177 thread_sequence=len(threads))
1182 done += thread_limiter.done()
1183 loop.save_result((done >= copies, len(threads)))
1186 return thread_limiter.response()
1188 raise arvados.errors.KeepWriteError(
1189 "failed to write {}: no Keep services available ({})".format(
1190 data_hash, loop.last_result()))
1192 service_errors = ((key, roots_map[key].last_result()['error'])
1193 for key in sorted_roots
1194 if roots_map[key].last_result()['error'])
1195 raise arvados.errors.KeepWriteError(
1196 "failed to write {} (wanted {} copies but wrote {})".format(
1197 data_hash, copies, thread_limiter.done()), service_errors, label="service")
1199 def local_store_put(self, data, copies=1, num_retries=None):
1200 """A stub for put().
1202 This method is used in place of the real put() method when
1203 using local storage (see constructor's local_store argument).
1205 copies and num_retries arguments are ignored: they are here
1206 only for the sake of offering the same call signature as
1209 Data stored this way can be retrieved via local_store_get().
1211 md5 = hashlib.md5(data).hexdigest()
1212 locator = '%s+%d' % (md5, len(data))
1213 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1215 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1216 os.path.join(self.local_store, md5))
1219 def local_store_get(self, loc_s, num_retries=None):
1220 """Companion to local_store_put()."""
1222 locator = KeepLocator(loc_s)
1224 raise arvados.errors.NotFoundError(
1225 "Invalid data locator: '%s'" % loc_s)
1226 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1228 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1231 def is_cached(self, locator):
1232 return self.block_cache.reserve_cache(expect_hash)