3198: Async put, read prefetch via BlockManager. Added arvfile tests (forgot
[arvados.git] / sdk / python / arvados / keep.py
1 import gflags
2 import logging
3 import os
4 import pprint
5 import sys
6 import types
7 import subprocess
8 import json
9 import UserDict
10 import re
11 import hashlib
12 import string
13 import bz2
14 import zlib
15 import fcntl
16 import time
17 import threading
18 import timer
19 import datetime
20 import ssl
21 import socket
22 import requests
23
24 import arvados
25 import arvados.config as config
26 import arvados.errors
27 import arvados.retry as retry
28 import arvados.util
29
30 _logger = logging.getLogger('arvados.keep')
31 global_client_object = None
32
33 class KeepLocator(object):
34     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
35     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
36
37     def __init__(self, locator_str):
38         self.hints = []
39         self._perm_sig = None
40         self._perm_expiry = None
41         pieces = iter(locator_str.split('+'))
42         self.md5sum = next(pieces)
43         try:
44             self.size = int(next(pieces))
45         except StopIteration:
46             self.size = None
47         for hint in pieces:
48             if self.HINT_RE.match(hint) is None:
49                 raise ValueError("unrecognized hint data {}".format(hint))
50             elif hint.startswith('A'):
51                 self.parse_permission_hint(hint)
52             else:
53                 self.hints.append(hint)
54
55     def __str__(self):
56         return '+'.join(
57             str(s) for s in [self.md5sum, self.size,
58                              self.permission_hint()] + self.hints
59             if s is not None)
60
61     def _make_hex_prop(name, length):
62         # Build and return a new property with the given name that
63         # must be a hex string of the given length.
64         data_name = '_{}'.format(name)
65         def getter(self):
66             return getattr(self, data_name)
67         def setter(self, hex_str):
68             if not arvados.util.is_hex(hex_str, length):
69                 raise ValueError("{} must be a {}-digit hex string: {}".
70                                  format(name, length, hex_str))
71             setattr(self, data_name, hex_str)
72         return property(getter, setter)
73
74     md5sum = _make_hex_prop('md5sum', 32)
75     perm_sig = _make_hex_prop('perm_sig', 40)
76
77     @property
78     def perm_expiry(self):
79         return self._perm_expiry
80
81     @perm_expiry.setter
82     def perm_expiry(self, value):
83         if not arvados.util.is_hex(value, 1, 8):
84             raise ValueError(
85                 "permission timestamp must be a hex Unix timestamp: {}".
86                 format(value))
87         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
88
89     def permission_hint(self):
90         data = [self.perm_sig, self.perm_expiry]
91         if None in data:
92             return None
93         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
94         return "A{}@{:08x}".format(*data)
95
96     def parse_permission_hint(self, s):
97         try:
98             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
99         except IndexError:
100             raise ValueError("bad permission hint {}".format(s))
101
102     def permission_expired(self, as_of_dt=None):
103         if self.perm_expiry is None:
104             return False
105         elif as_of_dt is None:
106             as_of_dt = datetime.datetime.now()
107         return self.perm_expiry <= as_of_dt
108
109
110 class Keep(object):
111     """Simple interface to a global KeepClient object.
112
113     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
114     own API client.  The global KeepClient will build an API client from the
115     current Arvados configuration, which may not match the one you built.
116     """
117     _last_key = None
118
119     @classmethod
120     def global_client_object(cls):
121         global global_client_object
122         # Previously, KeepClient would change its behavior at runtime based
123         # on these configuration settings.  We simulate that behavior here
124         # by checking the values and returning a new KeepClient if any of
125         # them have changed.
126         key = (config.get('ARVADOS_API_HOST'),
127                config.get('ARVADOS_API_TOKEN'),
128                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
129                config.get('ARVADOS_KEEP_PROXY'),
130                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
131                os.environ.get('KEEP_LOCAL_STORE'))
132         if (global_client_object is None) or (cls._last_key != key):
133             global_client_object = KeepClient()
134             cls._last_key = key
135         return global_client_object
136
137     @staticmethod
138     def get(locator, **kwargs):
139         return Keep.global_client_object().get(locator, **kwargs)
140
141     @staticmethod
142     def put(data, **kwargs):
143         return Keep.global_client_object().put(data, **kwargs)
144
145 class KeepBlockCache(object):
146     # Default RAM cache is 256MiB
147     def __init__(self, cache_max=(256 * 1024 * 1024)):
148         self.cache_max = cache_max
149         self._cache = []
150         self._cache_lock = threading.Lock()
151
152     class CacheSlot(object):
153         def __init__(self, locator):
154             self.locator = locator
155             self.ready = threading.Event()
156             self.content = None
157
158         def get(self):
159             self.ready.wait()
160             return self.content
161
162         def set(self, value):
163             self.content = value
164             self.ready.set()
165
166         def size(self):
167             if self.content is None:
168                 return 0
169             else:
170                 return len(self.content)
171
172     def cap_cache(self):
173         '''Cap the cache size to self.cache_max'''
174         with self._cache_lock:
175             # Select all slots except those where ready.is_set() and content is
176             # None (that means there was an error reading the block).
177             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
178             sm = sum([slot.size() for slot in self._cache])
179             while len(self._cache) > 0 and sm > self.cache_max:
180                 for i in xrange(len(self._cache)-1, -1, -1):
181                     if self._cache[i].ready.is_set():
182                         del self._cache[i]
183                         break
184                 sm = sum([slot.size() for slot in self._cache])
185
186     def reserve_cache(self, locator):
187         '''Reserve a cache slot for the specified locator,
188         or return the existing slot.'''
189         with self._cache_lock:
190             # Test if the locator is already in the cache
191             for i in xrange(0, len(self._cache)):
192                 if self._cache[i].locator == locator:
193                     n = self._cache[i]
194                     if i != 0:
195                         # move it to the front
196                         del self._cache[i]
197                         self._cache.insert(0, n)
198                     return n, False
199
200             # Add a new cache slot for the locator
201             n = KeepBlockCache.CacheSlot(locator)
202             self._cache.insert(0, n)
203             return n, True
204
205 class KeepClient(object):
206
207     # Default Keep server connection timeout:  2 seconds
208     # Default Keep server read timeout:      300 seconds
209     # Default Keep proxy connection timeout:  20 seconds
210     # Default Keep proxy read timeout:       300 seconds
211     DEFAULT_TIMEOUT = (2, 300)
212     DEFAULT_PROXY_TIMEOUT = (20, 300)
213
214     class ThreadLimiter(object):
215         """
216         Limit the number of threads running at a given time to
217         {desired successes} minus {successes reported}. When successes
218         reported == desired, wake up the remaining threads and tell
219         them to quit.
220
221         Should be used in a "with" block.
222         """
223         def __init__(self, todo):
224             self._todo = todo
225             self._done = 0
226             self._response = None
227             self._todo_lock = threading.Semaphore(todo)
228             self._done_lock = threading.Lock()
229
230         def __enter__(self):
231             self._todo_lock.acquire()
232             return self
233
234         def __exit__(self, type, value, traceback):
235             self._todo_lock.release()
236
237         def shall_i_proceed(self):
238             """
239             Return true if the current thread should do stuff. Return
240             false if the current thread should just stop.
241             """
242             with self._done_lock:
243                 return (self._done < self._todo)
244
245         def save_response(self, response_body, replicas_stored):
246             """
247             Records a response body (a locator, possibly signed) returned by
248             the Keep server.  It is not necessary to save more than
249             one response, since we presume that any locator returned
250             in response to a successful request is valid.
251             """
252             with self._done_lock:
253                 self._done += replicas_stored
254                 self._response = response_body
255
256         def response(self):
257             """
258             Returns the body from the response to a PUT request.
259             """
260             with self._done_lock:
261                 return self._response
262
263         def done(self):
264             """
265             Return how many successes were reported.
266             """
267             with self._done_lock:
268                 return self._done
269
270
271     class KeepService(object):
272         # Make requests to a single Keep service, and track results.
273         HTTP_ERRORS = (requests.exceptions.RequestException,
274                        socket.error, ssl.SSLError)
275
276         def __init__(self, root, **headers):
277             self.root = root
278             self.last_result = None
279             self.success_flag = None
280             self.get_headers = {'Accept': 'application/octet-stream'}
281             self.get_headers.update(headers)
282             self.put_headers = headers
283
284         def usable(self):
285             return self.success_flag is not False
286
287         def finished(self):
288             return self.success_flag is not None
289
290         def last_status(self):
291             try:
292                 return self.last_result.status_code
293             except AttributeError:
294                 return None
295
296         def get(self, locator, timeout=None):
297             # locator is a KeepLocator object.
298             url = self.root + str(locator)
299             _logger.debug("Request: GET %s", url)
300             try:
301                 with timer.Timer() as t:
302                     result = requests.get(url.encode('utf-8'),
303                                           headers=self.get_headers,
304                                           timeout=timeout)
305             except self.HTTP_ERRORS as e:
306                 _logger.debug("Request fail: GET %s => %s: %s",
307                               url, type(e), str(e))
308                 self.last_result = e
309             else:
310                 self.last_result = result
311                 self.success_flag = retry.check_http_response_success(result)
312                 content = result.content
313                 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
314                              self.last_status(), len(content), t.msecs,
315                              (len(content)/(1024.0*1024))/t.secs)
316                 if self.success_flag:
317                     resp_md5 = hashlib.md5(content).hexdigest()
318                     if resp_md5 == locator.md5sum:
319                         return content
320                     _logger.warning("Checksum fail: md5(%s) = %s",
321                                     url, resp_md5)
322             return None
323
324         def put(self, hash_s, body, timeout=None):
325             url = self.root + hash_s
326             _logger.debug("Request: PUT %s", url)
327             try:
328                 result = requests.put(url.encode('utf-8'),
329                                       data=body,
330                                       headers=self.put_headers,
331                                       timeout=timeout)
332             except self.HTTP_ERRORS as e:
333                 _logger.debug("Request fail: PUT %s => %s: %s",
334                               url, type(e), str(e))
335                 self.last_result = e
336             else:
337                 self.last_result = result
338                 self.success_flag = retry.check_http_response_success(result)
339             return self.success_flag
340
341
342     class KeepWriterThread(threading.Thread):
343         """
344         Write a blob of data to the given Keep server. On success, call
345         save_response() of the given ThreadLimiter to save the returned
346         locator.
347         """
348         def __init__(self, keep_service, **kwargs):
349             super(KeepClient.KeepWriterThread, self).__init__()
350             self.service = keep_service
351             self.args = kwargs
352             self._success = False
353
354         def success(self):
355             return self._success
356
357         def run(self):
358             with self.args['thread_limiter'] as limiter:
359                 if not limiter.shall_i_proceed():
360                     # My turn arrived, but the job has been done without
361                     # me.
362                     return
363                 self.run_with_limiter(limiter)
364
365         def run_with_limiter(self, limiter):
366             if self.service.finished():
367                 return
368             _logger.debug("KeepWriterThread %s proceeding %s %s",
369                           str(threading.current_thread()),
370                           self.args['data_hash'],
371                           self.args['service_root'])
372             self._success = bool(self.service.put(
373                 self.args['data_hash'],
374                 self.args['data'],
375                 timeout=self.args.get('timeout', None)))
376             status = self.service.last_status()
377             if self._success:
378                 result = self.service.last_result
379                 _logger.debug("KeepWriterThread %s succeeded %s %s",
380                               str(threading.current_thread()),
381                               self.args['data_hash'],
382                               self.args['service_root'])
383                 # Tick the 'done' counter for the number of replica
384                 # reported stored by the server, for the case that
385                 # we're talking to a proxy or other backend that
386                 # stores to multiple copies for us.
387                 try:
388                     replicas_stored = int(result.headers['x-keep-replicas-stored'])
389                 except (KeyError, ValueError):
390                     replicas_stored = 1
391                 limiter.save_response(result.text.strip(), replicas_stored)
392             elif status is not None:
393                 _logger.debug("Request fail: PUT %s => %s %s",
394                               self.args['data_hash'], status,
395                               self.service.last_result.text)
396
397
398     def __init__(self, api_client=None, proxy=None,
399                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
400                  api_token=None, local_store=None, block_cache=None,
401                  num_retries=0):
402         """Initialize a new KeepClient.
403
404         Arguments:
405         * api_client: The API client to use to find Keep services.  If not
406           provided, KeepClient will build one from available Arvados
407           configuration.
408         * proxy: If specified, this KeepClient will send requests to this
409           Keep proxy.  Otherwise, KeepClient will fall back to the setting
410           of the ARVADOS_KEEP_PROXY configuration setting.  If you want to
411           ensure KeepClient does not use a proxy, pass in an empty string.
412         * timeout: The timeout (in seconds) for HTTP requests to Keep
413           non-proxy servers.  A tuple of two floats is interpreted as
414           (connection_timeout, read_timeout): see
415           http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
416           Default: (2, 300).
417         * proxy_timeout: The timeout (in seconds) for HTTP requests to
418           Keep proxies. A tuple of two floats is interpreted as
419           (connection_timeout, read_timeout). Default: (20, 300).
420         * api_token: If you're not using an API client, but only talking
421           directly to a Keep proxy, this parameter specifies an API token
422           to authenticate Keep requests.  It is an error to specify both
423           api_client and api_token.  If you specify neither, KeepClient
424           will use one available from the Arvados configuration.
425         * local_store: If specified, this KeepClient will bypass Keep
426           services, and save data to the named directory.  If unspecified,
427           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
428           environment variable.  If you want to ensure KeepClient does not
429           use local storage, pass in an empty string.  This is primarily
430           intended to mock a server for testing.
431         * num_retries: The default number of times to retry failed requests.
432           This will be used as the default num_retries value when get() and
433           put() are called.  Default 0.
434         """
435         self.lock = threading.Lock()
436         if proxy is None:
437             proxy = config.get('ARVADOS_KEEP_PROXY')
438         if api_token is None:
439             if api_client is None:
440                 api_token = config.get('ARVADOS_API_TOKEN')
441             else:
442                 api_token = api_client.api_token
443         elif api_client is not None:
444             raise ValueError(
445                 "can't build KeepClient with both API client and token")
446         if local_store is None:
447             local_store = os.environ.get('KEEP_LOCAL_STORE')
448
449         self.block_cache = block_cache if block_cache else KeepBlockCache()
450         self.timeout = timeout
451         self.proxy_timeout = proxy_timeout
452
453         if local_store:
454             self.local_store = local_store
455             self.get = self.local_store_get
456             self.put = self.local_store_put
457         else:
458             self.num_retries = num_retries
459             if proxy:
460                 if not proxy.endswith('/'):
461                     proxy += '/'
462                 self.api_token = api_token
463                 self._keep_services = [{
464                     'uuid': 'proxy',
465                     '_service_root': proxy,
466                     }]
467                 self.using_proxy = True
468                 self._static_services_list = True
469             else:
470                 # It's important to avoid instantiating an API client
471                 # unless we actually need one, for testing's sake.
472                 if api_client is None:
473                     api_client = arvados.api('v1')
474                 self.api_client = api_client
475                 self.api_token = api_client.api_token
476                 self._keep_services = None
477                 self.using_proxy = None
478                 self._static_services_list = False
479
480     def current_timeout(self):
481         """Return the appropriate timeout to use for this client: the proxy
482         timeout setting if the backend service is currently a proxy,
483         the regular timeout setting otherwise.
484         """
485         # TODO(twp): the timeout should be a property of a
486         # KeepService, not a KeepClient. See #4488.
487         return self.proxy_timeout if self.using_proxy else self.timeout
488
489     def build_services_list(self, force_rebuild=False):
490         if (self._static_services_list or
491               (self._keep_services and not force_rebuild)):
492             return
493         with self.lock:
494             try:
495                 keep_services = self.api_client.keep_services().accessible()
496             except Exception:  # API server predates Keep services.
497                 keep_services = self.api_client.keep_disks().list()
498
499             self._keep_services = keep_services.execute().get('items')
500             if not self._keep_services:
501                 raise arvados.errors.NoKeepServersError()
502
503             self.using_proxy = any(ks.get('service_type') == 'proxy'
504                                    for ks in self._keep_services)
505
506             # Precompute the base URI for each service.
507             for r in self._keep_services:
508                 r['_service_root'] = "{}://[{}]:{:d}/".format(
509                     'https' if r['service_ssl_flag'] else 'http',
510                     r['service_host'],
511                     r['service_port'])
512             _logger.debug(str(self._keep_services))
513
514     def _service_weight(self, data_hash, service_uuid):
515         """Compute the weight of a Keep service endpoint for a data
516         block with a known hash.
517
518         The weight is md5(h + u) where u is the last 15 characters of
519         the service endpoint's UUID.
520         """
521         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
522
523     def weighted_service_roots(self, data_hash, force_rebuild=False):
524         """Return an array of Keep service endpoints, in the order in
525         which they should be probed when reading or writing data with
526         the given hash.
527         """
528         self.build_services_list(force_rebuild)
529
530         # Sort the available services by weight (heaviest first) for
531         # this data_hash, and return their service_roots (base URIs)
532         # in that order.
533         sorted_roots = [
534             svc['_service_root'] for svc in sorted(
535                 self._keep_services,
536                 reverse=True,
537                 key=lambda svc: self._service_weight(data_hash, svc['uuid']))]
538         _logger.debug(data_hash + ': ' + str(sorted_roots))
539         return sorted_roots
540
541     def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
542         # roots_map is a dictionary, mapping Keep service root strings
543         # to KeepService objects.  Poll for Keep services, and add any
544         # new ones to roots_map.  Return the current list of local
545         # root strings.
546         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
547         local_roots = self.weighted_service_roots(md5_s, force_rebuild)
548         for root in local_roots:
549             if root not in roots_map:
550                 roots_map[root] = self.KeepService(root, **headers)
551         return local_roots
552
553     @staticmethod
554     def _check_loop_result(result):
555         # KeepClient RetryLoops should save results as a 2-tuple: the
556         # actual result of the request, and the number of servers available
557         # to receive the request this round.
558         # This method returns True if there's a real result, False if
559         # there are no more servers available, otherwise None.
560         if isinstance(result, Exception):
561             return None
562         result, tried_server_count = result
563         if (result is not None) and (result is not False):
564             return True
565         elif tried_server_count < 1:
566             _logger.info("No more Keep services to try; giving up")
567             return False
568         else:
569             return None
570
571     @retry.retry_method
572     def get(self, loc_s, num_retries=None):
573         """Get data from Keep.
574
575         This method fetches one or more blocks of data from Keep.  It
576         sends a request each Keep service registered with the API
577         server (or the proxy provided when this client was
578         instantiated), then each service named in location hints, in
579         sequence.  As soon as one service provides the data, it's
580         returned.
581
582         Arguments:
583         * loc_s: A string of one or more comma-separated locators to fetch.
584           This method returns the concatenation of these blocks.
585         * num_retries: The number of times to retry GET requests to
586           *each* Keep server if it returns temporary failures, with
587           exponential backoff.  Note that, in each loop, the method may try
588           to fetch data from every available Keep service, along with any
589           that are named in location hints in the locator.  The default value
590           is set when the KeepClient is initialized.
591         """
592         if ',' in loc_s:
593             return ''.join(self.get(x) for x in loc_s.split(','))
594         locator = KeepLocator(loc_s)
595         expect_hash = locator.md5sum
596
597         slot, first = self.block_cache.reserve_cache(expect_hash)
598         if not first:
599             v = slot.get()
600             return v
601
602         # See #3147 for a discussion of the loop implementation.  Highlights:
603         # * Refresh the list of Keep services after each failure, in case
604         #   it's being updated.
605         # * Retry until we succeed, we're out of retries, or every available
606         #   service has returned permanent failure.
607         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
608                       for hint in locator.hints if hint.startswith('K@')]
609         # Map root URLs their KeepService objects.
610         roots_map = {root: self.KeepService(root) for root in hint_roots}
611         blob = None
612         loop = retry.RetryLoop(num_retries, self._check_loop_result,
613                                backoff_start=2)
614         for tries_left in loop:
615             try:
616                 local_roots = self.map_new_services(
617                     roots_map, expect_hash,
618                     force_rebuild=(tries_left < num_retries))
619             except Exception as error:
620                 loop.save_result(error)
621                 continue
622
623             # Query KeepService objects that haven't returned
624             # permanent failure, in our specified shuffle order.
625             services_to_try = [roots_map[root]
626                                for root in (local_roots + hint_roots)
627                                if roots_map[root].usable()]
628             for keep_service in services_to_try:
629                 blob = keep_service.get(locator, timeout=self.current_timeout())
630                 if blob is not None:
631                     break
632             loop.save_result((blob, len(services_to_try)))
633
634         # Always cache the result, then return it if we succeeded.
635         slot.set(blob)
636         self.block_cache.cap_cache()
637         if loop.success():
638             return blob
639
640         # No servers fulfilled the request.  Count how many responded
641         # "not found;" if the ratio is high enough (currently 75%), report
642         # Not Found; otherwise a generic error.
643         # Q: Including 403 is necessary for the Keep tests to continue
644         # passing, but maybe they should expect KeepReadError instead?
645         not_founds = sum(1 for ks in roots_map.values()
646                          if ks.last_status() in set([403, 404, 410]))
647         if roots_map and ((float(not_founds) / len(roots_map)) >= .75):
648             raise arvados.errors.NotFoundError(loc_s)
649         else:
650             raise arvados.errors.KeepReadError(loc_s)
651
652     @retry.retry_method
653     def put(self, data, copies=2, num_retries=None):
654         """Save data in Keep.
655
656         This method will get a list of Keep services from the API server, and
657         send the data to each one simultaneously in a new thread.  Once the
658         uploads are finished, if enough copies are saved, this method returns
659         the most recent HTTP response body.  If requests fail to upload
660         enough copies, this method raises KeepWriteError.
661
662         Arguments:
663         * data: The string of data to upload.
664         * copies: The number of copies that the user requires be saved.
665           Default 2.
666         * num_retries: The number of times to retry PUT requests to
667           *each* Keep server if it returns temporary failures, with
668           exponential backoff.  The default value is set when the
669           KeepClient is initialized.
670         """
671         data_hash = hashlib.md5(data).hexdigest()
672         if copies < 1:
673             return data_hash
674
675         headers = {}
676         if self.using_proxy:
677             # Tell the proxy how many copies we want it to store
678             headers['X-Keep-Desired-Replication'] = str(copies)
679         roots_map = {}
680         thread_limiter = KeepClient.ThreadLimiter(copies)
681         loop = retry.RetryLoop(num_retries, self._check_loop_result,
682                                backoff_start=2)
683         for tries_left in loop:
684             try:
685                 local_roots = self.map_new_services(
686                     roots_map, data_hash,
687                     force_rebuild=(tries_left < num_retries), **headers)
688             except Exception as error:
689                 loop.save_result(error)
690                 continue
691
692             threads = []
693             for service_root, ks in roots_map.iteritems():
694                 if ks.finished():
695                     continue
696                 t = KeepClient.KeepWriterThread(
697                     ks,
698                     data=data,
699                     data_hash=data_hash,
700                     service_root=service_root,
701                     thread_limiter=thread_limiter,
702                     timeout=self.current_timeout())
703                 t.start()
704                 threads.append(t)
705             for t in threads:
706                 t.join()
707             loop.save_result((thread_limiter.done() >= copies, len(threads)))
708
709         if loop.success():
710             return thread_limiter.response()
711         raise arvados.errors.KeepWriteError(
712             "Write fail for %s: wanted %d but wrote %d" %
713             (data_hash, copies, thread_limiter.done()))
714
715     # Local storage methods need no-op num_retries arguments to keep
716     # integration tests happy.  With better isolation they could
717     # probably be removed again.
718     def local_store_put(self, data, num_retries=0):
719         md5 = hashlib.md5(data).hexdigest()
720         locator = '%s+%d' % (md5, len(data))
721         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
722             f.write(data)
723         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
724                   os.path.join(self.local_store, md5))
725         return locator
726
727     def local_store_get(self, loc_s, num_retries=0):
728         try:
729             locator = KeepLocator(loc_s)
730         except ValueError:
731             raise arvados.errors.NotFoundError(
732                 "Invalid data locator: '%s'" % loc_s)
733         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
734             return ''
735         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
736             return f.read()