22 global_client_object = None
30 def global_client_object():
31 global global_client_object
32 if global_client_object == None:
33 global_client_object = KeepClient()
34 return global_client_object
37 def get(locator, **kwargs):
38 return Keep.global_client_object().get(locator, **kwargs)
41 def put(data, **kwargs):
42 return Keep.global_client_object().put(data, **kwargs)
44 class KeepClient(object):
46 class ThreadLimiter(object):
48 Limit the number of threads running at a given time to
49 {desired successes} minus {successes reported}. When successes
50 reported == desired, wake up the remaining threads and tell
53 Should be used in a "with" block.
55 def __init__(self, todo):
59 self._todo_lock = threading.Semaphore(todo)
60 self._done_lock = threading.Lock()
63 self._todo_lock.acquire()
66 def __exit__(self, type, value, traceback):
67 self._todo_lock.release()
69 def shall_i_proceed(self):
71 Return true if the current thread should do stuff. Return
72 false if the current thread should just stop.
75 return (self._done < self._todo)
77 def save_response(self, response_body, replicas_stored):
79 Records a response body (a locator, possibly signed) returned by
80 the Keep server. It is not necessary to save more than
81 one response, since we presume that any locator returned
82 in response to a successful request is valid.
85 self._done += replicas_stored
86 self._response = response_body
90 Returns the body from the response to a PUT request.
97 Return how many successes were reported.
102 class KeepWriterThread(threading.Thread):
104 Write a blob of data to the given Keep server. On success, call
105 save_response() of the given ThreadLimiter to save the returned
108 def __init__(self, **kwargs):
109 super(KeepClient.KeepWriterThread, self).__init__()
113 with self.args['thread_limiter'] as limiter:
114 if not limiter.shall_i_proceed():
115 # My turn arrived, but the job has been done without
118 logging.debug("KeepWriterThread %s proceeding %s %s" %
119 (str(threading.current_thread()),
120 self.args['data_hash'],
121 self.args['service_root']))
123 url = self.args['service_root'] + self.args['data_hash']
124 api_token = config.get('ARVADOS_API_TOKEN')
125 headers = {'Authorization': "OAuth2 %s" % api_token}
127 if self.args['using_proxy']:
128 # We're using a proxy, so tell the proxy how many copies we
130 headers['X-Keep-Desired-Replication'] = str(self.args['want_copies'])
133 logging.debug("Uploading to {}".format(url))
134 resp, content = h.request(url.encode('utf-8'), 'PUT',
136 body=self.args['data'])
137 if (resp['status'] == '401' and
138 re.match(r'Timestamp verification failed', content)):
139 body = KeepClient.sign_for_old_server(
140 self.args['data_hash'],
143 resp, content = h.request(url.encode('utf-8'), 'PUT',
146 if re.match(r'^2\d\d$', resp['status']):
147 logging.debug("KeepWriterThread %s succeeded %s %s" %
148 (str(threading.current_thread()),
149 self.args['data_hash'],
150 self.args['service_root']))
152 if 'x-keep-replicas-stored' in resp:
153 # Tick the 'done' counter for the number of replica
154 # reported stored by the server, for the case that
155 # we're talking to a proxy or other backend that
156 # stores to multiple copies for us.
158 replicas_stored = int(resp['x-keep-replicas-stored'])
161 return limiter.save_response(content.strip(), replicas_stored)
163 logging.warning("Request fail: PUT %s => %s %s" %
164 (url, resp['status'], content))
165 except (httplib2.HttpLib2Error, httplib.HTTPException) as e:
166 logging.warning("Request fail: PUT %s => %s: %s" %
167 (url, type(e), str(e)))
170 self.lock = threading.Lock()
171 self.service_roots = None
172 self._cache_lock = threading.Lock()
174 # default 256 megabyte cache
175 self.cache_max = 256 * 1024 * 1024
176 self.using_proxy = False
178 def shuffled_service_roots(self, hash):
179 if self.service_roots == None:
182 # Override normal keep disk lookup with an explict proxy
184 keep_proxy_env = config.get("ARVADOS_KEEP_PROXY")
185 if keep_proxy_env != None and len(keep_proxy_env) > 0:
187 if keep_proxy_env[-1:] != '/':
188 keep_proxy_env += "/"
189 self.service_roots = [keep_proxy_env]
190 self.using_proxy = True
194 keep_services = arvados.api().keep_services().accessible().execute()['items']
196 keep_services = arvados.api().keep_disks().list().execute()['items']
198 if len(keep_services) == 0:
199 raise arvados.errors.NoKeepServersError()
201 if 'service_type' in keep_services[0] and keep_services[0]['service_type'] == 'proxy':
202 self.using_proxy = True
204 roots = (("http%s://%s:%d/" %
205 ('s' if f['service_ssl_flag'] else '',
208 for f in keep_services)
209 self.service_roots = sorted(set(roots))
210 logging.debug(str(self.service_roots))
214 # Build an ordering with which to query the Keep servers based on the
215 # contents of the hash.
216 # "hash" is a hex-encoded number at least 8 digits
219 # seed used to calculate the next keep server from 'pool'
220 # to be added to 'pseq'
223 # Keep servers still to be added to the ordering
224 pool = self.service_roots[:]
226 # output probe sequence
229 # iterate while there are servers left to be assigned
232 # ran out of digits in the seed
233 if len(pseq) < len(hash) / 4:
234 # the number of servers added to the probe sequence is less
235 # than the number of 4-digit slices in 'hash' so refill the
236 # seed with the last 4 digits and then append the contents
238 seed = hash[-4:] + hash
240 # refill the seed with the contents of 'hash'
243 # Take the next 8 digits (32 bytes) and interpret as an integer,
244 # then modulus with the size of the remaining pool to get the next
246 probe = int(seed[0:8], 16) % len(pool)
248 # Append the selected server to the probe sequence and remove it
250 pseq += [pool[probe]]
251 pool = pool[:probe] + pool[probe+1:]
253 # Remove the digits just used from the seed
255 logging.debug(str(pseq))
258 class CacheSlot(object):
259 def __init__(self, locator):
260 self.locator = locator
261 self.ready = threading.Event()
268 def set(self, value):
273 if self.content == None:
276 return len(self.content)
279 '''Cap the cache size to self.cache_max'''
280 self._cache_lock.acquire()
282 self._cache = filter(lambda c: not (c.ready.is_set() and c.content == None), self._cache)
283 sm = sum([slot.size() for slot in self._cache])
284 while sm > self.cache_max:
286 sm = sum([slot.size() for a in self._cache])
288 self._cache_lock.release()
290 def reserve_cache(self, locator):
291 '''Reserve a cache slot for the specified locator,
292 or return the existing slot.'''
293 self._cache_lock.acquire()
295 # Test if the locator is already in the cache
296 for i in xrange(0, len(self._cache)):
297 if self._cache[i].locator == locator:
300 # move it to the front
302 self._cache.insert(0, n)
305 # Add a new cache slot for the locator
306 n = KeepClient.CacheSlot(locator)
307 self._cache.insert(0, n)
310 self._cache_lock.release()
312 def get(self, locator):
313 #logging.debug("Keep.get %s" % (locator))
315 if re.search(r',', locator):
316 return ''.join(self.get(x) for x in locator.split(','))
317 if 'KEEP_LOCAL_STORE' in os.environ:
318 return KeepClient.local_store_get(locator)
319 expect_hash = re.sub(r'\+.*', '', locator)
321 slot, first = self.reserve_cache(expect_hash)
322 #logging.debug("%s %s %s" % (slot, first, expect_hash))
329 for service_root in self.shuffled_service_roots(expect_hash):
330 url = service_root + locator
331 api_token = config.get('ARVADOS_API_TOKEN')
332 headers = {'Authorization': "OAuth2 %s" % api_token,
333 'Accept': 'application/octet-stream'}
334 blob = self.get_url(url, headers, expect_hash)
340 for location_hint in re.finditer(r'\+K@([a-z0-9]+)', locator):
341 instance = location_hint.group(1)
342 url = 'http://keep.' + instance + '.arvadosapi.com/' + locator
343 blob = self.get_url(url, {}, expect_hash)
355 raise arvados.errors.NotFoundError("Block not found: %s" % expect_hash)
357 def get_url(self, url, headers, expect_hash):
360 logging.info("Request: GET %s" % (url))
361 with timer.Timer() as t:
362 resp, content = h.request(url.encode('utf-8'), 'GET',
364 logging.info("Received %s bytes in %s msec (%s MiB/sec)" % (len(content),
366 (len(content)/(1024*1024))/t.secs))
367 if re.match(r'^2\d\d$', resp['status']):
368 m = hashlib.new('md5')
371 if md5 == expect_hash:
373 logging.warning("Checksum fail: md5(%s) = %s" % (url, md5))
374 except Exception as e:
375 logging.info("Request fail: GET %s => %s: %s" %
376 (url, type(e), str(e)))
379 def put(self, data, **kwargs):
380 if 'KEEP_LOCAL_STORE' in os.environ:
381 return KeepClient.local_store_put(data)
382 m = hashlib.new('md5')
384 data_hash = m.hexdigest()
386 want_copies = kwargs.get('copies', 2)
387 if not (want_copies > 0):
390 thread_limiter = KeepClient.ThreadLimiter(want_copies)
391 for service_root in self.shuffled_service_roots(data_hash):
392 t = KeepClient.KeepWriterThread(data=data,
394 service_root=service_root,
395 thread_limiter=thread_limiter,
396 using_proxy=self.using_proxy,
397 want_copies=(want_copies if self.using_proxy else 1))
402 have_copies = thread_limiter.done()
403 # If we're done, return the response from Keep
404 if have_copies >= want_copies:
405 return thread_limiter.response()
406 raise arvados.errors.KeepWriteError(
407 "Write fail for %s: wanted %d but wrote %d" %
408 (data_hash, want_copies, have_copies))
411 def sign_for_old_server(data_hash, data):
412 return (("-----BEGIN PGP SIGNED MESSAGE-----\n\n\n%d %s\n-----BEGIN PGP SIGNATURE-----\n\n-----END PGP SIGNATURE-----\n" % (int(time.time()), data_hash)) + data)
416 def local_store_put(data):
417 m = hashlib.new('md5')
420 locator = '%s+%d' % (md5, len(data))
421 with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'), 'w') as f:
423 os.rename(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'),
424 os.path.join(os.environ['KEEP_LOCAL_STORE'], md5))
428 def local_store_get(locator):
429 r = re.search('^([0-9a-f]{32,})', locator)
431 raise arvados.errors.NotFoundError(
432 "Invalid data locator: '%s'" % locator)
433 if r.group(0) == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
435 with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], r.group(0)), 'r') as f: