23 global_client_object = None
30 class KeepLocator(object):
31 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
33 def __init__(self, locator_str):
37 self._perm_expiry = None
38 pieces = iter(locator_str.split('+'))
39 self.md5sum = next(pieces)
41 if hint.startswith('A'):
42 self.parse_permission_hint(hint)
43 elif hint.startswith('K'):
44 self.loc_hint = hint # FIXME
48 raise ValueError("unrecognized hint data {}".format(hint))
52 str(s) for s in [self.md5sum, self.size, self.loc_hint,
53 self.permission_hint()]
56 def _make_hex_prop(name, length):
57 # Build and return a new property with the given name that
58 # must be a hex string of the given length.
59 data_name = '_{}'.format(name)
61 return getattr(self, data_name)
62 def setter(self, hex_str):
63 if not arvados.util.is_hex(hex_str, length):
64 raise ValueError("{} must be a {}-digit hex string: {}".
65 format(name, length, hex_str))
66 setattr(self, data_name, hex_str)
67 return property(getter, setter)
69 md5sum = _make_hex_prop('md5sum', 32)
70 perm_sig = _make_hex_prop('perm_sig', 40)
73 def perm_expiry(self):
74 return self._perm_expiry
77 def perm_expiry(self, value):
78 if not arvados.util.is_hex(value, 1, 8):
80 "permission timestamp must be a hex Unix timestamp: {}".
82 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
84 def permission_hint(self):
85 data = [self.perm_sig, self.perm_expiry]
88 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
89 return "A{}@{:08x}".format(*data)
91 def parse_permission_hint(self, s):
93 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
95 raise ValueError("bad permission hint {}".format(s))
97 def permission_expired(self, as_of_dt=None):
98 if self.perm_expiry is None:
100 elif as_of_dt is None:
101 as_of_dt = datetime.datetime.now()
102 return self.perm_expiry <= as_of_dt
107 def global_client_object():
108 global global_client_object
109 if global_client_object == None:
110 global_client_object = KeepClient()
111 return global_client_object
114 def get(locator, **kwargs):
115 return Keep.global_client_object().get(locator, **kwargs)
118 def put(data, **kwargs):
119 return Keep.global_client_object().put(data, **kwargs)
121 class KeepClient(object):
123 class ThreadLimiter(object):
125 Limit the number of threads running at a given time to
126 {desired successes} minus {successes reported}. When successes
127 reported == desired, wake up the remaining threads and tell
130 Should be used in a "with" block.
132 def __init__(self, todo):
135 self._response = None
136 self._todo_lock = threading.Semaphore(todo)
137 self._done_lock = threading.Lock()
140 self._todo_lock.acquire()
143 def __exit__(self, type, value, traceback):
144 self._todo_lock.release()
146 def shall_i_proceed(self):
148 Return true if the current thread should do stuff. Return
149 false if the current thread should just stop.
151 with self._done_lock:
152 return (self._done < self._todo)
154 def save_response(self, response_body, replicas_stored):
156 Records a response body (a locator, possibly signed) returned by
157 the Keep server. It is not necessary to save more than
158 one response, since we presume that any locator returned
159 in response to a successful request is valid.
161 with self._done_lock:
162 self._done += replicas_stored
163 self._response = response_body
167 Returns the body from the response to a PUT request.
169 with self._done_lock:
170 return self._response
174 Return how many successes were reported.
176 with self._done_lock:
179 class KeepWriterThread(threading.Thread):
181 Write a blob of data to the given Keep server. On success, call
182 save_response() of the given ThreadLimiter to save the returned
185 def __init__(self, **kwargs):
186 super(KeepClient.KeepWriterThread, self).__init__()
188 self._success = False
194 with self.args['thread_limiter'] as limiter:
195 if not limiter.shall_i_proceed():
196 # My turn arrived, but the job has been done without
199 logging.debug("KeepWriterThread %s proceeding %s %s" %
200 (str(threading.current_thread()),
201 self.args['data_hash'],
202 self.args['service_root']))
203 h = httplib2.Http(timeout=60)
204 url = self.args['service_root'] + self.args['data_hash']
205 api_token = config.get('ARVADOS_API_TOKEN')
206 headers = {'Authorization': "OAuth2 %s" % api_token}
208 if self.args['using_proxy']:
209 # We're using a proxy, so tell the proxy how many copies we
211 headers['X-Keep-Desired-Replication'] = str(self.args['want_copies'])
214 logging.debug("Uploading to {}".format(url))
215 resp, content = h.request(url.encode('utf-8'), 'PUT',
217 body=self.args['data'])
218 if (resp['status'] == '401' and
219 re.match(r'Timestamp verification failed', content)):
220 body = KeepClient.sign_for_old_server(
221 self.args['data_hash'],
224 resp, content = h.request(url.encode('utf-8'), 'PUT',
227 if re.match(r'^2\d\d$', resp['status']):
229 logging.debug("KeepWriterThread %s succeeded %s %s" %
230 (str(threading.current_thread()),
231 self.args['data_hash'],
232 self.args['service_root']))
234 if 'x-keep-replicas-stored' in resp:
235 # Tick the 'done' counter for the number of replica
236 # reported stored by the server, for the case that
237 # we're talking to a proxy or other backend that
238 # stores to multiple copies for us.
240 replicas_stored = int(resp['x-keep-replicas-stored'])
243 return limiter.save_response(content.strip(), replicas_stored)
245 logging.warning("Request fail: PUT %s => %s %s" %
246 (url, resp['status'], content))
247 except (httplib2.HttpLib2Error, httplib.HTTPException) as e:
248 logging.warning("Request fail: PUT %s => %s: %s" %
249 (url, type(e), str(e)))
251 def __init__(self, **kwargs):
252 self.lock = threading.Lock()
253 self.service_roots = None
254 self._cache_lock = threading.Lock()
256 # default 256 megabyte cache
257 self.cache_max = 256 * 1024 * 1024
258 self.using_proxy = False
259 self.timeout = kwargs.get('timeout', 60)
261 def shuffled_service_roots(self, hash):
262 if self.service_roots == None:
265 # Override normal keep disk lookup with an explict proxy
267 keep_proxy_env = config.get("ARVADOS_KEEP_PROXY")
268 if keep_proxy_env != None and len(keep_proxy_env) > 0:
270 if keep_proxy_env[-1:] != '/':
271 keep_proxy_env += "/"
272 self.service_roots = [keep_proxy_env]
273 self.using_proxy = True
277 keep_services = arvados.api().keep_services().accessible().execute()['items']
279 keep_services = arvados.api().keep_disks().list().execute()['items']
281 if len(keep_services) == 0:
282 raise arvados.errors.NoKeepServersError()
284 if 'service_type' in keep_services[0] and keep_services[0]['service_type'] == 'proxy':
285 self.using_proxy = True
287 roots = (("http%s://%s:%d/" %
288 ('s' if f['service_ssl_flag'] else '',
291 for f in keep_services)
292 self.service_roots = sorted(set(roots))
293 logging.debug(str(self.service_roots))
297 # Build an ordering with which to query the Keep servers based on the
298 # contents of the hash.
299 # "hash" is a hex-encoded number at least 8 digits
302 # seed used to calculate the next keep server from 'pool'
303 # to be added to 'pseq'
306 # Keep servers still to be added to the ordering
307 pool = self.service_roots[:]
309 # output probe sequence
312 # iterate while there are servers left to be assigned
315 # ran out of digits in the seed
316 if len(pseq) < len(hash) / 4:
317 # the number of servers added to the probe sequence is less
318 # than the number of 4-digit slices in 'hash' so refill the
319 # seed with the last 4 digits and then append the contents
321 seed = hash[-4:] + hash
323 # refill the seed with the contents of 'hash'
326 # Take the next 8 digits (32 bytes) and interpret as an integer,
327 # then modulus with the size of the remaining pool to get the next
329 probe = int(seed[0:8], 16) % len(pool)
331 # Append the selected server to the probe sequence and remove it
333 pseq += [pool[probe]]
334 pool = pool[:probe] + pool[probe+1:]
336 # Remove the digits just used from the seed
338 logging.debug(str(pseq))
341 class CacheSlot(object):
342 def __init__(self, locator):
343 self.locator = locator
344 self.ready = threading.Event()
351 def set(self, value):
356 if self.content == None:
359 return len(self.content)
362 '''Cap the cache size to self.cache_max'''
363 self._cache_lock.acquire()
365 self._cache = filter(lambda c: not (c.ready.is_set() and c.content == None), self._cache)
366 sm = sum([slot.size() for slot in self._cache])
367 while sm > self.cache_max:
369 sm = sum([slot.size() for a in self._cache])
371 self._cache_lock.release()
373 def reserve_cache(self, locator):
374 '''Reserve a cache slot for the specified locator,
375 or return the existing slot.'''
376 self._cache_lock.acquire()
378 # Test if the locator is already in the cache
379 for i in xrange(0, len(self._cache)):
380 if self._cache[i].locator == locator:
383 # move it to the front
385 self._cache.insert(0, n)
388 # Add a new cache slot for the locator
389 n = KeepClient.CacheSlot(locator)
390 self._cache.insert(0, n)
393 self._cache_lock.release()
395 def get(self, locator):
396 #logging.debug("Keep.get %s" % (locator))
398 if re.search(r',', locator):
399 return ''.join(self.get(x) for x in locator.split(','))
400 if 'KEEP_LOCAL_STORE' in os.environ:
401 return KeepClient.local_store_get(locator)
402 expect_hash = re.sub(r'\+.*', '', locator)
404 slot, first = self.reserve_cache(expect_hash)
405 #logging.debug("%s %s %s" % (slot, first, expect_hash))
412 for service_root in self.shuffled_service_roots(expect_hash):
413 url = service_root + locator
414 api_token = config.get('ARVADOS_API_TOKEN')
415 headers = {'Authorization': "OAuth2 %s" % api_token,
416 'Accept': 'application/octet-stream'}
417 blob = self.get_url(url, headers, expect_hash)
423 for location_hint in re.finditer(r'\+K@([a-z0-9]+)', locator):
424 instance = location_hint.group(1)
425 url = 'http://keep.' + instance + '.arvadosapi.com/' + locator
426 blob = self.get_url(url, {}, expect_hash)
438 raise arvados.errors.NotFoundError("Block not found: %s" % expect_hash)
440 def get_url(self, url, headers, expect_hash):
443 logging.info("Request: GET %s" % (url))
444 with timer.Timer() as t:
445 resp, content = h.request(url.encode('utf-8'), 'GET',
447 logging.info("Received %s bytes in %s msec (%s MiB/sec)" % (len(content),
449 (len(content)/(1024*1024))/t.secs))
450 if re.match(r'^2\d\d$', resp['status']):
451 m = hashlib.new('md5')
454 if md5 == expect_hash:
456 logging.warning("Checksum fail: md5(%s) = %s" % (url, md5))
457 except Exception as e:
458 logging.info("Request fail: GET %s => %s: %s" %
459 (url, type(e), str(e)))
462 def put(self, data, **kwargs):
463 if 'KEEP_LOCAL_STORE' in os.environ:
464 return KeepClient.local_store_put(data)
465 m = hashlib.new('md5')
467 data_hash = m.hexdigest()
469 want_copies = kwargs.get('copies', 2)
470 if not (want_copies > 0):
473 thread_limiter = KeepClient.ThreadLimiter(want_copies)
474 for service_root in self.shuffled_service_roots(data_hash):
475 t = KeepClient.KeepWriterThread(
478 service_root=service_root,
479 thread_limiter=thread_limiter,
480 timeout=self.timeout,
481 using_proxy=self.using_proxy,
482 want_copies=(want_copies if self.using_proxy else 1))
487 if thread_limiter.done() < want_copies:
488 # Retry the threads (i.e., services) that failed the first
493 logging.warning("Retrying: PUT %s %s" % (
494 t.args['service_root'],
495 t.args['data_hash']))
496 retry_with_args = t.args.copy()
497 t_retry = KeepClient.KeepWriterThread(**retry_with_args)
499 threads_retry += [t_retry]
500 for t in threads_retry:
502 have_copies = thread_limiter.done()
503 # If we're done, return the response from Keep
504 if have_copies >= want_copies:
505 return thread_limiter.response()
506 raise arvados.errors.KeepWriteError(
507 "Write fail for %s: wanted %d but wrote %d" %
508 (data_hash, want_copies, have_copies))
511 def sign_for_old_server(data_hash, data):
512 return (("-----BEGIN PGP SIGNED MESSAGE-----\n\n\n%d %s\n-----BEGIN PGP SIGNATURE-----\n\n-----END PGP SIGNATURE-----\n" % (int(time.time()), data_hash)) + data)
516 def local_store_put(data):
517 m = hashlib.new('md5')
520 locator = '%s+%d' % (md5, len(data))
521 with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'), 'w') as f:
523 os.rename(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'),
524 os.path.join(os.environ['KEEP_LOCAL_STORE'], md5))
528 def local_store_get(locator):
529 r = re.search('^([0-9a-f]{32,})', locator)
531 raise arvados.errors.NotFoundError(
532 "Invalid data locator: '%s'" % locator)
533 if r.group(0) == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
535 with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], r.group(0)), 'r') as f: