24 _logger = logging.getLogger('arvados.keep')
25 global_client_object = None
28 import arvados.config as config
32 class KeepLocator(object):
33 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
34 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
36 def __init__(self, locator_str):
39 self._perm_expiry = None
40 pieces = iter(locator_str.split('+'))
41 self.md5sum = next(pieces)
43 self.size = int(next(pieces))
47 if self.HINT_RE.match(hint) is None:
48 raise ValueError("unrecognized hint data {}".format(hint))
49 elif hint.startswith('A'):
50 self.parse_permission_hint(hint)
52 self.hints.append(hint)
56 str(s) for s in [self.md5sum, self.size,
57 self.permission_hint()] + self.hints
60 def _make_hex_prop(name, length):
61 # Build and return a new property with the given name that
62 # must be a hex string of the given length.
63 data_name = '_{}'.format(name)
65 return getattr(self, data_name)
66 def setter(self, hex_str):
67 if not arvados.util.is_hex(hex_str, length):
68 raise ValueError("{} must be a {}-digit hex string: {}".
69 format(name, length, hex_str))
70 setattr(self, data_name, hex_str)
71 return property(getter, setter)
73 md5sum = _make_hex_prop('md5sum', 32)
74 perm_sig = _make_hex_prop('perm_sig', 40)
77 def perm_expiry(self):
78 return self._perm_expiry
81 def perm_expiry(self, value):
82 if not arvados.util.is_hex(value, 1, 8):
84 "permission timestamp must be a hex Unix timestamp: {}".
86 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
88 def permission_hint(self):
89 data = [self.perm_sig, self.perm_expiry]
92 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
93 return "A{}@{:08x}".format(*data)
95 def parse_permission_hint(self, s):
97 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
99 raise ValueError("bad permission hint {}".format(s))
101 def permission_expired(self, as_of_dt=None):
102 if self.perm_expiry is None:
104 elif as_of_dt is None:
105 as_of_dt = datetime.datetime.now()
106 return self.perm_expiry <= as_of_dt
110 """Simple interface to a global KeepClient object.
112 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
113 own API client. The global KeepClient will build an API client from the
114 current Arvados configuration, which may not match the one you built.
119 def global_client_object(cls):
120 global global_client_object
121 # Previously, KeepClient would change its behavior at runtime based
122 # on these configuration settings. We simulate that behavior here
123 # by checking the values and returning a new KeepClient if any of
125 key = (config.get('ARVADOS_API_HOST'),
126 config.get('ARVADOS_API_TOKEN'),
127 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
128 config.get('ARVADOS_KEEP_PROXY'),
129 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
130 os.environ.get('KEEP_LOCAL_STORE'))
131 if (global_client_object is None) or (cls._last_key != key):
132 global_client_object = KeepClient()
134 return global_client_object
137 def get(locator, **kwargs):
138 return Keep.global_client_object().get(locator, **kwargs)
141 def put(data, **kwargs):
142 return Keep.global_client_object().put(data, **kwargs)
145 class KeepClient(object):
146 class ThreadLimiter(object):
148 Limit the number of threads running at a given time to
149 {desired successes} minus {successes reported}. When successes
150 reported == desired, wake up the remaining threads and tell
153 Should be used in a "with" block.
155 def __init__(self, todo):
158 self._response = None
159 self._todo_lock = threading.Semaphore(todo)
160 self._done_lock = threading.Lock()
163 self._todo_lock.acquire()
166 def __exit__(self, type, value, traceback):
167 self._todo_lock.release()
169 def shall_i_proceed(self):
171 Return true if the current thread should do stuff. Return
172 false if the current thread should just stop.
174 with self._done_lock:
175 return (self._done < self._todo)
177 def save_response(self, response_body, replicas_stored):
179 Records a response body (a locator, possibly signed) returned by
180 the Keep server. It is not necessary to save more than
181 one response, since we presume that any locator returned
182 in response to a successful request is valid.
184 with self._done_lock:
185 self._done += replicas_stored
186 self._response = response_body
190 Returns the body from the response to a PUT request.
192 with self._done_lock:
193 return self._response
197 Return how many successes were reported.
199 with self._done_lock:
203 class KeepWriterThread(threading.Thread):
205 Write a blob of data to the given Keep server. On success, call
206 save_response() of the given ThreadLimiter to save the returned
209 def __init__(self, api_token, **kwargs):
210 super(KeepClient.KeepWriterThread, self).__init__()
211 self._api_token = api_token
213 self._success = False
219 with self.args['thread_limiter'] as limiter:
220 if not limiter.shall_i_proceed():
221 # My turn arrived, but the job has been done without
224 self.run_with_limiter(limiter)
226 def run_with_limiter(self, limiter):
227 _logger.debug("KeepWriterThread %s proceeding %s %s",
228 str(threading.current_thread()),
229 self.args['data_hash'],
230 self.args['service_root'])
231 h = httplib2.Http(timeout=self.args.get('timeout', None))
232 url = self.args['service_root'] + self.args['data_hash']
233 headers = {'Authorization': "OAuth2 %s" % (self._api_token,)}
235 if self.args['using_proxy']:
236 # We're using a proxy, so tell the proxy how many copies we
238 headers['X-Keep-Desired-Replication'] = str(self.args['want_copies'])
241 _logger.debug("Uploading to {}".format(url))
242 resp, content = h.request(url.encode('utf-8'), 'PUT',
244 body=self.args['data'])
245 if re.match(r'^2\d\d$', resp['status']):
247 _logger.debug("KeepWriterThread %s succeeded %s %s",
248 str(threading.current_thread()),
249 self.args['data_hash'],
250 self.args['service_root'])
252 if 'x-keep-replicas-stored' in resp:
253 # Tick the 'done' counter for the number of replica
254 # reported stored by the server, for the case that
255 # we're talking to a proxy or other backend that
256 # stores to multiple copies for us.
258 replicas_stored = int(resp['x-keep-replicas-stored'])
261 limiter.save_response(content.strip(), replicas_stored)
263 _logger.debug("Request fail: PUT %s => %s %s",
264 url, resp['status'], content)
265 except (httplib2.HttpLib2Error,
266 httplib.HTTPException,
268 # When using https, timeouts look like ssl.SSLError from here.
269 # "SSLError: The write operation timed out"
270 _logger.debug("Request fail: PUT %s => %s: %s",
271 url, type(e), str(e))
274 def __init__(self, api_client=None, proxy=None, timeout=60,
275 api_token=None, local_store=None):
276 """Initialize a new KeepClient.
279 * api_client: The API client to use to find Keep services. If not
280 provided, KeepClient will build one from available Arvados
282 * proxy: If specified, this KeepClient will send requests to this
283 Keep proxy. Otherwise, KeepClient will fall back to the setting
284 of the ARVADOS_KEEP_PROXY configuration setting. If you want to
285 ensure KeepClient does not use a proxy, pass in an empty string.
286 * timeout: The timeout for all HTTP requests, in seconds. Default
288 * api_token: If you're not using an API client, but only talking
289 directly to a Keep proxy, this parameter specifies an API token
290 to authenticate Keep requests. It is an error to specify both
291 api_client and api_token. If you specify neither, KeepClient
292 will use one available from the Arvados configuration.
293 * local_store: If specified, this KeepClient will bypass Keep
294 services, and save data to the named directory. If unspecified,
295 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
296 environment variable. If you want to ensure KeepClient does not
297 use local storage, pass in an empty string. This is primarily
298 intended to mock a server for testing.
300 self.lock = threading.Lock()
302 proxy = config.get('ARVADOS_KEEP_PROXY')
303 if api_token is None:
304 api_token = config.get('ARVADOS_API_TOKEN')
305 elif api_client is not None:
307 "can't build KeepClient with both API client and token")
308 if local_store is None:
309 local_store = os.environ.get('KEEP_LOCAL_STORE')
312 self.local_store = local_store
313 self.get = self.local_store_get
314 self.put = self.local_store_put
316 self.timeout = timeout
317 self.cache_max = 256 * 1024 * 1024 # Cache is 256MiB
319 self._cache_lock = threading.Lock()
321 if not proxy.endswith('/'):
323 self.api_token = api_token
324 self.service_roots = [proxy]
325 self.using_proxy = True
327 # It's important to avoid instantiating an API client
328 # unless we actually need one, for testing's sake.
329 if api_client is None:
330 api_client = arvados.api('v1')
331 self.api_client = api_client
332 self.api_token = api_client.api_token
333 self.service_roots = None
334 self.using_proxy = None
336 def shuffled_service_roots(self, hash):
337 if self.service_roots is None:
340 keep_services = self.api_client.keep_services().accessible()
341 except Exception: # API server predates Keep services.
342 keep_services = self.api_client.keep_disks().list()
344 keep_services = keep_services.execute().get('items')
345 if not keep_services:
346 raise arvados.errors.NoKeepServersError()
348 self.using_proxy = (keep_services[0].get('service_type') ==
351 roots = (("http%s://%s:%d/" %
352 ('s' if f['service_ssl_flag'] else '',
355 for f in keep_services)
356 self.service_roots = sorted(set(roots))
357 _logger.debug(str(self.service_roots))
359 # Build an ordering with which to query the Keep servers based on the
360 # contents of the hash.
361 # "hash" is a hex-encoded number at least 8 digits
364 # seed used to calculate the next keep server from 'pool'
365 # to be added to 'pseq'
368 # Keep servers still to be added to the ordering
369 pool = self.service_roots[:]
371 # output probe sequence
374 # iterate while there are servers left to be assigned
377 # ran out of digits in the seed
378 if len(pseq) < len(hash) / 4:
379 # the number of servers added to the probe sequence is less
380 # than the number of 4-digit slices in 'hash' so refill the
381 # seed with the last 4 digits and then append the contents
383 seed = hash[-4:] + hash
385 # refill the seed with the contents of 'hash'
388 # Take the next 8 digits (32 bytes) and interpret as an integer,
389 # then modulus with the size of the remaining pool to get the next
391 probe = int(seed[0:8], 16) % len(pool)
393 # Append the selected server to the probe sequence and remove it
395 pseq += [pool[probe]]
396 pool = pool[:probe] + pool[probe+1:]
398 # Remove the digits just used from the seed
400 _logger.debug(str(pseq))
403 class CacheSlot(object):
404 def __init__(self, locator):
405 self.locator = locator
406 self.ready = threading.Event()
413 def set(self, value):
418 if self.content == None:
421 return len(self.content)
424 '''Cap the cache size to self.cache_max'''
425 self._cache_lock.acquire()
427 self._cache = filter(lambda c: not (c.ready.is_set() and c.content == None), self._cache)
428 sm = sum([slot.size() for slot in self._cache])
429 while sm > self.cache_max:
431 sm = sum([slot.size() for a in self._cache])
433 self._cache_lock.release()
435 def reserve_cache(self, locator):
436 '''Reserve a cache slot for the specified locator,
437 or return the existing slot.'''
438 self._cache_lock.acquire()
440 # Test if the locator is already in the cache
441 for i in xrange(0, len(self._cache)):
442 if self._cache[i].locator == locator:
445 # move it to the front
447 self._cache.insert(0, n)
450 # Add a new cache slot for the locator
451 n = KeepClient.CacheSlot(locator)
452 self._cache.insert(0, n)
455 self._cache_lock.release()
457 def get(self, loc_s):
459 return ''.join(self.get(x) for x in loc_s.split(','))
460 locator = KeepLocator(loc_s)
461 expect_hash = locator.md5sum
463 slot, first = self.reserve_cache(expect_hash)
470 for service_root in self.shuffled_service_roots(expect_hash):
471 url = service_root + loc_s
472 headers = {'Authorization': "OAuth2 %s" % (self.api_token,),
473 'Accept': 'application/octet-stream'}
474 blob = self.get_url(url, headers, expect_hash)
480 for hint in locator.hints:
481 if not hint.startswith('K@'):
483 url = 'http://keep.' + hint[2:] + '.arvadosapi.com/' + loc_s
484 blob = self.get_url(url, {}, expect_hash)
496 raise arvados.errors.NotFoundError("Block not found: %s" % expect_hash)
498 def get_url(self, url, headers, expect_hash):
501 _logger.debug("Request: GET %s", url)
502 with timer.Timer() as t:
503 resp, content = h.request(url.encode('utf-8'), 'GET',
505 _logger.info("Received %s bytes in %s msec (%s MiB/sec)",
506 len(content), t.msecs,
507 (len(content)/(1024*1024))/t.secs)
508 if re.match(r'^2\d\d$', resp['status']):
509 md5 = hashlib.md5(content).hexdigest()
510 if md5 == expect_hash:
512 _logger.warning("Checksum fail: md5(%s) = %s", url, md5)
513 except Exception as e:
514 _logger.debug("Request fail: GET %s => %s: %s",
515 url, type(e), str(e))
518 def put(self, data, copies=2):
519 data_hash = hashlib.md5(data).hexdigest()
522 if not (want_copies > 0):
525 thread_limiter = KeepClient.ThreadLimiter(want_copies)
526 for service_root in self.shuffled_service_roots(data_hash):
527 t = KeepClient.KeepWriterThread(
531 service_root=service_root,
532 thread_limiter=thread_limiter,
533 timeout=self.timeout,
534 using_proxy=self.using_proxy,
535 want_copies=(want_copies if self.using_proxy else 1))
540 if thread_limiter.done() < want_copies:
541 # Retry the threads (i.e., services) that failed the first
546 _logger.debug("Retrying: PUT %s %s",
547 t.args['service_root'],
549 retry_with_args = t.args.copy()
550 t_retry = KeepClient.KeepWriterThread(self.api_token,
553 threads_retry += [t_retry]
554 for t in threads_retry:
556 have_copies = thread_limiter.done()
557 # If we're done, return the response from Keep
558 if have_copies >= want_copies:
559 return thread_limiter.response()
560 raise arvados.errors.KeepWriteError(
561 "Write fail for %s: wanted %d but wrote %d" %
562 (data_hash, want_copies, have_copies))
565 def local_store_put(self, data):
566 md5 = hashlib.md5(data).hexdigest()
567 locator = '%s+%d' % (md5, len(data))
568 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
570 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
571 os.path.join(self.local_store, md5))
574 def local_store_get(self, loc_s):
576 locator = KeepLocator(loc_s)
578 raise arvados.errors.NotFoundError(
579 "Invalid data locator: '%s'" % loc_s)
580 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
582 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f: