24 _logger = logging.getLogger('arvados.keep')
25 global_client_object = None
28 import arvados.config as config
30 import arvados.retry as retry
33 class KeepLocator(object):
34 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
35 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
37 def __init__(self, locator_str):
40 self._perm_expiry = None
41 pieces = iter(locator_str.split('+'))
42 self.md5sum = next(pieces)
44 self.size = int(next(pieces))
48 if self.HINT_RE.match(hint) is None:
49 raise ValueError("unrecognized hint data {}".format(hint))
50 elif hint.startswith('A'):
51 self.parse_permission_hint(hint)
53 self.hints.append(hint)
57 str(s) for s in [self.md5sum, self.size,
58 self.permission_hint()] + self.hints
61 def _make_hex_prop(name, length):
62 # Build and return a new property with the given name that
63 # must be a hex string of the given length.
64 data_name = '_{}'.format(name)
66 return getattr(self, data_name)
67 def setter(self, hex_str):
68 if not arvados.util.is_hex(hex_str, length):
69 raise ValueError("{} must be a {}-digit hex string: {}".
70 format(name, length, hex_str))
71 setattr(self, data_name, hex_str)
72 return property(getter, setter)
74 md5sum = _make_hex_prop('md5sum', 32)
75 perm_sig = _make_hex_prop('perm_sig', 40)
78 def perm_expiry(self):
79 return self._perm_expiry
82 def perm_expiry(self, value):
83 if not arvados.util.is_hex(value, 1, 8):
85 "permission timestamp must be a hex Unix timestamp: {}".
87 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
89 def permission_hint(self):
90 data = [self.perm_sig, self.perm_expiry]
93 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
94 return "A{}@{:08x}".format(*data)
96 def parse_permission_hint(self, s):
98 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
100 raise ValueError("bad permission hint {}".format(s))
102 def permission_expired(self, as_of_dt=None):
103 if self.perm_expiry is None:
105 elif as_of_dt is None:
106 as_of_dt = datetime.datetime.now()
107 return self.perm_expiry <= as_of_dt
111 """Simple interface to a global KeepClient object.
113 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
114 own API client. The global KeepClient will build an API client from the
115 current Arvados configuration, which may not match the one you built.
120 def global_client_object(cls):
121 global global_client_object
122 # Previously, KeepClient would change its behavior at runtime based
123 # on these configuration settings. We simulate that behavior here
124 # by checking the values and returning a new KeepClient if any of
126 key = (config.get('ARVADOS_API_HOST'),
127 config.get('ARVADOS_API_TOKEN'),
128 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
129 config.get('ARVADOS_KEEP_PROXY'),
130 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
131 os.environ.get('KEEP_LOCAL_STORE'))
132 if (global_client_object is None) or (cls._last_key != key):
133 global_client_object = KeepClient()
135 return global_client_object
138 def get(locator, **kwargs):
139 return Keep.global_client_object().get(locator, **kwargs)
142 def put(data, **kwargs):
143 return Keep.global_client_object().put(data, **kwargs)
146 class KeepClient(object):
147 class ThreadLimiter(object):
149 Limit the number of threads running at a given time to
150 {desired successes} minus {successes reported}. When successes
151 reported == desired, wake up the remaining threads and tell
154 Should be used in a "with" block.
156 def __init__(self, todo):
159 self._response = None
160 self._todo_lock = threading.Semaphore(todo)
161 self._done_lock = threading.Lock()
164 self._todo_lock.acquire()
167 def __exit__(self, type, value, traceback):
168 self._todo_lock.release()
170 def shall_i_proceed(self):
172 Return true if the current thread should do stuff. Return
173 false if the current thread should just stop.
175 with self._done_lock:
176 return (self._done < self._todo)
178 def save_response(self, response_body, replicas_stored):
180 Records a response body (a locator, possibly signed) returned by
181 the Keep server. It is not necessary to save more than
182 one response, since we presume that any locator returned
183 in response to a successful request is valid.
185 with self._done_lock:
186 self._done += replicas_stored
187 self._response = response_body
191 Returns the body from the response to a PUT request.
193 with self._done_lock:
194 return self._response
198 Return how many successes were reported.
200 with self._done_lock:
204 class KeepService(object):
205 # Make requests to a single Keep service, and track results.
206 HTTP_ERRORS = (httplib2.HttpLib2Error, httplib.HTTPException,
209 def __init__(self, root, **headers):
211 self.last_result = None
212 self.success_flag = None
213 self.get_headers = {'Accept': 'application/octet-stream'}
214 self.get_headers.update(headers)
215 self.put_headers = headers
218 return self.success_flag is not False
221 return self.success_flag is not None
223 def last_status(self):
225 return int(self.last_result[0].status)
226 except (AttributeError, IndexError, ValueError):
229 def get(self, http, locator):
230 # http is an httplib2.Http object.
231 # locator is a KeepLocator object.
232 url = self.root + str(locator)
233 _logger.debug("Request: GET %s", url)
235 with timer.Timer() as t:
236 result = http.request(url.encode('utf-8'), 'GET',
237 headers=self.get_headers)
238 except self.HTTP_ERRORS as e:
239 _logger.debug("Request fail: GET %s => %s: %s",
240 url, type(e), str(e))
243 self.last_result = result
244 self.success_flag = retry.check_http_response_success(result)
246 _logger.info("%s response: %s bytes in %s msec (%s MiB/sec)",
247 self.last_status(), len(content), t.msecs,
248 (len(content)/(1024*1024))/t.secs)
249 if self.success_flag:
250 resp_md5 = hashlib.md5(content).hexdigest()
251 if resp_md5 == locator.md5sum:
253 _logger.warning("Checksum fail: md5(%s) = %s", url, md5)
256 def put(self, http, hash_s, body):
257 url = self.root + hash_s
258 _logger.debug("Request: PUT %s", url)
260 result = http.request(url.encode('utf-8'), 'PUT',
261 headers=self.put_headers, body=body)
262 except self.HTTP_ERRORS as e:
263 _logger.debug("Request fail: PUT %s => %s: %s",
264 url, type(e), str(e))
267 self.last_result = result
268 self.success_flag = retry.check_http_response_success(result)
269 return self.success_flag
272 class KeepWriterThread(threading.Thread):
274 Write a blob of data to the given Keep server. On success, call
275 save_response() of the given ThreadLimiter to save the returned
278 def __init__(self, keep_service, **kwargs):
279 super(KeepClient.KeepWriterThread, self).__init__()
280 self.service = keep_service
282 self._success = False
288 with self.args['thread_limiter'] as limiter:
289 if not limiter.shall_i_proceed():
290 # My turn arrived, but the job has been done without
293 self.run_with_limiter(limiter)
295 def run_with_limiter(self, limiter):
296 if self.service.finished():
298 _logger.debug("KeepWriterThread %s proceeding %s %s",
299 str(threading.current_thread()),
300 self.args['data_hash'],
301 self.args['service_root'])
302 h = httplib2.Http(timeout=self.args.get('timeout', None))
303 self._success = bool(self.service.put(
304 h, self.args['data_hash'], self.args['data']))
305 status = self.service.last_status()
307 resp, body = self.service.last_result
308 _logger.debug("KeepWriterThread %s succeeded %s %s",
309 str(threading.current_thread()),
310 self.args['data_hash'],
311 self.args['service_root'])
312 # Tick the 'done' counter for the number of replica
313 # reported stored by the server, for the case that
314 # we're talking to a proxy or other backend that
315 # stores to multiple copies for us.
317 replicas_stored = int(resp['x-keep-replicas-stored'])
318 except (KeyError, ValueError):
320 limiter.save_response(body.strip(), replicas_stored)
321 elif status is not None:
322 _logger.debug("Request fail: PUT %s => %s %s",
323 self.args['data_hash'], status,
324 self.service.last_result[1])
327 def __init__(self, api_client=None, proxy=None, timeout=60,
328 api_token=None, local_store=None):
329 """Initialize a new KeepClient.
332 * api_client: The API client to use to find Keep services. If not
333 provided, KeepClient will build one from available Arvados
335 * proxy: If specified, this KeepClient will send requests to this
336 Keep proxy. Otherwise, KeepClient will fall back to the setting
337 of the ARVADOS_KEEP_PROXY configuration setting. If you want to
338 ensure KeepClient does not use a proxy, pass in an empty string.
339 * timeout: The timeout for all HTTP requests, in seconds. Default
341 * api_token: If you're not using an API client, but only talking
342 directly to a Keep proxy, this parameter specifies an API token
343 to authenticate Keep requests. It is an error to specify both
344 api_client and api_token. If you specify neither, KeepClient
345 will use one available from the Arvados configuration.
346 * local_store: If specified, this KeepClient will bypass Keep
347 services, and save data to the named directory. If unspecified,
348 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
349 environment variable. If you want to ensure KeepClient does not
350 use local storage, pass in an empty string. This is primarily
351 intended to mock a server for testing.
353 self.lock = threading.Lock()
355 proxy = config.get('ARVADOS_KEEP_PROXY')
356 if api_token is None:
357 api_token = config.get('ARVADOS_API_TOKEN')
358 elif api_client is not None:
360 "can't build KeepClient with both API client and token")
361 if local_store is None:
362 local_store = os.environ.get('KEEP_LOCAL_STORE')
365 self.local_store = local_store
366 self.get = self.local_store_get
367 self.put = self.local_store_put
369 self.timeout = timeout
370 self.cache_max = 256 * 1024 * 1024 # Cache is 256MiB
372 self._cache_lock = threading.Lock()
374 if not proxy.endswith('/'):
376 self.api_token = api_token
377 self.service_roots = [proxy]
378 self.using_proxy = True
379 self.static_service_roots = True
381 # It's important to avoid instantiating an API client
382 # unless we actually need one, for testing's sake.
383 if api_client is None:
384 api_client = arvados.api('v1')
385 self.api_client = api_client
386 self.api_token = api_client.api_token
387 self.service_roots = None
388 self.using_proxy = None
389 self.static_service_roots = False
391 def build_service_roots(self, force_rebuild=False):
392 if (self.static_service_roots or
393 (self.service_roots and not force_rebuild)):
397 keep_services = self.api_client.keep_services().accessible()
398 except Exception: # API server predates Keep services.
399 keep_services = self.api_client.keep_disks().list()
401 keep_services = keep_services.execute().get('items')
402 if not keep_services:
403 raise arvados.errors.NoKeepServersError()
405 self.using_proxy = (keep_services[0].get('service_type') ==
408 roots = (("http%s://%s:%d/" %
409 ('s' if f['service_ssl_flag'] else '',
412 for f in keep_services)
413 self.service_roots = sorted(set(roots))
414 _logger.debug(str(self.service_roots))
416 def shuffled_service_roots(self, hash, force_rebuild=False):
417 self.build_service_roots(force_rebuild)
419 # Build an ordering with which to query the Keep servers based on the
420 # contents of the hash.
421 # "hash" is a hex-encoded number at least 8 digits
424 # seed used to calculate the next keep server from 'pool'
425 # to be added to 'pseq'
428 # Keep servers still to be added to the ordering
429 pool = self.service_roots[:]
431 # output probe sequence
434 # iterate while there are servers left to be assigned
437 # ran out of digits in the seed
438 if len(pseq) < len(hash) / 4:
439 # the number of servers added to the probe sequence is less
440 # than the number of 4-digit slices in 'hash' so refill the
441 # seed with the last 4 digits and then append the contents
443 seed = hash[-4:] + hash
445 # refill the seed with the contents of 'hash'
448 # Take the next 8 digits (32 bytes) and interpret as an integer,
449 # then modulus with the size of the remaining pool to get the next
451 probe = int(seed[0:8], 16) % len(pool)
453 # Append the selected server to the probe sequence and remove it
455 pseq += [pool[probe]]
456 pool = pool[:probe] + pool[probe+1:]
458 # Remove the digits just used from the seed
460 _logger.debug(str(pseq))
463 class CacheSlot(object):
464 def __init__(self, locator):
465 self.locator = locator
466 self.ready = threading.Event()
473 def set(self, value):
478 if self.content == None:
481 return len(self.content)
484 '''Cap the cache size to self.cache_max'''
485 self._cache_lock.acquire()
487 self._cache = filter(lambda c: not (c.ready.is_set() and c.content == None), self._cache)
488 sm = sum([slot.size() for slot in self._cache])
489 while sm > self.cache_max:
491 sm = sum([slot.size() for a in self._cache])
493 self._cache_lock.release()
495 def reserve_cache(self, locator):
496 '''Reserve a cache slot for the specified locator,
497 or return the existing slot.'''
498 self._cache_lock.acquire()
500 # Test if the locator is already in the cache
501 for i in xrange(0, len(self._cache)):
502 if self._cache[i].locator == locator:
505 # move it to the front
507 self._cache.insert(0, n)
510 # Add a new cache slot for the locator
511 n = KeepClient.CacheSlot(locator)
512 self._cache.insert(0, n)
515 self._cache_lock.release()
517 def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
518 # roots_map is a dictionary, mapping Keep service root strings
519 # to KeepService objects. Poll for Keep services, and add any
520 # new ones to roots_map. Return the current list of local
522 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
523 local_roots = self.shuffled_service_roots(md5_s, force_rebuild)
524 for root in local_roots:
525 if root not in roots_map:
526 roots_map[root] = self.KeepService(root, **headers)
530 def _check_loop_result(result):
531 # KeepClient RetryLoops should save results as a 2-tuple: the
532 # actual result of the request, and the number of servers available
533 # to receive the request this round.
534 # This method returns True if there's a real result, False if
535 # there are no more servers available, otherwise None.
536 if isinstance(result, Exception):
538 result, tried_server_count = result
539 if (result is not None) and (result is not False):
541 elif tried_server_count < 1:
542 _logger.info("No more Keep services to try; giving up")
547 def get(self, loc_s, num_retries=0):
548 """Get data from Keep.
550 This method fetches one or more blocks of data from Keep. It
551 sends a request each Keep service registered with the API
552 server (or the proxy provided when this client was
553 instantiated), then each service named in location hints, in
554 sequence. As soon as one service provides the data, it's
558 * loc_s: A string of one or more comma-separated locators to fetch.
559 This method returns the concatenation of these blocks.
560 * num_retries: The number of times to retry GET requests to
561 *each* Keep server if it returns temporary failures, with
562 exponential backoff. Note that, in each loop, the method may try
563 to fetch data from every available Keep service, along with any
564 that are named in location hints in the locator. Default 0.
567 return ''.join(self.get(x) for x in loc_s.split(','))
568 locator = KeepLocator(loc_s)
569 expect_hash = locator.md5sum
571 slot, first = self.reserve_cache(expect_hash)
576 # See #3147 for a discussion of the loop implementation. Highlights:
577 # * Refresh the list of Keep services after each failure, in case
578 # it's being updated.
579 # * Retry until we succeed, we're out of retries, or every available
580 # service has returned permanent failure.
581 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
582 for hint in locator.hints if hint.startswith('K@')]
583 # Map root URLs their KeepService objects.
584 roots_map = {root: self.KeepService(root) for root in hint_roots}
586 loop = retry.RetryLoop(num_retries, self._check_loop_result,
588 for tries_left in loop:
590 local_roots = self.map_new_services(
591 roots_map, expect_hash,
592 force_rebuild=(tries_left < num_retries))
593 except Exception as error:
594 loop.save_result(error)
597 # Query KeepService objects that haven't returned
598 # permanent failure, in our specified shuffle order.
599 services_to_try = [roots_map[root]
600 for root in (local_roots + hint_roots)
601 if roots_map[root].usable()]
602 http = httplib2.Http(timeout=self.timeout)
603 for keep_service in services_to_try:
604 blob = keep_service.get(http, locator)
607 loop.save_result((blob, len(services_to_try)))
609 # Always cache the result, then return it if we succeeded.
615 # No servers fulfilled the request. Count how many responded
616 # "not found;" if the ratio is high enough (currently 75%), report
617 # Not Found; otherwise a generic error.
618 # Q: Including 403 is necessary for the Keep tests to continue
619 # passing, but maybe they should expect KeepReadError instead?
620 not_founds = sum(1 for ks in roots_map.values()
621 if ks.last_status() in set([403, 404, 410]))
622 if roots_map and ((float(not_founds) / len(roots_map)) >= .75):
623 raise arvados.errors.NotFoundError(loc_s)
625 raise arvados.errors.KeepReadError(loc_s)
627 def put(self, data, copies=2, num_retries=0):
628 """Save data in Keep.
630 This method will get a list of Keep services from the API server, and
631 send the data to each one simultaneously in a new thread. Once the
632 uploads are finished, if enough copies are saved, this method returns
633 the most recent HTTP response body. If requests fail to upload
634 enough copies, this method raises KeepWriteError.
637 * data: The string of data to upload.
638 * copies: The number of copies that the user requires be saved.
640 * num_retries: The number of times to retry PUT requests to
641 *each* Keep server if it returns temporary failures, with
642 exponential backoff. Default 0.
644 data_hash = hashlib.md5(data).hexdigest()
650 # Tell the proxy how many copies we want it to store
651 headers['X-Keep-Desired-Replication'] = str(copies)
653 thread_limiter = KeepClient.ThreadLimiter(copies)
654 loop = retry.RetryLoop(num_retries, self._check_loop_result,
656 for tries_left in loop:
658 local_roots = self.map_new_services(
659 roots_map, data_hash,
660 force_rebuild=(tries_left < num_retries), **headers)
661 except Exception as error:
662 loop.save_result(error)
666 for service_root, ks in roots_map.iteritems():
669 t = KeepClient.KeepWriterThread(
673 service_root=service_root,
674 thread_limiter=thread_limiter,
675 timeout=self.timeout)
680 loop.save_result((thread_limiter.done() >= copies, len(threads)))
683 return thread_limiter.response()
684 raise arvados.errors.KeepWriteError(
685 "Write fail for %s: wanted %d but wrote %d" %
686 (data_hash, copies, thread_limiter.done()))
688 def local_store_put(self, data):
689 md5 = hashlib.md5(data).hexdigest()
690 locator = '%s+%d' % (md5, len(data))
691 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
693 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
694 os.path.join(self.local_store, md5))
697 def local_store_get(self, loc_s):
699 locator = KeepLocator(loc_s)
701 raise arvados.errors.NotFoundError(
702 "Invalid data locator: '%s'" % loc_s)
703 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
705 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f: