25 _logger = logging.getLogger('arvados.keep')
26 global_client_object = None
29 import arvados.config as config
31 import arvados.retry as retry
34 class KeepLocator(object):
35 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
36 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
38 def __init__(self, locator_str):
41 self._perm_expiry = None
42 pieces = iter(locator_str.split('+'))
43 self.md5sum = next(pieces)
45 self.size = int(next(pieces))
49 if self.HINT_RE.match(hint) is None:
50 raise ValueError("unrecognized hint data {}".format(hint))
51 elif hint.startswith('A'):
52 self.parse_permission_hint(hint)
54 self.hints.append(hint)
58 str(s) for s in [self.md5sum, self.size,
59 self.permission_hint()] + self.hints
62 def _make_hex_prop(name, length):
63 # Build and return a new property with the given name that
64 # must be a hex string of the given length.
65 data_name = '_{}'.format(name)
67 return getattr(self, data_name)
68 def setter(self, hex_str):
69 if not arvados.util.is_hex(hex_str, length):
70 raise ValueError("{} must be a {}-digit hex string: {}".
71 format(name, length, hex_str))
72 setattr(self, data_name, hex_str)
73 return property(getter, setter)
75 md5sum = _make_hex_prop('md5sum', 32)
76 perm_sig = _make_hex_prop('perm_sig', 40)
79 def perm_expiry(self):
80 return self._perm_expiry
83 def perm_expiry(self, value):
84 if not arvados.util.is_hex(value, 1, 8):
86 "permission timestamp must be a hex Unix timestamp: {}".
88 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
90 def permission_hint(self):
91 data = [self.perm_sig, self.perm_expiry]
94 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
95 return "A{}@{:08x}".format(*data)
97 def parse_permission_hint(self, s):
99 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
101 raise ValueError("bad permission hint {}".format(s))
103 def permission_expired(self, as_of_dt=None):
104 if self.perm_expiry is None:
106 elif as_of_dt is None:
107 as_of_dt = datetime.datetime.now()
108 return self.perm_expiry <= as_of_dt
112 """Simple interface to a global KeepClient object.
114 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
115 own API client. The global KeepClient will build an API client from the
116 current Arvados configuration, which may not match the one you built.
121 def global_client_object(cls):
122 global global_client_object
123 # Previously, KeepClient would change its behavior at runtime based
124 # on these configuration settings. We simulate that behavior here
125 # by checking the values and returning a new KeepClient if any of
127 key = (config.get('ARVADOS_API_HOST'),
128 config.get('ARVADOS_API_TOKEN'),
129 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
130 config.get('ARVADOS_KEEP_PROXY'),
131 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
132 os.environ.get('KEEP_LOCAL_STORE'))
133 if (global_client_object is None) or (cls._last_key != key):
134 global_client_object = KeepClient()
136 return global_client_object
139 def get(locator, **kwargs):
140 return Keep.global_client_object().get(locator, **kwargs)
143 def put(data, **kwargs):
144 return Keep.global_client_object().put(data, **kwargs)
146 class KeepBlockCache(object):
147 # Default RAM cache is 256MiB
148 def __init__(self, cache_max=(256 * 1024 * 1024)):
149 self.cache_max = cache_max
151 self._cache_lock = threading.Lock()
153 class CacheSlot(object):
154 def __init__(self, locator):
155 self.locator = locator
156 self.ready = threading.Event()
163 def set(self, value):
168 if self.content == None:
171 return len(self.content)
174 '''Cap the cache size to self.cache_max'''
175 self._cache_lock.acquire()
177 # Select all slots except those where ready.is_set() and content is
178 # None (that means there was an error reading the block).
179 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
180 sm = sum([slot.size() for slot in self._cache])
181 while len(self._cache) > 0 and sm > self.cache_max:
182 for i in xrange(len(self._cache)-1, -1, -1):
183 if self._cache[i].ready.is_set():
186 sm = sum([slot.size() for slot in self._cache])
188 self._cache_lock.release()
190 def reserve_cache(self, locator):
191 '''Reserve a cache slot for the specified locator,
192 or return the existing slot.'''
193 self._cache_lock.acquire()
195 # Test if the locator is already in the cache
196 for i in xrange(0, len(self._cache)):
197 if self._cache[i].locator == locator:
200 # move it to the front
202 self._cache.insert(0, n)
205 # Add a new cache slot for the locator
206 n = KeepBlockCache.CacheSlot(locator)
207 self._cache.insert(0, n)
210 self._cache_lock.release()
212 class KeepClient(object):
213 class ThreadLimiter(object):
215 Limit the number of threads running at a given time to
216 {desired successes} minus {successes reported}. When successes
217 reported == desired, wake up the remaining threads and tell
220 Should be used in a "with" block.
222 def __init__(self, todo):
225 self._response = None
226 self._todo_lock = threading.Semaphore(todo)
227 self._done_lock = threading.Lock()
230 self._todo_lock.acquire()
233 def __exit__(self, type, value, traceback):
234 self._todo_lock.release()
236 def shall_i_proceed(self):
238 Return true if the current thread should do stuff. Return
239 false if the current thread should just stop.
241 with self._done_lock:
242 return (self._done < self._todo)
244 def save_response(self, response_body, replicas_stored):
246 Records a response body (a locator, possibly signed) returned by
247 the Keep server. It is not necessary to save more than
248 one response, since we presume that any locator returned
249 in response to a successful request is valid.
251 with self._done_lock:
252 self._done += replicas_stored
253 self._response = response_body
257 Returns the body from the response to a PUT request.
259 with self._done_lock:
260 return self._response
264 Return how many successes were reported.
266 with self._done_lock:
270 class KeepService(object):
271 # Make requests to a single Keep service, and track results.
272 HTTP_ERRORS = (httplib2.HttpLib2Error, httplib.HTTPException,
273 socket.error, ssl.SSLError)
275 def __init__(self, root, **headers):
277 self.last_result = None
278 self.success_flag = None
279 self.get_headers = {'Accept': 'application/octet-stream'}
280 self.get_headers.update(headers)
281 self.put_headers = headers
284 return self.success_flag is not False
287 return self.success_flag is not None
289 def last_status(self):
291 return int(self.last_result[0].status)
292 except (AttributeError, IndexError, ValueError):
295 def get(self, http, locator):
296 # http is an httplib2.Http object.
297 # locator is a KeepLocator object.
298 url = self.root + str(locator)
299 _logger.debug("Request: GET %s", url)
301 with timer.Timer() as t:
302 result = http.request(url.encode('utf-8'), 'GET',
303 headers=self.get_headers)
304 except self.HTTP_ERRORS as e:
305 _logger.debug("Request fail: GET %s => %s: %s",
306 url, type(e), str(e))
309 self.last_result = result
310 self.success_flag = retry.check_http_response_success(result)
312 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
313 self.last_status(), len(content), t.msecs,
314 (len(content)/(1024.0*1024))/t.secs)
315 if self.success_flag:
316 resp_md5 = hashlib.md5(content).hexdigest()
317 if resp_md5 == locator.md5sum:
319 _logger.warning("Checksum fail: md5(%s) = %s",
323 def put(self, http, hash_s, body):
324 url = self.root + hash_s
325 _logger.debug("Request: PUT %s", url)
327 result = http.request(url.encode('utf-8'), 'PUT',
328 headers=self.put_headers, body=body)
329 except self.HTTP_ERRORS as e:
330 _logger.debug("Request fail: PUT %s => %s: %s",
331 url, type(e), str(e))
334 self.last_result = result
335 self.success_flag = retry.check_http_response_success(result)
336 return self.success_flag
339 class KeepWriterThread(threading.Thread):
341 Write a blob of data to the given Keep server. On success, call
342 save_response() of the given ThreadLimiter to save the returned
345 def __init__(self, keep_service, **kwargs):
346 super(KeepClient.KeepWriterThread, self).__init__()
347 self.service = keep_service
349 self._success = False
355 with self.args['thread_limiter'] as limiter:
356 if not limiter.shall_i_proceed():
357 # My turn arrived, but the job has been done without
360 self.run_with_limiter(limiter)
362 def run_with_limiter(self, limiter):
363 if self.service.finished():
365 _logger.debug("KeepWriterThread %s proceeding %s %s",
366 str(threading.current_thread()),
367 self.args['data_hash'],
368 self.args['service_root'])
369 h = httplib2.Http(timeout=self.args.get('timeout', None))
370 self._success = bool(self.service.put(
371 h, self.args['data_hash'], self.args['data']))
372 status = self.service.last_status()
374 resp, body = self.service.last_result
375 _logger.debug("KeepWriterThread %s succeeded %s %s",
376 str(threading.current_thread()),
377 self.args['data_hash'],
378 self.args['service_root'])
379 # Tick the 'done' counter for the number of replica
380 # reported stored by the server, for the case that
381 # we're talking to a proxy or other backend that
382 # stores to multiple copies for us.
384 replicas_stored = int(resp['x-keep-replicas-stored'])
385 except (KeyError, ValueError):
387 limiter.save_response(body.strip(), replicas_stored)
388 elif status is not None:
389 _logger.debug("Request fail: PUT %s => %s %s",
390 self.args['data_hash'], status,
391 self.service.last_result[1])
394 def __init__(self, api_client=None, proxy=None, timeout=300,
395 api_token=None, local_store=None, block_cache=None,
397 """Initialize a new KeepClient.
400 * api_client: The API client to use to find Keep services. If not
401 provided, KeepClient will build one from available Arvados
403 * proxy: If specified, this KeepClient will send requests to this
404 Keep proxy. Otherwise, KeepClient will fall back to the setting
405 of the ARVADOS_KEEP_PROXY configuration setting. If you want to
406 ensure KeepClient does not use a proxy, pass in an empty string.
407 * timeout: The timeout for all HTTP requests, in seconds. Default
409 * api_token: If you're not using an API client, but only talking
410 directly to a Keep proxy, this parameter specifies an API token
411 to authenticate Keep requests. It is an error to specify both
412 api_client and api_token. If you specify neither, KeepClient
413 will use one available from the Arvados configuration.
414 * local_store: If specified, this KeepClient will bypass Keep
415 services, and save data to the named directory. If unspecified,
416 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
417 environment variable. If you want to ensure KeepClient does not
418 use local storage, pass in an empty string. This is primarily
419 intended to mock a server for testing.
420 * num_retries: The default number of times to retry failed requests.
421 This will be used as the default num_retries value when get() and
422 put() are called. Default 0.
424 self.lock = threading.Lock()
426 proxy = config.get('ARVADOS_KEEP_PROXY')
427 if api_token is None:
428 if api_client is None:
429 api_token = config.get('ARVADOS_API_TOKEN')
431 api_token = api_client.api_token
432 elif api_client is not None:
434 "can't build KeepClient with both API client and token")
435 if local_store is None:
436 local_store = os.environ.get('KEEP_LOCAL_STORE')
438 self.block_cache = block_cache if block_cache else KeepBlockCache()
441 self.local_store = local_store
442 self.get = self.local_store_get
443 self.put = self.local_store_put
445 self.timeout = timeout
446 self.num_retries = num_retries
448 if not proxy.endswith('/'):
450 self.api_token = api_token
451 self.service_roots = [proxy]
452 self.using_proxy = True
453 self.static_service_roots = True
455 # It's important to avoid instantiating an API client
456 # unless we actually need one, for testing's sake.
457 if api_client is None:
458 api_client = arvados.api('v1')
459 self.api_client = api_client
460 self.api_token = api_client.api_token
461 self.service_roots = None
462 self.using_proxy = None
463 self.static_service_roots = False
465 def build_service_roots(self, force_rebuild=False):
466 if (self.static_service_roots or
467 (self.service_roots and not force_rebuild)):
471 keep_services = self.api_client.keep_services().accessible()
472 except Exception: # API server predates Keep services.
473 keep_services = self.api_client.keep_disks().list()
475 keep_services = keep_services.execute().get('items')
476 if not keep_services:
477 raise arvados.errors.NoKeepServersError()
479 self.using_proxy = any(ks.get('service_type') == 'proxy'
480 for ks in keep_services)
482 roots = ("{}://[{}]:{:d}/".format(
483 'https' if ks['service_ssl_flag'] else 'http',
486 for ks in keep_services)
487 self.service_roots = sorted(set(roots))
488 _logger.debug(str(self.service_roots))
490 def shuffled_service_roots(self, hash, force_rebuild=False):
491 self.build_service_roots(force_rebuild)
493 # Build an ordering with which to query the Keep servers based on the
494 # contents of the hash.
495 # "hash" is a hex-encoded number at least 8 digits
498 # seed used to calculate the next keep server from 'pool'
499 # to be added to 'pseq'
502 # Keep servers still to be added to the ordering
503 pool = self.service_roots[:]
505 # output probe sequence
508 # iterate while there are servers left to be assigned
511 # ran out of digits in the seed
512 if len(pseq) < len(hash) / 4:
513 # the number of servers added to the probe sequence is less
514 # than the number of 4-digit slices in 'hash' so refill the
515 # seed with the last 4 digits and then append the contents
517 seed = hash[-4:] + hash
519 # refill the seed with the contents of 'hash'
522 # Take the next 8 digits (32 bytes) and interpret as an integer,
523 # then modulus with the size of the remaining pool to get the next
525 probe = int(seed[0:8], 16) % len(pool)
527 # Append the selected server to the probe sequence and remove it
529 pseq += [pool[probe]]
530 pool = pool[:probe] + pool[probe+1:]
532 # Remove the digits just used from the seed
534 _logger.debug(str(pseq))
538 def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
539 # roots_map is a dictionary, mapping Keep service root strings
540 # to KeepService objects. Poll for Keep services, and add any
541 # new ones to roots_map. Return the current list of local
543 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
544 local_roots = self.shuffled_service_roots(md5_s, force_rebuild)
545 for root in local_roots:
546 if root not in roots_map:
547 roots_map[root] = self.KeepService(root, **headers)
551 def _check_loop_result(result):
552 # KeepClient RetryLoops should save results as a 2-tuple: the
553 # actual result of the request, and the number of servers available
554 # to receive the request this round.
555 # This method returns True if there's a real result, False if
556 # there are no more servers available, otherwise None.
557 if isinstance(result, Exception):
559 result, tried_server_count = result
560 if (result is not None) and (result is not False):
562 elif tried_server_count < 1:
563 _logger.info("No more Keep services to try; giving up")
569 def get(self, loc_s, num_retries=None):
570 """Get data from Keep.
572 This method fetches one or more blocks of data from Keep. It
573 sends a request each Keep service registered with the API
574 server (or the proxy provided when this client was
575 instantiated), then each service named in location hints, in
576 sequence. As soon as one service provides the data, it's
580 * loc_s: A string of one or more comma-separated locators to fetch.
581 This method returns the concatenation of these blocks.
582 * num_retries: The number of times to retry GET requests to
583 *each* Keep server if it returns temporary failures, with
584 exponential backoff. Note that, in each loop, the method may try
585 to fetch data from every available Keep service, along with any
586 that are named in location hints in the locator. The default value
587 is set when the KeepClient is initialized.
590 return ''.join(self.get(x) for x in loc_s.split(','))
591 locator = KeepLocator(loc_s)
592 expect_hash = locator.md5sum
594 slot, first = self.block_cache.reserve_cache(expect_hash)
599 # See #3147 for a discussion of the loop implementation. Highlights:
600 # * Refresh the list of Keep services after each failure, in case
601 # it's being updated.
602 # * Retry until we succeed, we're out of retries, or every available
603 # service has returned permanent failure.
604 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
605 for hint in locator.hints if hint.startswith('K@')]
606 # Map root URLs their KeepService objects.
607 roots_map = {root: self.KeepService(root) for root in hint_roots}
609 loop = retry.RetryLoop(num_retries, self._check_loop_result,
611 for tries_left in loop:
613 local_roots = self.map_new_services(
614 roots_map, expect_hash,
615 force_rebuild=(tries_left < num_retries))
616 except Exception as error:
617 loop.save_result(error)
620 # Query KeepService objects that haven't returned
621 # permanent failure, in our specified shuffle order.
622 services_to_try = [roots_map[root]
623 for root in (local_roots + hint_roots)
624 if roots_map[root].usable()]
625 http = httplib2.Http(timeout=self.timeout)
626 for keep_service in services_to_try:
627 blob = keep_service.get(http, locator)
630 loop.save_result((blob, len(services_to_try)))
632 # Always cache the result, then return it if we succeeded.
634 self.block_cache.cap_cache()
638 # No servers fulfilled the request. Count how many responded
639 # "not found;" if the ratio is high enough (currently 75%), report
640 # Not Found; otherwise a generic error.
641 # Q: Including 403 is necessary for the Keep tests to continue
642 # passing, but maybe they should expect KeepReadError instead?
643 not_founds = sum(1 for ks in roots_map.values()
644 if ks.last_status() in set([403, 404, 410]))
645 if roots_map and ((float(not_founds) / len(roots_map)) >= .75):
646 raise arvados.errors.NotFoundError(loc_s)
648 raise arvados.errors.KeepReadError(loc_s)
651 def put(self, data, copies=2, num_retries=None):
652 """Save data in Keep.
654 This method will get a list of Keep services from the API server, and
655 send the data to each one simultaneously in a new thread. Once the
656 uploads are finished, if enough copies are saved, this method returns
657 the most recent HTTP response body. If requests fail to upload
658 enough copies, this method raises KeepWriteError.
661 * data: The string of data to upload.
662 * copies: The number of copies that the user requires be saved.
664 * num_retries: The number of times to retry PUT requests to
665 *each* Keep server if it returns temporary failures, with
666 exponential backoff. The default value is set when the
667 KeepClient is initialized.
669 data_hash = hashlib.md5(data).hexdigest()
675 # Tell the proxy how many copies we want it to store
676 headers['X-Keep-Desired-Replication'] = str(copies)
678 thread_limiter = KeepClient.ThreadLimiter(copies)
679 loop = retry.RetryLoop(num_retries, self._check_loop_result,
681 for tries_left in loop:
683 local_roots = self.map_new_services(
684 roots_map, data_hash,
685 force_rebuild=(tries_left < num_retries), **headers)
686 except Exception as error:
687 loop.save_result(error)
691 for service_root, ks in roots_map.iteritems():
694 t = KeepClient.KeepWriterThread(
698 service_root=service_root,
699 thread_limiter=thread_limiter,
700 timeout=self.timeout)
705 loop.save_result((thread_limiter.done() >= copies, len(threads)))
708 return thread_limiter.response()
709 raise arvados.errors.KeepWriteError(
710 "Write fail for %s: wanted %d but wrote %d" %
711 (data_hash, copies, thread_limiter.done()))
713 # Local storage methods need no-op num_retries arguments to keep
714 # integration tests happy. With better isolation they could
715 # probably be removed again.
716 def local_store_put(self, data, num_retries=0):
717 md5 = hashlib.md5(data).hexdigest()
718 locator = '%s+%d' % (md5, len(data))
719 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
721 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
722 os.path.join(self.local_store, md5))
725 def local_store_get(self, loc_s, num_retries=0):
727 locator = KeepLocator(loc_s)
729 raise arvados.errors.NotFoundError(
730 "Invalid data locator: '%s'" % loc_s)
731 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
733 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f: