24 _logger = logging.getLogger('arvados.keep')
25 global_client_object = None
28 import arvados.config as config
30 import arvados.retry as retry
33 class KeepLocator(object):
34 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
35 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
37 def __init__(self, locator_str):
40 self._perm_expiry = None
41 pieces = iter(locator_str.split('+'))
42 self.md5sum = next(pieces)
44 self.size = int(next(pieces))
48 if self.HINT_RE.match(hint) is None:
49 raise ValueError("unrecognized hint data {}".format(hint))
50 elif hint.startswith('A'):
51 self.parse_permission_hint(hint)
53 self.hints.append(hint)
57 str(s) for s in [self.md5sum, self.size,
58 self.permission_hint()] + self.hints
61 def _make_hex_prop(name, length):
62 # Build and return a new property with the given name that
63 # must be a hex string of the given length.
64 data_name = '_{}'.format(name)
66 return getattr(self, data_name)
67 def setter(self, hex_str):
68 if not arvados.util.is_hex(hex_str, length):
69 raise ValueError("{} must be a {}-digit hex string: {}".
70 format(name, length, hex_str))
71 setattr(self, data_name, hex_str)
72 return property(getter, setter)
74 md5sum = _make_hex_prop('md5sum', 32)
75 perm_sig = _make_hex_prop('perm_sig', 40)
78 def perm_expiry(self):
79 return self._perm_expiry
82 def perm_expiry(self, value):
83 if not arvados.util.is_hex(value, 1, 8):
85 "permission timestamp must be a hex Unix timestamp: {}".
87 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
89 def permission_hint(self):
90 data = [self.perm_sig, self.perm_expiry]
93 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
94 return "A{}@{:08x}".format(*data)
96 def parse_permission_hint(self, s):
98 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
100 raise ValueError("bad permission hint {}".format(s))
102 def permission_expired(self, as_of_dt=None):
103 if self.perm_expiry is None:
105 elif as_of_dt is None:
106 as_of_dt = datetime.datetime.now()
107 return self.perm_expiry <= as_of_dt
111 """Simple interface to a global KeepClient object.
113 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
114 own API client. The global KeepClient will build an API client from the
115 current Arvados configuration, which may not match the one you built.
120 def global_client_object(cls):
121 global global_client_object
122 # Previously, KeepClient would change its behavior at runtime based
123 # on these configuration settings. We simulate that behavior here
124 # by checking the values and returning a new KeepClient if any of
126 key = (config.get('ARVADOS_API_HOST'),
127 config.get('ARVADOS_API_TOKEN'),
128 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
129 config.get('ARVADOS_KEEP_PROXY'),
130 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
131 os.environ.get('KEEP_LOCAL_STORE'))
132 if (global_client_object is None) or (cls._last_key != key):
133 global_client_object = KeepClient()
135 return global_client_object
138 def get(locator, **kwargs):
139 return Keep.global_client_object().get(locator, **kwargs)
142 def put(data, **kwargs):
143 return Keep.global_client_object().put(data, **kwargs)
145 class KeepBlockCache(object):
146 # Default RAM cache is 256MiB
147 def __init__(self, cache_max=(256 * 1024 * 1024)):
148 self.cache_max = cache_max
150 self._cache_lock = threading.Lock()
152 class CacheSlot(object):
153 def __init__(self, locator):
154 self.locator = locator
155 self.ready = threading.Event()
162 def set(self, value):
167 if self.content is None:
170 return len(self.content)
173 '''Cap the cache size to self.cache_max'''
174 self._cache_lock.acquire()
176 # Select all slots except those where ready.is_set() and content is
177 # None (that means there was an error reading the block).
178 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
179 sm = sum([slot.size() for slot in self._cache])
180 while len(self._cache) > 0 and sm > self.cache_max:
181 for i in xrange(len(self._cache)-1, -1, -1):
182 if self._cache[i].ready.is_set():
185 sm = sum([slot.size() for slot in self._cache])
187 self._cache_lock.release()
189 def reserve_cache(self, locator):
190 '''Reserve a cache slot for the specified locator,
191 or return the existing slot.'''
192 self._cache_lock.acquire()
194 # Test if the locator is already in the cache
195 for i in xrange(0, len(self._cache)):
196 if self._cache[i].locator == locator:
199 # move it to the front
201 self._cache.insert(0, n)
204 # Add a new cache slot for the locator
205 n = KeepBlockCache.CacheSlot(locator)
206 self._cache.insert(0, n)
209 self._cache_lock.release()
211 class KeepClient(object):
213 # Default Keep server connection timeout: 3 seconds
214 # Default Keep server read timeout: 30 seconds
215 # Default Keep proxy connection timeout: 20 seconds
216 # Default Keep proxy read timeout: 60 seconds
217 DEFAULT_TIMEOUT = (3, 30)
218 DEFAULT_PROXY_TIMEOUT = (20, 60)
220 class ThreadLimiter(object):
222 Limit the number of threads running at a given time to
223 {desired successes} minus {successes reported}. When successes
224 reported == desired, wake up the remaining threads and tell
227 Should be used in a "with" block.
229 def __init__(self, todo):
232 self._response = None
233 self._todo_lock = threading.Semaphore(todo)
234 self._done_lock = threading.Lock()
237 self._todo_lock.acquire()
240 def __exit__(self, type, value, traceback):
241 self._todo_lock.release()
243 def shall_i_proceed(self):
245 Return true if the current thread should do stuff. Return
246 false if the current thread should just stop.
248 with self._done_lock:
249 return (self._done < self._todo)
251 def save_response(self, response_body, replicas_stored):
253 Records a response body (a locator, possibly signed) returned by
254 the Keep server. It is not necessary to save more than
255 one response, since we presume that any locator returned
256 in response to a successful request is valid.
258 with self._done_lock:
259 self._done += replicas_stored
260 self._response = response_body
264 Returns the body from the response to a PUT request.
266 with self._done_lock:
267 return self._response
271 Return how many successes were reported.
273 with self._done_lock:
277 class KeepService(object):
278 # Make requests to a single Keep service, and track results.
279 HTTP_ERRORS = (requests.exceptions.RequestException,
280 socket.error, ssl.SSLError)
282 def __init__(self, root, timeout=None, **headers):
284 self.last_result = None
285 self.success_flag = None
286 self.get_headers = {'Accept': 'application/octet-stream'}
287 self.get_headers.update(headers)
288 self.put_headers = headers
291 return self.success_flag is not False
294 return self.success_flag is not None
296 def last_status(self):
298 return self.last_result.status_code
299 except (AttributeError, IndexError, ValueError):
302 def get(self, locator, timeout=None):
303 # locator is a KeepLocator object.
304 url = self.root + str(locator)
305 _logger.debug("Request: GET %s", url)
307 with timer.Timer() as t:
308 result = requests.get(url.encode('utf-8'),
309 headers=self.get_headers,
311 except self.HTTP_ERRORS as e:
312 _logger.debug("Request fail: GET %s => %s: %s",
313 url, type(e), str(e))
316 self.last_result = result
317 self.success_flag = retry.check_http_response_success(result)
318 content = result.content
319 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
320 self.last_status(), len(content), t.msecs,
321 (len(content)/(1024.0*1024))/t.secs)
322 if self.success_flag:
323 resp_md5 = hashlib.md5(content).hexdigest()
324 if resp_md5 == locator.md5sum:
326 _logger.warning("Checksum fail: md5(%s) = %s",
330 def put(self, hash_s, body, timeout=None):
331 url = self.root + hash_s
332 _logger.debug("Request: PUT %s", url)
334 result = requests.put(url.encode('utf-8'),
336 headers=self.put_headers,
338 except self.HTTP_ERRORS as e:
339 _logger.debug("Request fail: PUT %s => %s: %s",
340 url, type(e), str(e))
343 self.last_result = result
344 self.success_flag = retry.check_http_response_success(result)
345 return self.success_flag
348 class KeepWriterThread(threading.Thread):
350 Write a blob of data to the given Keep server. On success, call
351 save_response() of the given ThreadLimiter to save the returned
354 def __init__(self, keep_service, **kwargs):
355 super(KeepClient.KeepWriterThread, self).__init__()
356 self.service = keep_service
358 self._success = False
364 with self.args['thread_limiter'] as limiter:
365 if not limiter.shall_i_proceed():
366 # My turn arrived, but the job has been done without
369 self.run_with_limiter(limiter)
371 def run_with_limiter(self, limiter):
372 if self.service.finished():
374 _logger.debug("KeepWriterThread %s proceeding %s %s",
375 str(threading.current_thread()),
376 self.args['data_hash'],
377 self.args['service_root'])
378 self._success = bool(self.service.put(
379 self.args['data_hash'],
381 timeout=self.args.get('timeout', None)))
382 status = self.service.last_status()
384 result = self.service.last_result
385 _logger.debug("KeepWriterThread %s succeeded %s %s",
386 str(threading.current_thread()),
387 self.args['data_hash'],
388 self.args['service_root'])
389 # Tick the 'done' counter for the number of replica
390 # reported stored by the server, for the case that
391 # we're talking to a proxy or other backend that
392 # stores to multiple copies for us.
394 replicas_stored = int(result.headers['x-keep-replicas-stored'])
395 except (KeyError, ValueError):
397 limiter.save_response(result.text.strip(), replicas_stored)
398 elif status is not None:
399 _logger.debug("Request fail: PUT %s => %s %s",
400 self.args['data_hash'], status,
401 self.service.last_result.text)
404 def __init__(self, api_client=None, proxy=None,
405 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
406 api_token=None, local_store=None, block_cache=None,
408 """Initialize a new KeepClient.
411 * api_client: The API client to use to find Keep services. If not
412 provided, KeepClient will build one from available Arvados
414 * proxy: If specified, this KeepClient will send requests to this
415 Keep proxy. Otherwise, KeepClient will fall back to the setting
416 of the ARVADOS_KEEP_PROXY configuration setting. If you want to
417 ensure KeepClient does not use a proxy, pass in an empty string.
418 * timeout: The timeout (in seconds) for HTTP requests to Keep
419 non-proxy servers. A tuple of two floats is interpreted as
420 (connection_timeout, read_timeout): see
421 http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
423 * proxy_timeout: The timeout (in seconds) for HTTP requests to
424 Keep proxies. A tuple of two floats is interpreted as
425 (connection_timeout, read_timeout). Default: (20, 60).
426 * api_token: If you're not using an API client, but only talking
427 directly to a Keep proxy, this parameter specifies an API token
428 to authenticate Keep requests. It is an error to specify both
429 api_client and api_token. If you specify neither, KeepClient
430 will use one available from the Arvados configuration.
431 * local_store: If specified, this KeepClient will bypass Keep
432 services, and save data to the named directory. If unspecified,
433 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
434 environment variable. If you want to ensure KeepClient does not
435 use local storage, pass in an empty string. This is primarily
436 intended to mock a server for testing.
437 * num_retries: The default number of times to retry failed requests.
438 This will be used as the default num_retries value when get() and
439 put() are called. Default 0.
442 self.lock = threading.Lock()
444 proxy = config.get('ARVADOS_KEEP_PROXY')
445 if api_token is None:
446 if api_client is None:
447 api_token = config.get('ARVADOS_API_TOKEN')
449 api_token = api_client.api_token
450 elif api_client is not None:
452 "can't build KeepClient with both API client and token")
453 if local_store is None:
454 local_store = os.environ.get('KEEP_LOCAL_STORE')
456 self.block_cache = block_cache if block_cache else KeepBlockCache()
459 self.local_store = local_store
460 self.get = self.local_store_get
461 self.put = self.local_store_put
463 self.num_retries = num_retries
465 if not proxy.endswith('/'):
467 self.api_token = api_token
468 self.service_roots = [proxy]
469 self.using_proxy = True
470 self.timeout = proxy_timeout
471 self.static_service_roots = True
473 # It's important to avoid instantiating an API client
474 # unless we actually need one, for testing's sake.
475 if api_client is None:
476 api_client = arvados.api('v1')
477 self.api_client = api_client
478 self.api_token = api_client.api_token
479 self.service_roots = None
480 self.using_proxy = None
481 self.timeout = timeout
482 self.static_service_roots = False
484 def build_service_roots(self, force_rebuild=False):
485 if (self.static_service_roots or
486 (self.service_roots and not force_rebuild)):
490 keep_services = self.api_client.keep_services().accessible()
491 except Exception: # API server predates Keep services.
492 keep_services = self.api_client.keep_disks().list()
494 keep_services = keep_services.execute().get('items')
495 if not keep_services:
496 raise arvados.errors.NoKeepServersError()
498 self.using_proxy = any(ks.get('service_type') == 'proxy'
499 for ks in keep_services)
501 roots = ("{}://[{}]:{:d}/".format(
502 'https' if ks['service_ssl_flag'] else 'http',
505 for ks in keep_services)
506 self.service_roots = sorted(set(roots))
507 _logger.debug(str(self.service_roots))
509 def shuffled_service_roots(self, hash, force_rebuild=False):
510 self.build_service_roots(force_rebuild)
512 # Build an ordering with which to query the Keep servers based on the
513 # contents of the hash.
514 # "hash" is a hex-encoded number at least 8 digits
517 # seed used to calculate the next keep server from 'pool'
518 # to be added to 'pseq'
521 # Keep servers still to be added to the ordering
522 pool = self.service_roots[:]
524 # output probe sequence
527 # iterate while there are servers left to be assigned
530 # ran out of digits in the seed
531 if len(pseq) < len(hash) / 4:
532 # the number of servers added to the probe sequence is less
533 # than the number of 4-digit slices in 'hash' so refill the
534 # seed with the last 4 digits and then append the contents
536 seed = hash[-4:] + hash
538 # refill the seed with the contents of 'hash'
541 # Take the next 8 digits (32 bytes) and interpret as an integer,
542 # then modulus with the size of the remaining pool to get the next
544 probe = int(seed[0:8], 16) % len(pool)
546 # Append the selected server to the probe sequence and remove it
548 pseq += [pool[probe]]
549 pool = pool[:probe] + pool[probe+1:]
551 # Remove the digits just used from the seed
553 _logger.debug(str(pseq))
557 def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
558 # roots_map is a dictionary, mapping Keep service root strings
559 # to KeepService objects. Poll for Keep services, and add any
560 # new ones to roots_map. Return the current list of local
562 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
563 local_roots = self.shuffled_service_roots(md5_s, force_rebuild)
564 for root in local_roots:
565 if root not in roots_map:
566 roots_map[root] = self.KeepService(root, **headers)
570 def _check_loop_result(result):
571 # KeepClient RetryLoops should save results as a 2-tuple: the
572 # actual result of the request, and the number of servers available
573 # to receive the request this round.
574 # This method returns True if there's a real result, False if
575 # there are no more servers available, otherwise None.
576 if isinstance(result, Exception):
578 result, tried_server_count = result
579 if (result is not None) and (result is not False):
581 elif tried_server_count < 1:
582 _logger.info("No more Keep services to try; giving up")
588 def get(self, loc_s, num_retries=None):
589 """Get data from Keep.
591 This method fetches one or more blocks of data from Keep. It
592 sends a request each Keep service registered with the API
593 server (or the proxy provided when this client was
594 instantiated), then each service named in location hints, in
595 sequence. As soon as one service provides the data, it's
599 * loc_s: A string of one or more comma-separated locators to fetch.
600 This method returns the concatenation of these blocks.
601 * num_retries: The number of times to retry GET requests to
602 *each* Keep server if it returns temporary failures, with
603 exponential backoff. Note that, in each loop, the method may try
604 to fetch data from every available Keep service, along with any
605 that are named in location hints in the locator. The default value
606 is set when the KeepClient is initialized.
609 return ''.join(self.get(x) for x in loc_s.split(','))
610 locator = KeepLocator(loc_s)
611 expect_hash = locator.md5sum
613 slot, first = self.block_cache.reserve_cache(expect_hash)
618 # See #3147 for a discussion of the loop implementation. Highlights:
619 # * Refresh the list of Keep services after each failure, in case
620 # it's being updated.
621 # * Retry until we succeed, we're out of retries, or every available
622 # service has returned permanent failure.
623 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
624 for hint in locator.hints if hint.startswith('K@')]
625 # Map root URLs their KeepService objects.
626 roots_map = {root: self.KeepService(root) for root in hint_roots}
628 loop = retry.RetryLoop(num_retries, self._check_loop_result,
630 for tries_left in loop:
632 local_roots = self.map_new_services(
633 roots_map, expect_hash,
634 force_rebuild=(tries_left < num_retries))
635 except Exception as error:
636 loop.save_result(error)
639 # Query KeepService objects that haven't returned
640 # permanent failure, in our specified shuffle order.
641 services_to_try = [roots_map[root]
642 for root in (local_roots + hint_roots)
643 if roots_map[root].usable()]
644 for keep_service in services_to_try:
645 blob = keep_service.get(locator, timeout=self.timeout)
648 loop.save_result((blob, len(services_to_try)))
650 # Always cache the result, then return it if we succeeded.
652 self.block_cache.cap_cache()
656 # No servers fulfilled the request. Count how many responded
657 # "not found;" if the ratio is high enough (currently 75%), report
658 # Not Found; otherwise a generic error.
659 # Q: Including 403 is necessary for the Keep tests to continue
660 # passing, but maybe they should expect KeepReadError instead?
661 not_founds = sum(1 for ks in roots_map.values()
662 if ks.last_status() in set([403, 404, 410]))
663 if roots_map and ((float(not_founds) / len(roots_map)) >= .75):
664 raise arvados.errors.NotFoundError(loc_s)
666 raise arvados.errors.KeepReadError(loc_s)
669 def put(self, data, copies=2, num_retries=None):
670 """Save data in Keep.
672 This method will get a list of Keep services from the API server, and
673 send the data to each one simultaneously in a new thread. Once the
674 uploads are finished, if enough copies are saved, this method returns
675 the most recent HTTP response body. If requests fail to upload
676 enough copies, this method raises KeepWriteError.
679 * data: The string of data to upload.
680 * copies: The number of copies that the user requires be saved.
682 * num_retries: The number of times to retry PUT requests to
683 *each* Keep server if it returns temporary failures, with
684 exponential backoff. The default value is set when the
685 KeepClient is initialized.
687 data_hash = hashlib.md5(data).hexdigest()
693 # Tell the proxy how many copies we want it to store
694 headers['X-Keep-Desired-Replication'] = str(copies)
696 thread_limiter = KeepClient.ThreadLimiter(copies)
697 loop = retry.RetryLoop(num_retries, self._check_loop_result,
699 for tries_left in loop:
701 local_roots = self.map_new_services(
702 roots_map, data_hash,
703 force_rebuild=(tries_left < num_retries), **headers)
704 except Exception as error:
705 loop.save_result(error)
709 for service_root, ks in roots_map.iteritems():
712 t = KeepClient.KeepWriterThread(
716 service_root=service_root,
717 thread_limiter=thread_limiter,
718 timeout=self.timeout)
723 loop.save_result((thread_limiter.done() >= copies, len(threads)))
726 return thread_limiter.response()
727 raise arvados.errors.KeepWriteError(
728 "Write fail for %s: wanted %d but wrote %d" %
729 (data_hash, copies, thread_limiter.done()))
731 # Local storage methods need no-op num_retries arguments to keep
732 # integration tests happy. With better isolation they could
733 # probably be removed again.
734 def local_store_put(self, data, num_retries=0):
735 md5 = hashlib.md5(data).hexdigest()
736 locator = '%s+%d' % (md5, len(data))
737 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
739 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
740 os.path.join(self.local_store, md5))
743 def local_store_get(self, loc_s, num_retries=0):
745 locator = KeepLocator(loc_s)
747 raise arvados.errors.NotFoundError(
748 "Invalid data locator: '%s'" % loc_s)
749 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
751 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f: