25 _logger = logging.getLogger('arvados.keep')
26 global_client_object = None
29 import arvados.config as config
31 import arvados.retry as retry
34 class KeepLocator(object):
35 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
36 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
38 def __init__(self, locator_str):
41 self._perm_expiry = None
42 pieces = iter(locator_str.split('+'))
43 self.md5sum = next(pieces)
45 self.size = int(next(pieces))
49 if self.HINT_RE.match(hint) is None:
50 raise ValueError("unrecognized hint data {}".format(hint))
51 elif hint.startswith('A'):
52 self.parse_permission_hint(hint)
54 self.hints.append(hint)
58 str(s) for s in [self.md5sum, self.size,
59 self.permission_hint()] + self.hints
62 def _make_hex_prop(name, length):
63 # Build and return a new property with the given name that
64 # must be a hex string of the given length.
65 data_name = '_{}'.format(name)
67 return getattr(self, data_name)
68 def setter(self, hex_str):
69 if not arvados.util.is_hex(hex_str, length):
70 raise ValueError("{} must be a {}-digit hex string: {}".
71 format(name, length, hex_str))
72 setattr(self, data_name, hex_str)
73 return property(getter, setter)
75 md5sum = _make_hex_prop('md5sum', 32)
76 perm_sig = _make_hex_prop('perm_sig', 40)
79 def perm_expiry(self):
80 return self._perm_expiry
83 def perm_expiry(self, value):
84 if not arvados.util.is_hex(value, 1, 8):
86 "permission timestamp must be a hex Unix timestamp: {}".
88 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
90 def permission_hint(self):
91 data = [self.perm_sig, self.perm_expiry]
94 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
95 return "A{}@{:08x}".format(*data)
97 def parse_permission_hint(self, s):
99 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
101 raise ValueError("bad permission hint {}".format(s))
103 def permission_expired(self, as_of_dt=None):
104 if self.perm_expiry is None:
106 elif as_of_dt is None:
107 as_of_dt = datetime.datetime.now()
108 return self.perm_expiry <= as_of_dt
112 """Simple interface to a global KeepClient object.
114 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
115 own API client. The global KeepClient will build an API client from the
116 current Arvados configuration, which may not match the one you built.
121 def global_client_object(cls):
122 global global_client_object
123 # Previously, KeepClient would change its behavior at runtime based
124 # on these configuration settings. We simulate that behavior here
125 # by checking the values and returning a new KeepClient if any of
127 key = (config.get('ARVADOS_API_HOST'),
128 config.get('ARVADOS_API_TOKEN'),
129 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
130 config.get('ARVADOS_KEEP_PROXY'),
131 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
132 os.environ.get('KEEP_LOCAL_STORE'))
133 if (global_client_object is None) or (cls._last_key != key):
134 global_client_object = KeepClient()
136 return global_client_object
139 def get(locator, **kwargs):
140 return Keep.global_client_object().get(locator, **kwargs)
143 def put(data, **kwargs):
144 return Keep.global_client_object().put(data, **kwargs)
147 class KeepClient(object):
148 class ThreadLimiter(object):
150 Limit the number of threads running at a given time to
151 {desired successes} minus {successes reported}. When successes
152 reported == desired, wake up the remaining threads and tell
155 Should be used in a "with" block.
157 def __init__(self, todo):
160 self._response = None
161 self._todo_lock = threading.Semaphore(todo)
162 self._done_lock = threading.Lock()
165 self._todo_lock.acquire()
168 def __exit__(self, type, value, traceback):
169 self._todo_lock.release()
171 def shall_i_proceed(self):
173 Return true if the current thread should do stuff. Return
174 false if the current thread should just stop.
176 with self._done_lock:
177 return (self._done < self._todo)
179 def save_response(self, response_body, replicas_stored):
181 Records a response body (a locator, possibly signed) returned by
182 the Keep server. It is not necessary to save more than
183 one response, since we presume that any locator returned
184 in response to a successful request is valid.
186 with self._done_lock:
187 self._done += replicas_stored
188 self._response = response_body
192 Returns the body from the response to a PUT request.
194 with self._done_lock:
195 return self._response
199 Return how many successes were reported.
201 with self._done_lock:
205 class KeepService(object):
206 # Make requests to a single Keep service, and track results.
207 HTTP_ERRORS = (httplib2.HttpLib2Error, httplib.HTTPException,
208 socket.error, ssl.SSLError)
210 def __init__(self, root, **headers):
212 self.last_result = None
213 self.success_flag = None
214 self.get_headers = {'Accept': 'application/octet-stream'}
215 self.get_headers.update(headers)
216 self.put_headers = headers
219 return self.success_flag is not False
222 return self.success_flag is not None
224 def last_status(self):
226 return int(self.last_result[0].status)
227 except (AttributeError, IndexError, ValueError):
230 def get(self, http, locator):
231 # http is an httplib2.Http object.
232 # locator is a KeepLocator object.
233 url = self.root + str(locator)
234 _logger.debug("Request: GET %s", url)
236 with timer.Timer() as t:
237 result = http.request(url.encode('utf-8'), 'GET',
238 headers=self.get_headers)
239 except self.HTTP_ERRORS as e:
240 _logger.debug("Request fail: GET %s => %s: %s",
241 url, type(e), str(e))
244 self.last_result = result
245 self.success_flag = retry.check_http_response_success(result)
247 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
248 self.last_status(), len(content), t.msecs,
249 (len(content)/(1024.0*1024))/t.secs)
250 if self.success_flag:
251 resp_md5 = hashlib.md5(content).hexdigest()
252 if resp_md5 == locator.md5sum:
254 _logger.warning("Checksum fail: md5(%s) = %s", url, md5)
257 def put(self, http, hash_s, body):
258 url = self.root + hash_s
259 _logger.debug("Request: PUT %s", url)
261 result = http.request(url.encode('utf-8'), 'PUT',
262 headers=self.put_headers, body=body)
263 except self.HTTP_ERRORS as e:
264 _logger.debug("Request fail: PUT %s => %s: %s",
265 url, type(e), str(e))
268 self.last_result = result
269 self.success_flag = retry.check_http_response_success(result)
270 return self.success_flag
273 class KeepWriterThread(threading.Thread):
275 Write a blob of data to the given Keep server. On success, call
276 save_response() of the given ThreadLimiter to save the returned
279 def __init__(self, keep_service, **kwargs):
280 super(KeepClient.KeepWriterThread, self).__init__()
281 self.service = keep_service
283 self._success = False
289 with self.args['thread_limiter'] as limiter:
290 if not limiter.shall_i_proceed():
291 # My turn arrived, but the job has been done without
294 self.run_with_limiter(limiter)
296 def run_with_limiter(self, limiter):
297 if self.service.finished():
299 _logger.debug("KeepWriterThread %s proceeding %s %s",
300 str(threading.current_thread()),
301 self.args['data_hash'],
302 self.args['service_root'])
303 h = httplib2.Http(timeout=self.args.get('timeout', None))
304 self._success = bool(self.service.put(
305 h, self.args['data_hash'], self.args['data']))
306 status = self.service.last_status()
308 resp, body = self.service.last_result
309 _logger.debug("KeepWriterThread %s succeeded %s %s",
310 str(threading.current_thread()),
311 self.args['data_hash'],
312 self.args['service_root'])
313 # Tick the 'done' counter for the number of replica
314 # reported stored by the server, for the case that
315 # we're talking to a proxy or other backend that
316 # stores to multiple copies for us.
318 replicas_stored = int(resp['x-keep-replicas-stored'])
319 except (KeyError, ValueError):
321 limiter.save_response(body.strip(), replicas_stored)
322 elif status is not None:
323 _logger.debug("Request fail: PUT %s => %s %s",
324 self.args['data_hash'], status,
325 self.service.last_result[1])
328 def __init__(self, api_client=None, proxy=None, timeout=300,
329 api_token=None, local_store=None):
330 """Initialize a new KeepClient.
333 * api_client: The API client to use to find Keep services. If not
334 provided, KeepClient will build one from available Arvados
336 * proxy: If specified, this KeepClient will send requests to this
337 Keep proxy. Otherwise, KeepClient will fall back to the setting
338 of the ARVADOS_KEEP_PROXY configuration setting. If you want to
339 ensure KeepClient does not use a proxy, pass in an empty string.
340 * timeout: The timeout for all HTTP requests, in seconds. Default
342 * api_token: If you're not using an API client, but only talking
343 directly to a Keep proxy, this parameter specifies an API token
344 to authenticate Keep requests. It is an error to specify both
345 api_client and api_token. If you specify neither, KeepClient
346 will use one available from the Arvados configuration.
347 * local_store: If specified, this KeepClient will bypass Keep
348 services, and save data to the named directory. If unspecified,
349 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
350 environment variable. If you want to ensure KeepClient does not
351 use local storage, pass in an empty string. This is primarily
352 intended to mock a server for testing.
354 self.lock = threading.Lock()
356 proxy = config.get('ARVADOS_KEEP_PROXY')
357 if api_token is None:
358 api_token = config.get('ARVADOS_API_TOKEN')
359 elif api_client is not None:
361 "can't build KeepClient with both API client and token")
362 if local_store is None:
363 local_store = os.environ.get('KEEP_LOCAL_STORE')
366 self.local_store = local_store
367 self.get = self.local_store_get
368 self.put = self.local_store_put
370 self.timeout = timeout
371 self.cache_max = 256 * 1024 * 1024 # Cache is 256MiB
373 self._cache_lock = threading.Lock()
375 if not proxy.endswith('/'):
377 self.api_token = api_token
378 self.service_roots = [proxy]
379 self.using_proxy = True
380 self.static_service_roots = True
382 # It's important to avoid instantiating an API client
383 # unless we actually need one, for testing's sake.
384 if api_client is None:
385 api_client = arvados.api('v1')
386 self.api_client = api_client
387 self.api_token = api_client.api_token
388 self.service_roots = None
389 self.using_proxy = None
390 self.static_service_roots = False
392 def build_service_roots(self, force_rebuild=False):
393 if (self.static_service_roots or
394 (self.service_roots and not force_rebuild)):
398 keep_services = self.api_client.keep_services().accessible()
399 except Exception: # API server predates Keep services.
400 keep_services = self.api_client.keep_disks().list()
402 keep_services = keep_services.execute().get('items')
403 if not keep_services:
404 raise arvados.errors.NoKeepServersError()
406 self.using_proxy = (keep_services[0].get('service_type') ==
409 roots = (("http%s://%s:%d/" %
410 ('s' if f['service_ssl_flag'] else '',
413 for f in keep_services)
414 self.service_roots = sorted(set(roots))
415 _logger.debug(str(self.service_roots))
417 def shuffled_service_roots(self, hash, force_rebuild=False):
418 self.build_service_roots(force_rebuild)
420 # Build an ordering with which to query the Keep servers based on the
421 # contents of the hash.
422 # "hash" is a hex-encoded number at least 8 digits
425 # seed used to calculate the next keep server from 'pool'
426 # to be added to 'pseq'
429 # Keep servers still to be added to the ordering
430 pool = self.service_roots[:]
432 # output probe sequence
435 # iterate while there are servers left to be assigned
438 # ran out of digits in the seed
439 if len(pseq) < len(hash) / 4:
440 # the number of servers added to the probe sequence is less
441 # than the number of 4-digit slices in 'hash' so refill the
442 # seed with the last 4 digits and then append the contents
444 seed = hash[-4:] + hash
446 # refill the seed with the contents of 'hash'
449 # Take the next 8 digits (32 bytes) and interpret as an integer,
450 # then modulus with the size of the remaining pool to get the next
452 probe = int(seed[0:8], 16) % len(pool)
454 # Append the selected server to the probe sequence and remove it
456 pseq += [pool[probe]]
457 pool = pool[:probe] + pool[probe+1:]
459 # Remove the digits just used from the seed
461 _logger.debug(str(pseq))
464 class CacheSlot(object):
465 def __init__(self, locator):
466 self.locator = locator
467 self.ready = threading.Event()
474 def set(self, value):
479 if self.content == None:
482 return len(self.content)
485 '''Cap the cache size to self.cache_max'''
486 self._cache_lock.acquire()
488 self._cache = filter(lambda c: not (c.ready.is_set() and c.content == None), self._cache)
489 sm = sum([slot.size() for slot in self._cache])
490 while sm > self.cache_max:
492 sm = sum([slot.size() for a in self._cache])
494 self._cache_lock.release()
496 def reserve_cache(self, locator):
497 '''Reserve a cache slot for the specified locator,
498 or return the existing slot.'''
499 self._cache_lock.acquire()
501 # Test if the locator is already in the cache
502 for i in xrange(0, len(self._cache)):
503 if self._cache[i].locator == locator:
506 # move it to the front
508 self._cache.insert(0, n)
511 # Add a new cache slot for the locator
512 n = KeepClient.CacheSlot(locator)
513 self._cache.insert(0, n)
516 self._cache_lock.release()
518 def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
519 # roots_map is a dictionary, mapping Keep service root strings
520 # to KeepService objects. Poll for Keep services, and add any
521 # new ones to roots_map. Return the current list of local
523 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
524 local_roots = self.shuffled_service_roots(md5_s, force_rebuild)
525 for root in local_roots:
526 if root not in roots_map:
527 roots_map[root] = self.KeepService(root, **headers)
531 def _check_loop_result(result):
532 # KeepClient RetryLoops should save results as a 2-tuple: the
533 # actual result of the request, and the number of servers available
534 # to receive the request this round.
535 # This method returns True if there's a real result, False if
536 # there are no more servers available, otherwise None.
537 if isinstance(result, Exception):
539 result, tried_server_count = result
540 if (result is not None) and (result is not False):
542 elif tried_server_count < 1:
543 _logger.info("No more Keep services to try; giving up")
548 def get(self, loc_s, num_retries=0):
549 """Get data from Keep.
551 This method fetches one or more blocks of data from Keep. It
552 sends a request each Keep service registered with the API
553 server (or the proxy provided when this client was
554 instantiated), then each service named in location hints, in
555 sequence. As soon as one service provides the data, it's
559 * loc_s: A string of one or more comma-separated locators to fetch.
560 This method returns the concatenation of these blocks.
561 * num_retries: The number of times to retry GET requests to
562 *each* Keep server if it returns temporary failures, with
563 exponential backoff. Note that, in each loop, the method may try
564 to fetch data from every available Keep service, along with any
565 that are named in location hints in the locator. Default 0.
568 return ''.join(self.get(x) for x in loc_s.split(','))
569 locator = KeepLocator(loc_s)
570 expect_hash = locator.md5sum
572 slot, first = self.reserve_cache(expect_hash)
577 # See #3147 for a discussion of the loop implementation. Highlights:
578 # * Refresh the list of Keep services after each failure, in case
579 # it's being updated.
580 # * Retry until we succeed, we're out of retries, or every available
581 # service has returned permanent failure.
582 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
583 for hint in locator.hints if hint.startswith('K@')]
584 # Map root URLs their KeepService objects.
585 roots_map = {root: self.KeepService(root) for root in hint_roots}
587 loop = retry.RetryLoop(num_retries, self._check_loop_result,
589 for tries_left in loop:
591 local_roots = self.map_new_services(
592 roots_map, expect_hash,
593 force_rebuild=(tries_left < num_retries))
594 except Exception as error:
595 loop.save_result(error)
598 # Query KeepService objects that haven't returned
599 # permanent failure, in our specified shuffle order.
600 services_to_try = [roots_map[root]
601 for root in (local_roots + hint_roots)
602 if roots_map[root].usable()]
603 http = httplib2.Http(timeout=self.timeout)
604 for keep_service in services_to_try:
605 blob = keep_service.get(http, locator)
608 loop.save_result((blob, len(services_to_try)))
610 # Always cache the result, then return it if we succeeded.
616 # No servers fulfilled the request. Count how many responded
617 # "not found;" if the ratio is high enough (currently 75%), report
618 # Not Found; otherwise a generic error.
619 # Q: Including 403 is necessary for the Keep tests to continue
620 # passing, but maybe they should expect KeepReadError instead?
621 not_founds = sum(1 for ks in roots_map.values()
622 if ks.last_status() in set([403, 404, 410]))
623 if roots_map and ((float(not_founds) / len(roots_map)) >= .75):
624 raise arvados.errors.NotFoundError(loc_s)
626 raise arvados.errors.KeepReadError(loc_s)
628 def put(self, data, copies=2, num_retries=0):
629 """Save data in Keep.
631 This method will get a list of Keep services from the API server, and
632 send the data to each one simultaneously in a new thread. Once the
633 uploads are finished, if enough copies are saved, this method returns
634 the most recent HTTP response body. If requests fail to upload
635 enough copies, this method raises KeepWriteError.
638 * data: The string of data to upload.
639 * copies: The number of copies that the user requires be saved.
641 * num_retries: The number of times to retry PUT requests to
642 *each* Keep server if it returns temporary failures, with
643 exponential backoff. Default 0.
645 data_hash = hashlib.md5(data).hexdigest()
651 # Tell the proxy how many copies we want it to store
652 headers['X-Keep-Desired-Replication'] = str(copies)
654 thread_limiter = KeepClient.ThreadLimiter(copies)
655 loop = retry.RetryLoop(num_retries, self._check_loop_result,
657 for tries_left in loop:
659 local_roots = self.map_new_services(
660 roots_map, data_hash,
661 force_rebuild=(tries_left < num_retries), **headers)
662 except Exception as error:
663 loop.save_result(error)
667 for service_root, ks in roots_map.iteritems():
670 t = KeepClient.KeepWriterThread(
674 service_root=service_root,
675 thread_limiter=thread_limiter,
676 timeout=self.timeout)
681 loop.save_result((thread_limiter.done() >= copies, len(threads)))
684 return thread_limiter.response()
685 raise arvados.errors.KeepWriteError(
686 "Write fail for %s: wanted %d but wrote %d" %
687 (data_hash, copies, thread_limiter.done()))
689 def local_store_put(self, data):
690 md5 = hashlib.md5(data).hexdigest()
691 locator = '%s+%d' % (md5, len(data))
692 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
694 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
695 os.path.join(self.local_store, md5))
698 def local_store_get(self, loc_s):
700 locator = KeepLocator(loc_s)
702 raise arvados.errors.NotFoundError(
703 "Invalid data locator: '%s'" % loc_s)
704 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
706 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f: